Data Flow
This page documents how data flows through the Nexgent trading engine, from signal creation to trade execution to real-time updates.
Signal Processing Flow
When a trading signal is created, it triggers a cascade of operations across multiple services.
βββββββββββββββββββ
β Signal Source β (API, Webhook, Manual)
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β POST /signals β Create signal in database
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Event Emitter β Emit 'signal_created' event
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
βSignal Processor β Listen for signal events
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Agent Eligibilityβ Check which agents can trade
ββββββββββ¬βββββββββ
β
βΌ (parallel for each eligible agent)
βββββββββββββββββββ
βTrading Executor β Execute purchase for agent
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Position Createdβ Emit 'position_created' event
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β WebSocket β Broadcast to connected clients
βββββββββββββββββββAgent Eligibility Check
Before executing a trade, the system validates:
- Agent Active - Agent exists and is not disabled
- Automated Trading Enabled - For the current trading mode (simulation/live)
- Wallet Available - Agent has a wallet assigned for the trading mode
- Sufficient Balance - Wallet has enough SOL for position size
- No Existing Position - No open position in this token
- Signal Not Already Processed - Idempotency check
Eligibility checks are performed against Redis cache first for speed, falling back to database only when cache misses occur.
Trade Execution Flow
The Trading Executor orchestrates the complete trade lifecycle.
Purchase Flow
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Trading Executor β
β β
β 1. Validate Trade β
β βββ Check agent config β
β βββ Verify wallet balance β
β βββ Calculate position size β
β β
β 2. Get Swap Quote (Jupiter API) β
β βββ Input: SOL amount β
β βββ Output: Token amount β
β βββ Check price impact β
β β
β 3. Execute Swap β
β βββ Simulation: Use quote values β
β βββ Live: Sign and submit transaction β
β β
β 4. Database Transaction (atomic) β
β βββ Create transaction record β
β βββ Update balances (SOL β, Token β) β
β βββ Create position record β
β β
β 5. Update Redis Cache β
β βββ Update balance cache β
β βββ Update position cache β
β β
β 6. Initialize Stop Loss β
β βββ Set initial stop loss percentage β
β β
β 7. Emit Events β
β βββ 'position_created' β WebSocket broadcast β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββSale Flow (Stop Loss / Manual)
1. Load position from cache/database
2. Validate position belongs to agent
3. Get swap quote (Token β SOL)
4. Execute swap
5. Database transaction (atomic)
βββ Create sale transaction
βββ Update balances (Token β, SOL β)
βββ Delete position
6. Queue historical swap record (async)
7. Update Redis cache
8. Emit 'position_closed' eventPrice Update Flow
The Price Update Manager continuously monitors prices and triggers automated actions.
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Price Update Manager β
β β
β Every 1.5 seconds: β
β β
β 1. Get Tracked Tokens β
β βββ All tokens with active positions β
β β
β 2. Fetch Prices (DexScreener/Jupiter) β
β βββ Batch request for all tracked tokens β
β β
β 3. For Each Price Update: β
β βββ Update local cache β
β βββ Update Redis cache β
β β β
β βββ Evaluate Stop Loss β
β β βββ Check if price dropped below stop loss level β
β β βββ If trailing stop enabled, check from peak price β
β β βββ Trigger sale if conditions met β
β β β
β βββ Evaluate Stale Trade (if stop loss didn't trigger) β
β β βββ Check position age > min hold time β
β β βββ Check profit in target range β
β β βββ Trigger sale if conditions met β
β β β
β βββ Evaluate DCA (if no sale triggered) β
β β βββ Check if price dropped to DCA level β
β β βββ Check if DCA attempts remaining β
β β βββ Trigger DCA buy if conditions met β
β β β
β βββ Collect WebSocket Updates β
β β
β 4. Batch Broadcast Price Updates β
β βββ Send batched updates per connected agent β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββStop loss evaluation happens on every price update (every 1.5s). The trailing stop loss tracks peak price and tightens as profit increases.
WebSocket Data Flow
Real-time updates flow from backend to frontend via WebSocket.
Connection Flow
Frontend Backend
β β
β Connect with JWT + agentId β
β βββββββββββββββββββββββββββββββββββββΊ β
β β Verify JWT
β β Validate agent ownership
β β
β 'connected' message β
β βββββββββββββββββββββββββββββββββββββ β
β β
β 'initial_data' (positions) β
β βββββββββββββββββββββββββββββββββββββ β
β β
β Periodic 'ping' β
β βββββββββββββββββββββββββββββββββββββ β
β 'pong' response β
β βββββββββββββββββββββββββββββββββββββΊ β
β βMessage Types
| Message Type | Direction | Payload |
|---|---|---|
connected | Server β Client | Agent ID, timestamp |
initial_data | Server β Client | Array of enriched positions |
position_update | Server β Client | Position created/updated/closed |
price_update_batch | Server β Client | Array of token prices |
ping | Server β Client | Timestamp |
pong | Client β Server | Timestamp |
error | Server β Client | Error message |
Frontend State Updates
WebSocket Message
β
βΌ
βββββββββββββββββββ
β useWebSocket β React hook
β Hook β
ββββββββββ¬βββββββββ
β
ββββ initial_data βββββββΊ Set positions state
β
ββββ position_update ββββΊ Update/add/remove position
β Invalidate React Query cache
β
ββββ price_update_batch ββΊ Batch queue updates
Apply via requestAnimationFrameCaching Strategy
Write-Through Pattern
All writes go through both database and cache atomically.
Write Request
β
βΌ
βββββββββββββββββββ
β DB Transaction β
β (Prisma) β
ββββββββββ¬βββββββββ
β commit
βΌ
βββββββββββββββββββ
β Redis Update β
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β WebSocket Event β
βββββββββββββββββββCache Key Patterns
| Key Pattern | Data | TTL |
|---|---|---|
balance:{agentId}:{wallet}:{token} | Balance record | None (write-through) |
position:{agentId}:{wallet}:{token} | Position record | None (write-through) |
price:{tokenAddress} | Price data | 60s |
agent:{agentId} | Agent config | 300s |
idempotency:{key} | Deduplication | 60s |
Database Transactions
Critical operations use atomic database transactions.
Purchase Transaction
await prisma.$transaction(async (tx) => {
// 1. Create transaction record
await tx.agentTransaction.create({ ... });
// 2. Update balances
await tx.agentBalance.upsert({ ... }); // SOL decrease
await tx.agentBalance.upsert({ ... }); // Token increase
// 3. Create position
await tx.agentPosition.create({ ... });
});
// After commit: Update Redis, emit eventsSale Transaction
await prisma.$transaction(async (tx) => {
// 1. Create sale transaction
await tx.agentTransaction.create({ ... });
// 2. Update balances
await tx.agentBalance.update({ ... }); // Token decrease
await tx.agentBalance.update({ ... }); // SOL increase
// 3. Delete position
await tx.agentPosition.delete({ ... });
});
// After commit: Queue historical swap, update Redis, emit eventsEvent Flow Summary
| Event | Emitter | Listeners |
|---|---|---|
signal_created | Signal API | Signal Processor |
position_created | Trading Executor | WebSocket Server, Price Update Manager |
position_updated | Position Service | WebSocket Server |
position_closed | Trading Executor | WebSocket Server, Price Update Manager |