Databases & Caching
This guide covers the database architecture and caching patterns used in the Heimdall platform.
Overview
Heimdall uses a dual-database architecture with Redis caching:
| Database | Purpose | Driver |
|---|---|---|
| PostgreSQL | Core application data (users, sessions, API keys, roles) | SQLx |
| TimescaleDB | Time-series data (audit events, storage files) | SQLx |
| Redis | Caching, rate limiting, session tracking, pub/sub | redis-rs |
┌─────────────────────────────────────────────────────────────────┐
│ Rust API (Actix-web) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ PostgreSQL │ │ TimescaleDB │ │ Redis │ │
│ │ (state.pool) │ │ (state.tsdb) │ │ (state.redis) │ │
│ │ │ │ │ │ │ │
│ │ Core Data │ │ Time-Series │ │ Caching │ │
│ │ - Users │ │ - AuditEvent │ │ - Sessions │ │
│ │ - Sessions │ │ - StorageFile │ │ - Permissions │ │
│ │ - API Keys │ │ - GpsData │ │ - Rate Limits │ │
│ │ - Roles │ │ │ │ - Pub/Sub │ │
│ │ - OAuth │ │ Hypertables │ │ │ │
│ │ - 2FA │ │ with auto- │ │ │ │
│ │ │ │ partitioning │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Why Dual-Database?
This separation provides:
- Optimized time-series queries - TimescaleDB provides automatic partitioning and compression for audit/metrics data
- Independent scaling - OLTP workloads (PostgreSQL) scale separately from analytics (TimescaleDB)
- GeoIP enrichment - Audit events automatically enriched with location data
- Automatic data retention - Configurable retention policies for compliance
- Performance isolation - Heavy audit queries don't impact user-facing operations
PostgreSQL (Primary Database)
Connection
// platform/api/src/state.rs
pub struct AppState {
pub pool: PgPool, // PostgreSQL connection pool
pub tsdb: PgPool, // TimescaleDB connection pool
pub redis: RedisClient,
// ...
}
Configuration
# config/default.toml
[database]
url = "postgres://user:pass@localhost:5432/heimdall"
max_connections = 10
min_connections = 1
connect_timeout = 30
idle_timeout = 600
max_lifetime = 1800
Core Tables
User Management
| Table | Description |
|---|---|
User | Main user entity with profile, 2FA settings |
Platform | Authentication providers (OAuth + email) |
PlatformAccount | User credentials & linked OAuth accounts |
Session | Active user sessions |
UserBan | Ban records with audit trail |
ScheduledDeletion | Account deletion grace periods |
Authentication & Authorization
| Table | Description |
|---|---|
Role | Role definitions (Admin, Moderator, User, etc.) |
Permission | Resource:action permissions |
UserRole | User-role assignments (junction) |
RolePermission | Role-permission assignments (junction) |
ApiKey | API keys for programmatic access |
ApiKeyRole | API key role assignments |
Two-Factor Authentication
| Table | Description |
|---|---|
UserTwoFactor | 2FA settings per user |
TwoFactorBackupCode | Backup/recovery codes |
PendingTwoFactorSetup | Pending 2FA setup until verified |
OAuth 2.0 Provider
| Table | Description |
|---|---|
OAuthClient | Registered OAuth applications |
OAuthAuthorizationCode | Short-lived authorization codes |
OAuthAccessToken | Access tokens (hashed) |
OAuthRefreshToken | Refresh tokens |
OAuthConsent | User consent records |
OAuthPendingAuthorization | Pending OAuth requests |
Migrations
Migrations are stored in platform/migrations/ and run automatically on API startup.
# Run migrations manually
just migrate
# Create new migration
just migrate-create add_feature_name
Naming Convention
YYYYMMDDHHMMSS_description.up.sql
YYYYMMDDHHMMSS_description.down.sql
Example: 20260124000001_add_storage_permissions.up.sql
Best Practices
-- Always create both up and down migrations
-- Use TEXT for IDs (UUIDs stored as text)
-- Always add created_at with DEFAULT NOW()
-- Add indexes for foreign keys and common queries
-- Use ON DELETE CASCADE for user-owned data
-- Use double quotes for table/column names
CREATE TABLE "NewTable" (
id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::TEXT,
user_id TEXT NOT NULL REFERENCES "User"(id) ON DELETE CASCADE,
name TEXT NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_new_table_user ON "NewTable"(user_id);
CREATE INDEX idx_new_table_created ON "NewTable"(created_at DESC);
COMMENT ON TABLE "NewTable" IS 'Description of the table purpose';
SQLx Query Patterns
Basic Query
let user: Option<User> = sqlx::query_as(
r#"SELECT * FROM "User" WHERE id = $1"#
)
.bind(&user_id)
.fetch_optional(&state.pool)
.await?;
Insert with Returning
let user: User = sqlx::query_as(
r#"
INSERT INTO "User" (id, username, created_at, updated_at)
VALUES ($1, $2, NOW(), NOW())
RETURNING *
"#
)
.bind(&id)
.bind(&username)
.fetch_one(&state.pool)
.await?;
Transaction
let mut tx = state.pool.begin().await?;
sqlx::query(r#"UPDATE "User" SET username = $1 WHERE id = $2"#)
.bind(&username)
.bind(&user_id)
.execute(&mut *tx)
.await?;
sqlx::query(r#"INSERT INTO "UserRole" (user_id, role_id) VALUES ($1, $2)"#)
.bind(&user_id)
.bind(&role_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Dynamic Query Building
// For complex filters, build queries dynamically
let mut sql = String::from(r#"SELECT * FROM "User" WHERE 1=1"#);
let mut params: Vec<&(dyn sqlx::Encode<'_, sqlx::Postgres> + Sync)> = vec![];
if let Some(ref username) = filter.username {
sql.push_str(&format!(" AND username ILIKE ${}", params.len() + 1));
params.push(username);
}
if let Some(ref role) = filter.role {
sql.push_str(&format!(" AND id IN (SELECT user_id FROM \"UserRole\" WHERE role_id = ${})", params.len() + 1));
params.push(role);
}
sql.push_str(" ORDER BY created_at DESC LIMIT 100");
TimescaleDB (Time-Series Database)
TimescaleDB extends PostgreSQL with time-series optimizations. We use it for audit events and file metadata.
Configuration
# config/default.toml
[tsdb]
url = "postgres://user:pass@localhost:5433/heimdall_ts"
max_connections = 5
min_connections = 1
Migrations
TSDB migrations are stored separately in platform/tsdb_migrations/:
# Run TSDB migrations
just tsdb-migrate
Hypertables
Hypertables are TimescaleDB's core feature - they automatically partition data by time.
AuditEvent Hypertable
CREATE TABLE "AuditEvent" (
id UUID NOT NULL DEFAULT gen_random_uuid(),
user_id TEXT,
event_type TEXT NOT NULL,
resource_type TEXT,
resource_id TEXT,
actor_id TEXT,
ip_address TEXT,
user_agent TEXT,
description TEXT,
metadata JSONB,
status TEXT DEFAULT 'success',
error_message TEXT,
-- GeoIP fields (auto-enriched)
country_code TEXT,
country_name TEXT,
city TEXT,
region TEXT,
source_service TEXT,
is_reported BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, created_at)
);
-- Convert to hypertable with 7-day chunks
SELECT create_hypertable(
'"AuditEvent"',
by_range('created_at', INTERVAL '7 days')
);
-- Enable compression after 30 days
ALTER TABLE "AuditEvent" SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'user_id, event_type',
timescaledb.compress_orderby = 'created_at DESC, id'
);
SELECT add_compression_policy('"AuditEvent"', INTERVAL '30 days');
StorageFile Hypertable
CREATE TABLE "StorageFile" (
id UUID NOT NULL DEFAULT gen_random_uuid(),
user_id TEXT NOT NULL,
key TEXT NOT NULL,
original_filename TEXT NOT NULL,
content_type TEXT NOT NULL,
category TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
etag TEXT,
bucket TEXT NOT NULL,
metadata JSONB,
ip_address TEXT,
user_agent TEXT,
source_service TEXT DEFAULT 'api',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, created_at)
);
-- Convert to hypertable with daily chunks
SELECT create_hypertable(
'"StorageFile"',
by_range('created_at', INTERVAL '1 day')
);
-- Enable compression after 7 days
SELECT add_compression_policy('"StorageFile"', INTERVAL '7 days');
GeoIP Enrichment
Audit events are automatically enriched with location data from IP addresses:
// platform/api/src/services/audit.rs
impl AuditLogger {
pub fn with_geoip(tsdb: &PgPool, geoip: &Option<GeoIpReader>) -> Self {
// GeoIP reader loaded from MaxMind database
}
async fn enrich_with_geoip(&self, event: &mut CreateAuditEvent) {
if let Some(ref geoip) = self.geoip {
if let Some(ip) = &event.ip_address {
if let Ok(location) = geoip.lookup(ip) {
event.country_code = location.country_code;
event.country_name = location.country_name;
event.city = location.city;
event.region = location.region;
}
}
}
}
}
Querying TimescaleDB
// Always use state.tsdb for TimescaleDB tables
let events: Vec<AuditEvent> = sqlx::query_as(
r#"
SELECT * FROM "AuditEvent"
WHERE user_id = $1
AND created_at >= $2
AND created_at <= $3
ORDER BY created_at DESC
LIMIT 100
"#
)
.bind(&user_id)
.bind(&start_date)
.bind(&end_date)
.fetch_all(&state.tsdb) // <-- Use tsdb, not pool
.await?;
Time-Series Functions
TimescaleDB provides useful functions for time-series analysis:
-- Time bucket aggregation
SELECT
time_bucket('1 hour', created_at) AS bucket,
event_type,
COUNT(*) as count
FROM "AuditEvent"
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY bucket, event_type
ORDER BY bucket DESC;
-- First/last aggregates
SELECT
user_id,
first(event_type, created_at) as first_event,
last(event_type, created_at) as last_event,
COUNT(*) as total_events
FROM "AuditEvent"
WHERE created_at > NOW() - INTERVAL '7 days'
GROUP BY user_id;
Redis Cache
Redis is used for caching, rate limiting, session tracking, and pub/sub messaging.
Connection
// platform/api/src/utils/redis.rs
pub struct RedisClient {
pool: deadpool_redis::Pool,
}
impl RedisClient {
pub async fn new(config: &RedisConfig) -> Result<Self> {
// Resilient connection with auto-reconnect
}
}
Configuration
# config/default.toml
[redis]
url = "redis://localhost:6379"
pool_size = 10
Key Patterns
Session Caching (5 min TTL)
session:{session_token} -> user_id
session_id:{session_token} -> session_id (DB primary key)
User Session Tracking
user:{user_id}:sessions -> SET of active session tokens
user:{user_id}:sessions:activity -> ZSET (token -> timestamp for LRU)
Permission Caching (10 min TTL)
user:{user_id}:permissions -> JSON array of permission strings
api_key:{api_key_id}:permissions -> JSON array of permission strings
Rate Limiting
rate_limit:{endpoint}:{ip} -> request count (with sliding window TTL)
rate_limit:{endpoint}:{user_id} -> request count for authenticated users
Cache Methods
Session Methods
// Cache session (5 min TTL)
redis.cache_session(session_token, user_id).await?;
redis.get_cached_session(session_token).await?;
redis.invalidate_session(session_token).await?;
// Session ID (database ID for WebSocket targeting)
redis.cache_session_id(session_token, session_id).await?;
redis.get_cached_session_id(session_token).await?;
User Session Tracking
// Track user's active sessions
redis.add_user_session(user_id, session_token).await?;
redis.get_user_sessions(user_id).await?;
redis.remove_user_session(user_id, session_token).await?;
redis.clear_user_sessions(user_id).await?; // Logout all
// Session activity tracking (sorted by timestamp)
redis.update_session_activity(user_id, session_token).await?;
redis.get_sessions_by_activity(user_id).await?; // Most recent first
Permission Caching
// User permissions (10 min TTL)
redis.cache_user_permissions(user_id, &permissions).await?;
redis.get_cached_user_permissions(user_id).await?;
redis.invalidate_user_permissions(user_id).await?;
// API key permissions (10 min TTL)
redis.cache_api_key_permissions(api_key_id, &permissions).await?;
redis.get_cached_api_key_permissions(api_key_id).await?;
redis.invalidate_api_key_permissions(api_key_id).await?;
// Invalidate all user caches (after role change, etc.)
redis.invalidate_user_caches(user_id).await?;
Rate Limiting
// Check rate limit - returns true if within limit
let allowed = redis.check_rate_limit(
"login:192.168.1.1",
max_requests: 10,
window_secs: 60
).await?;
if !allowed {
return Err(Error::RateLimited);
}
Cache Invalidation Strategy
| Event | Invalidate |
|---|---|
| User logout | session:{token}, remove from user:{id}:sessions |
| User role change | user:{id}:permissions |
| API key update | api_key:{id}:permissions |
| User ban | All user caches |
| Password change | All user sessions |
| 2FA change | All user sessions (force re-auth) |
TTL Strategy
| Cache Type | TTL | Reason |
|---|---|---|
| Sessions | 5 minutes | Frequent DB validation for security |
| Permissions | 10 minutes | Max acceptable stale data for RBAC |
| Rate limits | Window-based | 60s for auth, 3600s for exports |
| 2FA tokens | 5 minutes | Short-lived for security |
Common Patterns
Get User with Caching
pub async fn get_user_with_permissions(
pool: &PgPool,
redis: &RedisClient,
user_id: &str,
) -> Result<(User, Vec<String>)> {
// Try cache first
if let Some(perms) = redis.get_cached_user_permissions(user_id).await? {
let user = get_user_by_id(pool, user_id).await?;
return Ok((user, perms));
}
// Query database
let user = get_user_by_id(pool, user_id).await?;
let perms = get_user_permissions(pool, user_id).await?;
// Cache for next time
redis.cache_user_permissions(user_id, &perms).await?;
Ok((user, perms))
}
Audit Event Logging
Always use AuditLogger::with_geoip() for audit events. Never write directly to the AuditEvent table.
// In REST handlers
let req_info = RequestInfo::from_request(&req);
let audit_logger = AuditLogger::with_geoip(&state.tsdb, &state.geoip)
.with_source_opt(req_info.source());
audit_logger.log_login(
&user_id,
req_info.ip(),
req_info.ua(),
Some("discord"), // OAuth provider
).await;
// In GraphQL mutations
let req_info = ctx.data::<RequestInfo>().ok();
let audit_logger = AuditLogger::with_geoip(&state.tsdb, &state.geoip)
.with_source_opt(req_info.and_then(|r| r.source()));
audit_logger.log_user_updated(
&user_id,
req_info.and_then(|r| r.ip()),
req_info.and_then(|r| r.ua()),
&["email", "username"], // Changed fields
).await;
Storage File Metadata
// After successful file upload
let source = req_info.source();
let create_file = CreateStorageFile {
user_id: &user_id,
key: &result.key,
original_filename: &filename,
content_type: &result.content_type,
category: category_str,
size_bytes: result.size_bytes as i64,
etag: result.etag.as_deref(),
bucket: &result.bucket,
metadata: None,
ip_address: req_info.ip(),
user_agent: req_info.ua(),
source_service: source.as_deref(),
};
storage_files::insert_storage_file(&state.tsdb, create_file).await?;
Health Checks
Database Health
pub async fn health_check(pool: &PgPool) -> anyhow::Result<()> {
sqlx::query("SELECT 1")
.fetch_one(pool)
.await?;
Ok(())
}
Redis Health
pub async fn health_check(&self) -> anyhow::Result<()> {
let mut conn = self.get_connection().await?;
let _: String = redis::cmd("PING")
.query_async(&mut conn)
.await?;
Ok(())
}
Combined Health Endpoint
// GET /health
pub async fn health(state: web::Data<AppState>) -> impl Responder {
let pg_ok = sqlx::query("SELECT 1")
.fetch_one(&state.pool)
.await
.is_ok();
let tsdb_ok = sqlx::query("SELECT 1")
.fetch_one(&state.tsdb)
.await
.is_ok();
let redis_ok = state.redis.health_check().await.is_ok();
if pg_ok && tsdb_ok && redis_ok {
HttpResponse::Ok().json(json!({
"status": "healthy",
"postgres": "ok",
"timescaledb": "ok",
"redis": "ok"
}))
} else {
HttpResponse::ServiceUnavailable().json(json!({
"status": "unhealthy",
"postgres": if pg_ok { "ok" } else { "error" },
"timescaledb": if tsdb_ok { "ok" } else { "error" },
"redis": if redis_ok { "ok" } else { "error" }
}))
}
}
Environment Variables
# Primary Database (PostgreSQL)
DATABASE_URL=postgres://user:pass@localhost:5432/heimdall
DATABASE_MAX_CONNECTIONS=10
DATABASE_MIN_CONNECTIONS=1
DATABASE_CONNECT_TIMEOUT=30
DATABASE_IDLE_TIMEOUT=600
DATABASE_MAX_LIFETIME=1800
# Time-Series Database (TimescaleDB)
TSDB_URL=postgres://user:pass@localhost:5433/heimdall_ts
TSDB_MAX_CONNECTIONS=5
TSDB_MIN_CONNECTIONS=1
# Redis
REDIS_URL=redis://localhost:6379
# GeoIP (MaxMind)
GEOIP_DATABASE_PATH=/path/to/GeoLite2-City.mmdb
Local Development
Docker Compose
# dev-stack/docker-compose.yml
services:
postgres:
image: postgres:16
ports:
- "5432:5432"
environment:
POSTGRES_USER: heimdall
POSTGRES_PASSWORD: heimdall
POSTGRES_DB: heimdall
timescaledb:
image: timescale/timescaledb:latest-pg16
ports:
- "5433:5432"
environment:
POSTGRES_USER: heimdall
POSTGRES_PASSWORD: heimdall
POSTGRES_DB: heimdall_ts
redis:
image: redis:7-alpine
ports:
- "6379:6379"
Just Commands
# Start development databases
just dev-stack
# Run PostgreSQL migrations
just migrate
# Run TimescaleDB migrations
just tsdb-migrate
# Connect to PostgreSQL
just db-shell
# Connect to TimescaleDB
just tsdb-shell
# Connect to Redis
just redis-shell
Performance Tips
- Use connection pooling - Always use the connection pool, never create new connections
- Index foreign keys - Always index columns used in JOINs and WHERE clauses
- Use RETURNING - Get inserted/updated data in one query instead of two
- Batch operations - Use bulk inserts for multiple records
- Cache aggressively - Cache permissions and session data in Redis
- Use appropriate TTLs - Balance freshness vs database load
- Monitor slow queries - Enable
log_min_duration_statementin PostgreSQL - Compress old data - Let TimescaleDB compress audit data automatically
Next Steps
- Auth System - Authentication architecture
- API Overview - API documentation
- Storage - File storage with S3