Job Queues (BullMQ)
Nexgent uses BullMQ for background job processing. This allows deferring non-critical work (like historical swap records) without blocking trade execution.
Queue Architecture
src/infrastructure/queue/
├── job-types.ts # Job type definitions
├── queue-client.ts # Queue producer (adds jobs)
└── queue-worker.ts # Queue consumer (processes jobs)Queue Names
| Queue | Purpose |
|---|---|
database-writes | Async database operations |
// src/infrastructure/queue/job-types.ts
export enum QueueName {
DATABASE_WRITES = 'database-writes',
}Job Types
export enum JobType {
WRITE_TRANSACTION = 'write-transaction',
WRITE_HISTORICAL_SWAP = 'write-historical-swap',
CAPTURE_BALANCE_SNAPSHOT = 'capture-balance-snapshot',
}Write Historical Swap
Records completed trades for performance tracking. Queued after position close to avoid blocking the sale.
interface WriteHistoricalSwapJob {
type: JobType.WRITE_HISTORICAL_SWAP;
data: {
id: string;
agentId: string;
walletAddress: string;
tokenAddress: string;
tokenSymbol: string;
amount: Decimal;
purchasePrice: Decimal;
salePrice: Decimal;
changePercent: Decimal;
profitLossUsd: Decimal;
profitLossSol: Decimal;
purchaseTime: Date;
saleTime: Date;
purchaseTransactionId: string | null;
saleTransactionId: string;
signalId: number | null;
closeReason: string | null;
};
}Capture Balance Snapshot
Periodic snapshots of portfolio balance for historical charting.
interface CaptureBalanceSnapshotJob {
type: JobType.CAPTURE_BALANCE_SNAPSHOT;
agentId?: string; // Optional: snapshot specific agent
walletAddress?: string; // Optional: snapshot specific wallet
// If both omitted, snapshots all agents
}Queue Client (Producer)
The queue client adds jobs to queues:
// src/infrastructure/queue/queue-client.ts
class QueueClient {
private queues: Map<QueueName, Queue> = new Map();
getQueue(queueName: QueueName): Queue {
if (!this.queues.has(queueName)) {
this.queues.set(queueName, new Queue(queueName, {
connection: redisConfig,
}));
}
return this.queues.get(queueName)!;
}
}
export const queueClient = QueueClient.getInstance();Adding Jobs
// After position close, queue historical swap record
await queueClient.getQueue(QueueName.DATABASE_WRITES).add(
JobType.WRITE_HISTORICAL_SWAP,
{
type: JobType.WRITE_HISTORICAL_SWAP,
data: {
id: historicalSwapId,
agentId: request.agentId,
tokenAddress: position.tokenAddress,
purchasePrice: position.purchasePrice,
salePrice: calculatedSalePrice,
profitLossSol,
// ... other fields
}
}
);Queue Worker (Consumer)
The worker processes jobs from queues:
// src/infrastructure/queue/queue-worker.ts
class QueueWorker {
private workers: Map<string, Worker> = new Map();
initialize(): void {
this.createWorker(QueueName.DATABASE_WRITES, this.processDatabaseWriteJob.bind(this));
console.log('✅ Queue workers initialized');
}
private createWorker(queueName: QueueName, processor: (job: Job) => Promise<void>): void {
const worker = new Worker(queueName, processor, {
connection: redisConfig,
concurrency: 5, // Process 5 jobs in parallel
});
worker.on('completed', (job) => {
// Job completed successfully
});
worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err);
});
this.workers.set(queueName, worker);
}
private async processDatabaseWriteJob(job: Job<DatabaseWriteJob>): Promise<void> {
const { type } = job.data;
switch (type) {
case JobType.WRITE_TRANSACTION:
await prisma.agentTransaction.upsert({
where: { id: job.data.data.id },
update: {},
create: job.data.data,
});
break;
case JobType.WRITE_HISTORICAL_SWAP:
await prisma.agentHistoricalSwap.create({ data: job.data.data });
break;
case JobType.CAPTURE_BALANCE_SNAPSHOT:
if (job.data.agentId && job.data.walletAddress) {
await balanceSnapshotService.captureSnapshot(
job.data.agentId,
job.data.walletAddress
);
} else {
await balanceSnapshotService.captureSnapshotsForAllAgents();
}
break;
}
}
}
export const queueWorker = QueueWorker.getInstance();Balance Snapshot Scheduler
Schedules periodic balance snapshots:
// src/domain/balances/balance-snapshot-scheduler.ts
class BalanceSnapshotScheduler {
private intervalId: NodeJS.Timeout | null = null;
private readonly SNAPSHOT_INTERVAL = 60 * 60 * 1000; // 1 hour
async start(): Promise<void> {
// Schedule hourly snapshots
this.intervalId = setInterval(async () => {
await queueClient.getQueue(QueueName.DATABASE_WRITES).add(
JobType.CAPTURE_BALANCE_SNAPSHOT,
{ type: JobType.CAPTURE_BALANCE_SNAPSHOT }
);
}, this.SNAPSHOT_INTERVAL);
console.log('✅ Balance snapshot scheduler started');
}
stop(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}
}Job Retry Strategy
BullMQ automatically retries failed jobs. Default behavior:
- Attempts: 3 retries
- Backoff: Exponential backoff between retries
- Dead Letter: After max retries, job moves to failed state
// Custom retry configuration per job
await queue.add(jobName, data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000, // Start with 1s delay
},
});Failed jobs are logged but don't block the main application flow. Historical swap records are non-critical—if creation fails, the trade itself still completed successfully.
Graceful Shutdown
Workers are closed gracefully on application shutdown:
// In shutdown handler
async function shutdown() {
// Close queue workers
await queueWorker.closeAll();
// Close queue clients
await queueClient.closeAll();
}Monitoring
BullMQ provides job status through the queue API:
const queue = queueClient.getQueue(QueueName.DATABASE_WRITES);
// Get job counts
const counts = await queue.getJobCounts();
// { waiting: 0, active: 2, completed: 150, failed: 1, delayed: 0 }
// Get failed jobs
const failedJobs = await queue.getFailed();Configuration
BullMQ uses the same Redis connection as caching:
const workerOptions = {
connection: {
host: redisConfig.host,
port: redisConfig.port,
password: redisConfig.password,
db: redisConfig.db,
},
concurrency: 5,
};