WebSocket Server
Nexgent uses WebSocket for real-time position updates and price broadcasts. The WebSocket server runs on the same port as the HTTP server using HTTP upgrade.
Architecture
┌─────────────────┐ ┌─────────────────┐
│ Frontend │◄───────►│ WebSocket Server│
│ (Dashboard) │ WS │ (Backend) │
└─────────────────┘ └────────┬────────┘
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
Position Events Price Updates Ping/PongConnection Flow
// Frontend connects with JWT token and agent ID
const wsUrl = `ws://localhost:4000/ws?token=${jwt}&agentId=${agentId}`;
const ws = new WebSocket(wsUrl);Server-Side Connection Handling
// src/infrastructure/websocket/server.ts
class WSServer {
private connections: Map<string, WSConnection> = new Map(); // agentId -> connection
async handleConnection(ws: WebSocket, req: IncomingMessage): Promise<void> {
// Extract params from URL
const url = new URL(req.url, `http://${req.headers.host}`);
const token = url.searchParams.get('token');
const agentId = url.searchParams.get('agentId');
// Verify JWT
const payload = verifyToken(token);
// Validate agent belongs to user
const agent = await prisma.agent.findFirst({
where: { id: agentId, userId: payload.userId },
});
if (!agent) {
ws.close(1008, 'Agent not found');
return;
}
// Close existing connection for this agent (one per agent)
const existing = this.connections.get(agentId);
if (existing) {
existing.ws.close(1000, 'New connection established');
}
// Store new connection
this.connections.set(agentId, { ws, userId: payload.userId, agentId });
// Send connection confirmation
this.sendMessage(ws, { type: 'connected', data: { agentId } });
// Send initial data (positions)
await this.sendInitialData(ws, agentId);
}
}Each agent can have only one active WebSocket connection. New connections close existing ones for the same agent.
Message Types
Server → Client
| Type | Description | Payload |
|---|---|---|
connected | Connection confirmed | { agentId, timestamp } |
initial_data | Initial positions | { positions: Position[] } |
position_update | Position changed | { eventType, position?, positionId? } |
price_update_batch | Price updates | { updates: PriceUpdate[] } |
ping | Keepalive ping | { timestamp } |
error | Error message | { message } |
Client → Server
| Type | Description | Payload |
|---|---|---|
pong | Keepalive response | { timestamp } |
subscribe | Subscribe to updates | { agentId } |
Initial Data
When a client connects, the server sends enriched positions with current prices:
async sendInitialData(ws: WebSocket, agentId: string): Promise<void> {
// Get all wallets for agent
const wallets = await prisma.agentWallet.findMany({
where: { agentId },
});
// Load positions from all wallets
const allPositions = [];
for (const wallet of wallets) {
const positions = await positionService.loadPositions(agentId, wallet.walletAddress);
allPositions.push(...positions);
}
// Get prices from Redis cache (fast), fallback to API for misses
const tokenAddresses = [...new Set(allPositions.map(p => p.tokenAddress))];
const cachedPrices = await redisPriceService.getMultiplePrices(tokenAddresses);
// Enrich positions with current prices and P/L
const enrichedPositions = allPositions.map(position => {
const price = cachedPrices.get(position.tokenAddress);
return {
...position,
currentPrice: price?.priceSol ?? position.purchasePrice,
currentPriceUsd: price?.priceUsd ?? 0,
// Calculate P/L...
};
});
this.sendMessage(ws, {
type: 'initial_data',
data: { positions: enrichedPositions },
});
}Position Events
The WebSocket server listens for position events and broadcasts to connected clients:
setupPositionEventListeners(): void {
// Position created
positionEventEmitter.on('position_created', (event) => {
if (this.isAgentConnected(event.agentId)) {
this.broadcastToAgent(event.agentId, {
type: 'position_update',
data: { eventType: 'position_created', position: event.position },
});
}
});
// Position updated (stop loss change, DCA, etc.)
positionEventEmitter.on('position_updated', (event) => {
if (this.isAgentConnected(event.agentId)) {
this.broadcastToAgent(event.agentId, {
type: 'position_update',
data: { eventType: 'position_updated', position: event.position },
});
}
});
// Position closed
positionEventEmitter.on('position_closed', (event) => {
if (this.isAgentConnected(event.agentId)) {
this.broadcastToAgent(event.agentId, {
type: 'position_update',
data: { eventType: 'position_closed', positionId: event.positionId },
});
}
});
}Price Broadcasts
The Price Update Manager broadcasts price updates in batches:
// Called by Price Update Manager
broadcastPriceUpdates(
agentId: string,
updates: Array<{ tokenAddress: string; price: number; priceUsd: number }>
): void {
if (!this.isAgentConnected(agentId)) {
return; // Skip if agent not connected
}
this.broadcastToAgent(agentId, {
type: 'price_update_batch',
data: {
updates,
timestamp: new Date().toISOString(),
},
});
}Price updates are batched per agent to reduce WebSocket message overhead. Updates are only sent to agents with positions in the affected tokens.
Connection Keepalive
The server sends periodic pings to detect stale connections:
private readonly PING_INTERVAL = 30000; // 30 seconds
startPingInterval(): void {
this.pingInterval = setInterval(() => {
for (const [agentId, connection] of this.connections.entries()) {
if (connection.ws.readyState === WebSocket.OPEN) {
// Check for stale connection (no pong in 2 intervals)
const timeSinceLastPong = Date.now() - connection.lastPing.getTime();
if (timeSinceLastPong > this.PING_INTERVAL * 2) {
connection.ws.close(1000, 'Connection timeout');
this.connections.delete(agentId);
} else {
this.sendMessage(connection.ws, { type: 'ping', timestamp: new Date().toISOString() });
}
}
}
}, this.PING_INTERVAL);
}Client Pong Response
// Frontend responds to ping
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong', timestamp: new Date().toISOString() }));
}
};Health Check
getHealthStatus(): { status: 'healthy' | 'unhealthy'; connectionCount?: number } {
if (!this.wss) {
return { status: 'unhealthy' };
}
return {
status: 'healthy',
connectionCount: this.connections.size,
};
}Graceful Shutdown
shutdown(): void {
// Stop ping interval
if (this.pingInterval) {
clearInterval(this.pingInterval);
}
// Close all connections
for (const connection of this.connections.values()) {
if (connection.ws.readyState === WebSocket.OPEN) {
connection.ws.close(1000, 'Server shutdown');
}
}
this.connections.clear();
if (this.wss) {
this.wss.close();
}
}Frontend Integration
The frontend uses a custom useWebSocket hook:
// packages/frontend/src/infrastructure/websocket/hooks/use-websocket.ts
export function useWebSocket(agentId: string | null) {
const [positions, setPositions] = useState<LivePosition[]>([]);
const [isConnected, setIsConnected] = useState(false);
// Handle incoming messages
const handleMessage = useCallback((event: MessageEvent) => {
const message = JSON.parse(event.data);
switch (message.type) {
case 'initial_data':
setPositions(message.data.positions);
break;
case 'position_update':
setPositions(prev => {
// Update/add/remove position based on eventType
});
break;
case 'price_update_batch':
// Queue updates for batched application
message.data.updates.forEach(update => {
queuePriceUpdate(update.tokenAddress, update.price, update.priceUsd);
});
break;
}
}, []);
return { positions, isConnected, connect, disconnect };
}Configuration
WebSocket server uses the same port as HTTP:
// In src/index.ts
server = app.listen(PORT, '0.0.0.0', () => {
console.log(`Server running on port ${PORT}`);
});
// Initialize WebSocket on same server
wsServer.initialize(server);Connection URL:
ws://localhost:4000/ws?token=<JWT>&agentId=<agent-id>