こんにちは。katoです。
今回はDynamoDBに保存したデータをElasticSearchで可視化する方法をご紹介したいと思います。
概要
今回ご紹介する方法は、DynamoDB Streamsをトリガーとし、Lambda経由でElasticSearchにPOSTするという流れになります。
DynamoDB Streamsを利用することで、DynamoDBへの書き込みデータを即座にElasticSearchに反映させることが可能になります。
DynamoDB Streamsの有効化
まず初めに該当のDynamoDBテーブルにて、Streamsの有効化を行います。 DynamoDBテーブルの概要タブにて「ストリームの管理」を選択します。
表示タイプは、Lambda関数にeventとして渡すデータにも関係してくるので、用途やLambda関数に応じて適宜選択してください。 今回は「新旧イメージ」を選択し、有効化します。
Lambda関数の作成
次にDynamoDB Streamsからデータを受け取り、ElasticSearchにPOSTするLambda関数を作成していきます。
今回は、requestsモジュール等を利用するので、任意のサーバにてzipファイルを作成していきます。
以下の項目は適宜変更してください。
・DynamoDBのキー名(user、time、task等) ・アクセスキー&シークレットアクセスキー ・ElasticSearchエンドポイント ・リージョン ・インデックス名
# mkdir /dynamodb_to_es # cd /dynamodb_to_es # pip install requests aws_requests_auth -t . # vi lambda_function.py #!/usr/bin/python # *-# -*- coding: utf-8 -*- import requests import json from aws_requests_auth.aws_auth import AWSRequestsAuth import random import string import datetime from urlparse import parse_qs print('Loading function') def lambda_handler(event, context): for record in event['Records']: struser = (record['dynamodb']['NewImage']['user']['S']) strtime = (record['dynamodb']['NewImage']['time']['S']) strtask = (record['dynamodb']['NewImage']['task']['S']) user = struser.replace("\"", "") time = strtime.replace("\"", "") task = strtask.replace("\"", "") now = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z") auth = AWSRequestsAuth(aws_access_key='xxxxxxxxxxxxxxxxxxxx', aws_secret_access_key='xxxxxxxxxxxxxxxxxxxx', aws_host='search-xxxxxxxxxx-1234567890.ap-northeast-1.es.amazonaws.com', aws_region='ap-northeast-1', aws_service='es') n = 5 random_str = ''.join([random.choice(string.ascii_letters + string.digits) for i in range(n)]) url = ("https://search-xxxxxxxxxx-1234567890.ap-northeast-1.es.amazonaws.com/index_name/number/"+random_str) res = requests.post(url, data=json.dumps({"@timestamp": now, "user": user, "task": task}), auth=auth) return (res.content) # zip -r dynamodb_to_es.zip *
上記の関数は以前ご紹介した、Slack(Slash Commands) + Lambdaで工数管理と連携した関数になります。
SlackからDynamoDBにタスクが登録された際に、ElasticSearchにもデータが送られる仕組みになります。
zipファイルを作成したら、lambdaの設定を行っていきます。
python 2.7にて関数を作成し、zipファイルをアップロードします。
次に、トリガーの設定を行います。
「トリガーの追加」からDynamoDBを選択します。
トリガーの設定にて対象のDynamoDBテーブルを選択し、トリガーを追加します。
以上で設定は完了になります。
以下の様なテストイベントを実行すると、ElasticSearchにデータが投入されることがわかります。
{ "Records": [ { "eventID": "1", "eventName": "INSERT", "eventVersion": "1.0", "eventSource": "aws:dynamodb", "awsRegion": "ap-northeast-1", "dynamodb": { "NewImage": { "user": { "S": "test" }, "time": { "S": "2018-1-1 01:01:01" }, "task": { "S": "test" } } } } ] }
DynamoDBにデータを登録した際に、ElasticSearchにもデータが登録されれば成功です。
あとはKibana側で描写の設定を行うだけです。
投入するデータに応じて、DynamoDBのデータを自由に可視化することが可能となります。
下記の例はリアルタイムのタスク実施状況とデイリーの作業比率を可視化したものになります。
まとめ
DynamoDBはデータベースとしてさまざまな用途で利用されておりますが、ElasticSearchと連携することでその登録データを可視化することが可能となります。
今回ご紹介したのはDynamoDB Streamsをトリガーとした連携方法ですが、下記の様に、スケジュールや別のAWSイベントをトリガーとして利用することも可能です。
DynamoDBのデータを可視化すれば、統計情報や遷移状況など様々なデータを簡単に管理することが可能になります。 運用を行っている方などは、Kibanaコンソールから一括管理が可能となるので管理の手間が削減されます。
DynamoDB単体でもとても優れたサービスですが、ElasticSearchと連携することでさらに便利なサービスとなります。
ご興味のある方は是非お試しください。