Handling
After a workflow has started it will often wait until a new message is read from the queue so that it can perform the next step in its process. This is done by configuring a handler for that message, and defining a discriminator that will determine which workflow instance should be activated for that message.
Handlers are created by declaring a function on the workflow and then providing a .when() mapping on the WorkflowMapper.
1
import { Workflow } from '@node-ts/bus-core'
2
​
3
export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
4
configureWorkflow (
5
mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
6
): void {
7
mapper
8
.withState(FulfilmentWorkflowState)
9
// ...
10
// When the item is shipped, email the customer their receipt
11
.when(ItemShipped, 'emailReceipt')
12
}
13
14
// Handles an `ItemShipped` event
15
async emailReceipt (event: ItemShipped) {
16
// ...
17
}
18
}
Copied!

Default mapping

When a workflow is started it is assigned a $workflowId that is persisted against the state and cannot be changed. This $workflowId is attached to all messages sent from within the workflow in sticky attributes, which propagates as subsequent messages are sent. Handling these messages from within a workflow will locate the workflow state based on the value of $workflowId.
This example uses the default mapping in the .when() handler
1
import { Workflow, BusInstance } from '@node-ts/bus-core'
2
​
3
export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
4
5
constructor (bus: BusInstance): {}
6
​
7
configureWorkflow (
8
mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
9
): void {
10
mapper
11
.withState(FulfilmentWorkflowState)
12
.startedBy(ItemPurchased, 'shipItem')
13
// When the item is shipped, email the customer their receipt
14
.when(ItemShipped, 'emailReceipt')
15
}
16
17
async shipItem (event: ItemPurchased): {
18
await this.bus.send(new ShipItem(event.itemId))
19
}
20
21
// Handles an `ItemShipped` event
22
async emailReceipt (event: ItemShipped) {
23
// ...
24
}
25
}
Copied!
What happens is:
  1. 1.
    A new workflow is started on receipt of an ItemPurchased event, with a new $workflowId value on the state
  2. 2.
    The shipItem handler will send a new ShipItem. The workflow will automatically attach the $workflowId into the sticky attributes of the outgoing message
  3. 3.
    The command handler for ShipItem will process the request and publish an ItemShipped event. Because $workflowId is present on the sticky attributes of the incoming command, it will also be attached to the sticky attributes of the outgoing event
  4. 4.
    The ItemShipped event is received and since this workflow uses a default handler for the message, the value of $workflowId on the incoming sticky attributes will be used to lookup the workflow state instance
  5. 5.
    The ItemShipped event is then routed to the emailReceipt handler for the workflow instance
Default mappings are a simple way to map workflow handlers when the next step is based on the outcome of a command sent by the workflow.

Mapping via message properties

Messages can be mapped to workflow handlers by matching the value of a property of a message to a property on the workflow state.
This is done by providing a lookup and mapsTo configuration for the message on the WorkflowMapper
1
import { Workflow, BusInstance, WorkflowState } from '@node-ts/bus-core'
2
​
3
type uuid = string
4
​
5
class FulfilmentWorkflowState extends WorkflowState {
6
$name = 'FulfilmentWorkflowState'
7
8
itemId: uuid
9
}
10
​
11
export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
12
13
constructor (bus: BusInstance): {}
14
​
15
configureWorkflow (
16
mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
17
): void {
18
mapper
19
.withState(FulfilmentWorkflowState)
20
.startedBy(ItemPurchased, 'shipItem')
21
// When the item is shipped, email the customer their receipt
22
.when(
23
ItemShipped,
24
'emailReceipt',
25
{
26
// When an `ItemShipped` event is received, get the value of `itemId`...
27
lookup: itemShippedEvent => itemShippedEvent.itemId,
28
// ...and grab the workflow with a matching 'itemId' value on the FulfilmentWorkflowState
29
mapsTo: 'itemId'
30
}
31
)
32
}
33
34
async shipItem (event: ItemPurchased): {
35
await this.bus.send(new ShipItem(event.itemId))
36
// Persist the updated `itemId` value on the workflow sstate
37
return {
38
itemId: event.itemId
39
}
40
}
41
42
// Handles an `ItemShipped` event
43
async emailReceipt (event: ItemShipped) {
44
// ...
45
}
46
}
Copied!
In this example, the workflow is started when an ItemPurchased event is received. The workflow sends a command to ShipItem and persists the itemId in the workflow state.
Eventually when an ItemShipped event is received, the lookup function grabs the value of itemId and tells the bus to get the workflow that has a matching value for itemId in the workflow state by using mapsTo.
Mapping via message properties is useful when the message being handled is a result of an action not triggered by the workflow.

Mapping via message attributes

Messages can also be mapped to workflow handlers using values held in the message attributes matched to values in the workflow state.
This is done by providing values for lookup and mapsTo in the handler mapping
1
import { Workflow, BusInstance } from '@node-ts/bus-core'
2
​
3
export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
4
5
constructor (bus: BusInstance): {}
6
​
7
configureWorkflow (
8
mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
9
): void {
10
mapper
11
.withState(FulfilmentWorkflowState)
12
.startedBy(ItemPurchased, 'shipItem')
13
// When the item is shipped, email the customer their receipt
14
.when(
15
ItemShipped,
16
'emailReceipt',
17
{
18
// When an `ItemShipped` event is received, get the itemId from the message attributes...
19
lookup: (_, { attributes }) => attributes.itemId,
20
// ...and grab the workflow with a matching 'itemId' value on the workflow state
21
mapsTo: 'itemId'
22
}
23
)
24
}
25
26
async shipItem ({ itemId }: ItemPurchased): {
27
await this.bus.send(
28
new ShipItem(itemId),
29
{ attributes: { itemId } }
30
)
31
// Persist the updated `itemId` value on the workflow sstate
32
return { itemId }
33
}
34
35
// Handles an `ItemShipped` event
36
async emailReceipt (event: ItemShipped) {
37
// ...
38
}
39
}
Copied!
Last modified 5mo ago