Performance Optimization
Nexgent is designed for ultra-low latency trading operations. This page covers the key optimization patterns used throughout the system.
Performance Targets
| Operation | Target | Actual |
|---|---|---|
| Signal processing (end-to-end) | < 500ms | ~300-450ms |
| Stop loss evaluation | < 1ms | ~0.5ms |
| Price update broadcast | < 10ms | ~5ms |
| WebSocket message latency | < 20ms | ~10ms |
Signal Processing Pipeline
Signals flow through an optimized pipeline designed for parallel execution.
Architecture
┌─────────────┐
│ Signal │
│ Created │
└──────┬──────┘
│
▼
┌──────────────────┐
│ Get Eligible │ ← Single query, cached config
│ Agents │
└────────┬─────────┘
│
▼
┌──────────────────────────────────────────────┐
│ Parallel Execution │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Agent 1 │ │ Agent 2 │ │ Agent N │ │
│ │ Execute │ │ Execute │ │ Execute │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└──────────────────────────────────────────────┘
│
▼
┌──────────────────┐
│ Record Metrics │
└──────────────────┘Parallel Agent Execution
async processSignal(signal: TradingSignal): Promise<void> {
const startTime = Date.now();
// 1. Get eligible agents (fast - uses cached configs)
const eligibleAgentIds = await agentEligibilityService.getEligibleAgents(signal);
if (eligibleAgentIds.length === 0) {
signalProcessingCount.inc({ status: 'skipped' });
return;
}
// 2. Execute for ALL agents in parallel
// Promise.all ensures maximum concurrency
await Promise.all(eligibleAgentIds.map(async (agentId) => {
await this.executeForAgent(agentId, signal);
}));
// 3. Record metrics
const duration = Date.now() - startTime;
signalProcessingLatency.observe({ status: 'success' }, duration / 1000);
}Parallel execution means a signal with 10 eligible agents takes roughly the same time as a signal with 1 agent (limited by the slowest execution, not sum of all).
Price Update Loop
The price update manager polls prices every 1.5 seconds and evaluates stop loss for all positions.
Optimizations
- Batch API calls - Fetch all token prices in one request
- Local + Redis cache - Two-tier caching for sub-ms reads
- Concurrent stop loss evaluation - Evaluate all positions in parallel
- Batched WebSocket broadcasts - Group updates per agent
class PriceUpdateManager {
private readonly POLL_INTERVAL = 1500; // 1.5 seconds
private isPolling = false; // Guard against overlapping polls
private async pollPrices(): Promise<void> {
// Guard: Skip if previous poll still running
if (this.isPolling) {
return;
}
this.isPolling = true;
try {
// Get all tokens to poll (from tracked positions)
const tokenAddresses = this.getTrackedTokenAddresses();
// Batch fetch all prices in one API call
const prices = await priceFeedService.getMultipleTokenPrices(tokenAddresses);
// Collect updates per agent for batched broadcast
const agentUpdates = new Map<string, PriceUpdate[]>();
// Process each price (updates cache, evaluates stop loss)
for (const price of prices) {
await this.processPriceUpdate(price, agentUpdates);
}
// Batch broadcast to each agent (one message per agent)
for (const [agentId, updates] of agentUpdates) {
this.wsServer.broadcastPriceUpdates(agentId, updates);
}
} finally {
this.isPolling = false;
}
}
}Stop Loss Evaluation
Stop loss evaluation is optimized for sub-millisecond performance:
async evaluateStopLoss(position, currentPrice, config): Promise<StopLossResult> {
// Fast path: Check if price is above stop loss threshold
// Most evaluations exit here (no database/Redis needed)
const priceChangePercent = ((currentPrice - position.purchasePrice) / position.purchasePrice) * 100;
// Calculate stop loss using shared pure function
const stopLossPercent = calculateStopLossPercentage(priceChangePercent, config.stopLoss);
const stopLossPrice = position.peakPrice * (stopLossPercent / 100);
// Quick check - most common case
if (currentPrice > stopLossPrice) {
// Update peak price if new high
if (currentPrice > position.peakPrice) {
await this.updatePeakPrice(position, currentPrice);
}
return { shouldTrigger: false };
}
// Acquire lock only when stop loss might trigger
const lockToken = await redisService.acquireLock(`stop-loss:${position.id}`, 5);
if (!lockToken) {
return { shouldTrigger: false }; // Another evaluation in progress
}
try {
// Double-check with fresh data (position might have been sold)
return { shouldTrigger: true, stopLossPrice };
} finally {
await redisService.releaseLock(`stop-loss:${position.id}`, lockToken);
}
}WebSocket Optimization
Price Update Batching
Instead of sending individual price updates, updates are batched per agent:
// Backend: Collect updates during price poll
const agentUpdates = new Map<string, PriceUpdate[]>();
for (const price of prices) {
for (const agentId of getAgentsWithPosition(price.tokenAddress)) {
if (!agentUpdates.has(agentId)) {
agentUpdates.set(agentId, []);
}
agentUpdates.get(agentId).push({
tokenAddress: price.tokenAddress,
price: price.priceSol,
priceUsd: price.priceUsd,
});
}
}
// Send one message per agent with all updates
for (const [agentId, updates] of agentUpdates) {
wsServer.broadcastPriceUpdates(agentId, updates);
}Frontend: requestAnimationFrame Batching
// Frontend: Batch price updates into animation frames
const pendingUpdates = useRef<Map<string, PriceUpdate>>(new Map());
const updateTimeout = useRef<number | null>(null);
const queuePriceUpdate = (tokenAddress: string, price: number, priceUsd: number) => {
pendingUpdates.current.set(tokenAddress, { price, priceUsd });
if (updateTimeout.current) return;
// Batch apply with requestAnimationFrame
updateTimeout.current = requestAnimationFrame(() => {
setPositions(prev => {
const updated = [...prev];
for (const [addr, update] of pendingUpdates.current) {
const idx = updated.findIndex(p => p.tokenAddress === addr);
if (idx !== -1) {
updated[idx] = { ...updated[idx], currentPrice: update.price };
}
}
return updated;
});
pendingUpdates.current.clear();
updateTimeout.current = null;
});
};Connection Keepalive
// Server-side ping every 30 seconds
const PING_INTERVAL = 30000;
setInterval(() => {
for (const [clientId, ws] of clients) {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
}
}
}, PING_INTERVAL);
// Client heartbeat tracking
ws.on('pong', () => {
clientState.lastPong = Date.now();
});Database Query Optimization
Indexed Queries
Frequently queried columns have indexes:
model AgentPosition {
@@index([agentId])
@@index([tokenAddress])
@@index([agentId, walletAddress])
@@index([purchaseTransactionId])
}
model AgentTransaction {
@@index([agentId])
@@index([agentId, transactionTime])
@@index([agentId, transactionType])
}Efficient Queries
// Good: Select only needed columns
const positions = await prisma.agentPosition.findMany({
where: { tokenAddress: tokenAddress.toLowerCase() },
select: {
id: true,
agentId: true,
purchasePrice: true,
peakPrice: true,
currentStopLossPercentage: true,
},
});
// Bad: Select all columns
const positions = await prisma.agentPosition.findMany({
where: { tokenAddress },
});Batch Operations
// Good: Batch insert with createMany
await prisma.agentTransaction.createMany({
data: transactions,
skipDuplicates: true,
});
// Good: Batch update with pipeline
const pipeline = redis.pipeline();
for (const balance of balances) {
pipeline.set(getBalanceKey(balance), JSON.stringify(balance));
}
await pipeline.exec();Async Job Processing
Non-critical operations are offloaded to background queues.
BullMQ Worker
// Heavy operations run in background
const worker = new Worker('nexgent-queue', async (job) => {
switch (job.name) {
case 'record_historical_swap':
// Database write for trade history
await prisma.agentHistoricalSwap.create({ data: job.data });
break;
case 'snapshot_balance':
// Periodic balance snapshot
await balanceSnapshotService.createSnapshot(job.data.agentId);
break;
}
}, { connection: redisConnection });Trade Execution Flow
┌─────────────────┐
│ Execute Trade │ ← Critical path (fast)
└────────┬────────┘
│
├─── Update balance (sync, cached)
│
├─── Create position (sync, cached)
│
├─── Emit WebSocket event (async)
│
└─── Queue historical swap write (async)
│
▼
┌────────────────┐
│ BullMQ Worker │ ← Background (non-blocking)
│ writes to DB │
└────────────────┘Memory Optimization
Token Tracking
Only track tokens with active positions:
class PriceUpdateManager {
private trackedTokens: Map<string, TokenTracking> = new Map();
// Add token when position opens
async addTokenTracking(tokenAddress: string, agentId: string) {
const normalized = tokenAddress.toLowerCase();
const tracking = this.trackedTokens.get(normalized);
if (tracking) {
tracking.agents.add(agentId);
} else {
this.trackedTokens.set(normalized, {
tokenAddress: normalized,
agents: new Set([agentId]),
lastUpdate: null,
});
}
}
// Remove token when all positions close
async removeTokenTracking(tokenAddress: string, agentId: string) {
const normalized = tokenAddress.toLowerCase();
const tracking = this.trackedTokens.get(normalized);
if (!tracking) return;
tracking.agents.delete(agentId);
if (tracking.agents.size === 0) {
this.trackedTokens.delete(normalized);
this.priceCache.delete(normalized); // Clean up cache too
}
}
}Periodic Cleanup
// Refresh tracked tokens periodically (every ~50 seconds)
if (Math.random() < 0.2) { // 20% chance each poll
await this.refreshTrackedTokens();
}Metrics Collection
Prometheus metrics track performance:
// Signal processing latency histogram
const signalProcessingLatency = new Histogram({
name: 'signal_processing_latency_seconds',
help: 'Signal processing latency in seconds',
labelNames: ['status'],
buckets: [0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
});
// Stop loss evaluation latency
const stopLossEvaluationLatency = new Histogram({
name: 'stop_loss_evaluation_latency_seconds',
help: 'Stop loss evaluation latency in seconds',
labelNames: ['agent_id', 'token_address'],
buckets: [0.0001, 0.0005, 0.001, 0.005, 0.01],
});
// Price update latency
const priceUpdateLatency = new Histogram({
name: 'price_update_latency_seconds',
help: 'Price update latency in seconds',
labelNames: ['source'],
buckets: [0.1, 0.25, 0.5, 1, 2.5, 5],
});Best Practices Summary
| Technique | Benefit |
|---|---|
| Parallel execution | Process multiple agents simultaneously |
| Two-tier caching | Sub-ms reads with local + Redis |
| Write-through cache | Consistency without cache invalidation bugs |
| Distributed locks | Prevent duplicate operations |
| Batched broadcasts | Reduce WebSocket message count |
| requestAnimationFrame | Smooth UI updates |
| Background queues | Keep critical path fast |
| Selective column queries | Reduce data transfer |
| Index optimization | Fast database lookups |