📦
Bunty

Worker Layer

The Worker Layer handles background tasks using createWorker(). It attaches to the Application Layer’s DI container, giving your workers access to all registered services while providing scheduling, lifecycle management, and error handling.

Overview

The Worker Layer provides:

  • Scheduling - Interval-based and cron-based execution
  • 🔄 Lifecycle - Init, run, and shutdown hooks
  • 🔌 Service Access - Full DI container integration
  • 📝 Logging - Built-in structured logging
  • 🛡️ Error Handling - Graceful failure and retry logic
  • 🎯 Manual Triggers - On-demand execution

Creating a Worker

Basic Worker

import { createApp, Injectable } from '@bunty/common';
import { createWorker } from '@bunty/worker';

@Injectable()
class CleanupWorker {
    async run() {
        console.log('Running cleanup task...');
        // Task logic here
    }
}

const app = createApp({
    name: 'worker-app',
    providers: [CleanupWorker, DatabaseService],
});

const worker = createWorker({
    app,
    interval: '5m',  // Run every 5 minutes
    tasks: [CleanupWorker],
});

await app.start();
await worker.start();

Terminal output:

[2025/10/30 19:53:14] [INFO] [Worker] Worker starting with interval: 5m
[2025/10/30 19:53:15] [SUCCESS] [Worker] Worker started successfully
[2025/10/30 19:53:15] [INFO] [CleanupWorker] Running cleanup task…

With Lifecycle Hooks

import { Injectable, Logger } from '@bunty/common';

@Injectable()
class DataSyncWorker {
    private logger = new Logger('DataSyncWorker');
    
    constructor(
        private source: DataSourceService,
        private storage: StorageService
    ) {}
    
    async onInit() {
        this.logger.info('Worker initializing...');
        // Initialize connections, load config
    }
    
    async run() {
        this.logger.info('Starting data sync...');
        
        const records = await this.source.fetchRecords();
        this.logger.debug('Fetched records:', records.length);
        
        await this.storage.saveBatch(records);
        this.logger.success('Sync completed');
    }
    
    async onShutdown() {
        this.logger.warn('Worker shutting down...');
        // Close connections, cleanup
    }
}

const worker = createWorker({
    app,
    interval: '15m',
    tasks: [DataSyncWorker],
});

Scheduling Options

Interval-Based

Run tasks at regular intervals:

// Every 5 minutes
createWorker({
    app,
    interval: '5m',
    tasks: [MyWorker],
});

// Every hour
createWorker({
    app,
    interval: '1h',
    tasks: [HourlyWorker],
});

// Every day
createWorker({
    app,
    interval: '1d',
    tasks: [DailyWorker],
});

// Using milliseconds
createWorker({
    app,
    interval: 30000,  // 30 seconds
    tasks: [FastWorker],
});

Cron-Based

Use cron expressions for precise timing:

// Every hour at minute 0
createWorker({
    app,
    cron: '0 * * * *',
    tasks: [HourlyReportWorker],
});

// Every day at midnight
createWorker({
    app,
    cron: '0 0 * * *',
    tasks: [DailyBackupWorker],
});

// Every Monday at 9 AM
createWorker({
    app,
    cron: '0 9 * * 1',
    tasks: [WeeklyReportWorker],
});

// Every 15 minutes
createWorker({
    app,
    cron: '*/15 * * * *',
    tasks: [FrequentSyncWorker],
});

Manual Execution

Run workers on-demand:

const worker = createWorker({
    app,
    tasks: [ManualWorker],
});

await worker.start();

// Trigger manually
await worker.runOnce();

// Trigger from HTTP endpoint
http.post('/admin/trigger-sync', async (req, res) => {
    await worker.runOnce();
    return res.json({ status: 'triggered' });
});

Worker Patterns

Data Synchronization

Sync data from external sources:

import { Injectable, Logger } from '@bunty/common';

@Injectable()
class ShopifySyncWorker {
    private logger = new Logger('ShopifySync');
    
    constructor(
        private shopify: ShopifyAPI,
        private products: ProductService
    ) {}
    
    async run() {
        this.logger.info('Starting Shopify sync...');
        
        try {
            // Fetch products from Shopify
            const shopifyProducts = await this.shopify.getProducts();
            this.logger.debug('Fetched products:', shopifyProducts.length);
            
            // Sync to database
            for (const product of shopifyProducts) {
                await this.products.upsert({
                    externalId: product.id,
                    title: product.title,
                    price: product.price,
                    inventory: product.inventory_quantity,
                });
            }
            
            this.logger.success('Sync completed:', shopifyProducts.length);
        } catch (error) {
            this.logger.error('Sync failed:', error);
            throw error;
        }
    }
}

createWorker({
    app,
    interval: '30m',  // Sync every 30 minutes
    tasks: [ShopifySyncWorker],
}).start();

Terminal output:

[2025/10/30 19:53:14] [INFO] [ShopifySync] Starting Shopify sync…
[2025/10/30 19:53:15] [DEBUG] [ShopifySync] Fetched products: 1247
[2025/10/30 19:53:22] [SUCCESS] [ShopifySync] Sync completed: 1247

Queue Processing

Process background job queues:

@Injectable()
class EmailQueueWorker {
    private logger = new Logger('EmailQueue');
    
    constructor(
        private queue: QueueService,
        private email: EmailService
    ) {}
    
    async run() {
        this.logger.info('Processing email queue...');
        
        // Get pending emails (batch of 50)
        const jobs = await this.queue.getPending('emails', 50);
        this.logger.debug('Found pending emails:', jobs.length);
        
        for (const job of jobs) {
            try {
                await this.email.send(job.data);
                await this.queue.markComplete(job.id);
                this.logger.success('Email sent:', job.id);
            } catch (error) {
                this.logger.error('Email failed:', job.id, error);
                await this.queue.markFailed(job.id, error.message);
            }
        }
        
        this.logger.info('Queue processing complete');
    }
}

createWorker({
    app,
    interval: '1m',  // Process every minute
    tasks: [EmailQueueWorker],
}).start();

Data Cleanup

Clean up old or stale data:

@Injectable()
class DataCleanupWorker {
    private logger = new Logger('Cleanup');
    
    constructor(private db: DatabaseService) {}
    
    async run() {
        this.logger.info('Starting cleanup...');
        
        const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
        
        // Delete old sessions
        const deletedSessions = await this.db
            .createQueryBuilder(sessionsTable)
            .where('expiresAt', '<', thirtyDaysAgo)
            .delete();
        
        this.logger.success('Deleted sessions:', deletedSessions);
        
        // Archive old orders
        const oldOrders = await this.db
            .createQueryBuilder(ordersTable)
            .where('createdAt', '<', thirtyDaysAgo)
            .where('status', '=', 'completed')
            .findMany();
        
        for (const order of oldOrders) {
            await this.archiveOrder(order);
        }
        
        this.logger.success('Archived orders:', oldOrders.length);
    }
    
    private async archiveOrder(order: any) {
        // Archive logic
    }
}

createWorker({
    app,
    cron: '0 2 * * *',  // Run at 2 AM daily
    tasks: [DataCleanupWorker],
}).start();

Report Generation

Generate and send reports:

@Injectable()
class DailyReportWorker {
    private logger = new Logger('DailyReport');
    
    constructor(
        private analytics: AnalyticsService,
        private email: EmailService
    ) {}
    
    async run() {
        this.logger.info('Generating daily report...');
        
        const report = await this.analytics.generateDailyReport({
            date: new Date(),
            metrics: ['sales', 'users', 'revenue'],
        });
        
        this.logger.debug('Report data:', report);
        
        await this.email.send({
            to: 'team@company.com',
            subject: `Daily Report - ${this.formatDate(new Date())}`,
            html: this.renderReport(report),
        });
        
        this.logger.success('Report sent successfully');
    }
    
    private formatDate(date: Date): string {
        return date.toISOString().split('T')[0];
    }
    
    private renderReport(report: any): string {
        return `<h1>Daily Report</h1>...`;
    }
}

createWorker({
    app,
    cron: '0 8 * * *',  // 8 AM daily
    tasks: [DailyReportWorker],
}).start();

Cache Warming

Pre-load data into cache:

@Injectable()
class CacheWarmingWorker {
    private logger = new Logger('CacheWarming');
    
    constructor(
        private cache: CacheService,
        private products: ProductService
    ) {}
    
    async run() {
        this.logger.info('Warming cache...');
        
        // Load popular products
        const popular = await this.products.getPopular(100);
        
        for (const product of popular) {
            await this.cache.set(
                `product:${product.id}`,
                product,
                { ttl: 3600 }  // 1 hour
            );
        }
        
        this.logger.success('Cache warmed with products:', popular.length);
    }
}

createWorker({
    app,
    interval: '5m',  // Refresh every 5 minutes
    tasks: [CacheWarmingWorker],
}).start();

Using Services

Workers have full access to the DI container:

@Injectable()
class OrderProcessorWorker {
    constructor(
        private orders: OrderService,
        private payment: PaymentService,
        private email: EmailService,
        private logger: Logger
    ) {
        this.logger.setName('OrderProcessor');
    }
    
    async run() {
        this.logger.info('Processing pending orders...');
        
        // Get pending orders
        const pending = await this.orders.findPending();
        this.logger.debug('Found pending orders:', pending.length);
        
        for (const order of pending) {
            try {
                // Process payment
                await this.payment.charge(order);
                
                // Update order status
                await this.orders.update(order.id, { status: 'paid' });
                
                // Send confirmation
                await this.email.sendOrderConfirmation(order);
                
                this.logger.success('Order processed:', order.id);
            } catch (error) {
                this.logger.error('Order failed:', order.id, error);
                await this.orders.update(order.id, { 
                    status: 'failed',
                    error: error.message,
                });
            }
        }
    }
}

Error Handling

Handle errors gracefully in workers:

Try-Catch Pattern

@Injectable()
class ResilientWorker {
    private logger = new Logger('ResilientWorker');
    
    async run() {
        try {
            await this.performTask();
            this.logger.success('Task completed successfully');
        } catch (error) {
            this.logger.error('Task failed:', error);
            
            // Don't throw - let worker continue
            // Or re-throw to stop worker
            // throw error;
        }
    }
    
    private async performTask() {
        // Task implementation
    }
}

Retry Logic

@Injectable()
class RetryWorker {
    private logger = new Logger('RetryWorker');
    private retries = 0;
    private maxRetries = 3;
    
    async run() {
        try {
            await this.performTask();
            this.retries = 0;  // Reset on success
        } catch (error) {
            this.retries++;
            this.logger.error(`Task failed (attempt ${this.retries}):`, error);
            
            if (this.retries >= this.maxRetries) {
                this.logger.error('Max retries reached, alerting team');
                await this.sendAlert(error);
                this.retries = 0;  // Reset for next cycle
            }
        }
    }
    
    private async sendAlert(error: Error) {
        const alertService = inject(AlertService);
        await alertService.send({
            severity: 'critical',
            message: `Worker failed after ${this.maxRetries} attempts`,
            error: error.message,
        });
    }
}

Circuit Breaker

@Injectable()
class CircuitBreakerWorker {
    private logger = new Logger('CircuitBreaker');
    private failures = 0;
    private isOpen = false;
    
    async run() {
        if (this.isOpen) {
            this.logger.warn('Circuit breaker is open, skipping run');
            return;
        }
        
        try {
            await this.task();
            this.failures = 0;  // Reset on success
        } catch (error) {
            this.failures++;
            this.logger.error('Task failed:', error);
            
            if (this.failures >= 5) {
                this.isOpen = true;
                this.logger.error('Circuit breaker opened');
                
                // Reset after 1 minute
                setTimeout(() => {
                    this.isOpen = false;
                    this.failures = 0;
                    this.logger.info('Circuit breaker reset');
                }, 60000);
            }
            
            throw error;
        }
    }
}

Multiple Workers

Run multiple workers in one application:

import { createApp } from '@bunty/common';
import { createWorker } from '@bunty/worker';

const app = createApp({
    name: 'multi-worker-app',
    providers: [
        // Services
        DatabaseService,
        CacheService,
        EmailService,
        
        // Workers
        DataSyncWorker,
        EmailQueueWorker,
        CleanupWorker,
        ReportWorker,
    ],
});

// Data sync - every 15 minutes
const dataSyncWorker = createWorker({
    app,
    interval: '15m',
    tasks: [DataSyncWorker],
});

// Email queue - every minute
const emailWorker = createWorker({
    app,
    interval: '1m',
    tasks: [EmailQueueWorker],
});

// Cleanup - daily at 2 AM
const cleanupWorker = createWorker({
    app,
    cron: '0 2 * * *',
    tasks: [CleanupWorker],
});

// Reports - daily at 8 AM
const reportWorker = createWorker({
    app,
    cron: '0 8 * * *',
    tasks: [ReportWorker],
});

// Start all
await app.start();
await dataSyncWorker.start();
await emailWorker.start();
await cleanupWorker.start();
await reportWorker.start();

Terminal output:

[2025/10/30 19:53:14] [INFO] [App] Application starting…
[2025/10/30 19:53:15] [SUCCESS] [Worker] DataSyncWorker started (interval: 15m)
[2025/10/30 19:53:15] [SUCCESS] [Worker] EmailQueueWorker started (interval: 1m)
[2025/10/30 19:53:15] [SUCCESS] [Worker] CleanupWorker started (cron: 0 2 * * *)
[2025/10/30 19:53:15] [SUCCESS] [Worker] ReportWorker started (cron: 0 8 * * *)

Graceful Shutdown

Handle shutdown signals:

const worker = createWorker({
    app,
    interval: '5m',
    tasks: [MyWorker],
});

await app.start();
await worker.start();

// Handle shutdown
process.on('SIGINT', async () => {
    console.log('Received SIGINT, shutting down...');
    await worker.stop();
    await app.shutdown();
    process.exit(0);
});

process.on('SIGTERM', async () => {
    console.log('Received SIGTERM, shutting down...');
    await worker.stop();
    await app.shutdown();
    process.exit(0);
});

Best Practices

✅ Do

  • Use lifecycle hooks for setup/teardown
  • Implement error handling and retries
  • Log all worker execution
  • Keep workers focused on one task
  • Make workers idempotent (safe to run multiple times)
  • Handle shutdown signals gracefully
  • Use appropriate scheduling intervals
  • Monitor worker health and failures

❌ Don’t

  • Store state between runs (use database/cache)
  • Block the event loop with long operations
  • Forget to handle errors
  • Run CPU-intensive tasks without consideration
  • Skip logging and monitoring
  • Ignore worker failures silently
  • Create circular dependencies
  • Mix HTTP concerns with worker logic

⚙️ Worker Layer Summary

The Worker Layer handles background tasks using createWorker(). It provides scheduled execution (interval or cron), lifecycle hooks, and full access to the Application Layer’s DI container. Build data sync jobs, queue processors, cleanup tasks, report generators, or any background operation with the same services and configuration as your HTTP layer.

Next Steps

Have questions? Join our Discord community
Found an issue? Edit this page on GitHub