📘
@node-test/bus
  • @node-ts/bus
  • Getting started
    • Installation
    • Handling messages
    • Shutting down cleanly
  • Reference
    • Bus
    • BusConfiguration
    • BusInstance
  • Getting help
  • Guide
    • Messages
      • Events
      • Commands
      • System messages
    • Message attributes
      • Correlation id
      • Attributes
      • Sticky attributes
    • Workflows
      • Creating a workflow
      • Starting
      • Handling
      • State
      • Completing
      • Example
    • Transports
      • RabbitMQ
      • Amazon SQS
      • Redis
      • Custom transports
    • Persistence
      • Postgres
      • MongoDB
      • Creating a persistence
    • Serializers
      • Class serializer
    • Loggers
      • Custom loggers
    • Middleware
    • Lifecycle hooks
    • Retry Strategies
    • Dependency injection
    • Long running processes
Powered by GitBook
On this page
  • Methods
  • withHandler(classHandler)
  • withHandler(functionHandler)
  • withCustomHandler(messageHandler, customResolver)
  • withWorkflow(workflow)
  • withTransport(transport)
  • withLogger(loggerFactory)
  • withSerializer(serializer)
  • withPersistence(persistence)
  • withConcurrency(concurrency)
  • withContainer(containerAdapter)
  • withMessageReadMiddleware(middleware)
  • withRetryStrategy(retryStrategy)
  • initialize([options])

Was this helpful?

  1. Reference

BusConfiguration

PreviousBusNextBusInstance

Last updated 3 years ago

Was this helpful?

Creates a configuration in order to initialize a new .

Methods

withHandler(classHandler)

Registers a class handler that receives a message and performs a unit of work. When Bus is initialized it will configure the transport to subscribe to the type of message handled by the handler and upon receipt will forward the message through to the handle() function.

Arguments

Argument
Description
Default

classHandler

A class responsible for handling messages that implements Handler

None

Example

import { Bus } from '@node-ts/bus-core'
import { TestHandler } from './test-handler'

Bus.configure().withHandler(TestHandler)

See also , .

withHandler(functionHandler)

Registers a function handler that receives a message and performs a unit of work. When Bus is initialized it will configure the transport to subscribe to the type of message handled by the function handler and upon receipt will forward the message to the function.

Arguments

Argument
Description
Default

functionHandler

A functional handler mapping initialized using handlerFor

None

Example

import { Bus, handlerFor } from '@node-ts/bus-core'
import { TestEvent } from './test-event'

Bus.configure().withHandler(handlerFor(TestEvent, event => {}))

withCustomHandler(messageHandler, customResolver)

Registers a custom handler that receives messages from external systems, or messages that don't implement the Message interface from @node-ts/bus-messages.

Arguments

Argument
Description
Default

messageHandler

A handler that receives the custom message

None

customResolver

A discriminator that determines if an incoming message should be mapped to this handler

None

Example

import { Bus } from '@node-ts/bus-core'
import { S3Event } from 'aws-sdk'

Bus.configure()
  .withCustomHandler(
    async (event: S3Event) => console.log('Received S3 event', { event }),
    {
      resolveWith: event => event.Records
        && event.Records.length
    }
  )   

withWorkflow(workflow)

Registers a workflow definition so that all of the messages it depends on will be subscribed to and forwarded to the handlers inside the workflow.

Arguments

Argument
Description
Default

workflow

Workflow definition to register

None

Example

import { Bus } from '@node-ts/bus-core'
import { TestWorkflow } from './test-workflow'

Bus.configure().withWorkflow(TestWorkflow) 

withTransport(transport)

Configures Bus to use a different transport than the default MemoryQueue.

Arguments

Argument
Description
Default

transport

A configured transport to use

None

Example

import { Bus } from '@node-ts/bus-core'
import { SqsTransport, SqsTransportConfiguration } from '@node-ts/bus-sqs'

const sqsConfiguration: SqsTransportConfiguration = {
  // ...
}
const sqsTransport = new SqsTransport(sqsConfiguration)
Bus.configure().withTransport(sqsTransport) 

withLogger(loggerFactory)

Configures Bus to use a different logging provider than the default consoler logger.

Arguments

Argument
Description
Default

loggerFactory

A factory that creates a new logger

None

Example

import { Bus } from '@node-ts/bus-core'
import { CustomLogger } from './custom-logger'

Bus.configure().withLogger((target: string) => new CustomLogger(target))

withSerializer(serializer)

Configures Bus to use a different serialization provider. The provider is responsible for transforming messages to/from a serialized representation, as well as ensuring all object properties are a strong type.

Arguments

Argument
Description
Default

serializer

Serializer to use

None

Example

import { Bus } from '@node-ts/bus-core'
import { ClassSerializer } from '@node-ts/bus-class-serializer'

Bus.configure().withSerializer(new ClassSerializer())

withPersistence(persistence)

Configures Bus to use a different persistence provider than the default InMemoryPersistence provider. This is used to persist workflow data and is unused if not using workflows.

Arguments

Argument
Description
Default

persistence

Persistence provider to use

None

Example

import { Bus } from '@node-ts/bus-core'
import { PostgresPersistence, PostgresConfiguration } from '@node-ts/bus-postgres'

const postgresConfiguration: PostgresConfiguration = {
  connection: {
    connectionString: 'postgres://postgres:password@localhost:5432/postgres'
  },
  schemaName: 'workflows'
}
const postgresPersistence = new PostgresPersistence(postgresConfiguration)
Bus.configure().withPersistence(postgresPersistence)

withConcurrency(concurrency)

Sets the message handling concurrency beyond the default value of 1, which will increase the number of messages handled in parallel.concurrency

Arguments

Argument
Description
Default

concurrency

The number of messages that can be handled in parallel

None

Example

import { Bus } from '@node-ts/bus-core'

Bus.configure().withConcurrency(5)

withContainer(containerAdapter)

withContainer({
      get <T>(type: ClassConstructor<T>) {
        return container.get<T>(type)
      }
    })

Use a local dependency injection/IoC container to resolve handlers and workflows.

Configures Bus to use a different persistence provider than the default InMemoryPersistence provider. This is used to persist workflow data and is unused if not using workflows.

Arguments

Argument
Description
Default

containerAdapter

An adapter that allows Bus to resolve class instances from the underlying IoC container

None

Example

import { Bus } from '@node-ts/bus-core'
import { Container } from 'inversify'

const container = new Container()
Bus.configure().withContainer({
      get <T>(type: ClassConstructor<T>) {
        return container.get<T>(type)
      }
})   

withMessageReadMiddleware(middleware)

  withMessageReadMiddleware<TransportMessageType = unknown> (
    messageReadMiddleware: Middleware<TransportMessage<TransportMessageType>>
  )

Run custom middleware before/after the point a message is read from the transport and then dispatched to handlers and workflow handlers.

Arguments

Argument
Description
Default

middleware

A middleware function that will be executed after a message is read from the transport and before it is dispatched to handlers.

None

Example

import { Bus, Middleware, Next, TransportMessage } from '@node-ts/bus-core'

const messageTimingMiddleware = async (
  context: TransportMessage<unknown>,
  next: Next
) => {
  const start = Date.now()
  await next()
  const end = Date.now()
  const durationMs = end - start
  console.log(
    'Message handled',
    { messageName: context.domainMessage.$name, durationMs }
  )
}
const bus = await Bus.configure()
  .withMessageReadMiddleware(messageTimingMiddleware)

withRetryStrategy(retryStrategy)

  withRetryStrategy({
    calculateRetryDelay (currentAttempt: number): number
  })

Configure the bus to use a different retry strategy instead of the default.

Arguments

Argument
Description
Default

retryStrategy

An implementation of RetryStrategy that calculates the delay between retrying failed messages.

DefaultRetryStrategy

Example

const bus = await Bus.configure()
  .withRetryStrategy({
    calculateRetryDelay (currentAttempt: number) { return Math.pow(2, currentAttempt) }
  }) 
  .initialize()

initialize([options])

Initialize a configured BusInstance. This should be called after all options have been provided for the configuration.

See also , .

See also .

See also .

See also .

See also .

See also .

See also .

See also

See also .

See also .

BusInstance
events
commands
events
commands
System messages
Workflows
Transports
Loggers
Serializer
Persistence
Dependency injection.
Middleware
Retry Strategies