Transactional Outbox パターンをSpring Bootで実装する

Transactional Outboxという、Databaseへの保存とメッセージの送信をアトミックにすることができる実装パターンがあります。

参照

例えば決済処理において、決済完了したことをDBに保存する処理と、決済完了メッセージを送信する処理がアトミックではない場合、下のように困ったことが起きてしまいます。

  • 保存はできたがメッセージの送信に失敗した場合、後続の処理が進まない

  • 保存に失敗したのに、決済完了メッセージだけ送信してしまった場合、後続の処理が誤って進んでしまう

特に、サーバサイドのアプリケーションがマイクロサービス化されていて、サービス間のメッセージ送受信の確実性がシステムの信頼性に直結する場合、 Transactional Outboxパターンはよく使われるパターンかと思います。

実装

このパターンをSpring Boot で実装する方法を紹介します。

言語は Kotlin を使っています。

受注完了時に送信するメッセージを OrderEvent とします。

class OrderEvent(
   val orderId: String,
   ....
)

OrderEventのpublish

受注完了時にOrderEventorg.springframework.context.ApplicationEventPublisher を使って publishします。

@Service
class OrderService(
  private val publisher: ApplicationEventPublisher
) {
  
   ....

  @Transactional
  fun order() {
    ....
    
    val event: OrderEvent = ...

    publisher.publish(event)
  }
}

OrderEventの永続化

publishしたOrderEventorg.springframework.context.event.EventListener アノテーションをつけたメソッドで受けてDBへ永続化します。このときのDBトランザクションは、上記の OrderService.order メソッドで開始しているDBトランザクションと同じものになります。

@Component
class OrderEventPersistentSubscriber {

    @EventListener
    fun processSubscriptionEvent(event: OrderEvent) {
       ... // 永続化処理
    }
}

OrderEventの送信

受注処理が完了してDBトランザクションがCOMMITされた後、OrderEvent を外部のメッセージブローカーに渡します。このときorg.springframework.scheduling.annotation.Async をつけて非同期に送信するとOrderService.order メソッドの処理をブロックしないので効率的です。

@Component
class OrderEventRelay {

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    fun relayOrderEventEvent(event: OrderEvent) {
        .... // OrderEventを外部のメッセージブローカーへ送信する処理
    }
}

OrderEventの再送

メッセージブローカーがダウンしているなどの理由で上記の送信処理が失敗することがあります。 それに備えて、送信されていない OrderEvent を定期的に見つけて再送する必要があります。

例えば、org.springframework.scheduling.annotation.Scheduled を使って、永続化した OrderEvent のうち送信完了していないものを定期的に選択して再送します。

@Component
class  OrderEventRecoveryTask {

    @Scheduled(...)
    fun recovery() {
      .... // 再送処理
    }
}

参考

上記で利用したSpringのクラスの設定方法や利用方法は、下記を参考にして下さい。

ApplicationEventPublisher, @EventListener

Spring bootを使っていればデフォルトで利用可能だったと思います。使い方は下記を参照してください。

Spring Framework - ApplicationEventPublisher Examples

@Async

@EnableAsync で有効にしつつ、Executorの設定が必要です。下記を参照してください。

@Async アノテーションで非同期メソッドの作成

@Scheduled

@EnableScheduling で有効にする必要があります。下記を参照してください。

Scheduling Tasks