Skip to main content

Notification Producer

The Notification Producer is a background service that monitors blockchain events and generates push notifications for users subscribed to CoW Protocol updates. It tracks trades, expired orders, and CMS notifications across multiple chains.

Purpose and Responsibilities

The Notification Producer performs the following tasks:
  • Trade Notifications: Monitors executed trades and generates notifications for users
  • Expired Order Notifications: Detects expired orders and notifies affected users
  • CMS Notifications: Fetches and distributes content management system notifications
  • Multi-Chain Indexing: Tracks events across all supported CoW Protocol chains
  • State Persistence: Maintains indexer state to resume from the last processed block after restarts

How to Run

Development

# Start required dependencies
docker-compose up -d queue  # RabbitMQ
docker-compose up -d db     # PostgreSQL

# Optional: Start Redis for caching
docker-compose up -d redis

# Run database migrations
yarn migration:run

# Start the notification producer
yarn producer
The producer will start monitoring all supported chains by default.

Run for Specific Chains

You can limit the producer to specific chains using the NOTIFICATIONS_PRODUCER_CHAINS environment variable:
# Run producer only for Mainnet (1) and Gnosis Chain (100)
NOTIFICATIONS_PRODUCER_CHAINS=1,100 yarn producer

Production

# Build the application
yarn build notification-producer

# Start the producer
node dist/apps/notification-producer/main.js

Docker

# Build the Docker image
docker build -f apps/notification-producer/Dockerfile . -t notification-producer

# Run the container
docker run \
  -e QUEUE_HOST=rabbitmq \
  -e QUEUE_USER=rabbit \
  -e QUEUE_PASSWORD=password \
  -e DATABASE_HOST=postgres \
  -e RPC_URL_1=https://mainnet.infura.io/v3/key \
  notification-producer

Key Configuration Options

Chain Selection

VariableDescriptionDefault
NOTIFICATIONS_PRODUCER_CHAINSComma-separated chain IDs to monitorAll supported chains
Example: NOTIFICATIONS_PRODUCER_CHAINS=1,100,137

RPC Configuration

The producer requires RPC endpoints for each chain being monitored:
RPC_URL_1=https://mainnet.infura.io/v3/your-key
RPC_URL_100=https://rpc.gnosischain.com
RPC_URL_137=https://polygon-rpc.com
RPC_URL_8453=https://base-rpc.com
RPC_URL_42161=https://arbitrum-rpc.com

Database Configuration

DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USERNAME=bff-db-user
DATABASE_PASSWORD=bff-db-password
DATABASE_NAME=bff-db
DATABASE_ENABLED=true

RabbitMQ Configuration

QUEUE_HOST=localhost
QUEUE_PORT=5672
QUEUE_USER=rabbit
QUEUE_PASSWORD=my-rabbit-password

Optional: Redis Cache

REDIS_ENABLED=true
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USER=cow_redis
REDIS_PASSWORD=cow_password

Producer Components

The service runs three types of notification producers concurrently:

1. Trade Notification Producer

Purpose: Generates notifications when users’ orders are executed. How it works:
  • Polls blockchain for new blocks
  • Queries CoW Protocol settlement events
  • Matches trades to subscribed user accounts
  • Enriches notifications with token information and USD values
  • Sends notifications to RabbitMQ queue
Key Features:
  • Batch processing (up to 5000 blocks per batch)
  • State persistence for crash recovery
  • Handles blockchain reorgs gracefully

2. Expired Orders Notification Producer

Purpose: Notifies users when their orders expire without being filled. How it works:
  • Monitors orders approaching expiration
  • Checks if orders were filled or cancelled
  • Generates notifications for truly expired orders
  • Sends to notification queue

3. CMS Notification Producer

Purpose: Fetches and distributes notifications from the content management system. How it works:
  • Polls CMS for new announcements
  • Distributes to all subscribed users
  • Handles general protocol updates and announcements

Main Functionality

Block Indexing

The trade notification producer implements a robust block indexing mechanism:
// 1. Get last indexed block from database
const stateRegistry = await indexerStateRepository.get(
  'trade_notification_producer',
  chainId
);

// 2. Get current block from RPC
const lastBlock = await client.getBlock();

// 3. Process blocks in batches
while (fromBlock <= toBlock) {
  await processBlocks(fromBlock, toBlock);
  // Update state after each batch
}

Notification Queue

Notifications are sent to RabbitMQ for consumption by delivery services (e.g., Telegram bot):
// Notification structure
{
  id: string,
  account: string,
  title: string,
  message: string,
  url: string,
  context: {
    chainId: number,
    orderUid: string,
    // ... additional metadata
  }
}

Dependencies

Required Services

  • PostgreSQL Database - Stores indexer state and subscription information. Required tables: indexer_state, push_subscriptions, on_chain_placed_orders, expired_orders
  • RabbitMQ - Message queue for distributing notifications to consumers. Queue: notifications
  • Blockchain RPC Nodes - HTTP or WebSocket endpoints for each monitored chain

Optional Services

  • Redis - Caching layer for token metadata and frequently accessed data. Improves performance by reducing redundant API calls and blockchain queries.

Database Schema

The indexer state table stores progress for each producer:
CREATE TABLE indexer_state (
  key TEXT NOT NULL,
  chain_id INTEGER,
  state JSONB,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE (key, chain_id)
);
State Structure:
{
  "lastBlock": "18500000",
  "lastBlockTimestamp": "1698765432",
  "lastBlockHash": "0xabc123..."
}

Graceful Shutdown

The producer implements graceful shutdown with a 30-second timeout:
// Signal handlers
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

// All producers are commanded to stop
// Wait up to 30 seconds for clean shutdown
// Force exit if timeout is reached
This ensures:
  • In-progress block processing completes
  • Database state is saved
  • No notifications are lost
  • Clean disconnection from RabbitMQ and database

Testing Notifications

Quick test to send a notification to your subscribed account:
# Replace with your Ethereum address
POST_TO_QUEUE_ACCOUNT=0x79063d9173C09887d536924E2F6eADbaBAc099f5 \
  nx test notification-producer \
  --testFile=src/sendPush.test.ts \
  --skip-nx-cache
This will:
  1. Create a test notification
  2. Send it to the RabbitMQ queue
  3. Be delivered by connected consumers (e.g., Telegram bot)

Nx Commands

# Development server
nx start notification-producer

# Build for production
nx build notification-producer

# Run tests
nx test notification-producer

# Lint
nx lint notification-producer

# Docker build
nx docker-build notification-producer

Monitoring and Logs

The producer outputs structured logs with prefixes:
[notification-producer:main] Start notification producer for networks: 1, 100
[TradeNotificationProducer:1] Indexing from block 18500000 to 18500100: 101 blocks
[TradeNotificationProducer:1] Sending 5 notifications
Log Levels:
  • trace - No new blocks to index
  • debug - Block processing details
  • info - Notifications sent, producer lifecycle
  • warn - RPC lag, temporary issues
  • error - Critical errors, retries
Last modified on March 4, 2026