OS Trading Engine
Technical Documentation
Backend
Infrastructure
WebSocket Server

WebSocket Server

Nexgent uses WebSocket for real-time position updates and price broadcasts. The WebSocket server runs on the same port as the HTTP server using HTTP upgrade.

Architecture

┌─────────────────┐         ┌─────────────────┐
│    Frontend     │◄───────►│ WebSocket Server│
│   (Dashboard)   │   WS    │   (Backend)     │
└─────────────────┘         └────────┬────────┘

                    ┌────────────────┼────────────────┐
                    │                │                │
                    ▼                ▼                ▼
            Position Events    Price Updates    Ping/Pong

Connection Flow

// Frontend connects with JWT token and agent ID
const wsUrl = `ws://localhost:4000/ws?token=${jwt}&agentId=${agentId}`;
const ws = new WebSocket(wsUrl);

Server-Side Connection Handling

// src/infrastructure/websocket/server.ts
class WSServer {
  private connections: Map<string, WSConnection> = new Map(); // agentId -> connection
  
  async handleConnection(ws: WebSocket, req: IncomingMessage): Promise<void> {
    // Extract params from URL
    const url = new URL(req.url, `http://${req.headers.host}`);
    const token = url.searchParams.get('token');
    const agentId = url.searchParams.get('agentId');
    
    // Verify JWT
    const payload = verifyToken(token);
    
    // Validate agent belongs to user
    const agent = await prisma.agent.findFirst({
      where: { id: agentId, userId: payload.userId },
    });
    
    if (!agent) {
      ws.close(1008, 'Agent not found');
      return;
    }
    
    // Close existing connection for this agent (one per agent)
    const existing = this.connections.get(agentId);
    if (existing) {
      existing.ws.close(1000, 'New connection established');
    }
    
    // Store new connection
    this.connections.set(agentId, { ws, userId: payload.userId, agentId });
    
    // Send connection confirmation
    this.sendMessage(ws, { type: 'connected', data: { agentId } });
    
    // Send initial data (positions)
    await this.sendInitialData(ws, agentId);
  }
}
💡

Each agent can have only one active WebSocket connection. New connections close existing ones for the same agent.


Message Types

Server → Client

TypeDescriptionPayload
connectedConnection confirmed{ agentId, timestamp }
initial_dataInitial positions{ positions: Position[] }
position_updatePosition changed{ eventType, position?, positionId? }
price_update_batchPrice updates{ updates: PriceUpdate[] }
pingKeepalive ping{ timestamp }
errorError message{ message }

Client → Server

TypeDescriptionPayload
pongKeepalive response{ timestamp }
subscribeSubscribe to updates{ agentId }

Initial Data

When a client connects, the server sends enriched positions with current prices:

async sendInitialData(ws: WebSocket, agentId: string): Promise<void> {
  // Get all wallets for agent
  const wallets = await prisma.agentWallet.findMany({
    where: { agentId },
  });
  
  // Load positions from all wallets
  const allPositions = [];
  for (const wallet of wallets) {
    const positions = await positionService.loadPositions(agentId, wallet.walletAddress);
    allPositions.push(...positions);
  }
  
  // Get prices from Redis cache (fast), fallback to API for misses
  const tokenAddresses = [...new Set(allPositions.map(p => p.tokenAddress))];
  const cachedPrices = await redisPriceService.getMultiplePrices(tokenAddresses);
  
  // Enrich positions with current prices and P/L
  const enrichedPositions = allPositions.map(position => {
    const price = cachedPrices.get(position.tokenAddress);
    return {
      ...position,
      currentPrice: price?.priceSol ?? position.purchasePrice,
      currentPriceUsd: price?.priceUsd ?? 0,
      // Calculate P/L...
    };
  });
  
  this.sendMessage(ws, {
    type: 'initial_data',
    data: { positions: enrichedPositions },
  });
}

Position Events

The WebSocket server listens for position events and broadcasts to connected clients:

setupPositionEventListeners(): void {
  // Position created
  positionEventEmitter.on('position_created', (event) => {
    if (this.isAgentConnected(event.agentId)) {
      this.broadcastToAgent(event.agentId, {
        type: 'position_update',
        data: { eventType: 'position_created', position: event.position },
      });
    }
  });
  
  // Position updated (stop loss change, DCA, etc.)
  positionEventEmitter.on('position_updated', (event) => {
    if (this.isAgentConnected(event.agentId)) {
      this.broadcastToAgent(event.agentId, {
        type: 'position_update',
        data: { eventType: 'position_updated', position: event.position },
      });
    }
  });
  
  // Position closed
  positionEventEmitter.on('position_closed', (event) => {
    if (this.isAgentConnected(event.agentId)) {
      this.broadcastToAgent(event.agentId, {
        type: 'position_update',
        data: { eventType: 'position_closed', positionId: event.positionId },
      });
    }
  });
}

Price Broadcasts

The Price Update Manager broadcasts price updates in batches:

// Called by Price Update Manager
broadcastPriceUpdates(
  agentId: string, 
  updates: Array<{ tokenAddress: string; price: number; priceUsd: number }>
): void {
  if (!this.isAgentConnected(agentId)) {
    return; // Skip if agent not connected
  }
  
  this.broadcastToAgent(agentId, {
    type: 'price_update_batch',
    data: {
      updates,
      timestamp: new Date().toISOString(),
    },
  });
}

Price updates are batched per agent to reduce WebSocket message overhead. Updates are only sent to agents with positions in the affected tokens.


Connection Keepalive

The server sends periodic pings to detect stale connections:

private readonly PING_INTERVAL = 30000; // 30 seconds
 
startPingInterval(): void {
  this.pingInterval = setInterval(() => {
    for (const [agentId, connection] of this.connections.entries()) {
      if (connection.ws.readyState === WebSocket.OPEN) {
        // Check for stale connection (no pong in 2 intervals)
        const timeSinceLastPong = Date.now() - connection.lastPing.getTime();
        if (timeSinceLastPong > this.PING_INTERVAL * 2) {
          connection.ws.close(1000, 'Connection timeout');
          this.connections.delete(agentId);
        } else {
          this.sendMessage(connection.ws, { type: 'ping', timestamp: new Date().toISOString() });
        }
      }
    }
  }, this.PING_INTERVAL);
}

Client Pong Response

// Frontend responds to ping
ws.onmessage = (event) => {
  const message = JSON.parse(event.data);
  if (message.type === 'ping') {
    ws.send(JSON.stringify({ type: 'pong', timestamp: new Date().toISOString() }));
  }
};

Health Check

getHealthStatus(): { status: 'healthy' | 'unhealthy'; connectionCount?: number } {
  if (!this.wss) {
    return { status: 'unhealthy' };
  }
  return {
    status: 'healthy',
    connectionCount: this.connections.size,
  };
}

Graceful Shutdown

shutdown(): void {
  // Stop ping interval
  if (this.pingInterval) {
    clearInterval(this.pingInterval);
  }
  
  // Close all connections
  for (const connection of this.connections.values()) {
    if (connection.ws.readyState === WebSocket.OPEN) {
      connection.ws.close(1000, 'Server shutdown');
    }
  }
  
  this.connections.clear();
  
  if (this.wss) {
    this.wss.close();
  }
}

Frontend Integration

The frontend uses a custom useWebSocket hook:

// packages/frontend/src/infrastructure/websocket/hooks/use-websocket.ts
export function useWebSocket(agentId: string | null) {
  const [positions, setPositions] = useState<LivePosition[]>([]);
  const [isConnected, setIsConnected] = useState(false);
  
  // Handle incoming messages
  const handleMessage = useCallback((event: MessageEvent) => {
    const message = JSON.parse(event.data);
    
    switch (message.type) {
      case 'initial_data':
        setPositions(message.data.positions);
        break;
        
      case 'position_update':
        setPositions(prev => {
          // Update/add/remove position based on eventType
        });
        break;
        
      case 'price_update_batch':
        // Queue updates for batched application
        message.data.updates.forEach(update => {
          queuePriceUpdate(update.tokenAddress, update.price, update.priceUsd);
        });
        break;
    }
  }, []);
  
  return { positions, isConnected, connect, disconnect };
}

Configuration

WebSocket server uses the same port as HTTP:

// In src/index.ts
server = app.listen(PORT, '0.0.0.0', () => {
  console.log(`Server running on port ${PORT}`);
});
 
// Initialize WebSocket on same server
wsServer.initialize(server);

Connection URL:

ws://localhost:4000/ws?token=<JWT>&agentId=<agent-id>