Handling

After a workflow has started it will often wait until a new message is read from the queue so that it can perform the next step in its process. This is done by configuring a handler for that message, and defining a discriminator that will determine which workflow instance should be activated for that message.

Handlers are created by declaring a function on the workflow and then providing a .when() mapping on the WorkflowMapper.

import { Workflow } from '@node-ts/bus-core'

export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
  configureWorkflow (
    mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
  ): void {
    mapper
      .withState(FulfilmentWorkflowState)
      // ...
      // When the item is shipped, email the customer their receipt
      .when(ItemShipped, 'emailReceipt')
  }
  
  // Handles an `ItemShipped` event
  async emailReceipt (event: ItemShipped) {
    // ...
  }
}

Default mapping

When a workflow is started it is assigned a $workflowId that is persisted against the state and cannot be changed. This $workflowId is attached to all messages sent from within the workflow in sticky attributes, which propagates as subsequent messages are sent. Handling these messages from within a workflow will locate the workflow state based on the value of $workflowId.

This example uses the default mapping in the .when() handler

import { Workflow, BusInstance } from '@node-ts/bus-core'

export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
 
  constructor (bus: BusInstance): {}

  configureWorkflow (
    mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
  ): void {
    mapper
      .withState(FulfilmentWorkflowState)
      .startedBy(ItemPurchased, 'shipItem')
      // When the item is shipped, email the customer their receipt
      .when(ItemShipped, 'emailReceipt')
  }
  
  async shipItem (event: ItemPurchased): {
    await this.bus.send(new ShipItem(event.itemId))
  }
  
  // Handles an `ItemShipped` event
  async emailReceipt (event: ItemShipped) {
    // ...
  }
}

What happens is:

  1. A new workflow is started on receipt of an ItemPurchased event, with a new $workflowId value on the state

  2. The shipItem handler will send a new ShipItem. The workflow will automatically attach the $workflowId into the sticky attributes of the outgoing message

  3. The command handler for ShipItem will process the request and publish an ItemShipped event. Because $workflowId is present on the sticky attributes of the incoming command, it will also be attached to the sticky attributes of the outgoing event

  4. The ItemShipped event is received and since this workflow uses a default handler for the message, the value of $workflowId on the incoming sticky attributes will be used to lookup the workflow state instance

  5. The ItemShipped event is then routed to the emailReceipt handler for the workflow instance

Default mappings are a simple way to map workflow handlers when the next step is based on the outcome of a command sent by the workflow.

Mapping via message properties

Messages can be mapped to workflow handlers by matching the value of a property of a message to a property on the workflow state.

This is done by providing a lookup and mapsTo configuration for the message on the WorkflowMapper

import { Workflow, BusInstance, WorkflowState } from '@node-ts/bus-core'

type uuid = string

class FulfilmentWorkflowState extends WorkflowState {
  $name = 'FulfilmentWorkflowState'
  
  itemId: uuid
}

export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
 
  constructor (bus: BusInstance): {}

  configureWorkflow (
    mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
  ): void {
    mapper
      .withState(FulfilmentWorkflowState)
      .startedBy(ItemPurchased, 'shipItem')
      // When the item is shipped, email the customer their receipt
      .when(
        ItemShipped,
        'emailReceipt',
        {
          // When an `ItemShipped` event is received, get the value of `itemId`...
          lookup: itemShippedEvent => itemShippedEvent.itemId,
          // ...and grab the workflow with a matching 'itemId' value on the FulfilmentWorkflowState
          mapsTo: 'itemId'
        }
      )
  }
  
  async shipItem (event: ItemPurchased): {
    await this.bus.send(new ShipItem(event.itemId))
    // Persist the updated `itemId` value on the workflow sstate
    return {
      itemId: event.itemId
    }
  }
  
  // Handles an `ItemShipped` event
  async emailReceipt (event: ItemShipped) {
    // ...
  }
}

In this example, the workflow is started when an ItemPurchased event is received. The workflow sends a command to ShipItem and persists the itemId in the workflow state.

Eventually when an ItemShipped event is received, the lookup function grabs the value of itemId and tells the bus to get the workflow that has a matching value for itemId in the workflow state by using mapsTo.

Mapping via message properties is useful when the message being handled is a result of an action not triggered by the workflow.

Mapping via message attributes

Messages can also be mapped to workflow handlers using values held in the message attributes matched to values in the workflow state.

This is done by providing values for lookup and mapsTo in the handler mapping

import { Workflow, BusInstance } from '@node-ts/bus-core'

export class FulfilmentWorkflow extends Workflow<FulfilmentWorkflowState> {
 
  constructor (bus: BusInstance): {}

  configureWorkflow (
    mapper: WorkflowMapper<FulfilmentWorkflowState, FulfilmentWorkflow>
  ): void {
    mapper
      .withState(FulfilmentWorkflowState)
      .startedBy(ItemPurchased, 'shipItem')
      // When the item is shipped, email the customer their receipt
      .when(
        ItemShipped,
        'emailReceipt',
        {
          // When an `ItemShipped` event is received, get the itemId from the message attributes...
          lookup: (_, { attributes }) => attributes.itemId,
          // ...and grab the workflow with a matching 'itemId' value on the workflow state
          mapsTo: 'itemId'
        }
      )
  }
  
  async shipItem ({ itemId }: ItemPurchased): {
    await this.bus.send(
      new ShipItem(itemId),
      { attributes: { itemId } }
    )
    // Persist the updated `itemId` value on the workflow sstate
    return { itemId }
  }
  
  // Handles an `ItemShipped` event
  async emailReceipt (event: ItemShipped) {
    // ...
  }
}

Last updated