Workers
Bunty workers extend the same dependency injection and configuration systems beyond the HTTP layer. Build background tasks, scheduled jobs, and headless services using the same service architecture as your HTTP APIs.
Concept
Workers are stand-alone application contexts created with createWorker() instead of createApp(). They share the same dependency graph, config, and lifecycle primitives.
No router. No HTTP server. Just DI, timers, queues, and business logic.
import { createWorker } from '@bunty/worker';
const worker = createWorker({
name: 'my-background-worker',
interval: '5m',
tasks: [MyWorkerTask]
});
worker.start();
Creating a Worker
Basic Worker
import { createWorker, Injectable } from '@bunty/common';
@Injectable()
class EmailCleanupWorker {
constructor(
private db: Database,
private logger: LoggerService
) {}
async run() {
this.logger.info('Starting email cleanup...');
// Delete emails older than 30 days
const deleted = await this.db
.createQueryBuilder(emailsTable)
.where('createdAt', '<', thirtyDaysAgo())
.delete();
this.logger.info(`Cleaned up ${deleted} old emails`);
}
}
// Create and start worker
const worker = createWorker({
name: 'email-cleanup',
interval: '1d', // Run daily
tasks: [EmailCleanupWorker]
});
worker.start();
Amazon Inventory Sync Worker
Real-world example: Syncing inventory from Amazon MWS:
import { createWorker, Injectable } from '@bunty/common';
import { AmazonAPI } from '@bunty/integrations';
@Injectable()
class AmazonInventoryWorker {
constructor(
private amazon: AmazonAPI,
private inventory: InventoryService,
private logger: LoggerService
) {}
async run() {
try {
this.logger.info('Fetching Amazon inventory...');
// Fetch products from Amazon MWS
const products = await this.amazon.fetchInventory();
this.logger.info(`Found ${products.length} products`);
// Sync each product to our database
for (const product of products) {
await this.inventory.syncProduct({
sku: product.SellerSKU,
quantity: product.Quantity,
price: product.Price,
title: product.Title,
asin: product.ASIN,
});
}
this.logger.info('Amazon inventory sync completed');
} catch (error) {
this.logger.error('Amazon sync failed', error);
throw error;
}
}
}
// Worker entry
const worker = createWorker({
name: 'amazon-inventory',
interval: '15m', // Sync every 15 minutes
tasks: [AmazonInventoryWorker],
});
worker.start();
Worker Lifecycle
Each worker supports lifecycle hooks for initialization, execution, and cleanup:
Lifecycle Hooks
@Injectable()
class DatabaseMaintenanceWorker {
private connection: DatabaseConnection;
// Called once before the first run
async onInit() {
this.logger.info('Initializing worker...');
this.connection = await this.db.connect();
await this.loadConfig();
}
// Main task - called on schedule
async run() {
this.logger.info('Running database maintenance...');
await this.vacuumTables();
await this.analyzeQueries();
await this.rebuildIndexes();
this.logger.info('Maintenance complete');
}
// Called on graceful shutdown
async onShutdown() {
this.logger.info('Shutting down worker...');
await this.connection.close();
await this.flushLogs();
}
private async vacuumTables() { /* ... */ }
private async analyzeQueries() { /* ... */ }
private async rebuildIndexes() { /* ... */ }
}
Lifecycle Flow
1. Worker Created
β
2. onInit() called
β
3. run() called (on schedule)
β
4. run() called (repeat)
β
5. Shutdown signal received
β
6. onShutdown() called
β
7. Worker stopped
Scheduling Options
Workers support multiple scheduling strategies:
Interval-Based
Run every N minutes/hours/days:
// Every 5 minutes
createWorker({
name: 'cache-refresh',
interval: '5m',
tasks: [CacheRefreshWorker]
});
// Every hour
createWorker({
name: 'report-generator',
interval: '1h',
tasks: [ReportWorker]
});
// Every day
createWorker({
name: 'daily-summary',
interval: '1d',
tasks: [DailySummaryWorker]
});
// Using milliseconds
createWorker({
name: 'fast-poll',
interval: 30000, // 30 seconds
tasks: [PollWorker]
});
Cron-Based
Use cron expressions for precise scheduling:
// Every hour at minute 0
createWorker({
name: 'hourly-task',
cron: '0 * * * *',
tasks: [HourlyWorker]
});
// Every day at midnight
createWorker({
name: 'nightly-backup',
cron: '0 0 * * *',
tasks: [BackupWorker]
});
// Every Monday at 9 AM
createWorker({
name: 'weekly-report',
cron: '0 9 * * 1',
tasks: [WeeklyReportWorker]
});
// Every 15 minutes
createWorker({
name: 'frequent-sync',
cron: '*/15 * * * *',
tasks: [SyncWorker]
});
Manual Trigger
Run workers on-demand:
const worker = createWorker({
name: 'manual-worker',
tasks: [ManualTaskWorker]
});
// Start the worker but don't schedule
worker.start();
// Trigger manually when needed
await worker.runOnce();
// Trigger from HTTP endpoint
app.post('/admin/trigger-sync', async (req, res) => {
await worker.runOnce();
return res.json({ status: 'triggered' });
});
Integration with Services
Workers reuse your applicationβs service layer seamlessly:
Shared Container
import { createApp, createWorker } from '@bunty/common';
// Create HTTP app
const app = createApp();
// Register shared services
app.container.register(Database);
app.container.register(CacheService);
app.container.register(EmailService);
// Create worker with shared container
const worker = createWorker({
name: 'background-jobs',
interval: '10m',
container: app.container, // Share DI container
tasks: [BackgroundJobWorker]
});
// Both use the same Database, Cache, and Email services
app.listen(3000);
worker.start();
Consistent Business Logic
// Service used by both HTTP and workers
@Injectable()
class OrderService {
constructor(
private db: Database,
private email: EmailService
) {}
async processOrder(orderId: number) {
const order = await this.db.getOrder(orderId);
await this.validateOrder(order);
await this.chargeCustomer(order);
await this.email.sendConfirmation(order);
return order;
}
}
// HTTP endpoint uses OrderService
app.post('/api/orders', async (req, res) => {
const orderService = inject(OrderService);
const order = await orderService.processOrder(req.body);
return res.json(order);
});
// Worker uses same OrderService
@Injectable()
class FailedOrderRetryWorker {
constructor(private orderService: OrderService) {}
async run() {
const failedOrders = await this.getFailedOrders();
for (const order of failedOrders) {
await this.orderService.processOrder(order.id);
}
}
}
Common Use Cases
1. External Data Sync
@Injectable()
class ShopifySyncWorker {
constructor(
private shopify: ShopifyAPI,
private products: ProductService
) {}
async run() {
// Sync products from Shopify
const products = await this.shopify.getProducts();
for (const product of products) {
await this.products.upsert({
externalId: product.id,
title: product.title,
price: product.price,
inventory: product.inventory_quantity
});
}
}
}
createWorker({
name: 'shopify-sync',
interval: '30m',
tasks: [ShopifySyncWorker]
}).start();
2. Queue Processing
@Injectable()
class EmailQueueWorker {
constructor(
private queue: QueueService,
private email: EmailService
) {}
async run() {
// Process pending emails
const jobs = await this.queue.getPending('emails', 50);
for (const job of jobs) {
try {
await this.email.send(job.data);
await this.queue.markComplete(job.id);
} catch (error) {
await this.queue.markFailed(job.id, error);
}
}
}
}
createWorker({
name: 'email-queue',
interval: '1m', // Process every minute
tasks: [EmailQueueWorker]
}).start();
3. Data Cleanup
@Injectable()
class DataCleanupWorker {
constructor(private db: Database) {}
async run() {
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
// Delete old sessions
await this.db
.createQueryBuilder(sessionsTable)
.where('expiresAt', '<', thirtyDaysAgo)
.delete();
// Archive old orders
const oldOrders = await this.db
.createQueryBuilder(ordersTable)
.where('createdAt', '<', thirtyDaysAgo)
.where('status', '=', 'completed')
.findMany();
for (const order of oldOrders) {
await this.archiveOrder(order);
}
}
}
createWorker({
name: 'cleanup',
cron: '0 2 * * *', // Run at 2 AM daily
tasks: [DataCleanupWorker]
}).start();
4. Report Generation
@Injectable()
class DailyReportWorker {
constructor(
private analytics: AnalyticsService,
private email: EmailService
) {}
async run() {
// Generate daily report
const report = await this.analytics.generateDailyReport({
date: new Date(),
metrics: ['sales', 'users', 'revenue']
});
// Send to stakeholders
await this.email.send({
to: 'team@company.com',
subject: `Daily Report - ${formatDate(new Date())}`,
html: this.renderReport(report)
});
}
}
createWorker({
name: 'daily-report',
cron: '0 8 * * *', // 8 AM daily
tasks: [DailyReportWorker]
}).start();
5. Cache Warming
@Injectable()
class CacheWarmingWorker {
constructor(
private cache: CacheService,
private products: ProductService
) {}
async run() {
// Pre-load popular products into cache
const popular = await this.products.getPopular(100);
for (const product of popular) {
await this.cache.set(
`product:${product.id}`,
product,
{ ttl: 3600 } // 1 hour
);
}
}
}
createWorker({
name: 'cache-warming',
interval: '5m',
tasks: [CacheWarmingWorker]
}).start();
Multiple Workers
Run multiple workers in the same process:
import { createApp, createWorker } from '@bunty/common';
const app = createApp();
// Start HTTP server
app.listen(3000);
// Background job processor
createWorker({
name: 'job-processor',
interval: '1m',
container: app.container,
tasks: [JobProcessorWorker]
}).start();
// External sync
createWorker({
name: 'data-sync',
interval: '15m',
container: app.container,
tasks: [DataSyncWorker]
}).start();
// Daily reports
createWorker({
name: 'reports',
cron: '0 9 * * *',
container: app.container,
tasks: [ReportWorker]
}).start();
// Cleanup
createWorker({
name: 'cleanup',
cron: '0 2 * * *',
container: app.container,
tasks: [CleanupWorker]
}).start();
Error Handling
Handle errors gracefully in workers:
@Injectable()
class ResilientWorker {
private retries = 0;
private maxRetries = 3;
constructor(
private logger: LoggerService,
private alerts: AlertService
) {}
async run() {
try {
await this.performTask();
this.retries = 0; // Reset on success
} catch (error) {
this.retries++;
this.logger.error(`Worker failed (attempt ${this.retries})`, error);
if (this.retries >= this.maxRetries) {
await this.alerts.send({
severity: 'critical',
message: `Worker failed after ${this.maxRetries} attempts`,
error
});
this.retries = 0; // Reset for next cycle
}
}
}
private async performTask() {
// Task implementation
}
}
Worker Patterns
Circuit Breaker
@Injectable()
class CircuitBreakerWorker {
private failures = 0;
private isOpen = false;
async run() {
if (this.isOpen) {
this.logger.warn('Circuit breaker is open, skipping run');
return;
}
try {
await this.task();
this.failures = 0;
} catch (error) {
this.failures++;
if (this.failures >= 5) {
this.isOpen = true;
setTimeout(() => {
this.isOpen = false;
this.failures = 0;
}, 60000); // Reset after 1 minute
}
throw error;
}
}
}
Batch Processing
@Injectable()
class BatchProcessorWorker {
async run() {
const batchSize = 100;
let offset = 0;
let hasMore = true;
while (hasMore) {
const items = await this.db
.createQueryBuilder(itemsTable)
.limit(batchSize)
.offset(offset)
.findMany();
if (items.length === 0) {
hasMore = false;
break;
}
await this.processBatch(items);
offset += batchSize;
}
}
private async processBatch(items: Item[]) {
await Promise.all(items.map(item => this.processItem(item)));
}
}
Best Practices
β Do
- Share containers between app and workers for consistency
- Use lifecycle hooks for proper initialization/cleanup
- Implement error handling and retries
- Log worker execution for monitoring
- Use cron for precise scheduling
- Keep workers focused on one task
- Make workers idempotent (safe to run multiple times)
β Donβt
- Store state between runs (use database/cache)
- Block the main thread with long-running sync operations
- Forget to handle errors
- Run CPU-intensive tasks without consideration
- Skip logging and monitoring
- Create circular dependencies with HTTP layer
- Ignore worker failures silently
π Workers Summary
Bunty workers extend the same dependency injection and configuration systems beyond the HTTP layer. A worker runs as an independent application context that executes recurring or one-off background tasks. Workers share the same service container, enabling full reuse of existing business logic without an HTTP server. This design supports modular monoliths with long-running background processes and cloud-ready job scheduling.
Next Steps
- Learn about Dependency Injection for service architecture
- Build HTTP APIs alongside workers
- Understand Core Concepts and design patterns