Step Functionsを利用したループ処理とエラー回避

こんにちは。katoです。

今回はStep Functionsを利用したLambdaの繰り返し実行とエラー回避の方法をご紹介したいと思います。

分散処理の用途で利用されることの多いStep Functionsですが、大規模アプリケーション以外でも便利な使い方ができるのでご紹介させていただきます。

 

概要

今回Step Functionsで行うのは、CloudWatch Logsのエクスポート処理になります。
以下の構成の青枠で囲われた部分をStep Functionsで実行します。

 

Step Functionsで実行 構成

 

CloudWatch Logsのエクスポート処理は、実行中のエクスポートが存在するとエラーとなってしまうため、Lambdaのみで処理を自動化しようとしたら、エラー時の判定やsleep等を考慮しなければならないため、面倒な部分が多くなります。

Step Functionsでは、エラー時に処理を繰り返すことが可能で、Lambdaの様にタイムアウト値の制限もないため、実行中のエクスポート処理が完了するのを待ってから、再度エクスポートを実行するといったことが可能です。

 

Lambda関数の作成

まず初めに、Step Functionsで実行する2つのLambda関数を作成していきます。

bucket check

S3バケットのチェック用関数になります。
CloudWatch Logsでは、エクスポートの実行後、指定のS3バケットにログが圧縮されて保存されます。
このバケットのコンテンツをチェックし、取得するログのパラメータをStep Functionsに返します。

 

#!/usr/bin/python

import boto3
import json
import time
import datetime
from dateutil.relativedelta import relativedelta

def lambda_handler(event, context):
  today = datetime.date.today()
##ここのrangeで指定した値がチェック対象期間(6を指定したら先月~6ヶ月前までチェック)
  for num in range(3):
    from_day = today - relativedelta(months=num+1)
    from_msec = from_day.strftime('%s')
    to_day = today - relativedelta(months=num)
    to_msec = int(to_day.strftime('%s')) - 1
    check_prefix = str(from_day) + "/"

##s3のフォルダ存在確認とコンテンツサイズの確認(27byte以下のオブジェクトしかない場合はエラーとみなして再export)
##コンテンツサイズの指定は環境に応じて変更する必要あり(無限ループの危険性あり)
    client = boto3.client('s3', region_name='ap-northeast-1')
    response = client.list_objects(
      Bucket='s3-bucket-name',
      Prefix=check_prefix
    )

    count = 0
    contents_len = 0
    if "Contents" in response.keys():
      contents_len = len(response['Contents'])
      for num in range(len(response['Contents'])):
        if response['Contents'][num]['Size'] > 27:
          count += 1
    else:
      pass

##export対象が見つかった時点でパラメータを返し処理終了
    if count == 0  and contents_len != 1:
      client = boto3.client('sns')
      response = client.publish(
        TopicArn='arn:aws:sns:ap-northeast-1:123456789012:sns-topic-name',
        Message='export month: ' + from_day.strftime("%Y-%m-%d"),
        Subject='CloudWatchLogsExport'
      )
      return { 'state': "step", 'Month': from_day.strftime("%Y-%m-%d"), 'From_Msec': from_msec, 'To_Msec': to_msec }
    else:
      pass
##for文が終了した場合(export対象が存在しない時)は終了ステータスを返す
  return {'state': "fin"}

 

上記スクリプト中に記載している27byteという値は、「aws-logs-write-test」というCloudWatch Logsのエクスポート時にテストで配置されるコンテンツのサイズになります。
エクスポート対象のログが存在しない場合は、バケットにこのコンテンツのみ配置されるので、処理が無限ループしないようコンテンツサイズやコンテンツ数をチェックし、エクスポート対象の有無を判定しております。

知らない間に処理が無限ループし、AWSの請求額が高額となってしまった。。。ということを防ぐためにも、Step Functionsの利用時には、ループの判定やエラー処理は注意して行ってください。

上記スクリプトの様にメール通知を挟むと、異常なループを早期発見できるようになるので、Step Functionsの処理に通知機能を挟むこともお勧めです。

 

log export

CloudWatch LogsをS3バケットにエクスポートする関数になります。
Step Functionsから受け取ったパラメータを元にエクスポートの対象を決定します。

 

import boto3
import json
import datetime
 
def lambda_handler(event, context):
  from_day = event['Month']
  from_msec = event['From_Msec']
  to_msec = event['To_Msec']
  awslog_to_s3(from_day, from_msec, to_msec)
 
def awslog_to_s3(from_day, from_msec, to_msec):
  client = boto3.client('logs')
 
  response = client.create_export_task(
    taskName='export-task-name',
    logGroupName='log-group-name',
    fromTime=int(from_msec) * 1000,
    to=int(to_msec) * 1000,
    destination='s3-bucket-name',
    destinationPrefix=from_day
  )
  print response

 

上記Lambda関数で指定したS3バケットは、CloudWatchからのエクスポートを許可するようにバケットポリシーを設定する必要があるので、下記のポリシーをバケットに設定します。

 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "logs.ap-northeast-1.amazonaws.com"
            },
            "Action": "s3:GetBucketAcl",
            "Resource": "arn:aws:s3:::s3-bucket-name"
        },
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "logs.ap-northeast-1.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::s3-bucket-name/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control"
                }
            }
        }
    ]
}

 

ステートマシンの作成

Lambda関数の作成が完了したら、Step Functionsのステートマシンを作成していきます。

ステートマシンの作成画面で以下のコードを入力します。

 

{
  "StartAt": "BucketCheck",
  "States": {
    "BucketCheck": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:1234567891012:function:bucketcheck-lambda-name",
      "Next": "ResultCheck"
    },
    "ResultCheck": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.state",
          "StringEquals": "fin",
          "Next": "EndStep"
        }
      ],
      "Default": "LogsExport"
    },
    "LogsExport": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:export-lambda-name",
      "Retry": [{
        "ErrorEquals": ["LimitExceededException"],
        "IntervalSeconds": 60,
        "MaxAttempts": 5,
        "BackoffRate": 10
      }],
      "Next": "BucketCheck"
    },
    "EndStep": {
      "Type": "Succeed"
    }
  }
}

 

コードの入力後、ビジュアルワークフローを更新すると、以下の様なフローが表示されると思います。

 

ビジュアルワークフロー

 

エクスポート対象の有無を確認し、その結果に応じてエクスポートを実行するか、処理を終了するか決定します。
エクスポートの実行後は、再度、エクスポート対象の有無を確認し、他にもエクスポートの対象がないか確認しています。
同時実行制限によりエクスポートの処理に失敗した場合には、インターバルを設けて再度エクスポートを行っています。

上記フローにおいて、Endがつながっていないのは、EndStepにおいてSucceedのTypeを利用しているためです。

 

動作確認

ステートマシンの作成が完了したら、実際に動かしてみて動作を確認してみましょう。

適当なタスク名を付けて実行を行います。

正常に動作している場合、下記のようなフロー結果が表示されると思います。

 

Step Functionsを利用したループ処理とエラー回避 動作確認

 

S3のバケットを確認してみると、複数のエクスポートが完了していることがわかります。
各フォルダの中に圧縮されたログが保存されております。

 

S3のバケットを確認

 

次にもう一度ステートマシンを実行します。
1回目の実行でエクスポートがすべて完了しているので、先ほどとは違うフローが表示されるはずです。

下記の様にエクスポートを実行せずに終了すれば正常な動作となります。

 

ステートマシンを実行

 

まとめ

Step Functionsは分散処理の用途で利用されることが多いサービスですが、他の用途で利用しても便利なサービスとなっております。

Lambdaとの組み合わせ次第で色々できそうなので、面白そうなものがあったらまたご紹介させていただきます。