Middleware

MIddleware can be added between when the message is read from a transport and before it is dispatched to handlers.

Uses in telemetry

Telemetry providers like AWS Xray, New Relic, Data Dog, etc can all be used to profile and report message processing times by using middleware.

This is useful to troubleshoot low message processing rates and identify message types that are taking the longest to process and would benefit from performance tuning.

The following shows how to integrate AWS Xray to profile how messages are handled.

import AWSXRay from 'aws-xray-sdk'
import { Bus } from '@node-ts/bus-core'
import { Message } from '@node-ts/bus-messages'

const bus = await Bus.configure()
  .withMessageReadMiddleware(async (context, next) => {
    const messageName = (context.domainMessage as Message).$name
    const segment = new AWSXRay.Segment('my-service')
    const subSegment = segment.addNewSubSegment(messageName)
    
    await next()
    
    subSegment.close()
    segment.close()
  })
  .initialize()

Uses in logging context

Middleware is also useful when adding context of the message handling scope to each log produced in the handling cycle.

An example of this is appending a message's correlationId to the metadata of each log. This can be done by using async_hooks that attach data to the promise scope that can be accessed by any logging request inside of it.

import { Bus } from '@node-ts/bus-core'
import * as asyncHooks from 'async_hooks'

type CorrelationId = string
// Requests to log() would look up the executionId in this map and attach the correlationId
export const handlingContext = new Map<number, CorrelationId>()

const bus = await Bus.configure()
  .withMessageReadMiddleware(async (context, next) => {
    const correlationId = context.attributes.correlationId
    const executionId = asyncHooks.executionAsyncId()
    
    handlingContext.set(executionId, correlationId)
    await next()
    handlingContext.delete(executionId)
  })
  .initialize()

Last updated