Custom transports

If you want to use a message queue that's not officially supported by the transport list, you can provide an adapter layer by implementing the Transport interface from @node-ts/bus-core.

The following interface definition lists the functionality that must be implemented in order for the transport to be compatible.

/**
 * A transport adapter interface that enables the service bus to use a messaging technology.
 */
export interface Transport<TransportMessageType = {}> {
  /**
   * Publishes an event to the underlying transport. This is generally done to a topic or some other
   * mechanism that consumers can subscribe themselves to
   * @param event A domain event to be published
   * @param messageOptions Options that control the behaviour around how the message is sent and
   * additional information that travels with it.
   */
  publish<TEvent extends Event> (event: TEvent, messageOptions?: MessageAttributes): Promise<void>

  /**
   * Sends a command to the underlying transport. This is generally done to a topic or some other
   * mechanism that consumers can subscribe themselves to
   * @param command A domain command to be sent
   * @param messageOptions Options that control the behaviour around how the message is sent and
   * additional information that travels with it.
   */
  send<TCommand extends Command> (command: TCommand, messageOptions?: MessageAttributes): Promise<void>

  /**
   * Forwards @param transportMessage to the dead letter queue. The message must have been read in from the
   * queue and have a receipt handle.
   */
  fail (transportMessage: TransportMessage<unknown>): Promise<void>

  /**
   * Forwards @param transportMessage to the dead letter queue. The message must have been read in from the
   * queue and have a receipt handle.
   */
  fail (transportMessage: TransportMessage<unknown>): Promise<void>

  /**
   * Fetch the next message from the underlying queue. If there are no messages, then `undefined`
   * should be returned.
   *
   * @returns The message construct from the underlying transport, that inclues both the raw message envelope
   * plus the contents or body that contains the `@node-ts/bus-messages` message.
   */
  readNextMessage (): Promise<TransportMessage<TransportMessageType> | undefined>

  /**
   * Removes a message from the underlying transport. This will be called once a message has been
   * successfully handled by any of the message handling functions.
   * @param message The message to be removed from the transport
   */
  deleteMessage (message: TransportMessage<TransportMessageType>): Promise<void>

  /**
   * Returns a message to the queue for retry. This will be called if an error was thrown when
   * trying to process a message.
   * @param message The message to be returned to the queue for reprocessing
   */
  returnMessage (message: TransportMessage<TransportMessageType>): Promise<void>

  /**
   * An optional function that is called before startup that will provide core dependencies
   * to the transport. This can be used to fetch loggers, registries etc that are used
   * in initialization steps
   * @param coreDependencies
   */
  prepare (coreDependencies: CoreDependencies): void

  /**
   * An optional function that will be called on startup. This gives a chance for the transport
   * to establish any connections to the underlying infrastructure.
   */
  connect? (): Promise<void>

  /**
   * An optional function that will be called on shutdown. This gives a chance for the transport
   * to close any connections to the underlying infrastructure.
   */
  disconnect? (): Promise<void>

  /**
   * An optional function that will be called when the service bus is starting. This is an
   * opportunity for the transport to see what messages need to be handled so that subscriptions
   * to the topics can be created.
   * @param handlerRegistry The list of messages being handled by the bus that the transport needs to subscribe to.
   */
  initialize? (handlerRegistry: HandlerRegistry): Promise<void>

  /**
   * An optional function that will be called when the service bus is shutting down. This is an
   * opportunity for the transport to close out any open requests to fetch messages etc.
   */
  dispose? (): Promise<void>
}

Once your transport has implemented this interface, it can be provided to the bus on configuration

import { Bus } from '@node-ts/bus-core'
import { MyTransport } from './my-transport'

const myTransport = new MyTransport()
await Bus
  .configure()
  .withTransport(myTransport)
  .intialize()

Last updated