OS Trading Engine
Technical Documentation
Caching & Performance
Performance Optimization

Performance Optimization

Nexgent is designed for ultra-low latency trading operations. This page covers the key optimization patterns used throughout the system.

Performance Targets

OperationTargetActual
Signal processing (end-to-end)< 500ms~300-450ms
Stop loss evaluation< 1ms~0.5ms
Price update broadcast< 10ms~5ms
WebSocket message latency< 20ms~10ms

Signal Processing Pipeline

Signals flow through an optimized pipeline designed for parallel execution.

Architecture

┌─────────────┐
│   Signal    │
│   Created   │
└──────┬──────┘


┌──────────────────┐
│ Get Eligible     │  ← Single query, cached config
│ Agents           │
└────────┬─────────┘


┌──────────────────────────────────────────────┐
│           Parallel Execution                  │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐      │
│  │ Agent 1 │  │ Agent 2 │  │ Agent N │      │
│  │ Execute │  │ Execute │  │ Execute │      │
│  └─────────┘  └─────────┘  └─────────┘      │
└──────────────────────────────────────────────┘


┌──────────────────┐
│ Record Metrics   │
└──────────────────┘

Parallel Agent Execution

async processSignal(signal: TradingSignal): Promise<void> {
  const startTime = Date.now();
  
  // 1. Get eligible agents (fast - uses cached configs)
  const eligibleAgentIds = await agentEligibilityService.getEligibleAgents(signal);
  
  if (eligibleAgentIds.length === 0) {
    signalProcessingCount.inc({ status: 'skipped' });
    return;
  }
  
  // 2. Execute for ALL agents in parallel
  // Promise.all ensures maximum concurrency
  await Promise.all(eligibleAgentIds.map(async (agentId) => {
    await this.executeForAgent(agentId, signal);
  }));
  
  // 3. Record metrics
  const duration = Date.now() - startTime;
  signalProcessingLatency.observe({ status: 'success' }, duration / 1000);
}

Parallel execution means a signal with 10 eligible agents takes roughly the same time as a signal with 1 agent (limited by the slowest execution, not sum of all).


Price Update Loop

The price update manager polls prices every 1.5 seconds and evaluates stop loss for all positions.

Optimizations

  1. Batch API calls - Fetch all token prices in one request
  2. Local + Redis cache - Two-tier caching for sub-ms reads
  3. Concurrent stop loss evaluation - Evaluate all positions in parallel
  4. Batched WebSocket broadcasts - Group updates per agent
class PriceUpdateManager {
  private readonly POLL_INTERVAL = 1500; // 1.5 seconds
  private isPolling = false;  // Guard against overlapping polls
  
  private async pollPrices(): Promise<void> {
    // Guard: Skip if previous poll still running
    if (this.isPolling) {
      return;
    }
    
    this.isPolling = true;
    
    try {
      // Get all tokens to poll (from tracked positions)
      const tokenAddresses = this.getTrackedTokenAddresses();
      
      // Batch fetch all prices in one API call
      const prices = await priceFeedService.getMultipleTokenPrices(tokenAddresses);
      
      // Collect updates per agent for batched broadcast
      const agentUpdates = new Map<string, PriceUpdate[]>();
      
      // Process each price (updates cache, evaluates stop loss)
      for (const price of prices) {
        await this.processPriceUpdate(price, agentUpdates);
      }
      
      // Batch broadcast to each agent (one message per agent)
      for (const [agentId, updates] of agentUpdates) {
        this.wsServer.broadcastPriceUpdates(agentId, updates);
      }
      
    } finally {
      this.isPolling = false;
    }
  }
}

Stop Loss Evaluation

Stop loss evaluation is optimized for sub-millisecond performance:

async evaluateStopLoss(position, currentPrice, config): Promise<StopLossResult> {
  // Fast path: Check if price is above stop loss threshold
  // Most evaluations exit here (no database/Redis needed)
  const priceChangePercent = ((currentPrice - position.purchasePrice) / position.purchasePrice) * 100;
  
  // Calculate stop loss using shared pure function
  const stopLossPercent = calculateStopLossPercentage(priceChangePercent, config.stopLoss);
  const stopLossPrice = position.peakPrice * (stopLossPercent / 100);
  
  // Quick check - most common case
  if (currentPrice > stopLossPrice) {
    // Update peak price if new high
    if (currentPrice > position.peakPrice) {
      await this.updatePeakPrice(position, currentPrice);
    }
    return { shouldTrigger: false };
  }
  
  // Acquire lock only when stop loss might trigger
  const lockToken = await redisService.acquireLock(`stop-loss:${position.id}`, 5);
  if (!lockToken) {
    return { shouldTrigger: false }; // Another evaluation in progress
  }
  
  try {
    // Double-check with fresh data (position might have been sold)
    return { shouldTrigger: true, stopLossPrice };
  } finally {
    await redisService.releaseLock(`stop-loss:${position.id}`, lockToken);
  }
}

WebSocket Optimization

Price Update Batching

Instead of sending individual price updates, updates are batched per agent:

// Backend: Collect updates during price poll
const agentUpdates = new Map<string, PriceUpdate[]>();
 
for (const price of prices) {
  for (const agentId of getAgentsWithPosition(price.tokenAddress)) {
    if (!agentUpdates.has(agentId)) {
      agentUpdates.set(agentId, []);
    }
    agentUpdates.get(agentId).push({
      tokenAddress: price.tokenAddress,
      price: price.priceSol,
      priceUsd: price.priceUsd,
    });
  }
}
 
// Send one message per agent with all updates
for (const [agentId, updates] of agentUpdates) {
  wsServer.broadcastPriceUpdates(agentId, updates);
}

Frontend: requestAnimationFrame Batching

// Frontend: Batch price updates into animation frames
const pendingUpdates = useRef<Map<string, PriceUpdate>>(new Map());
const updateTimeout = useRef<number | null>(null);
 
const queuePriceUpdate = (tokenAddress: string, price: number, priceUsd: number) => {
  pendingUpdates.current.set(tokenAddress, { price, priceUsd });
  
  if (updateTimeout.current) return;
  
  // Batch apply with requestAnimationFrame
  updateTimeout.current = requestAnimationFrame(() => {
    setPositions(prev => {
      const updated = [...prev];
      for (const [addr, update] of pendingUpdates.current) {
        const idx = updated.findIndex(p => p.tokenAddress === addr);
        if (idx !== -1) {
          updated[idx] = { ...updated[idx], currentPrice: update.price };
        }
      }
      return updated;
    });
    pendingUpdates.current.clear();
    updateTimeout.current = null;
  });
};

Connection Keepalive

// Server-side ping every 30 seconds
const PING_INTERVAL = 30000;
 
setInterval(() => {
  for (const [clientId, ws] of clients) {
    if (ws.readyState === WebSocket.OPEN) {
      ws.ping();
    }
  }
}, PING_INTERVAL);
 
// Client heartbeat tracking
ws.on('pong', () => {
  clientState.lastPong = Date.now();
});

Database Query Optimization

Indexed Queries

Frequently queried columns have indexes:

model AgentPosition {
  @@index([agentId])
  @@index([tokenAddress])
  @@index([agentId, walletAddress])
  @@index([purchaseTransactionId])
}
 
model AgentTransaction {
  @@index([agentId])
  @@index([agentId, transactionTime])
  @@index([agentId, transactionType])
}

Efficient Queries

// Good: Select only needed columns
const positions = await prisma.agentPosition.findMany({
  where: { tokenAddress: tokenAddress.toLowerCase() },
  select: {
    id: true,
    agentId: true,
    purchasePrice: true,
    peakPrice: true,
    currentStopLossPercentage: true,
  },
});
 
// Bad: Select all columns
const positions = await prisma.agentPosition.findMany({
  where: { tokenAddress },
});

Batch Operations

// Good: Batch insert with createMany
await prisma.agentTransaction.createMany({
  data: transactions,
  skipDuplicates: true,
});
 
// Good: Batch update with pipeline
const pipeline = redis.pipeline();
for (const balance of balances) {
  pipeline.set(getBalanceKey(balance), JSON.stringify(balance));
}
await pipeline.exec();

Async Job Processing

Non-critical operations are offloaded to background queues.

BullMQ Worker

// Heavy operations run in background
const worker = new Worker('nexgent-queue', async (job) => {
  switch (job.name) {
    case 'record_historical_swap':
      // Database write for trade history
      await prisma.agentHistoricalSwap.create({ data: job.data });
      break;
      
    case 'snapshot_balance':
      // Periodic balance snapshot
      await balanceSnapshotService.createSnapshot(job.data.agentId);
      break;
  }
}, { connection: redisConnection });

Trade Execution Flow

┌─────────────────┐
│ Execute Trade   │ ← Critical path (fast)
└────────┬────────┘

         ├─── Update balance (sync, cached)

         ├─── Create position (sync, cached)

         ├─── Emit WebSocket event (async)

         └─── Queue historical swap write (async)


      ┌────────────────┐
      │ BullMQ Worker  │ ← Background (non-blocking)
      │ writes to DB   │
      └────────────────┘

Memory Optimization

Token Tracking

Only track tokens with active positions:

class PriceUpdateManager {
  private trackedTokens: Map<string, TokenTracking> = new Map();
  
  // Add token when position opens
  async addTokenTracking(tokenAddress: string, agentId: string) {
    const normalized = tokenAddress.toLowerCase();
    const tracking = this.trackedTokens.get(normalized);
    if (tracking) {
      tracking.agents.add(agentId);
    } else {
      this.trackedTokens.set(normalized, {
        tokenAddress: normalized,
        agents: new Set([agentId]),
        lastUpdate: null,
      });
    }
  }
  
  // Remove token when all positions close
  async removeTokenTracking(tokenAddress: string, agentId: string) {
    const normalized = tokenAddress.toLowerCase();
    const tracking = this.trackedTokens.get(normalized);
    if (!tracking) return;
    
    tracking.agents.delete(agentId);
    if (tracking.agents.size === 0) {
      this.trackedTokens.delete(normalized);
      this.priceCache.delete(normalized);  // Clean up cache too
    }
  }
}

Periodic Cleanup

// Refresh tracked tokens periodically (every ~50 seconds)
if (Math.random() < 0.2) {  // 20% chance each poll
  await this.refreshTrackedTokens();
}

Metrics Collection

Prometheus metrics track performance:

// Signal processing latency histogram
const signalProcessingLatency = new Histogram({
  name: 'signal_processing_latency_seconds',
  help: 'Signal processing latency in seconds',
  labelNames: ['status'],
  buckets: [0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
});
 
// Stop loss evaluation latency
const stopLossEvaluationLatency = new Histogram({
  name: 'stop_loss_evaluation_latency_seconds',
  help: 'Stop loss evaluation latency in seconds',
  labelNames: ['agent_id', 'token_address'],
  buckets: [0.0001, 0.0005, 0.001, 0.005, 0.01],
});
 
// Price update latency
const priceUpdateLatency = new Histogram({
  name: 'price_update_latency_seconds',
  help: 'Price update latency in seconds',
  labelNames: ['source'],
  buckets: [0.1, 0.25, 0.5, 1, 2.5, 5],
});

Best Practices Summary

TechniqueBenefit
Parallel executionProcess multiple agents simultaneously
Two-tier cachingSub-ms reads with local + Redis
Write-through cacheConsistency without cache invalidation bugs
Distributed locksPrevent duplicate operations
Batched broadcastsReduce WebSocket message count
requestAnimationFrameSmooth UI updates
Background queuesKeep critical path fast
Selective column queriesReduce data transfer
Index optimizationFast database lookups