OS Trading Engine
Technical Documentation
Backend
Infrastructure
Job Queues (BullMQ)

Job Queues (BullMQ)

Nexgent uses BullMQ for background job processing. This allows deferring non-critical work (like historical swap records) without blocking trade execution.

Queue Architecture

src/infrastructure/queue/
├── job-types.ts      # Job type definitions
├── queue-client.ts   # Queue producer (adds jobs)
└── queue-worker.ts   # Queue consumer (processes jobs)

Queue Names

QueuePurpose
database-writesAsync database operations
// src/infrastructure/queue/job-types.ts
export enum QueueName {
  DATABASE_WRITES = 'database-writes',
}

Job Types

export enum JobType {
  WRITE_TRANSACTION = 'write-transaction',
  WRITE_HISTORICAL_SWAP = 'write-historical-swap',
  CAPTURE_BALANCE_SNAPSHOT = 'capture-balance-snapshot',
}

Write Historical Swap

Records completed trades for performance tracking. Queued after position close to avoid blocking the sale.

interface WriteHistoricalSwapJob {
  type: JobType.WRITE_HISTORICAL_SWAP;
  data: {
    id: string;
    agentId: string;
    walletAddress: string;
    tokenAddress: string;
    tokenSymbol: string;
    amount: Decimal;
    purchasePrice: Decimal;
    salePrice: Decimal;
    changePercent: Decimal;
    profitLossUsd: Decimal;
    profitLossSol: Decimal;
    purchaseTime: Date;
    saleTime: Date;
    purchaseTransactionId: string | null;
    saleTransactionId: string;
    signalId: number | null;
    closeReason: string | null;
  };
}

Capture Balance Snapshot

Periodic snapshots of portfolio balance for historical charting.

interface CaptureBalanceSnapshotJob {
  type: JobType.CAPTURE_BALANCE_SNAPSHOT;
  agentId?: string;       // Optional: snapshot specific agent
  walletAddress?: string; // Optional: snapshot specific wallet
  // If both omitted, snapshots all agents
}

Queue Client (Producer)

The queue client adds jobs to queues:

// src/infrastructure/queue/queue-client.ts
class QueueClient {
  private queues: Map<QueueName, Queue> = new Map();
  
  getQueue(queueName: QueueName): Queue {
    if (!this.queues.has(queueName)) {
      this.queues.set(queueName, new Queue(queueName, {
        connection: redisConfig,
      }));
    }
    return this.queues.get(queueName)!;
  }
}
 
export const queueClient = QueueClient.getInstance();

Adding Jobs

// After position close, queue historical swap record
await queueClient.getQueue(QueueName.DATABASE_WRITES).add(
  JobType.WRITE_HISTORICAL_SWAP,
  {
    type: JobType.WRITE_HISTORICAL_SWAP,
    data: {
      id: historicalSwapId,
      agentId: request.agentId,
      tokenAddress: position.tokenAddress,
      purchasePrice: position.purchasePrice,
      salePrice: calculatedSalePrice,
      profitLossSol,
      // ... other fields
    }
  }
);

Queue Worker (Consumer)

The worker processes jobs from queues:

// src/infrastructure/queue/queue-worker.ts
class QueueWorker {
  private workers: Map<string, Worker> = new Map();
  
  initialize(): void {
    this.createWorker(QueueName.DATABASE_WRITES, this.processDatabaseWriteJob.bind(this));
    console.log('✅ Queue workers initialized');
  }
  
  private createWorker(queueName: QueueName, processor: (job: Job) => Promise<void>): void {
    const worker = new Worker(queueName, processor, {
      connection: redisConfig,
      concurrency: 5, // Process 5 jobs in parallel
    });
    
    worker.on('completed', (job) => {
      // Job completed successfully
    });
    
    worker.on('failed', (job, err) => {
      console.error(`Job ${job?.id} failed:`, err);
    });
    
    this.workers.set(queueName, worker);
  }
  
  private async processDatabaseWriteJob(job: Job<DatabaseWriteJob>): Promise<void> {
    const { type } = job.data;
    
    switch (type) {
      case JobType.WRITE_TRANSACTION:
        await prisma.agentTransaction.upsert({
          where: { id: job.data.data.id },
          update: {},
          create: job.data.data,
        });
        break;
        
      case JobType.WRITE_HISTORICAL_SWAP:
        await prisma.agentHistoricalSwap.create({ data: job.data.data });
        break;
        
      case JobType.CAPTURE_BALANCE_SNAPSHOT:
        if (job.data.agentId && job.data.walletAddress) {
          await balanceSnapshotService.captureSnapshot(
            job.data.agentId, 
            job.data.walletAddress
          );
        } else {
          await balanceSnapshotService.captureSnapshotsForAllAgents();
        }
        break;
    }
  }
}
 
export const queueWorker = QueueWorker.getInstance();

Balance Snapshot Scheduler

Schedules periodic balance snapshots:

// src/domain/balances/balance-snapshot-scheduler.ts
class BalanceSnapshotScheduler {
  private intervalId: NodeJS.Timeout | null = null;
  private readonly SNAPSHOT_INTERVAL = 60 * 60 * 1000; // 1 hour
  
  async start(): Promise<void> {
    // Schedule hourly snapshots
    this.intervalId = setInterval(async () => {
      await queueClient.getQueue(QueueName.DATABASE_WRITES).add(
        JobType.CAPTURE_BALANCE_SNAPSHOT,
        { type: JobType.CAPTURE_BALANCE_SNAPSHOT }
      );
    }, this.SNAPSHOT_INTERVAL);
    
    console.log('✅ Balance snapshot scheduler started');
  }
  
  stop(): void {
    if (this.intervalId) {
      clearInterval(this.intervalId);
      this.intervalId = null;
    }
  }
}

Job Retry Strategy

BullMQ automatically retries failed jobs. Default behavior:

  • Attempts: 3 retries
  • Backoff: Exponential backoff between retries
  • Dead Letter: After max retries, job moves to failed state
// Custom retry configuration per job
await queue.add(jobName, data, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000, // Start with 1s delay
  },
});

Failed jobs are logged but don't block the main application flow. Historical swap records are non-critical—if creation fails, the trade itself still completed successfully.


Graceful Shutdown

Workers are closed gracefully on application shutdown:

// In shutdown handler
async function shutdown() {
  // Close queue workers
  await queueWorker.closeAll();
  
  // Close queue clients
  await queueClient.closeAll();
}

Monitoring

BullMQ provides job status through the queue API:

const queue = queueClient.getQueue(QueueName.DATABASE_WRITES);
 
// Get job counts
const counts = await queue.getJobCounts();
// { waiting: 0, active: 2, completed: 150, failed: 1, delayed: 0 }
 
// Get failed jobs
const failedJobs = await queue.getFailed();

Configuration

BullMQ uses the same Redis connection as caching:

const workerOptions = {
  connection: {
    host: redisConfig.host,
    port: redisConfig.port,
    password: redisConfig.password,
    db: redisConfig.db,
  },
  concurrency: 5,
};