Step Functions~Call Amazon SNS and DynamoDB APIs~

こんにちは。katoです。

今回は先日のre:Invent 2018にて発表された、Step FunctionsのSNSとDynamoDBとの連携の方法についてご紹介させていただきます。

 

概要

先日のre:Invent 2018にて、Step Functionsが新しく以下の8つのAWSサービスと連携可能となることが発表されました。

・DynamoDB
AWS Batch
Amazon ECS
AWS Fargate
Amazon SNS
Amazon SQS
AWS Glue
Amazon SageMaker

今回はこれらのAWSサービスの中でも利用頻度の高い、Amazon SNSとDynamoDBをStep Functionsと連携して、簡単なシステムを構築していきたいと思います。

今回構築するシステムは下図のような簡単なものとなります。

Step Functions 構築するシステム

 

全体の流れと致しましては、以下のようになります。

➀LambdaにてS3のオブジェクト(Billing Data)を取得/集計
②DynamoDB TableにputItem
③DynamoDB TableからgetItem
④Lambdaにて通知用メッセージの作成
Amazon SNSにてPublish

上記5つの処理をそれぞれ個別のタスクとしてStep Functionsにて分散処理を実現します。

なお今回データソースとして利用するS3オブジェクトは、aws-cost-allocationのCSVファイルとなります。

billing情報をDailyでDynamoDBに格納、SNS通知するようなシステムとなっております。

 

手順

それでは実際に設定を行っていきましょう。

各種サービスの利用方法に関しましては、ここでは省略させていただきます。

まず初めに最初のタスクとなるオブジェクトの取得/集計を行うLambda関数を作成します。

Lambda関数の作成に関しましては、returnを忘れずに行うことだけ注意して下さい。
そのほかに関しましては、基本的なLambda関数の作成と同じとなります。

 

import json
import boto3
import datetime

s3 = boto3.client('s3', region_name='ap-northeast-1')

def lambda_handler(event, context):
    nowdate = datetime.datetime.now()
    yesterday = nowdate - datetime.timedelta(days=1)
    strmonth = nowdate.strftime('%Y-%m')
    strdate = nowdate.strftime('%Y%m%d')
    stryesterday = yesterday.strftime('%Y%m%d')
    filename = "000123456789-aws-cost-allocation-" + strmonth + ".csv"
    getobj = s3.get_object(
        Bucket='xxxxxxxxxx',
        Key=filename
    )
    body = getobj['Body'].read()
    decodebody = body.decode('utf-8')
    datas = decodebody.split("\n")
    services = []
    billings = []
    for num in range(2, len(datas)):
        item = datas[num].split(",")
        if len(item) > 30:
            if item[12] != "\"\"":
                btax = item[-7].strip('\"')
                if item[12] in services:
                    serviceindex = services.index(item[12])
                    billings[serviceindex] = billings[serviceindex] + float(btax)
                else:
                    services.append(item[12])
                    billings.append(float(btax))
    totalcost = 0
    for i in range(len(services)):
        print (services[i])
        print (billings[i])
        totalcost = totalcost + billings[i]
    print (totalcost)
    return {"date": strdate,"totalcost": str(totalcost), "yesterday": stryesterday}

上記Lambda関数では、AWSサービスごとの利用料を取得していますが、サービスの数が不確定であることなどを考慮し、今回はTotalCostのみを利用しています。

次に、もう一つのメッセージ作成用のLambda関数を作成します。

こちらに関しましては、Step Functions側にてメッセージの作成/成型を行いたかったのですが、Step Functionsにてうまく変数を結合して、メッセージを作成することができなかったので、断念してLambdaを挟むことに致しました。

なので、作成するLamdba関数も下記の様にメッセージの作成と計算処理を行うのみのとても簡単なものとなっております。

import json

def lambda_handler(event, context):
    print (event)
    increases = float(event['totalcost']) - float(event['getdynamodb']['Item']['value']['S'])
    message = "Current cost: $" + event['totalcost'] +  "\nTodays increment: +$" + str(increases)
    return {'message': message}

2つのLambda関数の作成が完了したら、Step Functionsのステートマシンを作成していきます。

なお、SNSトピックやDynamoDBをまだ作成していない場合には事前に作成しておいてください。

ステートマシンの定義は以下の通りとなります。
詳細や注意点に関しましては追ってご説明いたします。

 

"StartAt": "GetBilling",
  "States": {
    "GetBilling": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:000123456789:function:xxxxxxxxxx",
      "Next": "PutDynamoDB"
    },
    "PutDynamoDB": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:putItem",
      "Parameters": {
        "TableName": "xxxxxxxxxx",
        "Item": {
          "date": {"S.$": "$.date"},
          "value": {"S.$": "$.totalcost"}
        }
      },
      "InputPath": "$",
      "ResultPath": "$.putdynamodb",
      "OutputPath": "$", 
      "Next": "GetDynamodb"
    },
    "GetDynamodb": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:getItem",
      "Parameters": {
        "TableName": "xxxxxxxxxx",
        "Key": {
          "date": {"S.$": "$.yesterday"}
        }
      },
      "InputPath": "$",
      "ResultPath": "$.getdynamodb",
      "OutputPath": "$",
      "Next": "CreateMessage"
    },
    "CreateMessage": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:000123456789:function:xxxxxxxxxx",
      "Next": "SNSPublish"
    },
    "SNSPublish": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:ap-northeast-1:000123456789:xxxxxxxxxx",
        "Message.$": "$.message"
      },
      "End": true
    }
  }
}

DynamoDBもSNSもリソースの指定が必要となります。上記のようにそれぞれのAPIに固定のリソースが存在していますが、ステートマシンの定義にて補完される選択肢の中には上記のリソースは存在していないのでご注意ください。

DynamoDBはKeyやItemの指定、SNSはArnの指定と基本的な使い方に関しては、ほかのAPIと大差ないので、詳細は省かせていただきます。

ここからはStep Functionsならではの癖のようなものを簡単にご紹介いたします。

・入力値の受け渡しに関して
Step FunctionsにはInputとOutputがそれぞれの処理毎に存在していて、InputPath、OutputPath、ResultPathの設定を適切に設定しないと複数の処理を跨いで値の参照ができません。

例えば、上記の処理にてPutDynamoDBのInputPath、OutputPath、ResultPathの記載を削除した場合、次のGetDynamodbのタスクに渡されるOutput値は空の{}となります。

今回の様に特定の結果をステートマシン全体を通して利用する場合には、InputPath、OutputPath、ResultPathの設定を適宜行ってください。

・変数の呼び出しに関して
Step Functionsでは「$」を使うことで、Inputされたパラメータなどにアクセスすることが可能です。

しかし、利用する場所によって変数の指定の仕方が異なってくるので注意が必要です。

例えば、InputPathなどでは「"InputPath": "$"」といった形で変数を参照することが可能ですが、Parameters内で変数を呼び出す場合などには、「"date": {"S.$": "$.yesterday"}」といったような形で記載する必要があります。

一見すると「"date": {"S": "$.yesterday"}」といった記載でも問題なさそうですが、これでは「$.yesterday」がそのまま文字列として扱われてしまいますので注意が必要です。

SNSのメッセージに関して
SNSのメッセージは文字列(String型)である必要があります。

今回の様に動的にメッセージを指定する場合には、メッセージの文字列を一つのJsonオブジェクトとして格納する必要があります。

今回はLambda関数を挟むことで変数をメッセージとしてまとめています。

 

動作確認

それでは実際に動作を確認していきます。

正常にステートマシンの定義が完了している場合には、以下のようなフローが確認できます。

Step Functions 動作確認

 

ステートマシンを実行すると、DynamoDBへのデータ挿入、およびSNS通知が行われます。

ただし、一回目の実行に関しては、前日のデータが存在しないため、GetDynamodbの段階で処理が失敗するかと思います。事前に適当なデータを登録しておくか、翌日再度実行してみてください。

実行が正常に完了するとDynamoDBには下記のような形式でデータが登録されていきます。

DynamoDB データ

 

現在の利用料と前日からの増加額がSNSにて通知されます。

 

DynamoDB SNSにて通知

 

まとめ

今回はStep Functionsの新機能である、Amazon SNSとDynamoDBとの連携をご紹介いたしました。

今回構築した程度のシステムであれば、Lambdaのみでも簡単に実現できますが、Step Functionsの特徴の一つでもある分散処理というのはAWSアーキテクチャにおいて重要なものであり、例外処理への対応や管理コストなどを考えると、Step Functionsでシステムを構築するメリットが多くあります。

これまでは、Step Functionsの中で複数のLambdaを連携させるような使い方が主流となっていましたが、今回のアップデートにより、Lambdaを挟むことなく、ほかのAWSサービスと簡単に連携することが可能となりました。

今まで以上に便利なサービスとなりましたので、是非お試しになってみてはいかがでしょうか。

 

 

 

このブログの著者