Building a Robust AWS SQS Message Processing System in TypeScript

Building a Robust AWS SQS Message Processing System in TypeScript

AWS
SQS
TypeScript
2024-01-02

Introduction

I absolutely love building event-driven architectures, and AWS SQS is one of my favorite tools for orchestrating asynchronous workflows in TypeScript. In large or complex systems, reliable message processing can make the difference between a system that gracefully scales and one that collapses under load.

The goal of this post is to provide an approach for building a robust SQS message processing system. I’ll break down how to separate concerns—receiving messages, hydrating them, and handling business logic—and integrate error handling in a way that’s easy to maintain. Let’s dive in and explore an architecture that can scale from small apps to massive enterprise setups.

Background

Many systems rely on message buses or queues to pass events between services. AWS SQS is a simple, cost-effective choice that can handle thousands of messages per second. But as messages get more varied and your codebase expands, message processing can become tangled if the data transformation and business logic aren't clearly separated.

With the help of TypeScript’s static typing, we can ensure that our message handling logic is consistent, reliable, and easier to refactor. This post builds on a pattern I often use: abstract classes and well-defined interfaces that cleanly split out each important segment of the SQS pipeline.

Problem Statement

So why put all this effort into a robust pattern? If you have multiple services publishing data to SQS, messages may arrive in various shapes. Some could come from SNS, others from a custom event bus—potentially leading to malformed JSON, empty payloads, or partial data that can cause unexpected runtime errors.

Moreover, you need a consistent way to handle logging, error retries, dead-letter queues, and concurrency. Without a solid structure, your code can turn into a web of conditionals and scattered try/catch blocks—which is no fun to debug.

Detailed Discussion

To avoid a cluttered or untestable codebase, I break the flow into four key steps:

  1. IReceiveMessage: Receives each raw SQS message from the queue.
  2. IHydrateMessage: Normalizes and parses the raw data into a strongly typed object or null if the data doesn’t conform.
  3. IHandleMessage: Performs the actual business logic or database updates on your typed data.
  4. BaseHandler: A single abstract layer that orchestrates hydration, handling, logging, and robust error processing.

Below is a code snippet showcasing how I typically implement these components. It references aws-sdk, winston for logging, and serialize-error for safer error serialization.

import { Message } from 'aws-sdk/clients/sqs'; import { AxiosError } from 'axios'; import { serializeError } from 'serialize-error'; import { Logger } from 'winston'; import { normalizeRawMessageDelivery } from './utils'; export const EMPTY_MESSAGE_WARNING = 'message body is empty; skipping'; export const PROCESSING_ERROR = 'processing_error'; export abstract class IReceiveMessage { abstract onMessage(message: AWS.SQS.Message): Promise<void>; } export abstract class IHydrateMessage<T> extends IReceiveMessage { abstract hydrate(message: AWS.SQS.Message): T | null; } export abstract class IHandleMessage<T> extends IHydrateMessage<T> { abstract handle(input: T): Promise<void>; } export abstract class BaseHandler<T> extends IHandleMessage<T> implements IHydrateMessage<T> { constructor(protected logger: Logger) { super(); } hydrate(input: AWS.SQS.Message): T | null { const logger = this.logger.child({ method: this.hydrate.name }); try { logger.info('hydrating message', { input }); const result = (normalizeRawMessageDelivery(input?.Body) as T) ?? null; logger.info('message hydrated', { result }); return result; } catch (error) { logger.error('failed to hydrate message', { cause: serializeError(error), }); return null; } } async _onMessage(message: Message): Promise<void> { const logger = this.logger.child({ method: this._onMessage.name, messageId: message?.MessageId }); try { const input = this.hydrate(message); if (!input) { logger.warn(EMPTY_MESSAGE_WARNING, { message }); return; } logger.info('handling message', { input }); const result = await this.handle(input); logger.info('message handled', { result }); return result; } catch (error) { logger.error('failed to handle message', { cause: serializeError(error) }); throw error; } } async _onError(err: Error | AxiosError, message: AWS.SQS.Message) { const logger = this.logger.child({ method: this._onError.name }); try { const input = this.hydrate(message); const companyId = input && typeof input === 'object' && 'companyId' in input ? input.companyId : undefined; logger.error(`${PROCESSING_ERROR}`, { sns: message, input, companyId, cause: serializeError(err), }); } catch (error) { logger.error('failed to handle error', { cause: serializeError(error) }); throw error; } } } export function normalizeRawMessageDelivery(body: string | undefined | null) { if (!body) return null; if (typeof body !== 'string') return body; const obj = JSON.parse(body); if (!obj.Message) return obj; return JSON.parse(obj.Message); }

In this snippet, notice how the BaseHandler orchestrates hydration in _onMessage, then calls this.handle(input) for business logic. If an error occurs, the message is passed to _onError for deeper logging and potential clean-up or custom notification. Each sub-step is testable on its own, and you can expand them as your application grows.

Let’s see a straightforward example of how you might wire everything together using the AWS SDK, Winston, and a custom message handler:

import AWS from 'aws-sdk'; import winston from 'winston'; import { BaseHandler } from './BaseHandler'; class MyMessageHandler extends BaseHandler<{ companyId: string; someValue: number }> { constructor(logger: winston.Logger) { super(logger); } async handle(input: { companyId: string; someValue: number }): Promise<void> { // Process the message data this.logger.info('Processing message data', { companyId: input.companyId, value: input.someValue }); // Your domain logic goes here, e.g. DB calls, external APIs, etc. } async onMessage(message: AWS.SQS.Message) { try { await this._onMessage(message); } catch (err) { await this._onError(err, message); } } } // Quick demonstration (async () => { const sqs = new AWS.SQS({ region: 'us-east-1' }); const logger = winston.createLogger({ transports: [new winston.transports.Console()], }); const handler = new MyMessageHandler(logger); // Fetch messages from SQS const result = await sqs.receiveMessage({ QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/test-queue', MaxNumberOfMessages: 1 }).promise(); if (result.Messages && result.Messages.length > 0) { await handler.onMessage(result.Messages[0]); } })();

In more advanced setups, or in organizations that rely on a NestJS environment, it’s easy to plug these abstractions into decorators like @SqsMessageHandler and @SqsConsumerEventHandler. This example shows how you might implement the same pattern using NestJS, with an injected logger and custom error tracing:

import { Trace, TracePrefix } from '@/lib/tracer'; import { BaseHandler, PROCESSING_ERROR } from '@/modules/aws/base.handler'; import { SqsConsumerEventHandler, SqsMessageHandler } from '@ssut/nestjs-sqs'; import { SqsQueues } from '@/modules/aws/sqs.config'; import { Inject, Injectable } from '@nestjs/common'; import { Message } from 'aws-sdk/clients/sqs'; import { AxiosError } from 'axios'; import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; import { serializeError } from 'serialize-error'; import { Logger } from 'winston'; export interface GoalCompletedMessage { // Your message payload (e.g., { companyId: string; goalId: string; } ) } @Injectable() export class GoalCompletedMessageHandler extends BaseHandler<GoalCompletedMessage> { constructor(@Inject(WINSTON_MODULE_PROVIDER) readonly logger: Logger) { super(logger); this.logger = logger.child({ class: GoalCompletedMessageHandler.name }); this.logger.info(`${GoalCompletedMessageHandler.name} initialized`); } @SqsConsumerEventHandler(SqsQueues.CORE_GOAL_COMPLETED, PROCESSING_ERROR) @Trace(`${TracePrefix.SQS}.${SqsQueues.CORE_GOAL_COMPLETED.toLowerCase()}.${PROCESSING_ERROR}`) onError(err: Error | AxiosError, message: AWS.SQS.Message) { return this._onError(err, message); } @SqsMessageHandler(SqsQueues.CORE_GOAL_COMPLETED) @Trace(`${TracePrefix.SQS}.${SqsQueues.CORE_GOAL_COMPLETED.toLowerCase()}`) onMessage(message: Message): Promise<void> { return this._onMessage(message); } async handle(input: GoalCompletedMessage): Promise<void> { const logger = this.logger.child({ method: this.handle.name, input }); try { logger.info('handling message'); // Business logic placeholder logger.warn('no op; skipping for now;'); logger.info('message handled'); } catch (error) { logger.error('error handling message', { cause: serializeError(error) }); throw error; } } }

NestJS decorators make it simple to hook into your queues for both normal message handling (@SqsMessageHandler) and error handling (@SqsConsumerEventHandler). This structure retains the core logic in your abstract classes while leveraging NestJS for orchestration, dependency injection, and improved traceability.

Best Practices

Over time, consistently applying a well-defined pattern will significantly reduce headaches. Here are a few final tips to keep your AWS SQS system healthy:

  • Use Dead Letter Queues (DLQs): Configure a DLQ for any queue to prevent messages that repeatedly fail from blocking processing.
  • Mind Your Concurrency: If processing large message volumes, tune concurrency and visibility timeouts to avoid duplication or timeouts.
  • Serialize Errors: Libraries like serialize-error help you consistently log structured error objects.
  • Practice Minimal Logging: Log essential fields, but avoid massive logs that can clog your monitoring systems.
  • Stay Flexible: Doubling down on typed interfaces and abstract classes doesn’t mean you have to freeze your architecture. You can always expand or swap out pieces as the requirements evolve.

Conclusion

By separating your message processing flow into distinct components that each have a clear purpose, you empower yourself (and your team) to move fast without breaking things. Clearing away the clutter of ephemeral errors and repetitive code patterns reveals the real business logic beneath.

The result? A modern, scalable, and testable message processing system that you can easily extend for new events and features down the road. Give these patterns a try in your own projects, and I’m certain you’ll see how they streamline development and reduce confusion across your codebase.

Thanks for joining me on this exploration of AWS SQS message processing in TypeScript, and see you next time!

Further Reading

For more on building event-driven applications and handling messages: