Comment on page
BusConfiguration
Registers a class handler that receives a message and performs a unit of work. When Bus is initialized it will configure the transport to subscribe to the type of message handled by the handler and upon receipt will forward the message through to the
handle()
function.Argument | Description | Default |
---|---|---|
classHandler | A class responsible for handling messages that implements Handler | None |
import { Bus } from '@node-ts/bus-core'
import { TestHandler } from './test-handler'
Bus.configure().withHandler(TestHandler)
Registers a function handler that receives a message and performs a unit of work. When Bus is initialized it will configure the transport to subscribe to the type of message handled by the function handler and upon receipt will forward the message to the function.
Argument | Description | Default |
---|---|---|
functionHandler | A functional handler mapping initialized using handlerFor | None |
import { Bus, handlerFor } from '@node-ts/bus-core'
import { TestEvent } from './test-event'
Bus.configure().withHandler(handlerFor(TestEvent, event => {}))
Registers a custom handler that receives messages from external systems, or messages that don't implement the Message interface from @node-ts/bus-messages.
Argument | Description | Default |
---|---|---|
messageHandler | A handler that receives the custom message | None |
customResolver | A discriminator that determines if an incoming message should be mapped to this handler | None |
import { Bus } from '@node-ts/bus-core'
import { S3Event } from 'aws-sdk'
Bus.configure()
.withCustomHandler(
async (event: S3Event) => console.log('Received S3 event', { event }),
{
resolveWith: event => event.Records
&& event.Records.length
}
)
Registers a workflow definition so that all of the messages it depends on will be subscribed to and forwarded to the handlers inside the workflow.
Argument | Description | Default |
---|---|---|
workflow | Workflow definition to register | None |
import { Bus } from '@node-ts/bus-core'
import { TestWorkflow } from './test-workflow'
Bus.configure().withWorkflow(TestWorkflow)
Configures Bus to use a different transport than the default MemoryQueue.
Argument | Description | Default |
---|---|---|
transport | A configured transport to use | None |
import { Bus } from '@node-ts/bus-core'
import { SqsTransport, SqsTransportConfiguration } from '@node-ts/bus-sqs'
const sqsConfiguration: SqsTransportConfiguration = {
// ...
}
const sqsTransport = new SqsTransport(sqsConfiguration)
Bus.configure().withTransport(sqsTransport)
Configures Bus to use a different logging provider than the default consoler logger.
Argument | Description | Default |
---|---|---|
loggerFactory | A factory that creates a new logger | None |
import { Bus } from '@node-ts/bus-core'
import { CustomLogger } from './custom-logger'
Bus.configure().withLogger((target: string) => new CustomLogger(target))
Configures Bus to use a different serialization provider. The provider is responsible for transforming messages to/from a serialized representation, as well as ensuring all object properties are a strong type.
Argument | Description | Default |
---|---|---|
serializer | Serializer to use | None |
import { Bus } from '@node-ts/bus-core'
import { ClassSerializer } from '@node-ts/bus-class-serializer'
Bus.configure().withSerializer(new ClassSerializer())
Configures Bus to use a different persistence provider than the default InMemoryPersistence provider. This is used to persist workflow data and is unused if not using workflows.
Argument | Description | Default |
---|---|---|
persistence | Persistence provider to use | None |
import { Bus } from '@node-ts/bus-core'
import { PostgresPersistence, PostgresConfiguration } from '@node-ts/bus-postgres'
const postgresConfiguration: PostgresConfiguration = {
connection: {
connectionString: 'postgres://postgres:password@localhost:5432/postgres'
},
schemaName: 'workflows'
}
const postgresPersistence = new PostgresPersistence(postgresConfiguration)
Bus.configure().withPersistence(postgresPersistence)
Sets the message handling concurrency beyond the default value of 1, which will increase the number of messages handled in parallel.concurrency
Argument | Description | Default |
---|---|---|
concurrency | The number of messages that can be handled in parallel | None |
import { Bus } from '@node-ts/bus-core'
Bus.configure().withConcurrency(5)
withContainer({
get <T>(type: ClassConstructor<T>) {
return container.get<T>(type)
}
})
Use a local dependency injection/IoC container to resolve handlers and workflows.
Configures Bus to use a different persistence provider than the default InMemoryPersistence provider. This is used to persist workflow data and is unused if not using workflows.
Argument | Description | Default |
---|---|---|
containerAdapter | An adapter that allows Bus to resolve class instances from the underlying IoC container | None |
import { Bus } from '@node-ts/bus-core'
import { Container } from 'inversify'
const container = new Container()
Bus.configure().withContainer({
get <T>(type: ClassConstructor<T>) {
return container.get<T>(type)
}
})
withMessageReadMiddleware<TransportMessageType = unknown> (
messageReadMiddleware: Middleware<TransportMessage<TransportMessageType>>
)
Run custom middleware before/after the point a message is read from the transport and then dispatched to handlers and workflow handlers.
Argument | Description | Default |
---|---|---|
middleware | A middleware function that will be executed after a message is read from the transport and before it is dispatched to handlers. | None |
import { Bus, Middleware, Next, TransportMessage } from '@node-ts/bus-core'
const messageTimingMiddleware = async (
context: TransportMessage<unknown>,
next: Next
) => {
const start = Date.now()
await next()
const end = Date.now()
const durationMs = end - start
console.log(
'Message handled',
{ messageName: context.domainMessage.$name, durationMs }
)
}
const bus = await Bus.configure()
.withMessageReadMiddleware(messageTimingMiddleware)
withRetryStrategy({
calculateRetryDelay (currentAttempt: number): number
})
Configure the bus to use a different retry strategy instead of the default.
Argument | Description | Default |
---|---|---|
retryStrategy | An implementation of RetryStrategy that calculates the delay between retrying failed messages. | DefaultRetryStrategy |
const bus = await Bus.configure()
.withRetryStrategy({
calculateRetryDelay (currentAttempt: number) { return Math.pow(2, currentAttempt) }
})
.initialize()
Initialize a configured BusInstance. This should be called after all options have been provided for the configuration.
Last modified 2yr ago