πŸ“¦
Bunty

Workers

Bunty workers extend the same dependency injection and configuration systems beyond the HTTP layer. Build background tasks, scheduled jobs, and headless services using the same service architecture as your HTTP APIs.

Concept

Workers are stand-alone application contexts created with createWorker() instead of createApp(). They share the same dependency graph, config, and lifecycle primitives.

No router. No HTTP server. Just DI, timers, queues, and business logic.

import { createWorker } from '@bunty/worker';

const worker = createWorker({
    name: 'my-background-worker',
    interval: '5m',
    tasks: [MyWorkerTask]
});

worker.start();

Creating a Worker

Basic Worker

import { createWorker, Injectable } from '@bunty/common';

@Injectable()
class EmailCleanupWorker {
    constructor(
        private db: Database,
        private logger: LoggerService
    ) {}

    async run() {
        this.logger.info('Starting email cleanup...');
        
        // Delete emails older than 30 days
        const deleted = await this.db
            .createQueryBuilder(emailsTable)
            .where('createdAt', '<', thirtyDaysAgo())
            .delete();
        
        this.logger.info(`Cleaned up ${deleted} old emails`);
    }
}

// Create and start worker
const worker = createWorker({
    name: 'email-cleanup',
    interval: '1d', // Run daily
    tasks: [EmailCleanupWorker]
});

worker.start();

Amazon Inventory Sync Worker

Real-world example: Syncing inventory from Amazon MWS:

import { createWorker, Injectable } from '@bunty/common';
import { AmazonAPI } from '@bunty/integrations';

@Injectable()
class AmazonInventoryWorker {
    constructor(
        private amazon: AmazonAPI,
        private inventory: InventoryService,
        private logger: LoggerService
    ) {}

    async run() {
        try {
            this.logger.info('Fetching Amazon inventory...');
            
            // Fetch products from Amazon MWS
            const products = await this.amazon.fetchInventory();
            
            this.logger.info(`Found ${products.length} products`);
            
            // Sync each product to our database
            for (const product of products) {
                await this.inventory.syncProduct({
                    sku: product.SellerSKU,
                    quantity: product.Quantity,
                    price: product.Price,
                    title: product.Title,
                    asin: product.ASIN,
                });
            }
            
            this.logger.info('Amazon inventory sync completed');
        } catch (error) {
            this.logger.error('Amazon sync failed', error);
            throw error;
        }
    }
}

// Worker entry
const worker = createWorker({
    name: 'amazon-inventory',
    interval: '15m', // Sync every 15 minutes
    tasks: [AmazonInventoryWorker],
});

worker.start();

Worker Lifecycle

Each worker supports lifecycle hooks for initialization, execution, and cleanup:

Lifecycle Hooks

@Injectable()
class DatabaseMaintenanceWorker {
    private connection: DatabaseConnection;

    // Called once before the first run
    async onInit() {
        this.logger.info('Initializing worker...');
        this.connection = await this.db.connect();
        await this.loadConfig();
    }

    // Main task - called on schedule
    async run() {
        this.logger.info('Running database maintenance...');
        
        await this.vacuumTables();
        await this.analyzeQueries();
        await this.rebuildIndexes();
        
        this.logger.info('Maintenance complete');
    }

    // Called on graceful shutdown
    async onShutdown() {
        this.logger.info('Shutting down worker...');
        await this.connection.close();
        await this.flushLogs();
    }

    private async vacuumTables() { /* ... */ }
    private async analyzeQueries() { /* ... */ }
    private async rebuildIndexes() { /* ... */ }
}

Lifecycle Flow

1. Worker Created
   ↓
2. onInit() called
   ↓
3. run() called (on schedule)
   ↓
4. run() called (repeat)
   ↓
5. Shutdown signal received
   ↓
6. onShutdown() called
   ↓
7. Worker stopped

Scheduling Options

Workers support multiple scheduling strategies:

Interval-Based

Run every N minutes/hours/days:

// Every 5 minutes
createWorker({
    name: 'cache-refresh',
    interval: '5m',
    tasks: [CacheRefreshWorker]
});

// Every hour
createWorker({
    name: 'report-generator',
    interval: '1h',
    tasks: [ReportWorker]
});

// Every day
createWorker({
    name: 'daily-summary',
    interval: '1d',
    tasks: [DailySummaryWorker]
});

// Using milliseconds
createWorker({
    name: 'fast-poll',
    interval: 30000, // 30 seconds
    tasks: [PollWorker]
});

Cron-Based

Use cron expressions for precise scheduling:

// Every hour at minute 0
createWorker({
    name: 'hourly-task',
    cron: '0 * * * *',
    tasks: [HourlyWorker]
});

// Every day at midnight
createWorker({
    name: 'nightly-backup',
    cron: '0 0 * * *',
    tasks: [BackupWorker]
});

// Every Monday at 9 AM
createWorker({
    name: 'weekly-report',
    cron: '0 9 * * 1',
    tasks: [WeeklyReportWorker]
});

// Every 15 minutes
createWorker({
    name: 'frequent-sync',
    cron: '*/15 * * * *',
    tasks: [SyncWorker]
});

Manual Trigger

Run workers on-demand:

const worker = createWorker({
    name: 'manual-worker',
    tasks: [ManualTaskWorker]
});

// Start the worker but don't schedule
worker.start();

// Trigger manually when needed
await worker.runOnce();

// Trigger from HTTP endpoint
app.post('/admin/trigger-sync', async (req, res) => {
    await worker.runOnce();
    return res.json({ status: 'triggered' });
});

Integration with Services

Workers reuse your application’s service layer seamlessly:

Shared Container

import { createApp, createWorker } from '@bunty/common';

// Create HTTP app
const app = createApp();

// Register shared services
app.container.register(Database);
app.container.register(CacheService);
app.container.register(EmailService);

// Create worker with shared container
const worker = createWorker({
    name: 'background-jobs',
    interval: '10m',
    container: app.container, // Share DI container
    tasks: [BackgroundJobWorker]
});

// Both use the same Database, Cache, and Email services
app.listen(3000);
worker.start();

Consistent Business Logic

// Service used by both HTTP and workers
@Injectable()
class OrderService {
    constructor(
        private db: Database,
        private email: EmailService
    ) {}

    async processOrder(orderId: number) {
        const order = await this.db.getOrder(orderId);
        await this.validateOrder(order);
        await this.chargeCustomer(order);
        await this.email.sendConfirmation(order);
        return order;
    }
}

// HTTP endpoint uses OrderService
app.post('/api/orders', async (req, res) => {
    const orderService = inject(OrderService);
    const order = await orderService.processOrder(req.body);
    return res.json(order);
});

// Worker uses same OrderService
@Injectable()
class FailedOrderRetryWorker {
    constructor(private orderService: OrderService) {}

    async run() {
        const failedOrders = await this.getFailedOrders();
        for (const order of failedOrders) {
            await this.orderService.processOrder(order.id);
        }
    }
}

Common Use Cases

1. External Data Sync

@Injectable()
class ShopifySyncWorker {
    constructor(
        private shopify: ShopifyAPI,
        private products: ProductService
    ) {}

    async run() {
        // Sync products from Shopify
        const products = await this.shopify.getProducts();
        
        for (const product of products) {
            await this.products.upsert({
                externalId: product.id,
                title: product.title,
                price: product.price,
                inventory: product.inventory_quantity
            });
        }
    }
}

createWorker({
    name: 'shopify-sync',
    interval: '30m',
    tasks: [ShopifySyncWorker]
}).start();

2. Queue Processing

@Injectable()
class EmailQueueWorker {
    constructor(
        private queue: QueueService,
        private email: EmailService
    ) {}

    async run() {
        // Process pending emails
        const jobs = await this.queue.getPending('emails', 50);
        
        for (const job of jobs) {
            try {
                await this.email.send(job.data);
                await this.queue.markComplete(job.id);
            } catch (error) {
                await this.queue.markFailed(job.id, error);
            }
        }
    }
}

createWorker({
    name: 'email-queue',
    interval: '1m', // Process every minute
    tasks: [EmailQueueWorker]
}).start();

3. Data Cleanup

@Injectable()
class DataCleanupWorker {
    constructor(private db: Database) {}

    async run() {
        const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
        
        // Delete old sessions
        await this.db
            .createQueryBuilder(sessionsTable)
            .where('expiresAt', '<', thirtyDaysAgo)
            .delete();
        
        // Archive old orders
        const oldOrders = await this.db
            .createQueryBuilder(ordersTable)
            .where('createdAt', '<', thirtyDaysAgo)
            .where('status', '=', 'completed')
            .findMany();
        
        for (const order of oldOrders) {
            await this.archiveOrder(order);
        }
    }
}

createWorker({
    name: 'cleanup',
    cron: '0 2 * * *', // Run at 2 AM daily
    tasks: [DataCleanupWorker]
}).start();

4. Report Generation

@Injectable()
class DailyReportWorker {
    constructor(
        private analytics: AnalyticsService,
        private email: EmailService
    ) {}

    async run() {
        // Generate daily report
        const report = await this.analytics.generateDailyReport({
            date: new Date(),
            metrics: ['sales', 'users', 'revenue']
        });
        
        // Send to stakeholders
        await this.email.send({
            to: 'team@company.com',
            subject: `Daily Report - ${formatDate(new Date())}`,
            html: this.renderReport(report)
        });
    }
}

createWorker({
    name: 'daily-report',
    cron: '0 8 * * *', // 8 AM daily
    tasks: [DailyReportWorker]
}).start();

5. Cache Warming

@Injectable()
class CacheWarmingWorker {
    constructor(
        private cache: CacheService,
        private products: ProductService
    ) {}

    async run() {
        // Pre-load popular products into cache
        const popular = await this.products.getPopular(100);
        
        for (const product of popular) {
            await this.cache.set(
                `product:${product.id}`,
                product,
                { ttl: 3600 } // 1 hour
            );
        }
    }
}

createWorker({
    name: 'cache-warming',
    interval: '5m',
    tasks: [CacheWarmingWorker]
}).start();

Multiple Workers

Run multiple workers in the same process:

import { createApp, createWorker } from '@bunty/common';

const app = createApp();

// Start HTTP server
app.listen(3000);

// Background job processor
createWorker({
    name: 'job-processor',
    interval: '1m',
    container: app.container,
    tasks: [JobProcessorWorker]
}).start();

// External sync
createWorker({
    name: 'data-sync',
    interval: '15m',
    container: app.container,
    tasks: [DataSyncWorker]
}).start();

// Daily reports
createWorker({
    name: 'reports',
    cron: '0 9 * * *',
    container: app.container,
    tasks: [ReportWorker]
}).start();

// Cleanup
createWorker({
    name: 'cleanup',
    cron: '0 2 * * *',
    container: app.container,
    tasks: [CleanupWorker]
}).start();

Error Handling

Handle errors gracefully in workers:

@Injectable()
class ResilientWorker {
    private retries = 0;
    private maxRetries = 3;

    constructor(
        private logger: LoggerService,
        private alerts: AlertService
    ) {}

    async run() {
        try {
            await this.performTask();
            this.retries = 0; // Reset on success
        } catch (error) {
            this.retries++;
            this.logger.error(`Worker failed (attempt ${this.retries})`, error);
            
            if (this.retries >= this.maxRetries) {
                await this.alerts.send({
                    severity: 'critical',
                    message: `Worker failed after ${this.maxRetries} attempts`,
                    error
                });
                this.retries = 0; // Reset for next cycle
            }
        }
    }

    private async performTask() {
        // Task implementation
    }
}

Worker Patterns

Circuit Breaker

@Injectable()
class CircuitBreakerWorker {
    private failures = 0;
    private isOpen = false;

    async run() {
        if (this.isOpen) {
            this.logger.warn('Circuit breaker is open, skipping run');
            return;
        }

        try {
            await this.task();
            this.failures = 0;
        } catch (error) {
            this.failures++;
            
            if (this.failures >= 5) {
                this.isOpen = true;
                setTimeout(() => {
                    this.isOpen = false;
                    this.failures = 0;
                }, 60000); // Reset after 1 minute
            }
            
            throw error;
        }
    }
}

Batch Processing

@Injectable()
class BatchProcessorWorker {
    async run() {
        const batchSize = 100;
        let offset = 0;
        let hasMore = true;

        while (hasMore) {
            const items = await this.db
                .createQueryBuilder(itemsTable)
                .limit(batchSize)
                .offset(offset)
                .findMany();

            if (items.length === 0) {
                hasMore = false;
                break;
            }

            await this.processBatch(items);
            offset += batchSize;
        }
    }

    private async processBatch(items: Item[]) {
        await Promise.all(items.map(item => this.processItem(item)));
    }
}

Best Practices

βœ… Do

  • Share containers between app and workers for consistency
  • Use lifecycle hooks for proper initialization/cleanup
  • Implement error handling and retries
  • Log worker execution for monitoring
  • Use cron for precise scheduling
  • Keep workers focused on one task
  • Make workers idempotent (safe to run multiple times)

❌ Don’t

  • Store state between runs (use database/cache)
  • Block the main thread with long-running sync operations
  • Forget to handle errors
  • Run CPU-intensive tasks without consideration
  • Skip logging and monitoring
  • Create circular dependencies with HTTP layer
  • Ignore worker failures silently

πŸ”„ Workers Summary

Bunty workers extend the same dependency injection and configuration systems beyond the HTTP layer. A worker runs as an independent application context that executes recurring or one-off background tasks. Workers share the same service container, enabling full reuse of existing business logic without an HTTP server. This design supports modular monoliths with long-running background processes and cloud-ready job scheduling.

Next Steps

Have questions? Join our Discord community
Found an issue? Edit this page on GitHub