Notification Producer
The Notification Producer is a background service that monitors blockchain events and generates push notifications for users subscribed to CoW Protocol updates. It tracks trades, expired orders, and CMS notifications across multiple chains.
Purpose and Responsibilities
The Notification Producer performs the following tasks:
- Trade Notifications: Monitors executed trades and generates notifications for users
- Expired Order Notifications: Detects expired orders and notifies affected users
- CMS Notifications: Fetches and distributes content management system notifications
- Multi-Chain Indexing: Tracks events across all supported CoW Protocol chains
- State Persistence: Maintains indexer state to resume from the last processed block after restarts
How to Run
Development
# Start required dependencies
docker-compose up -d queue # RabbitMQ
docker-compose up -d db # PostgreSQL
# Optional: Start Redis for caching
docker-compose up -d redis
# Run database migrations
yarn migration:run
# Start the notification producer
yarn producer
The producer will start monitoring all supported chains by default.
Run for Specific Chains
You can limit the producer to specific chains using the NOTIFICATIONS_PRODUCER_CHAINS environment variable:
# Run producer only for Mainnet (1) and Gnosis Chain (100)
NOTIFICATIONS_PRODUCER_CHAINS=1,100 yarn producer
Production
# Build the application
yarn build notification-producer
# Start the producer
node dist/apps/notification-producer/main.js
Docker
# Build the Docker image
docker build -f apps/notification-producer/Dockerfile . -t notification-producer
# Run the container
docker run \
-e QUEUE_HOST=rabbitmq \
-e QUEUE_USER=rabbit \
-e QUEUE_PASSWORD=password \
-e DATABASE_HOST=postgres \
-e RPC_URL_1=https://mainnet.infura.io/v3/key \
notification-producer
Key Configuration Options
Chain Selection
| Variable | Description | Default |
|---|
NOTIFICATIONS_PRODUCER_CHAINS | Comma-separated chain IDs to monitor | All supported chains |
Example: NOTIFICATIONS_PRODUCER_CHAINS=1,100,137
RPC Configuration
The producer requires RPC endpoints for each chain being monitored:
RPC_URL_1=https://mainnet.infura.io/v3/your-key
RPC_URL_100=https://rpc.gnosischain.com
RPC_URL_137=https://polygon-rpc.com
RPC_URL_8453=https://base-rpc.com
RPC_URL_42161=https://arbitrum-rpc.com
Database Configuration
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USERNAME=bff-db-user
DATABASE_PASSWORD=bff-db-password
DATABASE_NAME=bff-db
DATABASE_ENABLED=true
RabbitMQ Configuration
QUEUE_HOST=localhost
QUEUE_PORT=5672
QUEUE_USER=rabbit
QUEUE_PASSWORD=my-rabbit-password
Optional: Redis Cache
REDIS_ENABLED=true
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USER=cow_redis
REDIS_PASSWORD=cow_password
Producer Components
The service runs three types of notification producers concurrently:
1. Trade Notification Producer
Purpose: Generates notifications when users’ orders are executed.
How it works:
- Polls blockchain for new blocks
- Queries CoW Protocol settlement events
- Matches trades to subscribed user accounts
- Enriches notifications with token information and USD values
- Sends notifications to RabbitMQ queue
Key Features:
- Batch processing (up to 5000 blocks per batch)
- State persistence for crash recovery
- Handles blockchain reorgs gracefully
2. Expired Orders Notification Producer
Purpose: Notifies users when their orders expire without being filled.
How it works:
- Monitors orders approaching expiration
- Checks if orders were filled or cancelled
- Generates notifications for truly expired orders
- Sends to notification queue
3. CMS Notification Producer
Purpose: Fetches and distributes notifications from the content management system.
How it works:
- Polls CMS for new announcements
- Distributes to all subscribed users
- Handles general protocol updates and announcements
Main Functionality
Block Indexing
The trade notification producer implements a robust block indexing mechanism:
// 1. Get last indexed block from database
const stateRegistry = await indexerStateRepository.get(
'trade_notification_producer',
chainId
);
// 2. Get current block from RPC
const lastBlock = await client.getBlock();
// 3. Process blocks in batches
while (fromBlock <= toBlock) {
await processBlocks(fromBlock, toBlock);
// Update state after each batch
}
Notification Queue
Notifications are sent to RabbitMQ for consumption by delivery services (e.g., Telegram bot):
// Notification structure
{
id: string,
account: string,
title: string,
message: string,
url: string,
context: {
chainId: number,
orderUid: string,
// ... additional metadata
}
}
Dependencies
Required Services
-
PostgreSQL Database - Stores indexer state and subscription information. Required tables:
indexer_state, push_subscriptions, on_chain_placed_orders, expired_orders
-
RabbitMQ - Message queue for distributing notifications to consumers. Queue:
notifications
-
Blockchain RPC Nodes - HTTP or WebSocket endpoints for each monitored chain
Optional Services
- Redis - Caching layer for token metadata and frequently accessed data. Improves performance by reducing redundant API calls and blockchain queries.
Database Schema
The indexer state table stores progress for each producer:
CREATE TABLE indexer_state (
key TEXT NOT NULL,
chain_id INTEGER,
state JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (key, chain_id)
);
State Structure:
{
"lastBlock": "18500000",
"lastBlockTimestamp": "1698765432",
"lastBlockHash": "0xabc123..."
}
Graceful Shutdown
The producer implements graceful shutdown with a 30-second timeout:
// Signal handlers
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
// All producers are commanded to stop
// Wait up to 30 seconds for clean shutdown
// Force exit if timeout is reached
This ensures:
- In-progress block processing completes
- Database state is saved
- No notifications are lost
- Clean disconnection from RabbitMQ and database
Testing Notifications
Quick test to send a notification to your subscribed account:
# Replace with your Ethereum address
POST_TO_QUEUE_ACCOUNT=0x79063d9173C09887d536924E2F6eADbaBAc099f5 \
nx test notification-producer \
--testFile=src/sendPush.test.ts \
--skip-nx-cache
This will:
- Create a test notification
- Send it to the RabbitMQ queue
- Be delivered by connected consumers (e.g., Telegram bot)
Nx Commands
# Development server
nx start notification-producer
# Build for production
nx build notification-producer
# Run tests
nx test notification-producer
# Lint
nx lint notification-producer
# Docker build
nx docker-build notification-producer
Monitoring and Logs
The producer outputs structured logs with prefixes:
[notification-producer:main] Start notification producer for networks: 1, 100
[TradeNotificationProducer:1] Indexing from block 18500000 to 18500100: 101 blocks
[TradeNotificationProducer:1] Sending 5 notifications
Log Levels:
trace - No new blocks to index
debug - Block processing details
info - Notifications sent, producer lifecycle
warn - RPC lag, temporary issues
error - Critical errors, retries
Last modified on March 4, 2026