はじめに
先日、AWS Step Functionsを使ってバッチを作成していたのですが、
「Amazon ECSから出力した値を次のステートに渡したい!…けど、どうやってやるんだ?」
と疑問に思いました。
目次
お急ぎの方は、Send Task Successを用いるからご覧ください。
結論だけ見たい方は、求めていたのはこれだ!!!をご覧ください。
AWS Step Functionsって何なの…?
AWS Step Functionsは、AWS Lambda関数と他のAWS のサービスを組み合わせてビジネスクリティカルなアプリケーションを構築できるサーバーレスのオーケストレーションサービスです。
(AWS Step Functions とは? - AWS Step Functions より引用*1)
AWS Step Functionsで複数のタスクを組み合わせることができるので、複雑なワークフローを設計することができます。
特に、Amazon ECSついては、実行するコマンドや環境変数などのコンテナの上書きもAWS Step Functionsで設定できます。
AWS Step Functionsの詳細については、こちらの公式ドキュメントをご覧ください。
aws.amazon.com
処理の概要
今回は配送を例にAWS Step Functionsでワークフローを作成してみます。
受付と発送の2つのAmazon ECS(AWS Fargateタイプ)で処理をすることにします。
受付
Input:荷物名、荷物分類(冷凍・精密機器・普通の3つ)
処理:荷物分類に応じて、発送業者へのメッセージを出力します。
- 冷凍 → 「冷凍庫に入れて配送してください。」
- 精密機器 → 「精密機器です。衝撃に注意して配送してください。」
- 普通 → 「お客様の大切な荷物です。丁重に配送してください。」
Output:荷物名、荷物分類に対応した発送業者へのメッセージ
発送
Input:荷物名、荷物分類に対応した発送業者へのメッセージ
処理:荷物名と発送業者へのメッセージの文字列を結合します。
Output:荷物名と荷物分類に対応した発送業者へのメッセージを結合した文字列
受付のOutputが発送のInputになっていますので、Amazon ECS間で値の受け渡しが必要になります。
単純にreturnしてみる()
まずは、単純にreturnしてみたいと思います。
AWS Lambdaを用いる場合、戻り値を設定すれば出力できるので、同じくコンピューティングサービスのAmazon ECSも同様にできるのではないかという希望的観測です。
ソースコード
(言語はPythonです)
受付
reception.py
import sys import json # コマンドライン引数から荷物名と分類を取得 args = sys.argv baggage = args[1] category = args[2] # 分類に応じたメッセージを格納 if category == '冷凍': message = '冷凍庫に入れて配送してください。' elif category == '精密機器': message = '精密機器です。衝撃に注意して配送してください。' else: message = 'お客様の大切な荷物です。丁重に配送してください。' # JSON形式で出力 return json.dumps( { 'baggage' : baggage, 'message' : message } )
発送
sending.py
import sys import json # コマンドライン引数から荷物名とメッセージを取得 args = sys.argv baggage = args[1] message = args[2] # 荷物名とメッセージを結合 message = f'{baggage}は{message}' # JSON形式で出力 return json.dumps( { 'result' : message } )
AWS Step Functions
ステートマシン定義
{ "Comment": "A description of my state machine", "StartAt": "reception", "States": { "reception": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.sync", "Parameters": { "LaunchType": "FARGATE", "Cluster": "{クラスターのARN}", "TaskDefinition": "{タスク定義のARN}", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "SecurityGroups": [ "{セキュリティグループ}" ], "Subnets": [ "{サブネット}" ] } }, "Overrides": { "ContainerOverrides": [ { "Name": "{コンテナ名}", "Command.$": "States.Array('python','/home/reception.py', $.baggagename, $.category)" } ] } }, "Next": "sending" }, "sending": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.sync", "Parameters": { "LaunchType": "FARGATE", "Cluster": "{クラスターのARN}", "TaskDefinition": "{タスク定義のARN}", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "SecurityGroups": [ "{セキュリティグループ}" ], "Subnets": [ "{サブネット}" ] } }, "Overrides": { "ContainerOverrides": [ { "Name": "{コンテナ名}", "Command.$": "States.Array('python','/home/sending.py', $.baggage, $.message)" } ] } }, "End": true, "ResultPath": "$" } } }
結果
以下の入力に対して、
{ "baggagename": "お茶500mL×24", "category": "普通" }
タスクがエラーとなって失敗しました…
以下、CloudWatchより、該当タスクのログです。
File "/home/reception.py", line 21 return json.dumps( ^^^^^^^^^^^^^^^^^^ SyntaxError: 'return' outside function
まあ、そもそも関数を設定していないのにも関わらずreturnが使えるわけがありません。
Send Task Successを用いる
(お急ぎの方、こんにちは。頭から呼んでくださっている方、お待たせしております。)
戻り値を設定するだけではうまくいかなかったので、他の方法を考えます。
調べてみたところ、Send Task Successメソッドを使うことでAmazon ECSからの出力が実現できそうです。
docs.aws.amazon.com
そして、AWS Step Functions上のAmazon ECSでSend Task Successを用いる際には、サービス統合パターンを「タスクトークンのコールバックまで待機する」にする必要があります。
(今回はタスクトークンについての説明は省略します)
docs.aws.amazon.com
※Send Task Successを用いると、Amazon ECSからAWS Step Functionsに対しての権限が必要になります。
求めていたのはこれだ!!!
(結論だけ見たい方、こんにちは。お急ぎの方、お待たせしました。)
ソースコード
(言語はPythonです)
※記載していませんが、Dockerfileでboto3をインストールしています。
受付
reception.py
import sys import json import os import boto3 # Step Functionsのクライアントを作成 sfn = boto3.client('stepfunctions') # コマンドライン引数から荷物名と分類を取得 args = sys.argv if len(args) > 1: baggage = args[1] category = args[2] # 分類に応じたメッセージを格納 if category == '冷凍': message = '冷凍庫に入れて配送してください。' elif category == '精密機器': message = '精密機器です。衝撃に注意して配送してください。' else: message = 'お客様の大切な荷物です。丁重に配送してください。' # JSON形式で出力 sfn.send_task_success( taskToken = os.getenv("TASKTOKEN"), output=json.dumps( { 'baggage' : baggage, 'message' : message } ) ) else: # JSON形式で出力 sfn.send_task_failure( taskToken = os.getenv("TASKTOKEN"), error = 'E001', cause = 'コマンドライン引数が不足しています' )
発送
sending.py
import sys import json import os import boto3 # Step Functionsのクライアントを作成 sfn = boto3.client('stepfunctions') # コマンドライン引数から荷物名とメッセージを取得 args = sys.argv if len(args) > 1: baggage = args[1] message = args[2] # 荷物名とメッセージを結合 message = f'{baggage}は{message}' # JSON形式で出力 # JSON形式で出力 sfn = boto3.client('stepfunctions') sfn.send_task_success( taskToken=os.getenv("TASKTOKEN"), output=json.dumps( { 'message' : message } ) ) else: # JSON形式で出力 sfn.send_task_failure( taskToken = os.getenv("TASKTOKEN"), error = 'E001', cause = 'コマンドライン引数が不足しています' )
IAMポリシーの追記
Amazon ECSのタスク定義にて、実行ロールおよび、タスクロールのポリシーに以下の権限を追記
{ "Effect": "Allow", "Action": [ "states:SendTaskSuccess", "states:SendTaskFailure" ], "Resource": "*" }
AWS Step Functions
ステートマシン定義
{ "Comment": "A description of my state machine", "StartAt": "reception", "States": { "reception": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken", "Parameters": { "LaunchType": "FARGATE", "Cluster": "{クラスターのARN}", "TaskDefinition": "{タスク定義のARN}", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "SecurityGroups": [ "{セキュリティグループ}" ], "Subnets": [ "{サブネット}" ] } }, "Overrides": { "ContainerOverrides": [ { "Name": "{コンテナ名}", "Command.$": "States.Array('python','/home/reception.py', $.baggagename, $.category)", "Environment": [ { "Name": "TASKTOKEN", "Value.$": "$$.Task.Token" } ] } ] } }, "Next": "sending", "TimeoutSeconds": 300 }, "sending": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken", "Parameters": { "LaunchType": "FARGATE", "Cluster": "{クラスターのARN}", "TaskDefinition": "{タスク定義のARN}", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "SecurityGroups": [ "{セキュリティグループ}" ], "Subnets": [ "{サブネット}" ] } }, "Overrides": { "ContainerOverrides": [ { "Name": "{コンテナ名}", "Command.$": "States.Array('python','/home/sending.py', $.baggage, $.message)", "Environment": [ { "Name": "TASKTOKEN", "Value.$": "$$.Task.Token" } ] } ] } }, "End": true, "ResultPath": "$", "TimeoutSeconds": 300 } } }
結果
以下の入力に対して、
{ "baggagename": "お茶500mL×24", "category": "普通" }
receptionのタスクから出力できています。
また、sendingのタスクの入力はreceptionの出力から受け取れています。
最終出力は以下の通りです。
"output": { "message": "お茶500mL×24はお客様の大切な荷物です。丁重に配送してください。" }
ということでAmazon ECSからの出力を実現できました。
ちなみに
元も子もないですが、
Amazon ECS間で値を受け渡すならS3にファイルをアップロードしてもいいんですけどね笑