Skip to main content

Databases & Caching

This guide covers the database architecture and caching patterns used in the Heimdall platform.

Overview

Heimdall uses a dual-database architecture with Redis caching:

DatabasePurposeDriver
PostgreSQLCore application data (users, sessions, API keys, roles)SQLx
TimescaleDBTime-series data (audit events, storage files)SQLx
RedisCaching, rate limiting, session tracking, pub/subredis-rs
┌─────────────────────────────────────────────────────────────────┐
│ Rust API (Actix-web) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ PostgreSQL │ │ TimescaleDB │ │ Redis │ │
│ │ (state.pool) │ │ (state.tsdb) │ │ (state.redis) │ │
│ │ │ │ │ │ │ │
│ │ Core Data │ │ Time-Series │ │ Caching │ │
│ │ - Users │ │ - AuditEvent │ │ - Sessions │ │
│ │ - Sessions │ │ - StorageFile │ │ - Permissions │ │
│ │ - API Keys │ │ - GpsData │ │ - Rate Limits │ │
│ │ - Roles │ │ │ │ - Pub/Sub │ │
│ │ - OAuth │ │ Hypertables │ │ │ │
│ │ - 2FA │ │ with auto- │ │ │ │
│ │ │ │ partitioning │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘

Why Dual-Database?

This separation provides:

  • Optimized time-series queries - TimescaleDB provides automatic partitioning and compression for audit/metrics data
  • Independent scaling - OLTP workloads (PostgreSQL) scale separately from analytics (TimescaleDB)
  • GeoIP enrichment - Audit events automatically enriched with location data
  • Automatic data retention - Configurable retention policies for compliance
  • Performance isolation - Heavy audit queries don't impact user-facing operations

PostgreSQL (Primary Database)

Connection

// platform/api/src/state.rs
pub struct AppState {
pub pool: PgPool, // PostgreSQL connection pool
pub tsdb: PgPool, // TimescaleDB connection pool
pub redis: RedisClient,
// ...
}

Configuration

# config/default.toml
[database]
url = "postgres://user:pass@localhost:5432/heimdall"
max_connections = 10
min_connections = 1
connect_timeout = 30
idle_timeout = 600
max_lifetime = 1800

Core Tables

User Management

TableDescription
UserMain user entity with profile, 2FA settings
PlatformAuthentication providers (OAuth + email)
PlatformAccountUser credentials & linked OAuth accounts
SessionActive user sessions
UserBanBan records with audit trail
ScheduledDeletionAccount deletion grace periods

Authentication & Authorization

TableDescription
RoleRole definitions (Admin, Moderator, User, etc.)
PermissionResource:action permissions
UserRoleUser-role assignments (junction)
RolePermissionRole-permission assignments (junction)
ApiKeyAPI keys for programmatic access
ApiKeyRoleAPI key role assignments

Two-Factor Authentication

TableDescription
UserTwoFactor2FA settings per user
TwoFactorBackupCodeBackup/recovery codes
PendingTwoFactorSetupPending 2FA setup until verified

OAuth 2.0 Provider

TableDescription
OAuthClientRegistered OAuth applications
OAuthAuthorizationCodeShort-lived authorization codes
OAuthAccessTokenAccess tokens (hashed)
OAuthRefreshTokenRefresh tokens
OAuthConsentUser consent records
OAuthPendingAuthorizationPending OAuth requests

Migrations

Migrations are stored in platform/migrations/ and run automatically on API startup.

# Run migrations manually
just migrate

# Create new migration
just migrate-create add_feature_name

Naming Convention

YYYYMMDDHHMMSS_description.up.sql
YYYYMMDDHHMMSS_description.down.sql

Example: 20260124000001_add_storage_permissions.up.sql

Best Practices

-- Always create both up and down migrations
-- Use TEXT for IDs (UUIDs stored as text)
-- Always add created_at with DEFAULT NOW()
-- Add indexes for foreign keys and common queries
-- Use ON DELETE CASCADE for user-owned data
-- Use double quotes for table/column names

CREATE TABLE "NewTable" (
id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::TEXT,
user_id TEXT NOT NULL REFERENCES "User"(id) ON DELETE CASCADE,
name TEXT NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_new_table_user ON "NewTable"(user_id);
CREATE INDEX idx_new_table_created ON "NewTable"(created_at DESC);

COMMENT ON TABLE "NewTable" IS 'Description of the table purpose';

SQLx Query Patterns

Basic Query

let user: Option<User> = sqlx::query_as(
r#"SELECT * FROM "User" WHERE id = $1"#
)
.bind(&user_id)
.fetch_optional(&state.pool)
.await?;

Insert with Returning

let user: User = sqlx::query_as(
r#"
INSERT INTO "User" (id, username, created_at, updated_at)
VALUES ($1, $2, NOW(), NOW())
RETURNING *
"#
)
.bind(&id)
.bind(&username)
.fetch_one(&state.pool)
.await?;

Transaction

let mut tx = state.pool.begin().await?;

sqlx::query(r#"UPDATE "User" SET username = $1 WHERE id = $2"#)
.bind(&username)
.bind(&user_id)
.execute(&mut *tx)
.await?;

sqlx::query(r#"INSERT INTO "UserRole" (user_id, role_id) VALUES ($1, $2)"#)
.bind(&user_id)
.bind(&role_id)
.execute(&mut *tx)
.await?;

tx.commit().await?;

Dynamic Query Building

// For complex filters, build queries dynamically
let mut sql = String::from(r#"SELECT * FROM "User" WHERE 1=1"#);
let mut params: Vec<&(dyn sqlx::Encode<'_, sqlx::Postgres> + Sync)> = vec![];

if let Some(ref username) = filter.username {
sql.push_str(&format!(" AND username ILIKE ${}", params.len() + 1));
params.push(username);
}

if let Some(ref role) = filter.role {
sql.push_str(&format!(" AND id IN (SELECT user_id FROM \"UserRole\" WHERE role_id = ${})", params.len() + 1));
params.push(role);
}

sql.push_str(" ORDER BY created_at DESC LIMIT 100");

TimescaleDB (Time-Series Database)

TimescaleDB extends PostgreSQL with time-series optimizations. We use it for audit events and file metadata.

Configuration

# config/default.toml
[tsdb]
url = "postgres://user:pass@localhost:5433/heimdall_ts"
max_connections = 5
min_connections = 1

Migrations

TSDB migrations are stored separately in platform/tsdb_migrations/:

# Run TSDB migrations
just tsdb-migrate

Hypertables

Hypertables are TimescaleDB's core feature - they automatically partition data by time.

AuditEvent Hypertable

CREATE TABLE "AuditEvent" (
id UUID NOT NULL DEFAULT gen_random_uuid(),
user_id TEXT,
event_type TEXT NOT NULL,
resource_type TEXT,
resource_id TEXT,
actor_id TEXT,
ip_address TEXT,
user_agent TEXT,
description TEXT,
metadata JSONB,
status TEXT DEFAULT 'success',
error_message TEXT,
-- GeoIP fields (auto-enriched)
country_code TEXT,
country_name TEXT,
city TEXT,
region TEXT,
source_service TEXT,
is_reported BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, created_at)
);

-- Convert to hypertable with 7-day chunks
SELECT create_hypertable(
'"AuditEvent"',
by_range('created_at', INTERVAL '7 days')
);

-- Enable compression after 30 days
ALTER TABLE "AuditEvent" SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'user_id, event_type',
timescaledb.compress_orderby = 'created_at DESC, id'
);

SELECT add_compression_policy('"AuditEvent"', INTERVAL '30 days');

StorageFile Hypertable

CREATE TABLE "StorageFile" (
id UUID NOT NULL DEFAULT gen_random_uuid(),
user_id TEXT NOT NULL,
key TEXT NOT NULL,
original_filename TEXT NOT NULL,
content_type TEXT NOT NULL,
category TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
etag TEXT,
bucket TEXT NOT NULL,
metadata JSONB,
ip_address TEXT,
user_agent TEXT,
source_service TEXT DEFAULT 'api',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, created_at)
);

-- Convert to hypertable with daily chunks
SELECT create_hypertable(
'"StorageFile"',
by_range('created_at', INTERVAL '1 day')
);

-- Enable compression after 7 days
SELECT add_compression_policy('"StorageFile"', INTERVAL '7 days');

GeoIP Enrichment

Audit events are automatically enriched with location data from IP addresses:

// platform/api/src/services/audit.rs
impl AuditLogger {
pub fn with_geoip(tsdb: &PgPool, geoip: &Option<GeoIpReader>) -> Self {
// GeoIP reader loaded from MaxMind database
}

async fn enrich_with_geoip(&self, event: &mut CreateAuditEvent) {
if let Some(ref geoip) = self.geoip {
if let Some(ip) = &event.ip_address {
if let Ok(location) = geoip.lookup(ip) {
event.country_code = location.country_code;
event.country_name = location.country_name;
event.city = location.city;
event.region = location.region;
}
}
}
}
}

Querying TimescaleDB

// Always use state.tsdb for TimescaleDB tables
let events: Vec<AuditEvent> = sqlx::query_as(
r#"
SELECT * FROM "AuditEvent"
WHERE user_id = $1
AND created_at >= $2
AND created_at <= $3
ORDER BY created_at DESC
LIMIT 100
"#
)
.bind(&user_id)
.bind(&start_date)
.bind(&end_date)
.fetch_all(&state.tsdb) // <-- Use tsdb, not pool
.await?;

Time-Series Functions

TimescaleDB provides useful functions for time-series analysis:

-- Time bucket aggregation
SELECT
time_bucket('1 hour', created_at) AS bucket,
event_type,
COUNT(*) as count
FROM "AuditEvent"
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY bucket, event_type
ORDER BY bucket DESC;

-- First/last aggregates
SELECT
user_id,
first(event_type, created_at) as first_event,
last(event_type, created_at) as last_event,
COUNT(*) as total_events
FROM "AuditEvent"
WHERE created_at > NOW() - INTERVAL '7 days'
GROUP BY user_id;

Redis Cache

Redis is used for caching, rate limiting, session tracking, and pub/sub messaging.

Connection

// platform/api/src/utils/redis.rs
pub struct RedisClient {
pool: deadpool_redis::Pool,
}

impl RedisClient {
pub async fn new(config: &RedisConfig) -> Result<Self> {
// Resilient connection with auto-reconnect
}
}

Configuration

# config/default.toml
[redis]
url = "redis://localhost:6379"
pool_size = 10

Key Patterns

Session Caching (5 min TTL)

session:{session_token}           -> user_id
session_id:{session_token} -> session_id (DB primary key)

User Session Tracking

user:{user_id}:sessions           -> SET of active session tokens
user:{user_id}:sessions:activity -> ZSET (token -> timestamp for LRU)

Permission Caching (10 min TTL)

user:{user_id}:permissions        -> JSON array of permission strings
api_key:{api_key_id}:permissions -> JSON array of permission strings

Rate Limiting

rate_limit:{endpoint}:{ip}        -> request count (with sliding window TTL)
rate_limit:{endpoint}:{user_id} -> request count for authenticated users

Cache Methods

Session Methods

// Cache session (5 min TTL)
redis.cache_session(session_token, user_id).await?;
redis.get_cached_session(session_token).await?;
redis.invalidate_session(session_token).await?;

// Session ID (database ID for WebSocket targeting)
redis.cache_session_id(session_token, session_id).await?;
redis.get_cached_session_id(session_token).await?;

User Session Tracking

// Track user's active sessions
redis.add_user_session(user_id, session_token).await?;
redis.get_user_sessions(user_id).await?;
redis.remove_user_session(user_id, session_token).await?;
redis.clear_user_sessions(user_id).await?; // Logout all

// Session activity tracking (sorted by timestamp)
redis.update_session_activity(user_id, session_token).await?;
redis.get_sessions_by_activity(user_id).await?; // Most recent first

Permission Caching

// User permissions (10 min TTL)
redis.cache_user_permissions(user_id, &permissions).await?;
redis.get_cached_user_permissions(user_id).await?;
redis.invalidate_user_permissions(user_id).await?;

// API key permissions (10 min TTL)
redis.cache_api_key_permissions(api_key_id, &permissions).await?;
redis.get_cached_api_key_permissions(api_key_id).await?;
redis.invalidate_api_key_permissions(api_key_id).await?;

// Invalidate all user caches (after role change, etc.)
redis.invalidate_user_caches(user_id).await?;

Rate Limiting

// Check rate limit - returns true if within limit
let allowed = redis.check_rate_limit(
"login:192.168.1.1",
max_requests: 10,
window_secs: 60
).await?;

if !allowed {
return Err(Error::RateLimited);
}

Cache Invalidation Strategy

EventInvalidate
User logoutsession:{token}, remove from user:{id}:sessions
User role changeuser:{id}:permissions
API key updateapi_key:{id}:permissions
User banAll user caches
Password changeAll user sessions
2FA changeAll user sessions (force re-auth)

TTL Strategy

Cache TypeTTLReason
Sessions5 minutesFrequent DB validation for security
Permissions10 minutesMax acceptable stale data for RBAC
Rate limitsWindow-based60s for auth, 3600s for exports
2FA tokens5 minutesShort-lived for security

Common Patterns

Get User with Caching

pub async fn get_user_with_permissions(
pool: &PgPool,
redis: &RedisClient,
user_id: &str,
) -> Result<(User, Vec<String>)> {
// Try cache first
if let Some(perms) = redis.get_cached_user_permissions(user_id).await? {
let user = get_user_by_id(pool, user_id).await?;
return Ok((user, perms));
}

// Query database
let user = get_user_by_id(pool, user_id).await?;
let perms = get_user_permissions(pool, user_id).await?;

// Cache for next time
redis.cache_user_permissions(user_id, &perms).await?;

Ok((user, perms))
}

Audit Event Logging

Important

Always use AuditLogger::with_geoip() for audit events. Never write directly to the AuditEvent table.

// In REST handlers
let req_info = RequestInfo::from_request(&req);
let audit_logger = AuditLogger::with_geoip(&state.tsdb, &state.geoip)
.with_source_opt(req_info.source());

audit_logger.log_login(
&user_id,
req_info.ip(),
req_info.ua(),
Some("discord"), // OAuth provider
).await;

// In GraphQL mutations
let req_info = ctx.data::<RequestInfo>().ok();
let audit_logger = AuditLogger::with_geoip(&state.tsdb, &state.geoip)
.with_source_opt(req_info.and_then(|r| r.source()));

audit_logger.log_user_updated(
&user_id,
req_info.and_then(|r| r.ip()),
req_info.and_then(|r| r.ua()),
&["email", "username"], // Changed fields
).await;

Storage File Metadata

// After successful file upload
let source = req_info.source();
let create_file = CreateStorageFile {
user_id: &user_id,
key: &result.key,
original_filename: &filename,
content_type: &result.content_type,
category: category_str,
size_bytes: result.size_bytes as i64,
etag: result.etag.as_deref(),
bucket: &result.bucket,
metadata: None,
ip_address: req_info.ip(),
user_agent: req_info.ua(),
source_service: source.as_deref(),
};

storage_files::insert_storage_file(&state.tsdb, create_file).await?;

Health Checks

Database Health

pub async fn health_check(pool: &PgPool) -> anyhow::Result<()> {
sqlx::query("SELECT 1")
.fetch_one(pool)
.await?;
Ok(())
}

Redis Health

pub async fn health_check(&self) -> anyhow::Result<()> {
let mut conn = self.get_connection().await?;
let _: String = redis::cmd("PING")
.query_async(&mut conn)
.await?;
Ok(())
}

Combined Health Endpoint

// GET /health
pub async fn health(state: web::Data<AppState>) -> impl Responder {
let pg_ok = sqlx::query("SELECT 1")
.fetch_one(&state.pool)
.await
.is_ok();

let tsdb_ok = sqlx::query("SELECT 1")
.fetch_one(&state.tsdb)
.await
.is_ok();

let redis_ok = state.redis.health_check().await.is_ok();

if pg_ok && tsdb_ok && redis_ok {
HttpResponse::Ok().json(json!({
"status": "healthy",
"postgres": "ok",
"timescaledb": "ok",
"redis": "ok"
}))
} else {
HttpResponse::ServiceUnavailable().json(json!({
"status": "unhealthy",
"postgres": if pg_ok { "ok" } else { "error" },
"timescaledb": if tsdb_ok { "ok" } else { "error" },
"redis": if redis_ok { "ok" } else { "error" }
}))
}
}

Environment Variables

# Primary Database (PostgreSQL)
DATABASE_URL=postgres://user:pass@localhost:5432/heimdall
DATABASE_MAX_CONNECTIONS=10
DATABASE_MIN_CONNECTIONS=1
DATABASE_CONNECT_TIMEOUT=30
DATABASE_IDLE_TIMEOUT=600
DATABASE_MAX_LIFETIME=1800

# Time-Series Database (TimescaleDB)
TSDB_URL=postgres://user:pass@localhost:5433/heimdall_ts
TSDB_MAX_CONNECTIONS=5
TSDB_MIN_CONNECTIONS=1

# Redis
REDIS_URL=redis://localhost:6379

# GeoIP (MaxMind)
GEOIP_DATABASE_PATH=/path/to/GeoLite2-City.mmdb

Local Development

Docker Compose

# dev-stack/docker-compose.yml
services:
postgres:
image: postgres:16
ports:
- "5432:5432"
environment:
POSTGRES_USER: heimdall
POSTGRES_PASSWORD: heimdall
POSTGRES_DB: heimdall

timescaledb:
image: timescale/timescaledb:latest-pg16
ports:
- "5433:5432"
environment:
POSTGRES_USER: heimdall
POSTGRES_PASSWORD: heimdall
POSTGRES_DB: heimdall_ts

redis:
image: redis:7-alpine
ports:
- "6379:6379"

Just Commands

# Start development databases
just dev-stack

# Run PostgreSQL migrations
just migrate

# Run TimescaleDB migrations
just tsdb-migrate

# Connect to PostgreSQL
just db-shell

# Connect to TimescaleDB
just tsdb-shell

# Connect to Redis
just redis-shell

Performance Tips

  1. Use connection pooling - Always use the connection pool, never create new connections
  2. Index foreign keys - Always index columns used in JOINs and WHERE clauses
  3. Use RETURNING - Get inserted/updated data in one query instead of two
  4. Batch operations - Use bulk inserts for multiple records
  5. Cache aggressively - Cache permissions and session data in Redis
  6. Use appropriate TTLs - Balance freshness vs database load
  7. Monitor slow queries - Enable log_min_duration_statement in PostgreSQL
  8. Compress old data - Let TimescaleDB compress audit data automatically

Next Steps