Data Flow
This page documents how data flows through the Nexgent trading engine, from signal creation to trade execution to real-time updates.
Signal Processing Flow
When a trading signal is created, it triggers a cascade of operations across multiple services.
┌─────────────────┐
│ Signal Source │ (API, Webhook, Manual)
└────────┬────────┘
│
▼
┌─────────────────┐
│ POST /signals │ Create signal in database
└────────┬────────┘
│
▼
┌─────────────────┐
│ Event Emitter │ Emit 'signal_created' event
└────────┬────────┘
│
▼
┌─────────────────┐
│Signal Processor │ Listen for signal events
└────────┬────────┘
│
▼
┌─────────────────┐
│ Agent Eligibility│ Check which agents can trade
└────────┬────────┘
│
▼ (parallel for each eligible agent)
┌─────────────────┐
│Trading Executor │ Execute purchase for agent
└────────┬────────┘
│
▼
┌─────────────────┐
│ Position Created│ Emit 'position_created' event
└────────┬────────┘
│
▼
┌─────────────────┐
│ WebSocket │ Broadcast to connected clients
└─────────────────┘Agent Eligibility Check
Before executing a trade, the system validates:
- Agent Active - Agent exists and is not disabled
- Automated Trading Enabled - For the current trading mode (simulation/live)
- Wallet Available - Agent has a wallet assigned for the trading mode
- Sufficient Balance - Wallet has enough SOL for position size
- No Existing Position - No open position in this token
- Signal Not Already Processed - Idempotency check
Eligibility checks are performed against Redis cache first for speed, falling back to database only when cache misses occur.
Trade Execution Flow
The Trading Executor orchestrates the complete trade lifecycle.
Purchase Flow
┌─────────────────────────────────────────────────────────────────┐
│ Trading Executor │
│ │
│ 1. Validate Trade │
│ ├── Check agent config │
│ ├── Verify wallet balance │
│ └── Calculate position size │
│ │
│ 2. Get Swap Quote (Jupiter API) │
│ ├── Input: SOL amount │
│ ├── Output: Token amount │
│ └── Check price impact │
│ │
│ 3. Execute Swap │
│ ├── Simulation: Use quote values │
│ └── Live: Sign and submit transaction │
│ │
│ 4. Database Transaction (atomic) │
│ ├── Create transaction record │
│ ├── Update balances (SOL ↓, Token ↑) │
│ └── Create position record │
│ │
│ 5. Update Redis Cache │
│ ├── Update balance cache │
│ └── Update position cache │
│ │
│ 6. Initialize Stop Loss │
│ └── Set initial stop loss percentage │
│ │
│ 7. Emit Events │
│ └── 'position_created' → WebSocket broadcast │
└─────────────────────────────────────────────────────────────────┘Sale Flow (Stop Loss / Manual)
1. Load position from cache/database
2. Validate position belongs to agent
3. Get swap quote (Token → SOL)
4. Execute swap
5. Database transaction (atomic)
├── Create sale transaction
├── Update balances (Token ↓, SOL ↑)
└── Delete position
6. Queue historical swap record (async)
7. Update Redis cache
8. Emit 'position_closed' eventPrice Update Flow
The Price Update Manager continuously monitors prices and triggers automated actions.
┌──────────────────────────────────────────────────────────────────┐
│ Price Update Manager │
│ │
│ Every 1.5 seconds: │
│ │
│ 1. Get Tracked Tokens │
│ └── All tokens with active positions │
│ │
│ 2. Fetch Prices (DexScreener/Jupiter) │
│ └── Batch request for all tracked tokens │
│ │
│ 3. For Each Price Update: │
│ ├── Update local cache │
│ ├── Update Redis cache │
│ │ │
│ ├── Evaluate Stop Loss │
│ │ ├── Check if price dropped below stop loss level │
│ │ ├── If trailing stop enabled, check from peak price │
│ │ └── Trigger sale if conditions met │
│ │ │
│ ├── Evaluate Stale Trade (if stop loss didn't trigger) │
│ │ ├── Check position age > min hold time │
│ │ ├── Check profit in target range │
│ │ └── Trigger sale if conditions met │
│ │ │
│ ├── Evaluate DCA (if no sale triggered) │
│ │ ├── Check if price dropped to DCA level │
│ │ ├── Check if DCA attempts remaining │
│ │ └── Trigger DCA buy if conditions met │
│ │ │
│ └── Collect WebSocket Updates │
│ │
│ 4. Batch Broadcast Price Updates │
│ └── Send batched updates per connected agent │
└──────────────────────────────────────────────────────────────────┘Stop loss evaluation happens on every price update (every 1.5s). The trailing stop loss tracks peak price and tightens as profit increases.
WebSocket Data Flow
Real-time updates flow from backend to frontend via WebSocket.
Connection Flow
Frontend Backend
│ │
│ Connect with JWT + agentId │
│ ────────────────────────────────────► │
│ │ Verify JWT
│ │ Validate agent ownership
│ │
│ 'connected' message │
│ ◄──────────────────────────────────── │
│ │
│ 'initial_data' (positions) │
│ ◄──────────────────────────────────── │
│ │
│ Periodic 'ping' │
│ ◄──────────────────────────────────── │
│ 'pong' response │
│ ────────────────────────────────────► │
│ │Message Types
| Message Type | Direction | Payload |
|---|---|---|
connected | Server → Client | Agent ID, timestamp |
initial_data | Server → Client | Array of enriched positions |
position_update | Server → Client | Position created/updated/closed |
price_update_batch | Server → Client | Array of token prices |
ping | Server → Client | Timestamp |
pong | Client → Server | Timestamp |
error | Server → Client | Error message |
Frontend State Updates
WebSocket Message
│
▼
┌─────────────────┐
│ useWebSocket │ React hook
│ Hook │
└────────┬────────┘
│
├─── initial_data ──────► Set positions state
│
├─── position_update ───► Update/add/remove position
│ Invalidate React Query cache
│
└─── price_update_batch ─► Batch queue updates
Apply via requestAnimationFrameCaching Strategy
Write-Through Pattern
All writes go through both database and cache atomically.
Write Request
│
▼
┌─────────────────┐
│ DB Transaction │
│ (Prisma) │
└────────┬────────┘
│ commit
▼
┌─────────────────┐
│ Redis Update │
└────────┬────────┘
│
▼
┌─────────────────┐
│ WebSocket Event │
└─────────────────┘Cache Key Patterns
| Key Pattern | Data | TTL |
|---|---|---|
balance:{agentId}:{wallet}:{token} | Balance record | None (write-through) |
position:{agentId}:{wallet}:{token} | Position record | None (write-through) |
price:{tokenAddress} | Price data | 60s |
agent:{agentId} | Agent config | 300s |
idempotency:{key} | Deduplication | 60s |
Database Transactions
Critical operations use atomic database transactions.
Purchase Transaction
await prisma.$transaction(async (tx) => {
// 1. Create transaction record
await tx.agentTransaction.create({ ... });
// 2. Update balances
await tx.agentBalance.upsert({ ... }); // SOL decrease
await tx.agentBalance.upsert({ ... }); // Token increase
// 3. Create position
await tx.agentPosition.create({ ... });
});
// After commit: Update Redis, emit eventsSale Transaction
await prisma.$transaction(async (tx) => {
// 1. Create sale transaction
await tx.agentTransaction.create({ ... });
// 2. Update balances
await tx.agentBalance.update({ ... }); // Token decrease
await tx.agentBalance.update({ ... }); // SOL increase
// 3. Delete position
await tx.agentPosition.delete({ ... });
});
// After commit: Queue historical swap, update Redis, emit eventsEvent Flow Summary
| Event | Emitter | Listeners |
|---|---|---|
signal_created | Signal API | Signal Processor |
position_created | Trading Executor | WebSocket Server, Price Update Manager |
position_updated | Position Service | WebSocket Server |
position_closed | Trading Executor | WebSocket Server, Price Update Manager |