Worker Layer
The Worker Layer handles background tasks using createWorker(). It attaches to the Application Layer’s DI container, giving your workers access to all registered services while providing scheduling, lifecycle management, and error handling.
Overview
The Worker Layer provides:
- ⏰ Scheduling - Interval-based and cron-based execution
- 🔄 Lifecycle - Init, run, and shutdown hooks
- 🔌 Service Access - Full DI container integration
- 📝 Logging - Built-in structured logging
- 🛡️ Error Handling - Graceful failure and retry logic
- 🎯 Manual Triggers - On-demand execution
Creating a Worker
Basic Worker
import { createApp, Injectable } from '@bunty/common';
import { createWorker } from '@bunty/worker';
@Injectable()
class CleanupWorker {
async run() {
console.log('Running cleanup task...');
// Task logic here
}
}
const app = createApp({
name: 'worker-app',
providers: [CleanupWorker, DatabaseService],
});
const worker = createWorker({
app,
interval: '5m', // Run every 5 minutes
tasks: [CleanupWorker],
});
await app.start();
await worker.start();
Terminal output:
With Lifecycle Hooks
import { Injectable, Logger } from '@bunty/common';
@Injectable()
class DataSyncWorker {
private logger = new Logger('DataSyncWorker');
constructor(
private source: DataSourceService,
private storage: StorageService
) {}
async onInit() {
this.logger.info('Worker initializing...');
// Initialize connections, load config
}
async run() {
this.logger.info('Starting data sync...');
const records = await this.source.fetchRecords();
this.logger.debug('Fetched records:', records.length);
await this.storage.saveBatch(records);
this.logger.success('Sync completed');
}
async onShutdown() {
this.logger.warn('Worker shutting down...');
// Close connections, cleanup
}
}
const worker = createWorker({
app,
interval: '15m',
tasks: [DataSyncWorker],
});
Scheduling Options
Interval-Based
Run tasks at regular intervals:
// Every 5 minutes
createWorker({
app,
interval: '5m',
tasks: [MyWorker],
});
// Every hour
createWorker({
app,
interval: '1h',
tasks: [HourlyWorker],
});
// Every day
createWorker({
app,
interval: '1d',
tasks: [DailyWorker],
});
// Using milliseconds
createWorker({
app,
interval: 30000, // 30 seconds
tasks: [FastWorker],
});
Cron-Based
Use cron expressions for precise timing:
// Every hour at minute 0
createWorker({
app,
cron: '0 * * * *',
tasks: [HourlyReportWorker],
});
// Every day at midnight
createWorker({
app,
cron: '0 0 * * *',
tasks: [DailyBackupWorker],
});
// Every Monday at 9 AM
createWorker({
app,
cron: '0 9 * * 1',
tasks: [WeeklyReportWorker],
});
// Every 15 minutes
createWorker({
app,
cron: '*/15 * * * *',
tasks: [FrequentSyncWorker],
});
Manual Execution
Run workers on-demand:
const worker = createWorker({
app,
tasks: [ManualWorker],
});
await worker.start();
// Trigger manually
await worker.runOnce();
// Trigger from HTTP endpoint
http.post('/admin/trigger-sync', async (req, res) => {
await worker.runOnce();
return res.json({ status: 'triggered' });
});
Worker Patterns
Data Synchronization
Sync data from external sources:
import { Injectable, Logger } from '@bunty/common';
@Injectable()
class ShopifySyncWorker {
private logger = new Logger('ShopifySync');
constructor(
private shopify: ShopifyAPI,
private products: ProductService
) {}
async run() {
this.logger.info('Starting Shopify sync...');
try {
// Fetch products from Shopify
const shopifyProducts = await this.shopify.getProducts();
this.logger.debug('Fetched products:', shopifyProducts.length);
// Sync to database
for (const product of shopifyProducts) {
await this.products.upsert({
externalId: product.id,
title: product.title,
price: product.price,
inventory: product.inventory_quantity,
});
}
this.logger.success('Sync completed:', shopifyProducts.length);
} catch (error) {
this.logger.error('Sync failed:', error);
throw error;
}
}
}
createWorker({
app,
interval: '30m', // Sync every 30 minutes
tasks: [ShopifySyncWorker],
}).start();
Terminal output:
Queue Processing
Process background job queues:
@Injectable()
class EmailQueueWorker {
private logger = new Logger('EmailQueue');
constructor(
private queue: QueueService,
private email: EmailService
) {}
async run() {
this.logger.info('Processing email queue...');
// Get pending emails (batch of 50)
const jobs = await this.queue.getPending('emails', 50);
this.logger.debug('Found pending emails:', jobs.length);
for (const job of jobs) {
try {
await this.email.send(job.data);
await this.queue.markComplete(job.id);
this.logger.success('Email sent:', job.id);
} catch (error) {
this.logger.error('Email failed:', job.id, error);
await this.queue.markFailed(job.id, error.message);
}
}
this.logger.info('Queue processing complete');
}
}
createWorker({
app,
interval: '1m', // Process every minute
tasks: [EmailQueueWorker],
}).start();
Data Cleanup
Clean up old or stale data:
@Injectable()
class DataCleanupWorker {
private logger = new Logger('Cleanup');
constructor(private db: DatabaseService) {}
async run() {
this.logger.info('Starting cleanup...');
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
// Delete old sessions
const deletedSessions = await this.db
.createQueryBuilder(sessionsTable)
.where('expiresAt', '<', thirtyDaysAgo)
.delete();
this.logger.success('Deleted sessions:', deletedSessions);
// Archive old orders
const oldOrders = await this.db
.createQueryBuilder(ordersTable)
.where('createdAt', '<', thirtyDaysAgo)
.where('status', '=', 'completed')
.findMany();
for (const order of oldOrders) {
await this.archiveOrder(order);
}
this.logger.success('Archived orders:', oldOrders.length);
}
private async archiveOrder(order: any) {
// Archive logic
}
}
createWorker({
app,
cron: '0 2 * * *', // Run at 2 AM daily
tasks: [DataCleanupWorker],
}).start();
Report Generation
Generate and send reports:
@Injectable()
class DailyReportWorker {
private logger = new Logger('DailyReport');
constructor(
private analytics: AnalyticsService,
private email: EmailService
) {}
async run() {
this.logger.info('Generating daily report...');
const report = await this.analytics.generateDailyReport({
date: new Date(),
metrics: ['sales', 'users', 'revenue'],
});
this.logger.debug('Report data:', report);
await this.email.send({
to: 'team@company.com',
subject: `Daily Report - ${this.formatDate(new Date())}`,
html: this.renderReport(report),
});
this.logger.success('Report sent successfully');
}
private formatDate(date: Date): string {
return date.toISOString().split('T')[0];
}
private renderReport(report: any): string {
return `<h1>Daily Report</h1>...`;
}
}
createWorker({
app,
cron: '0 8 * * *', // 8 AM daily
tasks: [DailyReportWorker],
}).start();
Cache Warming
Pre-load data into cache:
@Injectable()
class CacheWarmingWorker {
private logger = new Logger('CacheWarming');
constructor(
private cache: CacheService,
private products: ProductService
) {}
async run() {
this.logger.info('Warming cache...');
// Load popular products
const popular = await this.products.getPopular(100);
for (const product of popular) {
await this.cache.set(
`product:${product.id}`,
product,
{ ttl: 3600 } // 1 hour
);
}
this.logger.success('Cache warmed with products:', popular.length);
}
}
createWorker({
app,
interval: '5m', // Refresh every 5 minutes
tasks: [CacheWarmingWorker],
}).start();
Using Services
Workers have full access to the DI container:
@Injectable()
class OrderProcessorWorker {
constructor(
private orders: OrderService,
private payment: PaymentService,
private email: EmailService,
private logger: Logger
) {
this.logger.setName('OrderProcessor');
}
async run() {
this.logger.info('Processing pending orders...');
// Get pending orders
const pending = await this.orders.findPending();
this.logger.debug('Found pending orders:', pending.length);
for (const order of pending) {
try {
// Process payment
await this.payment.charge(order);
// Update order status
await this.orders.update(order.id, { status: 'paid' });
// Send confirmation
await this.email.sendOrderConfirmation(order);
this.logger.success('Order processed:', order.id);
} catch (error) {
this.logger.error('Order failed:', order.id, error);
await this.orders.update(order.id, {
status: 'failed',
error: error.message,
});
}
}
}
}
Error Handling
Handle errors gracefully in workers:
Try-Catch Pattern
@Injectable()
class ResilientWorker {
private logger = new Logger('ResilientWorker');
async run() {
try {
await this.performTask();
this.logger.success('Task completed successfully');
} catch (error) {
this.logger.error('Task failed:', error);
// Don't throw - let worker continue
// Or re-throw to stop worker
// throw error;
}
}
private async performTask() {
// Task implementation
}
}
Retry Logic
@Injectable()
class RetryWorker {
private logger = new Logger('RetryWorker');
private retries = 0;
private maxRetries = 3;
async run() {
try {
await this.performTask();
this.retries = 0; // Reset on success
} catch (error) {
this.retries++;
this.logger.error(`Task failed (attempt ${this.retries}):`, error);
if (this.retries >= this.maxRetries) {
this.logger.error('Max retries reached, alerting team');
await this.sendAlert(error);
this.retries = 0; // Reset for next cycle
}
}
}
private async sendAlert(error: Error) {
const alertService = inject(AlertService);
await alertService.send({
severity: 'critical',
message: `Worker failed after ${this.maxRetries} attempts`,
error: error.message,
});
}
}
Circuit Breaker
@Injectable()
class CircuitBreakerWorker {
private logger = new Logger('CircuitBreaker');
private failures = 0;
private isOpen = false;
async run() {
if (this.isOpen) {
this.logger.warn('Circuit breaker is open, skipping run');
return;
}
try {
await this.task();
this.failures = 0; // Reset on success
} catch (error) {
this.failures++;
this.logger.error('Task failed:', error);
if (this.failures >= 5) {
this.isOpen = true;
this.logger.error('Circuit breaker opened');
// Reset after 1 minute
setTimeout(() => {
this.isOpen = false;
this.failures = 0;
this.logger.info('Circuit breaker reset');
}, 60000);
}
throw error;
}
}
}
Multiple Workers
Run multiple workers in one application:
import { createApp } from '@bunty/common';
import { createWorker } from '@bunty/worker';
const app = createApp({
name: 'multi-worker-app',
providers: [
// Services
DatabaseService,
CacheService,
EmailService,
// Workers
DataSyncWorker,
EmailQueueWorker,
CleanupWorker,
ReportWorker,
],
});
// Data sync - every 15 minutes
const dataSyncWorker = createWorker({
app,
interval: '15m',
tasks: [DataSyncWorker],
});
// Email queue - every minute
const emailWorker = createWorker({
app,
interval: '1m',
tasks: [EmailQueueWorker],
});
// Cleanup - daily at 2 AM
const cleanupWorker = createWorker({
app,
cron: '0 2 * * *',
tasks: [CleanupWorker],
});
// Reports - daily at 8 AM
const reportWorker = createWorker({
app,
cron: '0 8 * * *',
tasks: [ReportWorker],
});
// Start all
await app.start();
await dataSyncWorker.start();
await emailWorker.start();
await cleanupWorker.start();
await reportWorker.start();
Terminal output:
Graceful Shutdown
Handle shutdown signals:
const worker = createWorker({
app,
interval: '5m',
tasks: [MyWorker],
});
await app.start();
await worker.start();
// Handle shutdown
process.on('SIGINT', async () => {
console.log('Received SIGINT, shutting down...');
await worker.stop();
await app.shutdown();
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Received SIGTERM, shutting down...');
await worker.stop();
await app.shutdown();
process.exit(0);
});
Best Practices
✅ Do
- Use lifecycle hooks for setup/teardown
- Implement error handling and retries
- Log all worker execution
- Keep workers focused on one task
- Make workers idempotent (safe to run multiple times)
- Handle shutdown signals gracefully
- Use appropriate scheduling intervals
- Monitor worker health and failures
❌ Don’t
- Store state between runs (use database/cache)
- Block the event loop with long operations
- Forget to handle errors
- Run CPU-intensive tasks without consideration
- Skip logging and monitoring
- Ignore worker failures silently
- Create circular dependencies
- Mix HTTP concerns with worker logic
⚙️ Worker Layer Summary
The Worker Layer handles background tasks using createWorker(). It provides scheduled execution (interval or cron), lifecycle hooks, and full access to the Application Layer’s DI container. Build data sync jobs, queue processors, cleanup tasks, report generators, or any background operation with the same services and configuration as your HTTP layer.
Next Steps
- Explore the HTTP Layer for building APIs
- Learn about the Application Layer foundation
- Understand Application Architecture overview
- See Workers for more scheduling patterns
- Check out Logging for monitoring workers