Custom transports
If you want to use a message queue that's not officially supported by the transport list, you can provide an adapter layer by implementing the Transport interface from @node-ts/bus-core.
The following interface definition lists the functionality that must be implemented in order for the transport to be compatible.
1
/**
2
* A transport adapter interface that enables the service bus to use a messaging technology.
3
*/
4
export interface Transport<TransportMessageType = {}> {
5
/**
6
* Publishes an event to the underlying transport. This is generally done to a topic or some other
7
* mechanism that consumers can subscribe themselves to
8
* @param event A domain event to be published
9
* @param messageOptions Options that control the behaviour around how the message is sent and
10
* additional information that travels with it.
11
*/
12
publish<TEvent extends Event> (event: TEvent, messageOptions?: MessageAttributes): Promise<void>
13
​
14
/**
15
* Sends a command to the underlying transport. This is generally done to a topic or some other
16
* mechanism that consumers can subscribe themselves to
17
* @param command A domain command to be sent
18
* @param messageOptions Options that control the behaviour around how the message is sent and
19
* additional information that travels with it.
20
*/
21
send<TCommand extends Command> (command: TCommand, messageOptions?: MessageAttributes): Promise<void>
22
​
23
/**
24
* Forwards @param transportMessage to the dead letter queue. The message must have been read in from the
25
* queue and have a receipt handle.
26
*/
27
fail (transportMessage: TransportMessage<unknown>): Promise<void>
28
​
29
/**
30
* Forwards @param transportMessage to the dead letter queue. The message must have been read in from the
31
* queue and have a receipt handle.
32
*/
33
fail (transportMessage: TransportMessage<unknown>): Promise<void>
34
​
35
/**
36
* Fetch the next message from the underlying queue. If there are no messages, then `undefined`
37
* should be returned.
38
*
39
* @returns The message construct from the underlying transport, that inclues both the raw message envelope
40
* plus the contents or body that contains the `@node-ts/bus-messages` message.
41
*/
42
readNextMessage (): Promise<TransportMessage<TransportMessageType> | undefined>
43
​
44
/**
45
* Removes a message from the underlying transport. This will be called once a message has been
46
* successfully handled by any of the message handling functions.
47
* @param message The message to be removed from the transport
48
*/
49
deleteMessage (message: TransportMessage<TransportMessageType>): Promise<void>
50
​
51
/**
52
* Returns a message to the queue for retry. This will be called if an error was thrown when
53
* trying to process a message.
54
* @param message The message to be returned to the queue for reprocessing
55
*/
56
returnMessage (message: TransportMessage<TransportMessageType>): Promise<void>
57
​
58
/**
59
* An optional function that is called before startup that will provide core dependencies
60
* to the transport. This can be used to fetch loggers, registries etc that are used
61
* in initialization steps
62
* @param coreDependencies
63
*/
64
prepare (coreDependencies: CoreDependencies): void
65
​
66
/**
67
* An optional function that will be called on startup. This gives a chance for the transport
68
* to establish any connections to the underlying infrastructure.
69
*/
70
connect? (): Promise<void>
71
​
72
/**
73
* An optional function that will be called on shutdown. This gives a chance for the transport
74
* to close any connections to the underlying infrastructure.
75
*/
76
disconnect? (): Promise<void>
77
​
78
/**
79
* An optional function that will be called when the service bus is starting. This is an
80
* opportunity for the transport to see what messages need to be handled so that subscriptions
81
* to the topics can be created.
82
* @param handlerRegistry The list of messages being handled by the bus that the transport needs to subscribe to.
83
*/
84
initialize? (handlerRegistry: HandlerRegistry): Promise<void>
85
​
86
/**
87
* An optional function that will be called when the service bus is shutting down. This is an
88
* opportunity for the transport to close out any open requests to fetch messages etc.
89
*/
90
dispose? (): Promise<void>
91
}
92
​
Copied!
Once your transport has implemented this interface, it can be provided to the bus on configuration
1
import { Bus } from '@node-ts/bus-core'
2
import { MyTransport } from './my-transport'
3
​
4
const myTransport = new MyTransport()
5
await Bus
6
.configure()
7
.withTransport(myTransport)
8
.intialize()
Copied!
​
Copy link