こんにちは。katoです。
今回は前回ご紹介したAmazon Connectのデータストリーミング機能にLambdaの処理を組み込んでいきたいと思います。
概要
今回は、前回作成したAmazon Connect + Kinesis Firehose + Amazon Elasticsearchの仕組みに、Lambdaを組み込んでいきます。
全体の構成としては下記の様になります。
なお、今回はLambdaの組み込みの部分以外は省略させていただきますので、Amazon Connectのデータストリーミング機能をまだ有効化されていない方は、前回の記事をご参照ください。
Amazon Connect + Kinesis Firehose + Amazon Elasticsearch
手順
Kinesis FirehoseへのLambda組み込みで必要となるのは、Lambda関数のみです。
Kinesis Firehoseの設定画面にlambda組み込み用のブループリントが用意されていますが、Node.jsのものしかなかったので、今回はPythonで一からLambda関数を作成しました。
今回組み込むLamnda関数は、Amazon Connectから送られてきた顧客の電話番号を基に、DyanamoDBから顧客の会社名を取得するものになります。
DynamoDBのテーブルに関しましては、プライマリパーティションキーに電話番号(number)を指定しているだけの単純なものになりますので、作成手順は省略させていただきます。
import boto3 import json import base64 def lambda_handler(event, context): dynamodb = boto3.client('dynamodb') output = [] for num in range(len(event['records'])): decodedata = base64.b64decode(event['records'][num]['data']) decodejson = json.loads(decodedata) phonenum = decodejson['CustomerEndpoint']['Address'] response = dynamodb.get_item( TableName='dynamodb_table_name', Key={ 'number': { 'S': phonenum } } ) ##DynamoDBテーブルの取得するフィールド名を指定します corp = response['Item']['契約者会社名']['S'] decodejson['CustomerEndpoint']['Address'] = corp strjson = json.dumps(decodejson) encodedata = base64.b64encode(strjson.encode()) output_json = { 'recordId': event['records'][num]['recordId'], 'result': 'Ok', 'data': encodedata.decode(), } output.append(output_json) return { 'records': output }
スクリプト自体はかなり短いものとなっておりますが、作成にあたって少し躓いた箇所をご説明していきます。
・受信データに関して Lambdaに送られるAmazon Connectのデータは下記の通りとなっております。
invocationId deliveryStreamArn region records(recordId、approximateArrivalTimestamp、data)
Amazon Connectのデータストリーミングは、一定期間内のコンタクトイベントをまとめてKinesis Firehoseに送るのでfor文でレコードをそれぞれ処理しております。
また、Kinesis FirehoseからLambdaに送られるデータはbase64エンコードされておりますのでデコード処理が必要となります。
今回の受信データはjson形式であるため、デコードと合わせてjson変換の処理も行っております。
・送信データに関して LambdaからKinesis Firehoseに変換したデータを返す際には、以下のパラメータを含む必要があります。
recordId result deta
recordIdはLambdaがKinesis Firehoseから受け取った際のrecordIdとそろえる必要があります。 異なるrecordIdを指定した場合には、データ変換の失敗として扱われAmazon Elasticsearchにはデータが送られなくなります。
resultのvalueは「Ok」、「Dropped」、「ProcessingFailed」のいづれかを指定する必要があります。 今回は全てOkとしていますが、変換処理の成功時はOk、変換に失敗した場合はProcessingFailedまたはDroppedで削除のような処理にすると良いでしょう。
detaに関しましては、Kinesis Firehoseに返す際にもbase64エンコードを行う必要があります。 「json → str → encode」の流れになるのですが、byte型のままだと返信用のjson格納時にエラーとなってしまいます。 そのため、json格納時にdecodeした状態でdetaを指定する必要があります。
Lambda関数の作成が完了したら、Kinesis Firehoseに作成したLambda関数を組み込んでいきます。
Kinesis Firehoseの管理画面から作成済みのstreamを選択し、Detailsタブの「Edit」をクリックします。 「Transform source records with AWS Lambda」の「Source record tranformation」を有効化し、先ほど作成したLambda関数を指定します。
上記の設定を行うと、IAMロールの警告が表示されるので、Lambda権限を持ったロールに更新します。 更新は割当て済みのロールに手動で権限を追加してもいいですが、「Create new or update」から新規ロールを作成することで、Lambda権限を付与されたIAMロールを作成することができます。
以上でKinesis FirehoseへのLambda組み込みは完了となります。
動作確認
Lambda組み込み前のKibanaでのグラフの表示は下記の様に電話番号の表示となっておりました。
今回作成したLambda関数でデータを変更することにより、下記の様に電話番号に基づいた顧客情報に更新されます。
なお、今回は受信データを直接編集しているので、Lambda関数の組み込み前に利用していた電話番号はAmazon Elasticsearchには送られなくなります。 電話番号でのグラフ化も行うような場合には、今回のようなjsonデータの上書きではなく、キーの追加にてデータを変更する必要があります。
まとめ
前回ご紹介したAmazon Connectのデータストリーミング機能にLambda処理を組み込む方法をご紹介しました。
Amazon ConnectからKinesis Firehoseに送られるデータはそのままでも十分活用できるものとなっておりますが、問合せデータの解析にあたって、顧客やエージェントの情報は必要不可欠となってきます。
Lambdaを経由することにより、今回のようなDynamoDB以外にも様々なAWSサービスと連携することが可能となるので、利用シーンにあった解析ツールを構築することが可能となります。
簡単に組み込むことができる機能となっておりますので、Amazon Connectのデータストリーミングを有効化する際には、Lambda連携もお考えになってみてはいかがでしょうか。