Handling
After a workflow has started it will often wait until a new message is read from the queue so that it can perform the next step in its process. This is done by configuring a handler for that message, and defining a discriminator that will determine which workflow instance should be activated for that message.
Handlers are created by declaring a function on the workflow and then providing a
.when()
mapping on the WorkflowMapper.import { Workflow } from '@node-ts/bus-core'
export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
configureWorkflow (
mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
): void {
mapper
.withState(FulfilmentWorkflowState)
// ...
// When the item is shipped, email the customer their receipt
.when(ItemShipped, 'emailReceipt')
}
// Handles an `ItemShipped` event
async emailReceipt (event: ItemShipped) {
// ...
}
}
When a workflow is started it is assigned a $workflowId that is persisted against the state and cannot be changed. This $workflowId is attached to all messages sent from within the workflow in sticky attributes, which propagates as subsequent messages are sent. Handling these messages from within a workflow will locate the workflow state based on the value of $workflowId.
This example uses the default mapping in the
.when()
handlerimport { Workflow, BusInstance } from '@node-ts/bus-core'
export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
constructor (bus: BusInstance): {}
configureWorkflow (
mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
): void {
mapper
.withState(FulfilmentWorkflowState)
.startedBy(ItemPurchased, 'shipItem')
// When the item is shipped, email the customer their receipt
.when(ItemShipped, 'emailReceipt')
}
async shipItem (event: ItemPurchased): {
await this.bus.send(new ShipItem(event.itemId))
}
// Handles an `ItemShipped` event
async emailReceipt (event: ItemShipped) {
// ...
}
}
What happens is:
- 1.A new workflow is started on receipt of an ItemPurchased event, with a new $workflowId value on the state
- 2.The shipItem handler will send a new ShipItem. The workflow will automatically attach the $workflowId into the sticky attributes of the outgoing message
- 3.The command handler for ShipItem will process the request and publish an ItemShipped event. Because $workflowId is present on the sticky attributes of the incoming command, it will also be attached to the sticky attributes of the outgoing event
- 4.The ItemShipped event is received and since this workflow uses a default handler for the message, the value of $workflowId on the incoming sticky attributes will be used to lookup the workflow state instance
- 5.The ItemShipped event is then routed to the emailReceipt handler for the workflow instance
Default mappings are a simple way to map workflow handlers when the next step is based on the outcome of a command sent by the workflow.
Messages can be mapped to workflow handlers by matching the value of a property of a message to a property on the workflow state.
This is done by providing a
lookup
and mapsTo
configuration for the message on the WorkflowMapperimport { Workflow, BusInstance, WorkflowState } from '@node-ts/bus-core'
type uuid = string
class FulfilmentWorkflowState extends WorkflowState {
$name = 'FulfilmentWorkflowState'
itemId: uuid
}
export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
constructor (bus: BusInstance): {}
configureWorkflow (
mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
): void {
mapper
.withState(FulfilmentWorkflowState)
.startedBy(ItemPurchased, 'shipItem')
// When the item is shipped, email the customer their receipt
.when(
ItemShipped,
'emailReceipt',
{
// When an `ItemShipped` event is received, get the value of `itemId`...
lookup: itemShippedEvent => itemShippedEvent.itemId,
// ...and grab the workflow with a matching 'itemId' value on the FulfilmentWorkflowState
mapsTo: 'itemId'
}
)
}
async shipItem (event: ItemPurchased): {
await this.bus.send(new ShipItem(event.itemId))
// Persist the updated `itemId` value on the workflow sstate
return {
itemId: event.itemId
}
}
// Handles an `ItemShipped` event
async emailReceipt (event: ItemShipped) {
// ...
}
}
In this example, the workflow is started when an
ItemPurchased
event is received. The workflow sends a command to ShipItem
and persists the itemId
in the workflow state.Eventually when an
ItemShipped
event is received, the lookup
function grabs the value of itemId
and tells the bus to get the workflow that has a matching value for itemId
in the workflow state by using mapsTo
. Mapping via message properties is useful when the message being handled is a result of an action not triggered by the workflow.
Messages can also be mapped to workflow handlers using values held in the message attributes matched to values in the workflow state.
This is done by providing values for
lookup
and mapsTo
in the handler mappingimport { Workflow, BusInstance } from '@node-ts/bus-core'
export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
constructor (bus: BusInstance): {}
configureWorkflow (
mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
): void {
mapper
.withState(FulfilmentWorkflowState)
.startedBy(ItemPurchased, 'shipItem')
// When the item is shipped, email the customer their receipt
.when(
ItemShipped,
'emailReceipt',
{
// When an `ItemShipped` event is received, get the itemId from the message attributes...
lookup: (_, { attributes }) => attributes.itemId,
// ...and grab the workflow with a matching 'itemId' value on the workflow state
mapsTo: 'itemId'
}
)
}
async shipItem ({ itemId }: ItemPurchased): {
await this.bus.send(
new ShipItem(itemId),
{ attributes: { itemId } }
)
// Persist the updated `itemId` value on the workflow sstate
return { itemId }
}
// Handles an `ItemShipped` event
async emailReceipt (event: ItemShipped) {
// ...
}
}
Last modified 1yr ago