DynamoDBへのJSON投入用のLambda

前回にJSON作成用のEXCEL作成してDataPipeLineでのデータ投入を試したけれど
全然うまくいかないのでムキーという状態になったのですが、発想を変えてLambdaで
S3に保存されたファイルをトリガーに自動実行させてSNSメールすればいいじゃん。
と考えて、試してみました。
S3に保存されたファイルをDynamoDBに取込むところまでは何とか準備できた。
んでこれをどう加工するかですな。SNS側も何とかできました。
が、S3保存トリガーだとなぜかメールが2通飛ぶんだよなぁ。LambdaのWebエディタからだと1回なのに、、S3トリガーだと2通飛ぶ、これ何でだろうなぁ。
誰か親切な方、教えていただけると幸いです、

そして相変わらずコードベタ貼り、SAMとか使っていないからGit管理できていないのが
その理由です。(苦笑

import logging
import datetime
import logging
import json
import os
import urllib

import boto3

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
dynamodb = boto3.resource('dynamodb')
s3_res = boto3.resource('s3')
s3_cl = boto3.client('s3')
client = boto3.client('lambda')

# DynamoDBに書込み
def put_item(writeitems,writetable):
    try:
        table = dynamodb.Table(writetable)
        with table.batch_writer() as batch:
            for i in range(len(writeitems)):
                batch.put_item(
                    Item=writeitems[i]
                )
        LOGGER.info("Completed registration")
        return "end"
    except Exception as e:
        LOGGER.error(e)
        raise e

# S3で読込んだJSONを辞書型配列に変換
def translateJson(res):
    try:
        s=json.loads(res.decode('utf-8-sig'))
        return s
    except Exception as e:
        LOGGER.error(e)
        raise e

# SNS
def sendmessage(subject,msg):
    try:
        sns_arn = os.getenv('SNS_ARN')
        client = boto3.client('sns')
        request = {
                     'TopicArn': sns_arn,
                     'Message': msg,
                     'Subject': subject
                  }
        r = client.publish(**request)
        return r
    except Exception as e:
        LOGGER.error(e)
        raise e

# main
def lambda_handler(event, context):
    try:
        st = datetime.datetime.now()
        bucket_name = os.getenv('BUCKET_NAME')
        rep = s3_res.Bucket(bucket_name).objects.all()
        mg = u'◆targetTable:\n'
        objTables =[]
        icnt=0
        for all_object in rep:
            file_name = all_object.key
            table_name=('.').join(file_name.split('.')[:-1])
            objTables.append(table_name)
            response = s3_cl.get_object(Bucket=bucket_name, Key=file_name)
            tabledata = response['Body'].read()
            objItem = translateJson(tabledata)
            stat = put_item(objItem,table_name)
            LOGGER.info("Completed registration")
            mg= mg + table_name + '\n'
            icnt+=1
        sb=u'処理終了'
        mg= mg + '\n' + u'count' + str(icnt)
        end = datetime.datetime.now()
        mg= mg + '\n' + u'START:' + str(st)
        mg= mg + '\n' + u'END:' + str(end)
        sendmessage(sb,mg)
        return "end"
    except Exception as e:
        LOGGER.error(e)
        raise e


相変わらずpythonの型変換につまずきながらも何とか完成です。
とりあえずEXCELからデータ変換してWinSCPでデータアップロードしてDynamoDBに取込という一連の流れは何とか目途が立ちました。
さて、ここからは細かい部分とか修正だな。


コメント

このブログの人気の投稿

GASでGoogleDriveのサブフォルダとファイル一覧を出力する

証券外務員1種勉強(計算式暗記用メモ)

マクロ経済学(IS-LM分析)