Step Functions で「コールバックを待つ」機能を試してみた

aws

※プロモーションページが含まれる場合があります

Step Functions で「コールバックを待つ」機能を試してみた

AWS Step Functionsは、複雑なワークフローを簡単に構築し、各ステップでのサービス間の連携を自動化できる強力なツールです。その中でも、外部システムや人間の承認を待つ必要がある場合に便利なのが「コールバックを待つ」機能です。この記事では、Step Functionsでこの機能を実際に使ってみた経験をシェアします。

コールバック機能

Step Functionsのコールバック機能は、外部のプロセスが完了するまでワークフローの進行を一時停止し、その後、外部から通知を受け取って再開する仕組みです。例えば、以下のような場面で利用できます。

外部APIの処理結果を待つ
ユーザーからの承認を待つ
別のシステムからの完了通知を待つ
通常、こういったプロセスは不確実な時間を要するため、Step Functionsでの自動リトライやタイムアウト処理と組み合わせることで、柔軟かつ堅牢なワークフローを構築することができます。

使ってみたユースケース

今回試してみたユースケースは、ユーザーのメールアドレスを更新し、該当ユーザーにメールを送信するプロセスです。このワークフローでは、メール更新処理を行うlambdaがあり、更新処理が終わるとeventBridgeの中にあるlambdaで変更した旨のメールを送るというものになります。

背景

この時にeventBridgeの中にあるlambdaがエラーを吐いた際にstep Functions内でエラーをキャッチできなかったのでキャッチしたかった。
後続のeventBridge PutEventsで「コールバックを待つ」の機能を使いエラーのキャッチができたので備忘録として記載いたします。

構成はこんな感じ

step Functions の構成

  • メールアドレス更新のlambdaで更新処理を行います。
  • 更新処理が終わると後続のeventBridge PutEventsでメール送信を行います。

eventBridge PutEventsの構成

コールバックを待つ

ここの機能のことです

チェックをすると説明がでてきます

"MyTaskToken.$": "$$.Task.Token"

とあるようにMyTaskToken にtokenが排出されるようです。これをeventBridgeの中のlambdaに渡すようにします。

stepfucionsのAPIが使えるので以下のように

  • 成功した時 SendTaskSuccess
  • 失敗した時  SendTaskFailure

をそれぞれ成功した時の処理と失敗した時の処理で組み込んであげればstep Functions で成功or失敗の受け取りができそうです。

SendTaskSuccess - AWS Step Functions
Used by activity workers, Task states using the callback pattern, and optionally Task states using the job run pattern to report that the task identified by the...
SendTaskFailure - AWS Step Functions
Used by activity workers, Task states using the callback pattern, and optionally Task states using the job run pattern to report that the task identified by the...

検証

eventBridgeで送る以下のようなAPIパラメータを設定します。

{
  "Entries": [
    {
      "Detail": {
        "MyTaskToken.$": "$$.Task.Token",
        "media": ["EMAIL"],
        "title": "ログインID(メールアドレス)変更通知",
        "message.$": "メール更新したよー",
        "notification_type": "UPDATE_EMAIL",
        "workspace_id.$": "$.workspaceId",
        "user_id.$": "$.userId",
        "target": {
          "email": {
            "to.$": "$.currentEmail"
          }
        }
      },
      "DetailType": "Notify",
      "EventBusName": "hogeEventBusName",
      "Resources": [],
      "Source": "hoge"
    }
  ]
}

lambda(メール送信処理)

import json
import boto3

sfn = boto3.client("stepfunctions")


def lambda_handler(event, context):
    token = event.get("Mytoken")
    records = event.get("Records", [])
    for record in records:
        arg = {
            "email": detail["target"]["email"]["to"],
            "workspace_id": detail["workspace_id"],
            "user_id": detail["user_id"],
            "notification_type": detail["notification_type"],
            "target": detail["target"]["email"]["to"],
            "title": detail["title"],
            "body": detail["message"],
        }
        res = send_email(arg)
        if res.result == "success":
            # 成功
            sfn.send_task_success(
                taskToken=token, output=json.dumps({"result": "success"})
            )
        else:
            # 失敗
            sfn.send_task_failure(
                taskToken=token, error=json.dumps({"result": "failure"})
            )

    return

権限も追加しましょう

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "states:SendTaskSuccess",
      "Resource": "arn:aws:states:us-west-2:466642126643:stateMachine:ima-ft1-stepfunction-email-bulk-update-workflow"
    }
  ]
}

まとめ

AWS Step Functionsの「コールバックを待つ」機能は、外部システムの結果を待つ際に非常に便利です。この機能を使うことで、メールアドレスの更新や通知メールの送信といったプロセスを柔軟に構築でき、外部システムでエラーが発生した際も、Step Functions内でエラーハンドリングを行えます。

今回は、EventBridgeとLambdaを組み合わせて、このコールバック機能を活用し、メール送信処理の成功や失敗をトークンを使って管理する方法を紹介しました。今後、さらに複雑なシステム連携でも、この機能を使ってエラーハンドリングやプロセスの一時停止を簡単に行うことができるでしょう

コメント

タイトルとURLをコピーしました