Redis Patterns
Nexgent uses Redis for caching, distributed locking, idempotency, and token blacklisting. This page details the patterns and implementation strategies.
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ Application │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Balance │ │ Position │ │ Price │ │
│ │ Service │ │ Service │ │ Service │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Redis Services │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │RedisBalance │ │RedisPosition │ │ RedisPrice │ │ │
│ │ │Service │ │Service │ │ Service │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │IdempotencyService │ │ RedisToken │ │ RedisAgent│ │ │
│ │ │ │ │ Service │ │ Service │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Redis Client │ │
│ │ (ioredis) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────┐
│ Redis │
│ (6379) │
└───────────────────┘Key Naming Convention
All Redis keys follow a consistent pattern: {entity}:{identifier}:{subkey}
// src/shared/constants/redis-keys.ts
export const REDIS_KEYS = {
// Balance caching
BALANCE: (agentId: string, walletAddress: string, tokenAddress: string) =>
`balance:${agentId}:${walletAddress}:${tokenAddress}`,
// Position caching
POSITION: (agentId: string, walletAddress: string, tokenAddress: string) =>
`position:${agentId}:${walletAddress}:${tokenAddress}`,
// Price caching (short TTL)
PRICE: (tokenAddress: string) =>
`price:${tokenAddress.toLowerCase()}`,
// Agent state
AGENT: (agentId: string) =>
`agent:${agentId}`,
// Trading config
CONFIG: (agentId: string) =>
`config:${agentId}`,
// Distributed locks
LOCK: (resource: string) =>
`lock:${resource}`,
// Idempotency keys
IDEMPOTENCY: (key: string) =>
`idempotency:${key}`,
// Token blacklist (auth)
TOKEN_BLACKLIST_ACCESS: (jti: string) =>
`token:blacklist:access:${jti}`,
TOKEN_BLACKLIST_REFRESH: (jti: string) =>
`token:blacklist:refresh:${jti}`,
};TTL Configuration
export const REDIS_TTL = {
PRICE: 60, // 60 seconds - prices change frequently
AGENT: 300, // 5 minutes - agent data is stable
CONFIG: 300, // 5 minutes - config rarely changes
LOCK: 5, // 5 seconds - short lock for operations
IDEMPOTENCY: 60, // 60 seconds - prevent duplicate operations
// Balances/positions have no TTL (write-through pattern)
};Write-Through Caching
Critical data uses write-through caching where database is written first, then cache is updated.
Pattern
┌────────────────┐
│ Write Request │
└───────┬────────┘
│
▼
┌────────────────┐
│ Database Write │ ← Source of truth
│ (Prisma) │
└───────┬────────┘
│ commit
▼
┌────────────────┐
│ Cache Update │ ← Updated after DB commit
│ (Redis) │
└───────┬────────┘
│
▼
┌────────────────┐
│ WebSocket Event│ ← Notify clients
└────────────────┘Implementation
// Balance Service - Write-through pattern
async upsertBalance(
walletAddress: string,
agentId: string,
tokenAddress: string,
tokenSymbol: string,
delta: Decimal,
tx?: Prisma.TransactionClient
): Promise<Decimal> {
// 1. Database write (source of truth)
const dbBalance = await this.balanceRepo.update(
existingBalance.id,
{ balance: newBalance.toString() },
tx
);
// 2. Cache update (only if not in transaction)
// When inside a transaction, caller updates cache after commit
if (!tx) {
await redisBalanceService.setBalance({
id: dbBalance.id,
agentId: dbBalance.agentId,
walletAddress: dbBalance.walletAddress,
tokenAddress: dbBalance.tokenAddress,
tokenSymbol: dbBalance.tokenSymbol,
balance: dbBalance.balance,
lastUpdated: dbBalance.lastUpdated,
});
}
return newBalance;
}When inside a database transaction, Redis updates are deferred until after the transaction commits. This prevents cache inconsistency if the transaction rolls back.
Read Pattern
async getBalance(agentId: string, walletAddress: string, tokenAddress: string): Promise<Decimal> {
// 1. Check cache first
const cached = await redisBalanceService.getBalance(agentId, walletAddress, tokenAddress);
if (cached) {
return new Decimal(cached.balance);
}
// 2. Cache miss - read from database
const dbBalance = await this.balanceRepo.findByWalletAndToken(walletAddress, tokenAddress);
// 3. Populate cache for next read
if (dbBalance) {
await redisBalanceService.setBalance(dbBalance);
return new Decimal(dbBalance.balance);
}
return new Decimal(0);
}Distributed Locking
Redis provides distributed locks to prevent concurrent operations on the same resource.
Lock Implementation
// Acquire lock with automatic expiry
async acquireLock(key: string, ttlSeconds: number): Promise<string | null> {
const lockToken = randomUUID();
// SET key value EX ttl NX - only sets if key doesn't exist
const result = await this.client.set(key, lockToken, 'EX', ttlSeconds, 'NX');
return result === 'OK' ? lockToken : null;
}
// Release lock (only if we own it) - Lua script for atomicity
async releaseLock(key: string, lockToken: string): Promise<void> {
const luaScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await this.client.eval(luaScript, 1, key, lockToken);
}
// Extend lock if operation takes longer
async extendLock(key: string, lockToken: string, additionalTtlSeconds: number): Promise<boolean> {
const luaScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
`;
const result = await this.client.eval(luaScript, 1, key, lockToken, additionalTtlSeconds);
return result === 1;
}Usage: Stop Loss Evaluation
async evaluateStopLoss(position, currentPrice, config) {
const lockKey = REDIS_KEYS.LOCK(`stop-loss:${position.id}`);
const lockToken = await redisService.acquireLock(lockKey, REDIS_TTL.LOCK);
if (!lockToken) {
// Another evaluation in progress, skip to prevent duplicate sales
return { shouldTrigger: false, updated: false };
}
try {
// Safe to evaluate - we have exclusive access
const shouldTrigger = currentPrice <= stopLossPrice;
if (shouldTrigger) {
await this.triggerStopLoss(position);
}
return { shouldTrigger, updated: true };
} finally {
// Always release lock
await redisService.releaseLock(lockKey, lockToken);
}
}Idempotency Keys
Prevents duplicate operations like double-executing a trade.
Implementation
class IdempotencyService {
// Check if operation can proceed (returns true if first time)
async checkAndSet(key: string, ttlSeconds: number = 60): Promise<boolean> {
const fullKey = REDIS_KEYS.IDEMPOTENCY(key);
// SET NX only succeeds if key doesn't exist
const result = await redisService.getClient().set(
fullKey, '1', 'EX', ttlSeconds, 'NX'
);
return result === 'OK';
}
// Clear key (allow retry after failure)
async clear(key: string): Promise<void> {
await redisService.del(REDIS_KEYS.IDEMPOTENCY(key));
}
}Usage: Preventing Duplicate Sales
async executeSale(request: SaleRequest) {
// Create unique key for this sale operation
const saleKey = `sale:${request.positionId}`;
const canProceed = await idempotencyService.checkAndSet(saleKey, 60);
if (!canProceed) {
throw new TradingExecutorError(
'DUPLICATE_OPERATION',
'Position is already being sold'
);
}
try {
// Execute sale...
return result;
} catch (error) {
// Clear key on failure to allow retry
await idempotencyService.clear(saleKey);
throw error;
}
// Key auto-expires after 60 seconds
}Price Caching
Token prices are cached with short TTL since they change frequently.
Multi-Get with Pipeline
async getMultiplePrices(tokenAddresses: string[]): Promise<Map<string, CachedPrice | null>> {
const pipeline = redisService.getClient().pipeline();
const normalizedAddresses = tokenAddresses.map(a => a.toLowerCase());
// Batch all GET commands into one round-trip
for (const addr of normalizedAddresses) {
pipeline.get(REDIS_KEYS.PRICE(addr));
}
// Execute pipeline
const results = await pipeline.exec();
// Process results
const priceMap = new Map<string, CachedPrice | null>();
for (let i = 0; i < normalizedAddresses.length; i++) {
const [error, data] = results![i];
if (!error && data) {
priceMap.set(normalizedAddresses[i], JSON.parse(data as string));
} else {
priceMap.set(normalizedAddresses[i], null);
}
}
return priceMap;
}Local + Redis Cache
The Price Update Manager uses a two-tier cache:
class PriceUpdateManager {
// Tier 1: Local in-memory cache (sub-millisecond)
private priceCache: Map<string, CachedPrice> = new Map();
// Tier 2: Redis cache (millisecond)
async processPriceUpdate(price: TokenPrice): Promise<boolean> {
const normalizedAddress = price.tokenAddress.toLowerCase();
// Check local cache first
const cached = this.priceCache.get(normalizedAddress);
if (cached && cached.priceSol === price.priceSol) {
return false; // No change, skip broadcast
}
// Update local cache
this.priceCache.set(normalizedAddress, {
priceSol: price.priceSol,
priceUsd: price.priceUsd,
timestamp: new Date(),
});
// Update Redis cache (async, non-blocking)
redisPriceService.setPrice(normalizedAddress, {
priceSol: price.priceSol,
priceUsd: price.priceUsd,
lastUpdated: new Date(),
}).catch(err => logger.error('Redis price cache failed', err));
return true; // Price changed, broadcast
}
}Token Blacklisting (Auth)
JWT tokens are blacklisted by storing their jti (JWT ID) until expiry.
class RedisTokenService {
async blacklistAccessToken(jti: string, expiresAt: number): Promise<void> {
const ttl = expiresAt - Math.floor(Date.now() / 1000);
if (ttl > 0) {
await redisService.set(
REDIS_KEYS.TOKEN_BLACKLIST_ACCESS(jti),
'1',
ttl
);
}
}
async isAccessTokenBlacklisted(jti: string): Promise<boolean> {
return await redisService.exists(REDIS_KEYS.TOKEN_BLACKLIST_ACCESS(jti));
}
}Blacklisted tokens use TTL matching their natural expiration. Once the token would expire anyway, the blacklist entry is automatically cleaned up.
Cache Warming
On startup, critical data is preloaded into Redis:
class CacheWarmer {
async warmup(): Promise<void> {
logger.info('🔥 Warming up cache...');
const agents = await prisma.agent.findMany({ include: { wallets: true } });
for (const agent of agents) {
// Cache agent
await redisAgentService.setAgent(agent);
// Cache config with defaults merged
const config = this.mergeWithDefaults(agent.tradingConfig);
await redisConfigService.setAgentConfig(agent.id, config);
// Cache balances and positions for each wallet
for (const wallet of agent.wallets) {
const balances = await prisma.agentBalance.findMany({
where: { walletAddress: wallet.walletAddress },
});
for (const balance of balances) {
await redisBalanceService.setBalance(balance);
}
const positions = await prisma.agentPosition.findMany({
where: { agentId: agent.id, walletAddress: wallet.walletAddress },
});
for (const position of positions) {
await redisPositionService.setPosition(position);
}
}
}
logger.info('✅ Cache warmup complete');
}
}Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0export const redisConfig = {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
password: process.env.REDIS_PASSWORD || undefined,
db: parseInt(process.env.REDIS_DB || '0', 10),
keyPrefix: 'nexgent:', // All keys prefixed
maxRetries: 3,
};