š±seedling
How We Built a Distributed Pipeline to Process 1 Crore Records Daily
Planted November 3, 2025
nestjsbullmqdistributed-systemsdata-pipelineproducer-workertypescript
The Challenge
How do you process millions of real estate records every day, transforming data from messy spreadsheets into a production-ready data warehouse, while ensuring zero data loss and maximum reliability?
Started with manual batch scripts, but faced:
- No visibility into failures
- Scaling bottlenecks with growing data
- No retry mechanism - lost records forever
- Required manual intervention
The Solution Architecture
Producer-Worker Pattern
āāāāāāāāāāāāāāāā āāāāāāāāāāāā āāāāāāāāāāāāāāāā
ā Producer āāāā>ā Queue āāāā>ā Worker ā
ā (1 replica) ā ā (Redis) ā ā(200 replicas)ā
āāāāāāāāāāāāāāāā āāāāāāāāāāāā āāāāāāāāāāāāāāāā
Why this pattern:
- Separation of concerns: orchestration vs execution
- Horizontal scaling: add workers = more throughput
- Fault tolerance: worker failures isolated
- Automatic load balancing
Key Technical Decisions
Memory-Efficient Streaming
Using async generators instead of loading entire tables:
async *streamBatches(table: string, batchSize: number) {
let offset = 0;
const chunkSize = 10000;
while (true) {
const rows = await this.db.query(
`SELECT * FROM ${table} LIMIT ${chunkSize} OFFSET ${offset}`
);
if (rows.length === 0) break;
for (let i = 0; i < rows.length; i += batchSize) {
yield rows.slice(i, i + batchSize);
}
offset += chunkSize;
}
}
Result: 8x memory reduction (4GB ā 500MB)
Row-Level Error Isolation
Failed rows don't kill entire batch:
for (const row of rows) {
try {
await processRow(row);
} catch (error) {
await dlq.add("failed-row", { row, error });
// Continue with next row
}
}
Result: 99% success rate even with bad data
Parallel Queue Writes with Memory Guards
Control parallelism to prevent overflow:
const q = async.queue(async (task) => {
await this.etlQueue.add(task.name, task.data);
}, MAX_PARALLEL);
for await (const batch of stream) {
q.push(createJob(batch));
if (q.length() > MEMORY_GUARD) {
await q.drain(); // Pause and let it catch up
}
}
Result: 3.5x faster Redis writes
Automatic Worker Scaling
Dynamic scaling based on queue depth:
// Monitor queue metrics every 30 seconds
setInterval(async () => {
const queue = new Queue("etl", { connection: redis });
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const queueDepth = waiting + active;
const currentWorkers = await getWorkerCount();
// Scale up: add workers when queue is backing up
if (queueDepth > 10000 && currentWorkers < 200) {
const targetWorkers = Math.min(Math.ceil(queueDepth / 100), 200);
await scaleWorkers(targetWorkers);
}
// Scale down: remove workers when queue is empty
if (queueDepth < 100 && currentWorkers > 10) {
await scaleWorkers(10); // Keep minimum 10 workers
}
}, 30000);
Scaling Strategy:
- Scale up: 1 worker per 100 pending jobs (max 200)
- Scale down: Drop to 10 workers when queue < 100 jobs
- Cost optimization: Only pay for workers when needed
- Response time: 30-second polling interval
Result: 60% cost reduction during off-peak hours
Production Metrics
- 10M+ records processed daily
- 30-45 minutes total pipeline time
- 99.9% success rate with retry + DLQ
- 200 concurrent workers at peak
- Zero data loss (DLQ catches all failures)
Design Patterns Used
- Factory Pattern: Create appropriate producer based on ETL type
- Strategy Pattern: Select database repository at runtime
- Template Method: Define algorithm skeleton in base class
Critical Bug We Found
QueueEvents resource leak:
// ā BEFORE: Memory leak
const queueEvents = new QueueEvents('etl', {...});
queueEvents.on('drained', callback);
// Never closed! 14 tables = 14 hanging connections
// ā
AFTER: Proper cleanup
try {
await new Promise((resolve) => {
queueEvents.on('drained', resolve);
});
} finally {
queueEvents.removeAllListeners();
await queueEvents.close(); // Critical!
}
Next Steps to Develop This Note
- Add complete code examples for producer service
- Document worker configuration and scaling strategy
- Add monitoring and observability setup
- Create architecture diagrams
- Document retry strategy and DLQ handling
- Add performance benchmarks and optimization tips
- Create open-source boilerplate repository
Technology Stack
| Component | Technology | Purpose |
|---|---|---|
| Framework | NestJS | DI, modularity |
| Queue | BullMQ + Redis | Job distribution, retry |
| Databases | MySQL + MongoDB | Source and destination |
| Scaling | Docker Compose | 200 worker replicas |
| Monitoring | Pino Logger | Structured logging |