僕はStep FunctionsでECSの出力をしたい!

はじめに

先日、AWS Step Functionsを使ってバッチを作成していたのですが、

Amazon ECSから出力した値を次のステートに渡したい!…けど、どうやってやるんだ?」

と疑問に思いました。

ということで今回は、AWS Step FunctionsでAmazon ECSの出力をして値を受け渡してみます。


AWS Step Functionsって何なの…?

AWS Step Functionsは、AWS Lambda関数と他のAWS のサービスを組み合わせてビジネスクリティカルなアプリケーションを構築できるサーバーレスのオーケストレーションサービスです。
AWS Step Functions とは? - AWS Step Functions より引用*1

AWS Step Functionsで複数のタスクを組み合わせることができるので、複雑なワークフローを設計することができます。

特に、Amazon ECSついては、実行するコマンドや環境変数などのコンテナの上書きもAWS Step Functionsで設定できます。


AWS Step Functionsの詳細については、こちらの公式ドキュメントをご覧ください。
aws.amazon.com


処理の概要

今回は配送を例にAWS Step Functionsでワークフローを作成してみます。

受付と発送の2つのAmazon ECS(AWS Fargateタイプ)で処理をすることにします。

受付

Input:荷物名、荷物分類(冷凍・精密機器・普通の3つ)

処理:荷物分類に応じて、発送業者へのメッセージを出力します。

  • 冷凍    →  「冷凍庫に入れて配送してください。」
  • 精密機器  →  「精密機器です。衝撃に注意して配送してください。」
  • 普通    →  「お客様の大切な荷物です。丁重に配送してください。」

Output:荷物名、荷物分類に対応した発送業者へのメッセージ

発送

Input:荷物名、荷物分類に対応した発送業者へのメッセージ

処理:荷物名と発送業者へのメッセージの文字列を結合します。

Output:荷物名と荷物分類に対応した発送業者へのメッセージを結合した文字列


受付のOutputが発送のInputになっていますので、Amazon ECS間で値の受け渡しが必要になります。


単純にreturnしてみる()

まずは、単純にreturnしてみたいと思います。

AWS Lambdaを用いる場合、戻り値を設定すれば出力できるので、同じくコンピューティングサービスのAmazon ECSも同様にできるのではないかという希望的観測です。

ソースコード

(言語はPythonです)

受付

reception.py

import sys
import json

# コマンドライン引数から荷物名と分類を取得
args = sys.argv
baggage = args[1]
category = args[2]

# 分類に応じたメッセージを格納
if category == '冷凍':
    message = '冷凍庫に入れて配送してください。'
elif category == '精密機器':
    message = '精密機器です。衝撃に注意して配送してください。'
else:
    message = 'お客様の大切な荷物です。丁重に配送してください。'

# JSON形式で出力
return json.dumps(
        {
            'baggage' : baggage, 
            'message' : message
        }
    )
発送

sending.py

import sys
import json

# コマンドライン引数から荷物名とメッセージを取得
args = sys.argv
baggage = args[1]
message = args[2]

# 荷物名とメッセージを結合
message = f'{baggage}は{message}'

# JSON形式で出力
return json.dumps(
        { 'result' : message }
    )

AWS Step Functions

ステートマシン定義

{
  "Comment": "A description of my state machine",
  "StartAt": "reception",
  "States": {
    "reception": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "{クラスターのARN}",
        "TaskDefinition": "{タスク定義のARN}",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "AssignPublicIp": "ENABLED",
            "SecurityGroups": [
              "{セキュリティグループ}"
            ],
            "Subnets": [
              "{サブネット}"
            ]
          }
        },
        "Overrides": {
          "ContainerOverrides": [
            {
              "Name": "{コンテナ名}",
              "Command.$": "States.Array('python','/home/reception.py', $.baggagename, $.category)"
            }
          ]
        }
      },
      "Next": "sending"
    },
    "sending": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "{クラスターのARN}",
        "TaskDefinition": "{タスク定義のARN}",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "AssignPublicIp": "ENABLED",
            "SecurityGroups": [
              "{セキュリティグループ}"
            ],
            "Subnets": [
              "{サブネット}"
            ]
          }
        },
        "Overrides": {
          "ContainerOverrides": [
            {
              "Name": "{コンテナ名}",
              "Command.$": "States.Array('python','/home/sending.py', $.baggage, $.message)"
            }
          ]
        }
      },
      "End": true,
      "ResultPath": "$"
    }
  }
}

結果

以下の入力に対して、

{
  "baggagename": "お茶500mL×24",
  "category": "普通"
}


タスクがエラーとなって失敗しました…

以下、CloudWatchより、該当タスクのログです。

File "/home/reception.py", line 21
return json.dumps(
^^^^^^^^^^^^^^^^^^
SyntaxError: 'return' outside function


まあ、そもそも関数を設定していないのにも関わらずreturnが使えるわけがありません。


Send Task Successを用いる

(お急ぎの方、こんにちは。頭から呼んでくださっている方、お待たせしております。)

戻り値を設定するだけではうまくいかなかったので、他の方法を考えます。

調べてみたところ、Send Task Successメソッドを使うことでAmazon ECSからの出力が実現できそうです。
docs.aws.amazon.com

そして、AWS Step Functions上のAmazon ECSでSend Task Successを用いる際には、サービス統合パターンを「タスクトークンのコールバックまで待機する」にする必要があります。
(今回はタスクトークンについての説明は省略します)
docs.aws.amazon.com


※Send Task Successを用いると、Amazon ECSからAWS Step Functionsに対しての権限が必要になります。

安定稼働のための各種設定

エラーハンドリング

サービス統合パターンが「ジョブの実行 (.sync)」の場合、プログラム上でエラーが発生した場合、タスクが失敗となって、AWS Step Functionsが停止します。

しかし、今回は、サービス統合パターンを「タスクトークンのコールバックまで待機する」に変更したので、プログラム上でエラーが発生しても、コールバックが起きず、タスクが失敗となりません。

ですので、プログラム上でエラーが発生する場合をcatchして、Send Task Failureを設定しておく必要があります。

無限待機に備える

Send Task Success/FailureはあくまでAPIのリクエストに過ぎないため、極めて低い確率ではありますが、不具合が起きる可能性があります。

その他プログラム上の処理でも不具合が起きる可能性があります。

ですので、Send Task Success/Failureを利用する際、タスクトークンを送信することに失敗して無限に待機することを回避するためにタスクのタイムアウトを設定するのが良いでしょう。


求めていたのはこれだ!!!

(結論だけ見たい方、こんにちは。お急ぎの方、お待たせしました。)

ソースコード

(言語はPythonです)
※記載していませんが、Dockerfileでboto3をインストールしています。

受付

reception.py

import sys
import json
import os
import boto3

# Step Functionsのクライアントを作成
sfn = boto3.client('stepfunctions')

# コマンドライン引数から荷物名と分類を取得
args = sys.argv
if len(args) > 1:
    baggage = args[1]
    category = args[2]
    
    # 分類に応じたメッセージを格納
    if category == '冷凍':
        message = '冷凍庫に入れて配送してください。'
    elif category == '精密機器':
        message = '精密機器です。衝撃に注意して配送してください。'
    else:
        message = 'お客様の大切な荷物です。丁重に配送してください。'
    
    # JSON形式で出力
    sfn.send_task_success(
        taskToken = os.getenv("TASKTOKEN"),
        output=json.dumps(
            {
                'baggage' : baggage, 
                'message' : message
            }
        )
    )
else:
    # JSON形式で出力
    sfn.send_task_failure(
        taskToken = os.getenv("TASKTOKEN"),
        error = 'E001',
        cause = 'コマンドライン引数が不足しています'
    )
発送

sending.py

import sys
import json
import os
import boto3

# Step Functionsのクライアントを作成
sfn = boto3.client('stepfunctions')

# コマンドライン引数から荷物名とメッセージを取得
args = sys.argv
if len(args) > 1:
    baggage = args[1]
    message = args[2]
    
    # 荷物名とメッセージを結合
    message = f'{baggage}は{message}'
    
    # JSON形式で出力
    # JSON形式で出力
    sfn = boto3.client('stepfunctions')
    sfn.send_task_success(
        taskToken=os.getenv("TASKTOKEN"),
        output=json.dumps(
            { 'message' : message }
        )
    )
else:
    # JSON形式で出力
    sfn.send_task_failure(
        taskToken = os.getenv("TASKTOKEN"),
        error = 'E001',
        cause = 'コマンドライン引数が不足しています'
    )

IAMポリシーの追記

Amazon ECSのタスク定義にて、実行ロールおよび、タスクロールのポリシーに以下の権限を追記

{
    "Effect": "Allow",
    "Action": [
        "states:SendTaskSuccess",
        "states:SendTaskFailure"
    ],
    "Resource": "*"
}

AWS Step Functions

ステートマシン定義

{
  "Comment": "A description of my state machine",
  "StartAt": "reception",
  "States": {
    "reception": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "{クラスターのARN}",
        "TaskDefinition": "{タスク定義のARN}",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "AssignPublicIp": "ENABLED",
            "SecurityGroups": [
              "{セキュリティグループ}"
            ],
            "Subnets": [
              "{サブネット}"
            ]
          }
        },
        "Overrides": {
          "ContainerOverrides": [
            {
              "Name": "{コンテナ名}",
              "Command.$": "States.Array('python','/home/reception.py', $.baggagename, $.category)",
              "Environment": [
                {
                  "Name": "TASKTOKEN",
                  "Value.$": "$$.Task.Token"
                }
              ]
            }
          ]
        }
      },
      "Next": "sending",
      "TimeoutSeconds": 300
    },
    "sending": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "{クラスターのARN}",
        "TaskDefinition": "{タスク定義のARN}",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "AssignPublicIp": "ENABLED",
            "SecurityGroups": [
              "{セキュリティグループ}"
            ],
            "Subnets": [
              "{サブネット}"
            ]
          }
        },
        "Overrides": {
          "ContainerOverrides": [
            {
              "Name": "{コンテナ名}",
              "Command.$": "States.Array('python','/home/sending.py', $.baggage, $.message)",
              "Environment": [
                {
                  "Name": "TASKTOKEN",
                  "Value.$": "$$.Task.Token"
                }
              ]
            }
          ]
        }
      },
      "End": true,
      "ResultPath": "$",
      "TimeoutSeconds": 300
    }
  }
}

結果

以下の入力に対して、

{
  "baggagename": "お茶500mL×24",
  "category": "普通"
}

receptionのタスクから出力できています。


また、sendingのタスクの入力はreceptionの出力から受け取れています。


最終出力は以下の通りです。

"output": {
    "message": "お茶500mL×24はお客様の大切な荷物です。丁重に配送してください。"
  }


ということでAmazon ECSからの出力を実現できました。



ちなみに

元も子もないですが、
Amazon ECS間で値を受け渡すならS3にファイルをアップロードしてもいいんですけどね笑

*1:最終閲覧日:2023年4月20日