こんにちは。katoです。
今回は前回ご紹介したS3の使用状況を拡張子別に取得する処理をStep Functionsにて実現していきたいと思います。
少しボリュームが増えてしまったので2回に分けてご説明していきます。
概要
Step Functionsを利用することで、並列処理を実現することができます。
時間の掛かる処理をStep Functionsで並列実行することで、処理時間の短縮やLambdaの実行時間制限を超えた処理の実現など多くの恩恵を受けることができます。
しかしながら、並列実行する処理の数が変動するような場合には、ステートマシンの定義の管理が難しく、少し扱いづらいものとなってしまいます。
そこで、今回は並列処理を行うStep Functionsを動的に作成し、管理コストを削減する方法をご紹介いたします。
全体の構成は下図のような形となります。
今回は並列処理を行う方のステートマシン(図の下部分)にて必要となるLambda関数の準備を進めていきます。
なお、今回実装する内容は前回の記事にてご紹介したS3の利用状況を拡張子別に取得するものとなります。
処理の内容やベースは前回の記事と基本的に同じものとなっておりますので、詳細はそちらをご覧ください。
手順
それでは実際に作業を進めていきます。
今回実装するS3の利用状況取得にて利用するLambda関数は以下の3種類となっております。
・list-objects → バケット内オブジェクトの取得と集計用 ・put-dynamodb → 集計結果のDynamoDB登録用 ・create-message → 通知用メッセージの作成とDBテーブルクリア用
DynamoDBへのアイテムの登録はステートマシン側の定義でも可能となりましたが、今回実装する内容ではLambdaからの返り値や登録数などが不確定な内容となりますので、Lambda関数にてDynamoDBにアイテムを登録します。
list-objects
オブジェクトの取得と集計用のLambda関数に関しては前回の記事とほぼ同じ内容となっております。
変更点としてはreturn部分の整形程度となりますので説明は省略致します。
import json import boto3 import re s3 = boto3.client('s3') def lambda_handler(event, context): print (event) bucket = event size = 0 count = 0 extensions = [] counts = [] sizes = [] usage = {} listobj = s3.list_objects_v2( Bucket=bucket ) if "Contents" in listobj: for num in range(len(listobj['Contents'])): key = listobj['Contents'][num]['Key'] prefix = key.rsplit("/", 1) if len(prefix) != 1: keys = prefix[1].rsplit(".", 1) if len(keys) != 1: extension = keys[1] else: extension = "other" else: keys = prefix[0].rsplit(".", 1) if len(keys) != 1: extension = keys[1] else: extension ="other" if extension in extensions: indexnum = extensions.index(extension) sizes[indexnum] = sizes[indexnum] + listobj['Contents'][num]['Size'] counts[indexnum] += 1 else: extensions.append(extension) sizes.append(listobj['Contents'][num]['Size']) counts.append(1) size = size + listobj['Contents'][num]['Size'] count += 1 objlen = len(listobj['Contents']) while (objlen % 1000 == 0): startlen = objlen -1 lastkey = listobj['Contents'][startlen]['Key'] listobj = s3.list_objects_v2( Bucket=bucket, StartAfter=lastkey ) for num in range(len(listobj['Contents'])): key = listobj['Contents'][num]['Key'] prefix = key.rsplit("/", 1) if len(prefix) != 1: keys = prefix[1].rsplit(".", 1) if len(keys) != 1: extension = keys[1] else: extension = "other" else: keys = prefix[0].rsplit(".", 1) if len(keys) != 1: extension = keys[1] else: extension ="other" if extension in extensions: indexnum = extensions.index(extension) sizes[indexnum] = sizes[indexnum] + listobj['Contents'][num]['Size'] counts[indexnum] += 1 else: extensions.append(extension) sizes.append(listobj['Contents'][num]['Size']) counts.append(1) size = size + listobj['Contents'][num]['Size'] count += 1 objlen = len(listobj['Contents']) usage['Bucket'] = bucket usage['TotalSize'] = str(size) usage['TotalCount'] = str(count) rankcount = 0 while (len(extensions) != 0 and rankcount < 5 ): maxindex = sizes.index(max(sizes)) usage[rankcount] = {'extension': extensions[maxindex], 'size': str(sizes[maxindex]), 'count': str(counts[maxindex])} del extensions[maxindex] del sizes[maxindex] del counts[maxindex] rankcount += 1 else: usage['Bucket'] = bucket usage['TotalSize'] = str(0) return {"usage": usage}
put-dynamodb
DynamoDBへのアイテム登録用のLambda関数は、前段のLambda関数から渡されたusageの値をDynamoDBテーブルに登録する処理を行っています。
DynamoDBのテーブルはパーティションキーを「buckets」、ソートキーを「extensions」として作成しております。
import json import boto3 dynamodb = boto3.client('dynamodb') def lambda_handler(event, context): if (len(event['usage']) <= 2): put = dynamodb.put_item( TableName='xxxxxxxxxx', Item={ 'buckets': { 'S': event['usage']['Bucket'] }, 'extensions': { 'S': "-" }, 'size': { 'S': event['usage']['TotalSize'] } } ) else: put = dynamodb.put_item( TableName='xxxxxxxxxx', Item={ 'buckets': { 'S': event['usage']['Bucket'] }, 'extensions': { 'S': "-" }, 'size': { 'S': event['usage']['TotalSize'] }, 'count': { 'S': event['usage']['TotalCount'] } } ) for num in range(0, len(event['usage']) - 3): put = dynamodb.put_item( TableName='xxxxxxxxxx', Item={ 'buckets': { 'S': event['usage']['Bucket'] }, 'extensions': { 'S': event['usage']['%s' % (num)]['extension'] }, 'size': { 'S': event['usage']['%s' % (num)]['size'] }, 'count': { 'S': event['usage']['%s' % (num)]['count'] } } )
create-message
メッセージ作成用のLambda関数はDynamoDBの登録情報をscanしてメッセージを作成します。
なお、今回はメッセージの作成後にDynamoDBのアイテムを全て削除していますが、統計用途などでデータを残したい場合には、最後のdelete_item部分の処理は省いてください。
import json import boto3 dynamodb = boto3.client('dynamodb') def lambda_handler(event, context): scan = dynamodb.scan( TableName="xxxxxxxxxx", ScanFilter={ 'extensions': { 'AttributeValueList': [ { "S": "-" } ], 'ComparisonOperator': 'EQ' } } ) messages = "" for num in range(len(scan['Items'])): total = "" detail = "" scanbucket = dynamodb.scan( TableName="xxxxxxxxxx", ScanFilter={ 'buckets': { 'AttributeValueList': [ { "S": scan['Items'][num]['buckets']['S'] } ], 'ComparisonOperator': 'EQ' } } ) for i in range(len(scanbucket['Items'])): if scanbucket['Items'][i]['extensions']['S'] == "-": total = "backet name: " + scan['Items'][num]['buckets']['S'] + "\ntotal size: " + scanbucket['Items'][i]['size']['S'] + "\ntotal count: " + scanbucket['Items'][i]['count']['S'] else: detail = detail + "\n\nextension: " + scanbucket['Items'][i]['extensions']['S'] + "\nsize: " + scanbucket['Items'][i]['size']['S'] + "\ncount: " + scanbucket['Items'][i]['count']['S'] message = total + detail + "\n\n===============\n\n" messages = messages + message print (messages) delscan = dynamodb.scan( TableName="xxxxxxxxxx" ) for num in range(len(delscan['Items'])): delete = dynamodb.delete_item( TableName="xxxxxxxxxxx", Key={ 'buckets': { 'S': delscan['Items'][num]['buckets']['S'] }, 'extensions': { 'S': delscan['Items'][num]['extensions']['S'] } } ) return {"message": messages}
以上で並列処理部分のLambda関数の用意が完了しました。
次回は実際にステートマシンを作成して並列処理の実装を行っていきます。
S3の使用状況を拡張子別に取得してみた~Step Functions編②~