OS Trading Engine
Technical Documentation
Backend
Domain Services

Domain Services

The domain layer contains all business logic for the trading engine. Services are organized by domain concept and communicate via events.

Service Overview

ServiceResponsibility
Signal ProcessorOrchestrates signal processing for all agents
Agent EligibilityDetermines which agents can trade a signal
Trading ExecutorExecutes purchases, sales, and DCA buys
Position ServiceManages open position state
Balance ServiceTracks token balances
Stop Loss ManagerEvaluates and triggers stop losses
DCA ManagerEvaluates and triggers DCA buys
Price Update ManagerPolls prices and triggers automated actions
Config ServiceLoads and caches agent trading configurations

Signal Processing

Signal Processor (signal-processor.service.ts)

Listens for signal_created events and orchestrates trade execution for eligible agents.

class SignalProcessor {
  async processSignal(signal: TradingSignal): Promise<void> {
    // 1. Get eligible agents
    const eligibleAgentIds = await agentEligibilityService.getEligibleAgents(signal);
    
    // 2. Execute for each agent in parallel
    await Promise.all(eligibleAgentIds.map(async (agentId) => {
      await this.executeForAgent(agentId, signal);
    }));
  }
}

Key behaviors:

  • Processes signals in parallel across agents for maximum throughput
  • Records execution status for each agent (success/failure)
  • Emits metrics for monitoring (latency, success rate)

Agent Eligibility (agent-eligibility.service.ts)

Determines which agents are eligible to trade a specific signal.

Eligibility criteria:

  1. Agent exists and is active
  2. Automated trading is enabled for the current mode (simulation/live)
  3. Agent has a wallet assigned for the trading mode
  4. Wallet has sufficient SOL balance for position size
  5. No existing open position in this token
  6. Signal hasn't already been processed (idempotency)
async getEligibleAgents(signal: TradingSignal): Promise<string[]> {
  // Load all agents from cache (warmed on startup)
  const agents = await redisAgentService.getAllAgents();
  
  // Filter by eligibility criteria
  const eligible = await Promise.all(
    agents.map(async (agent) => {
      const isEligible = await this.checkEligibility(agent, signal);
      return isEligible ? agent.id : null;
    })
  );
  
  return eligible.filter(Boolean);
}

Eligibility checks use Redis cache first for speed, falling back to database only on cache miss.


Trade Execution

Trading Executor (trading-executor.service.ts)

The central service for executing all trade operations.

Purchase Flow

async executePurchase(request: TradeExecutionRequest): Promise<TradeExecutionResult> {
  // 1. Validate trade (agent, wallet, balance, config)
  const validation = await tradeValidator.validateTradeExecution(...);
  
  // 2. Get token metadata and swap quote in parallel
  const [tokenDecimals, swapQuote] = await Promise.all([
    tokenMetadataService.getTokenDecimals(tokenAddress),
    swapService.getQuote({ inputMint: SOL, outputMint: token, amount }),
  ]);
  
  // 3. Check price impact, retry with reduced amount if needed
  // 4. Execute swap (simulation or live)
  const swapResult = await swapService.executeSwap({ quote, wallet, isSimulation });
  
  // 5. Database transaction (atomic)
  await prisma.$transaction(async (tx) => {
    await transactionRepo.create(transactionData, tx);
    await balanceService.updateBalancesFromTransaction(..., tx);
    await positionService.createPosition(..., tx);
  });
  
  // 6. Update Redis cache
  // 7. Initialize stop loss
  // 8. Emit position_created event
}

Sale Flow

async executeSale(request: SaleExecutionRequest): Promise<SaleExecutionResult> {
  // 1. Idempotency check (prevent duplicate sales)
  const canProceed = await idempotencyService.checkAndSet(saleKey);
  
  // 2. Load and validate position
  // 3. Get swap quote (Token → SOL)
  // 4. Execute swap
  
  // 5. Database transaction (atomic)
  await prisma.$transaction(async (tx) => {
    await transactionRepo.create(saleTransactionData, tx);
    await balanceService.updateBalancesFromTransaction(..., tx);
    await positionRepo.delete(positionId, tx);
  });
  
  // 6. Queue historical swap record (async)
  // 7. Update Redis cache
  // 8. Emit position_closed event
}

DCA Buy Flow

async executeDCABuy(request: DCABuyRequest): Promise<DCABuyResult> {
  // 1. Idempotency check
  // 2. Load position, verify ownership
  // 3. Verify sufficient SOL balance
  // 4. Get swap quote and execute
  
  // 5. Database transaction
  await prisma.$transaction(async (tx) => {
    await transactionRepo.create({ ...transactionData, isDca: true }, tx);
    await balanceService.updateBalancesFromTransaction(..., tx);
  });
  
  // 6. Update position (new average price, total amount, DCA count)
  await positionService.updatePositionAfterDCA(positionId, {
    newAveragePurchasePrice,
    newTotalPurchaseAmount,
    newTotalInvestedSol,
    dcaTransactionId,
  });
}

Position Management

Position Service (position-service.ts)

Manages the lifecycle of open positions.

Key operations:

  • createPosition() - Creates new position with initial stop loss state
  • updatePosition() - Updates position fields (stop loss %, peak price)
  • updatePositionAfterDCA() - Recalculates averages after DCA buy
  • getPositionById() - Retrieves position from cache or database
  • getPositionsByToken() - Gets all positions for a token (for stop loss evaluation)
  • loadPositions() - Loads all positions for an agent/wallet
async createPosition(
  agentId: string,
  walletAddress: string,
  purchaseTransactionId: string,
  tokenAddress: string,
  tokenSymbol: string,
  purchasePrice: number,
  purchaseAmount: number,
  tx?: Prisma.TransactionClient
): Promise<OpenPosition> {
  // Create in database
  const position = await this.positionRepo.create({...}, tx);
  
  // Update Redis cache
  await redisPositionService.setPosition(position);
  
  // Emit event
  positionEventEmitter.emitPositionCreated({ agentId, position });
  
  return position;
}

Balance Management

Balance Service (balance-service.ts)

Handles automatic balance updates based on transactions.

Transaction types and effects:

TypeInput TokenOutput Token
DEPOSIT+amount-
WITHDRAWAL-amount-
SWAP-inputAmount+outputAmount
BURN-amount-
async updateBalancesFromTransaction(
  walletAddress: string,
  agentId: string,
  transactionType: TransactionType,
  inputMint: string,
  inputSymbol: string,
  inputAmount: Decimal,
  outputMint: string,
  outputSymbol: string,
  outputAmount: Decimal,
  tx?: Prisma.TransactionClient
): Promise<{ inputBalance: Decimal; outputBalance?: Decimal }> {
  // Calculate deltas based on transaction type
  const delta = this.calculateBalanceDelta(transactionType, ...);
  
  // Validate sufficient balance (with tolerance for floating-point)
  if (delta.inputToken.requiresValidation) {
    await this.validateSufficientBalance(walletAddress, tokenAddress, requiredAmount, tx);
  }
  
  // Upsert balances (write-through to DB then Redis)
  const inputBalance = await this.upsertBalance(...);
  const outputBalance = delta.outputToken ? await this.upsertBalance(...) : undefined;
  
  return { inputBalance, outputBalance };
}
💡

Balance validation includes a tolerance (1e-8) for floating-point precision issues that can occur with very small token amounts.


Stop Loss Management

Stop Loss Manager (stop-loss-manager.service.ts)

Evaluates trailing stop loss conditions on every price update.

Stop loss modes:

  • fixed - Fixed percentage steps based on profit level
  • exponential - Exponential decay as profit increases
  • zones - Configurable profit zones with specific stop loss levels
  • custom - User-defined configuration
async evaluateStopLoss(
  position: OpenPosition,
  currentPrice: number,
  config?: AgentTradingConfig
): Promise<StopLossEvaluationResult> {
  // Acquire distributed lock (prevent concurrent evaluations)
  const lockToken = await redisService.acquireLock(lockKey, REDIS_TTL.LOCK);
  
  try {
    // Update peak price if current price is higher
    let peakPrice = position.peakPrice ?? position.purchasePrice;
    if (currentPrice > peakPrice) {
      peakPrice = currentPrice;
    }
    
    // Calculate price change from purchase
    const peakChangePercent = ((peakPrice - purchasePrice) / purchasePrice) * 100;
    
    // Calculate target stop loss based on peak performance
    const targetStopLoss = calculateStopLossPercentage(peakChangePercent, config.stopLoss);
    
    // Ensure monotonic tightening (stop loss only increases)
    const newStopLoss = Math.max(targetStopLoss, currentStopLoss);
    
    // Calculate stop loss price
    const stopLossPrice = purchasePrice * (1 + newStopLoss / 100);
    
    // Should trigger?
    const shouldTrigger = currentPrice <= stopLossPrice;
    
    // Update position if state changed
    if (stopLossChanged || peakChanged) {
      await positionService.updatePosition(position.id, { ... });
    }
    
    return { shouldTrigger, currentStopLossPercentage, stopLossPrice, updated };
  } finally {
    await redisService.releaseLock(lockKey, lockToken);
  }
}

DCA Management

DCA Manager (dca-manager.service.ts)

Evaluates Dollar Cost Averaging conditions when prices drop.

DCA logic:

  • Triggered when price drops to configured levels (e.g., -10%, -20%, -30%)
  • Each level specifies how much additional SOL to invest (as % of original position)
  • Limited by maxDcaAttempts configuration
  • Tracks dcaCount, lastDcaTime, and lowestPrice on position
async evaluateDCA(
  position: OpenPosition,
  currentPrice: number,
  config: AgentTradingConfig
): Promise<DCAEvaluationResult> {
  if (!config.dca?.enabled) {
    return { shouldTrigger: false, reason: 'DCA disabled' };
  }
  
  // Check if max DCA attempts reached
  if (position.dcaCount >= config.dca.maxAttempts) {
    return { shouldTrigger: false, reason: 'Max DCA attempts reached' };
  }
  
  // Calculate price drop from average purchase price
  const dropPercent = ((currentPrice - position.purchasePrice) / position.purchasePrice) * 100;
  
  // Find applicable DCA level
  const triggerLevel = config.dca.levels.find(level => 
    dropPercent <= -level.dropPercent && 
    !this.levelAlreadyTriggered(position, level)
  );
  
  if (triggerLevel) {
    const buyAmountSol = position.totalInvestedSol * (triggerLevel.buyPercent / 100);
    return { shouldTrigger: true, triggerLevel, buyAmountSol };
  }
  
  return { shouldTrigger: false, reason: 'No DCA level triggered' };
}

Price Updates

Price Update Manager (price-update-manager.ts)

Continuously polls prices for tokens with active positions and triggers automated actions.

Polling cycle (every 1.5 seconds):

async pollPrices(): Promise<void> {
  // 1. Get tracked tokens (all tokens with active positions)
  const tokenAddresses = Array.from(this.trackedTokens.values())
    .map(t => t.originalTokenAddress);
  
  // 2. Fetch prices from DexScreener/Jupiter
  const prices = await priceFeedService.getMultipleTokenPrices(tokenAddresses);
  
  // 3. For each price update:
  for (const price of prices) {
    // Update local cache
    this.priceCache.set(tokenAddress, price);
    
    // Update Redis cache
    await redisPriceService.setPrice(tokenAddress, price);
    
    // Evaluate stop loss for all positions
    await this.evaluateStopLossForToken(tokenAddress, price.priceSol);
    
    // Collect WebSocket updates
    agentUpdates.get(agentId).push({ tokenAddress, price: price.priceSol, priceUsd });
  }
  
  // 4. Batch broadcast to connected clients
  for (const [agentId, updates] of agentUpdates) {
    wsServer.broadcastPriceUpdates(agentId, updates);
  }
}

Token tracking:

  • Tokens are added when positions are created
  • Tokens are removed when all positions in that token are closed
  • Tracking is refreshed periodically from database

Event System

Signal Events (signal-events.ts)

const signalEventEmitter = new EventEmitter();
 
// Events
signalEventEmitter.emit('signal_created', { signal });

Position Events (position-events.ts)

const positionEventEmitter = new EventEmitter();
 
// Events
positionEventEmitter.emit('position_created', { agentId, position });
positionEventEmitter.emit('position_updated', { agentId, position });
positionEventEmitter.emit('position_closed', { agentId, positionId, tokenAddress });

Event listeners:

  • WebSocket Server - broadcasts updates to connected clients
  • Price Update Manager - tracks tokens for price polling

Configuration Service

Config Service (config-service.ts)

Loads and caches agent trading configurations.

async loadAgentConfig(agentId: string): Promise<AgentTradingConfig> {
  // 1. Try Redis cache first
  const cached = await redisConfigService.getAgentConfig(agentId);
  if (cached) return cached;
  
  // 2. Load from database
  const agent = await prisma.agent.findUnique({ where: { id: agentId } });
  
  // 3. Merge with defaults
  const config = this.mergeWithDefaults(agent.tradingConfig);
  
  // 4. Cache in Redis
  await redisConfigService.setAgentConfig(agentId, config);
  
  return config;
}