こんにちは。katoです。
前回の続きとなります。
S3の使用状況を拡張子別に取得してみた~Step Functions編➀~
手順
前回までで並列処理用のLambda関数の用意が完了したので、実際にステートマシンを作成していきます。
並列処理を行うステートマシンを作成するステートマシン(図の上部分)を作成して行くことになるので、必要となるLambda関数を作成します。
必要となるLambda関数は以下の3種類となります。
・create-state-machine → ステートマシンの作成と実行用 ・describe-execution → 進捗状況(ステータス)の確認用 ・delete-state-machine → ステートマシンの削除用
今回実装するような並列処理部分の実行数が変動するような場合にはステートマシン自体を実行の度に作成するなどの対応が必要となります。
ステートマシンの更新も可能ですが、更新内容のチェックやユニークな実行名の指定など管理が面倒になるので、実行時にステートマシンを作成する方法を今回は採用しました。
create-state-machine
ステートマシン作成用のLambda関数が今回の中で最も重要な関数となりますが、行っている内容としては単純です。
S3のバケットリストを取得し、リストの数だけ並列処理部分のステートマシン定義を繰り返しているのみとなります。
ステートマシンの定義は文字列指定が可能なので、定義用の文字列を結合していくだけで完了します。
ステートマシンの作成後はバケットのリストをInputとして指定し、ステートマシンを実行しています。
import json import boto3 s3 = boto3.client('s3') sf = boto3.client('stepfunctions') def lambda_handler(event, context): buckets = {} listbucket = s3.list_buckets() for i in range(len(listbucket['Buckets'])): buckets[i] = listbucket['Buckets'][i]['Name'] ##Start State definition = '{ "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Parallel" },'\ '"Parallel": { "Type": "Parallel", "Next": "CreateMessage", "Branches": [ ' ##Parallel State for j in range(len(buckets)): statename = "ListObject" + str(j) nextstate = "PutDynamoDB" + str(j) input = "$." + str(j) definition = definition + '{ "StartAt": "%s", "States": { "%s": { "Type": "Task",'\ '"Resource": "arn:aws:lambda:ap-northeast-1:123456789000:function:list-objects", "InputPath": "%s", "Next": "%s" }, '\ '"%s": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:123456789000:function:put-dynamodb", "End": true }} },' % (statename, statename, input, nextstate, nextstate) definition = definition[:-1] ##End State definition = definition + '] }, "CreateMessage": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:123456789000:function:create-message",'\ '"Next": "Publish"}, "Publish": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish",'\ '"Parameters": { "TopicArn": "arn:aws:sns:ap-northeast-1:123456789000:xxxxxxxxxx", "Message.$": "$.message" }, "End": true } } }' create = sf.create_state_machine( name="blog_stepfunctions", definition=definition, roleArn="arn:aws:iam::123456789000:role/xxxxxxxxxx" ) start = sf.start_execution( stateMachineArn=create['stateMachineArn'], name="start", input=json.dumps(buckets) ) return {'exeArn': start['executionArn']}
1点注意していただきたいのが、ステートマシンの作成時に指定しているroleArnに関してです。
権限としてはLambdaの実行とSNS Publishのポリシーを付与すればいいのですが、信頼関係を以下のように定義する必要があります。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "states.ap-northeast-1.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
AWSコンソール上からStep Functions用ロールとしてIAMロールを作成すると、Service部分が「states.amazonaws.com」のように指定されます。
これではStep Functionsの実行時にエラーとなるため、上記の様に信頼関係を編集する必要が御座います。
describe-execution
import json import boto3 sf = boto3.client('stepfunctions') def lambda_handler(event, context): execution = event['exeArn'] desc = sf.describe_execution( executionArn=execution ) return {'exeArn': desc['executionArn'], 'status': desc['status'], 'stateArn': desc['stateMachineArn']}
delete-state-machine
import json import boto3 sf = boto3.client('stepfunctions') def lambda_handler(event, context): stateArn = event['stateArn'] delete = sf.delete_state_machine( stateMachineArn=stateArn ) print (delete)
Lambda関数の用意が完了したら実際にステートマシンの作成を行います。
{ "StartAt": "CreateStateMachine", "States": { "CreateStateMachine": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:123456789000:function:create-state-machine", "Next": "DescribeExecution" }, "DescribeExecution": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:123456789000:function:describe-execution", "Next": "Choice" }, "Choice": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "SUCCEEDED", "Next": "DeleteStateMachine" }, { "Variable": "$.status", "StringEquals": "RUNNING", "Next": "Wait" } ], "Default": "Error" }, "DeleteStateMachine": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:123456789000:function:delete-state-machine", "End": true }, "Wait": { "Type": "Wait", "Seconds": 30, "Next": "DescribeExecution" }, "Error": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "arn:aws:sns:ap-northeast-1:123456789000:xxxxxxxxxx", "Message": "Error" }, "End": true } } }
下図のようなステートマシンの定義が表示されるはずです。
定義の内容と致しましては、choiceを利用しステータスに応じて処理を分岐しています。
「SUCCEEDED」が得られた場合にはステートマシンを削除して終了。
「RUNNING」が得られた場合には30秒の待機時間をおいて再度ステータスチェックを実施。
上記以外のステータスが得られた場合にはエラーとして処理を終了。
なお、実行が正常に完了した場合には並列処理用のステートマシンは削除されますが、処理がエラーとなった場合にはステートマシンの削除は実施されません。
エラー内容の確認後、ステートマシンを手動にて削除します。
動作確認
作成したステートマシンを手動にて実行してみます。
入力オプションはデフォルトのままで問題ありません。
正常に実行が完了すると下図のようなビジュアルワークフローが表示されます。
また、動的に並列処理用のステートマシンが作成されることも確認できます。
処理の完了後にSNS経由でメールが届けば正常な動作となります。
まとめ
今回はStep Functionsの並列処理を動的に定義する方法をご紹介致しました。
Step Functionsの並列処理はとても便利な機能となっておりますが、動的リソースへの対応が難しかったりして、利用を見送るケースなども見受けられます。
今回ご紹介したように別のステートマシンやLambda関数を利用して、並列処理用のステートマシンを定義することで、動的リソースへの対応も可能となります。
準備やステートマシン定義の難しさは御座いますが、それに見合っただけのメリットが得られるはずです。
便利な機能となっておりますので一度お試しになってみてはいかがでしょうか。
.