DynamoDB to Amazon Elastic Search

こんにちは。katoです。

今回はDynamoDBに保存したデータをElasticSearchで可視化する方法をご紹介したいと思います。

 

概要

今回ご紹介する方法は、DynamoDB Streamsをトリガーとし、Lambda経由でElasticSearchにPOSTするという流れになります。

 

 

DynamoDB Streamsを利用することで、DynamoDBへの書き込みデータを即座にElasticSearchに反映させることが可能になります。

 

DynamoDB Streamsの有効化

まず初めに該当のDynamoDBテーブルにて、Streamsの有効化を行います。
DynamoDBテーブルの概要タブにて「ストリームの管理」を選択します。

 

 

表示タイプは、Lambda関数にeventとして渡すデータにも関係してくるので、用途やLambda関数に応じて適宜選択してください。
今回は「新旧イメージ」を選択し、有効化します。

 

 

Lambda関数の作成

次にDynamoDB Streamsからデータを受け取り、ElasticSearchにPOSTするLambda関数を作成していきます。

今回は、requestsモジュール等を利用するので、任意のサーバにてzipファイルを作成していきます。

以下の項目は適宜変更してください。

・DynamoDBのキー名(user、time、task等)
・アクセスキー&シークレットアクセスキー
・ElasticSearchエンドポイント
・リージョン
・インデックス名

 

# mkdir /dynamodb_to_es
# cd /dynamodb_to_es
# pip install requests aws_requests_auth -t .

# vi lambda_function.py

#!/usr/bin/python
# *-# -*- coding: utf-8 -*-

import requests
import json
from aws_requests_auth.aws_auth import AWSRequestsAuth
import random
import string
import datetime
from urlparse import parse_qs

print('Loading function')

def lambda_handler(event, context):
    for record in event['Records']:
        struser = (record['dynamodb']['NewImage']['user']['S'])
        strtime = (record['dynamodb']['NewImage']['time']['S'])
        strtask = (record['dynamodb']['NewImage']['task']['S'])
        user = struser.replace("\"", "")
        time = strtime.replace("\"", "")
        task = strtask.replace("\"", "")
        now = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z")

    auth = AWSRequestsAuth(aws_access_key='xxxxxxxxxxxxxxxxxxxx',
                           aws_secret_access_key='xxxxxxxxxxxxxxxxxxxx',
                           aws_host='search-xxxxxxxxxx-1234567890.ap-northeast-1.es.amazonaws.com',
                           aws_region='ap-northeast-1',
                           aws_service='es')
    n = 5
    random_str = ''.join([random.choice(string.ascii_letters + string.digits) for i in range(n)])
    url = ("https://search-xxxxxxxxxx-1234567890.ap-northeast-1.es.amazonaws.com/index_name/number/"+random_str)
    res = requests.post(url, data=json.dumps({"@timestamp": now, "user": user, "task": task}), auth=auth)
    return (res.content)

# zip -r dynamodb_to_es.zip *

 

上記の関数は以前ご紹介した、Slack(Slash Commands) + Lambdaで工数管理と連携した関数になります。

xp-cloud.jp

SlackからDynamoDBにタスクが登録された際に、ElasticSearchにもデータが送られる仕組みになります。

zipファイルを作成したら、lambdaの設定を行っていきます。

python 2.7にて関数を作成し、zipファイルをアップロードします。

次に、トリガーの設定を行います。

「トリガーの追加」からDynamoDBを選択します。

 

 

トリガーの設定にて対象のDynamoDBテーブルを選択し、トリガーを追加します。

 

 

以上で設定は完了になります。

以下の様なテストイベントを実行すると、ElasticSearchにデータが投入されることがわかります。

 

{
 "Records": [
    {
      "eventID": "1",
      "eventName": "INSERT",
      "eventVersion": "1.0",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-northeast-1",
      "dynamodb": {
        "NewImage": {
          "user": {
            "S": "test"
          },
          "time": {
            "S": "2018-1-1 01:01:01"
          },
          "task": {
            "S": "test"
          }
        }
      }
    }
  ]
}

 

DynamoDBにデータを登録した際に、ElasticSearchにもデータが登録されれば成功です。

あとはKibana側で描写の設定を行うだけです。

投入するデータに応じて、DynamoDBのデータを自由に可視化することが可能となります。

下記の例はリアルタイムのタスク実施状況とデイリーの作業比率を可視化したものになります。

 

 

 

まとめ

DynamoDBはデータベースとしてさまざまな用途で利用されておりますが、ElasticSearchと連携することでその登録データを可視化することが可能となります。

今回ご紹介したのはDynamoDB Streamsをトリガーとした連携方法ですが、下記の様に、スケジュールや別のAWSイベントをトリガーとして利用することも可能です。

 

 

DynamoDBのデータを可視化すれば、統計情報や遷移状況など様々なデータを簡単に管理することが可能になります。
運用を行っている方などは、Kibanaコンソールから一括管理が可能となるので管理の手間が削減されます。

DynamoDB単体でもとても優れたサービスですが、ElasticSearchと連携することでさらに便利なサービスとなります。

ご興味のある方は是非お試しください。