Domain Services
The domain layer contains all business logic for the trading engine. Services are organized by domain concept and communicate via events.
Service Overview
| Service | Responsibility |
|---|---|
| Signal Processor | Orchestrates signal processing for all agents |
| Agent Eligibility | Determines which agents can trade a signal |
| Trading Executor | Executes purchases, sales, and DCA buys |
| Position Service | Manages open position state |
| Balance Service | Tracks token balances |
| Stop Loss Manager | Evaluates and triggers stop losses |
| DCA Manager | Evaluates and triggers DCA buys |
| Price Update Manager | Polls prices and triggers automated actions |
| Config Service | Loads and caches agent trading configurations |
Signal Processing
Signal Processor (signal-processor.service.ts)
Listens for signal_created events and orchestrates trade execution for eligible agents.
class SignalProcessor {
async processSignal(signal: TradingSignal): Promise<void> {
// 1. Get eligible agents
const eligibleAgentIds = await agentEligibilityService.getEligibleAgents(signal);
// 2. Execute for each agent in parallel
await Promise.all(eligibleAgentIds.map(async (agentId) => {
await this.executeForAgent(agentId, signal);
}));
}
}Key behaviors:
- Processes signals in parallel across agents for maximum throughput
- Records execution status for each agent (success/failure)
- Emits metrics for monitoring (latency, success rate)
Agent Eligibility (agent-eligibility.service.ts)
Determines which agents are eligible to trade a specific signal.
Eligibility criteria:
- Agent exists and is active
- Automated trading is enabled for the current mode (simulation/live)
- Agent has a wallet assigned for the trading mode
- Wallet has sufficient SOL balance for position size
- No existing open position in this token
- Signal hasn't already been processed (idempotency)
async getEligibleAgents(signal: TradingSignal): Promise<string[]> {
// Load all agents from cache (warmed on startup)
const agents = await redisAgentService.getAllAgents();
// Filter by eligibility criteria
const eligible = await Promise.all(
agents.map(async (agent) => {
const isEligible = await this.checkEligibility(agent, signal);
return isEligible ? agent.id : null;
})
);
return eligible.filter(Boolean);
}Eligibility checks use Redis cache first for speed, falling back to database only on cache miss.
Trade Execution
Trading Executor (trading-executor.service.ts)
The central service for executing all trade operations.
Purchase Flow
async executePurchase(request: TradeExecutionRequest): Promise<TradeExecutionResult> {
// 1. Validate trade (agent, wallet, balance, config)
const validation = await tradeValidator.validateTradeExecution(...);
// 2. Get token metadata and swap quote in parallel
const [tokenDecimals, swapQuote] = await Promise.all([
tokenMetadataService.getTokenDecimals(tokenAddress),
swapService.getQuote({ inputMint: SOL, outputMint: token, amount }),
]);
// 3. Check price impact, retry with reduced amount if needed
// 4. Execute swap (simulation or live)
const swapResult = await swapService.executeSwap({ quote, wallet, isSimulation });
// 5. Database transaction (atomic)
await prisma.$transaction(async (tx) => {
await transactionRepo.create(transactionData, tx);
await balanceService.updateBalancesFromTransaction(..., tx);
await positionService.createPosition(..., tx);
});
// 6. Update Redis cache
// 7. Initialize stop loss
// 8. Emit position_created event
}Sale Flow
async executeSale(request: SaleExecutionRequest): Promise<SaleExecutionResult> {
// 1. Idempotency check (prevent duplicate sales)
const canProceed = await idempotencyService.checkAndSet(saleKey);
// 2. Load and validate position
// 3. Get swap quote (Token → SOL)
// 4. Execute swap
// 5. Database transaction (atomic)
await prisma.$transaction(async (tx) => {
await transactionRepo.create(saleTransactionData, tx);
await balanceService.updateBalancesFromTransaction(..., tx);
await positionRepo.delete(positionId, tx);
});
// 6. Queue historical swap record (async)
// 7. Update Redis cache
// 8. Emit position_closed event
}DCA Buy Flow
async executeDCABuy(request: DCABuyRequest): Promise<DCABuyResult> {
// 1. Idempotency check
// 2. Load position, verify ownership
// 3. Verify sufficient SOL balance
// 4. Get swap quote and execute
// 5. Database transaction
await prisma.$transaction(async (tx) => {
await transactionRepo.create({ ...transactionData, isDca: true }, tx);
await balanceService.updateBalancesFromTransaction(..., tx);
});
// 6. Update position (new average price, total amount, DCA count)
await positionService.updatePositionAfterDCA(positionId, {
newAveragePurchasePrice,
newTotalPurchaseAmount,
newTotalInvestedSol,
dcaTransactionId,
});
}Position Management
Position Service (position-service.ts)
Manages the lifecycle of open positions.
Key operations:
createPosition()- Creates new position with initial stop loss stateupdatePosition()- Updates position fields (stop loss %, peak price)updatePositionAfterDCA()- Recalculates averages after DCA buygetPositionById()- Retrieves position from cache or databasegetPositionsByToken()- Gets all positions for a token (for stop loss evaluation)loadPositions()- Loads all positions for an agent/wallet
async createPosition(
agentId: string,
walletAddress: string,
purchaseTransactionId: string,
tokenAddress: string,
tokenSymbol: string,
purchasePrice: number,
purchaseAmount: number,
tx?: Prisma.TransactionClient
): Promise<OpenPosition> {
// Create in database
const position = await this.positionRepo.create({...}, tx);
// Update Redis cache
await redisPositionService.setPosition(position);
// Emit event
positionEventEmitter.emitPositionCreated({ agentId, position });
return position;
}Balance Management
Balance Service (balance-service.ts)
Handles automatic balance updates based on transactions.
Transaction types and effects:
| Type | Input Token | Output Token |
|---|---|---|
DEPOSIT | +amount | - |
WITHDRAWAL | -amount | - |
SWAP | -inputAmount | +outputAmount |
BURN | -amount | - |
async updateBalancesFromTransaction(
walletAddress: string,
agentId: string,
transactionType: TransactionType,
inputMint: string,
inputSymbol: string,
inputAmount: Decimal,
outputMint: string,
outputSymbol: string,
outputAmount: Decimal,
tx?: Prisma.TransactionClient
): Promise<{ inputBalance: Decimal; outputBalance?: Decimal }> {
// Calculate deltas based on transaction type
const delta = this.calculateBalanceDelta(transactionType, ...);
// Validate sufficient balance (with tolerance for floating-point)
if (delta.inputToken.requiresValidation) {
await this.validateSufficientBalance(walletAddress, tokenAddress, requiredAmount, tx);
}
// Upsert balances (write-through to DB then Redis)
const inputBalance = await this.upsertBalance(...);
const outputBalance = delta.outputToken ? await this.upsertBalance(...) : undefined;
return { inputBalance, outputBalance };
}Balance validation includes a tolerance (1e-8) for floating-point precision issues that can occur with very small token amounts.
Stop Loss Management
Stop Loss Manager (stop-loss-manager.service.ts)
Evaluates trailing stop loss conditions on every price update.
Stop loss modes:
fixed- Fixed percentage steps based on profit levelexponential- Exponential decay as profit increaseszones- Configurable profit zones with specific stop loss levelscustom- User-defined configuration
async evaluateStopLoss(
position: OpenPosition,
currentPrice: number,
config?: AgentTradingConfig
): Promise<StopLossEvaluationResult> {
// Acquire distributed lock (prevent concurrent evaluations)
const lockToken = await redisService.acquireLock(lockKey, REDIS_TTL.LOCK);
try {
// Update peak price if current price is higher
let peakPrice = position.peakPrice ?? position.purchasePrice;
if (currentPrice > peakPrice) {
peakPrice = currentPrice;
}
// Calculate price change from purchase
const peakChangePercent = ((peakPrice - purchasePrice) / purchasePrice) * 100;
// Calculate target stop loss based on peak performance
const targetStopLoss = calculateStopLossPercentage(peakChangePercent, config.stopLoss);
// Ensure monotonic tightening (stop loss only increases)
const newStopLoss = Math.max(targetStopLoss, currentStopLoss);
// Calculate stop loss price
const stopLossPrice = purchasePrice * (1 + newStopLoss / 100);
// Should trigger?
const shouldTrigger = currentPrice <= stopLossPrice;
// Update position if state changed
if (stopLossChanged || peakChanged) {
await positionService.updatePosition(position.id, { ... });
}
return { shouldTrigger, currentStopLossPercentage, stopLossPrice, updated };
} finally {
await redisService.releaseLock(lockKey, lockToken);
}
}DCA Management
DCA Manager (dca-manager.service.ts)
Evaluates Dollar Cost Averaging conditions when prices drop.
DCA logic:
- Triggered when price drops to configured levels (e.g., -10%, -20%, -30%)
- Each level specifies how much additional SOL to invest (as % of original position)
- Limited by
maxDcaAttemptsconfiguration - Tracks
dcaCount,lastDcaTime, andlowestPriceon position
async evaluateDCA(
position: OpenPosition,
currentPrice: number,
config: AgentTradingConfig
): Promise<DCAEvaluationResult> {
if (!config.dca?.enabled) {
return { shouldTrigger: false, reason: 'DCA disabled' };
}
// Check if max DCA attempts reached
if (position.dcaCount >= config.dca.maxAttempts) {
return { shouldTrigger: false, reason: 'Max DCA attempts reached' };
}
// Calculate price drop from average purchase price
const dropPercent = ((currentPrice - position.purchasePrice) / position.purchasePrice) * 100;
// Find applicable DCA level
const triggerLevel = config.dca.levels.find(level =>
dropPercent <= -level.dropPercent &&
!this.levelAlreadyTriggered(position, level)
);
if (triggerLevel) {
const buyAmountSol = position.totalInvestedSol * (triggerLevel.buyPercent / 100);
return { shouldTrigger: true, triggerLevel, buyAmountSol };
}
return { shouldTrigger: false, reason: 'No DCA level triggered' };
}Price Updates
Price Update Manager (price-update-manager.ts)
Continuously polls prices for tokens with active positions and triggers automated actions.
Polling cycle (every 1.5 seconds):
async pollPrices(): Promise<void> {
// 1. Get tracked tokens (all tokens with active positions)
const tokenAddresses = Array.from(this.trackedTokens.values())
.map(t => t.originalTokenAddress);
// 2. Fetch prices from DexScreener/Jupiter
const prices = await priceFeedService.getMultipleTokenPrices(tokenAddresses);
// 3. For each price update:
for (const price of prices) {
// Update local cache
this.priceCache.set(tokenAddress, price);
// Update Redis cache
await redisPriceService.setPrice(tokenAddress, price);
// Evaluate stop loss for all positions
await this.evaluateStopLossForToken(tokenAddress, price.priceSol);
// Collect WebSocket updates
agentUpdates.get(agentId).push({ tokenAddress, price: price.priceSol, priceUsd });
}
// 4. Batch broadcast to connected clients
for (const [agentId, updates] of agentUpdates) {
wsServer.broadcastPriceUpdates(agentId, updates);
}
}Token tracking:
- Tokens are added when positions are created
- Tokens are removed when all positions in that token are closed
- Tracking is refreshed periodically from database
Event System
Signal Events (signal-events.ts)
const signalEventEmitter = new EventEmitter();
// Events
signalEventEmitter.emit('signal_created', { signal });Position Events (position-events.ts)
const positionEventEmitter = new EventEmitter();
// Events
positionEventEmitter.emit('position_created', { agentId, position });
positionEventEmitter.emit('position_updated', { agentId, position });
positionEventEmitter.emit('position_closed', { agentId, positionId, tokenAddress });Event listeners:
- WebSocket Server - broadcasts updates to connected clients
- Price Update Manager - tracks tokens for price polling
Configuration Service
Config Service (config-service.ts)
Loads and caches agent trading configurations.
async loadAgentConfig(agentId: string): Promise<AgentTradingConfig> {
// 1. Try Redis cache first
const cached = await redisConfigService.getAgentConfig(agentId);
if (cached) return cached;
// 2. Load from database
const agent = await prisma.agent.findUnique({ where: { id: agentId } });
// 3. Merge with defaults
const config = this.mergeWithDefaults(agent.tradingConfig);
// 4. Cache in Redis
await redisConfigService.setAgentConfig(agentId, config);
return config;
}