今までのお仕事履歴 & まったりお仕事募集中♪

【ブラックレーベル】謎解き特集!【SCRAPゲームブック】

NISA口座の出口戦略を考え始める

【転職・就活】独立系? メーカー系? エンジニア(ソフトウェア開発)はどっちが良い?【ユーザー系もちょこっと】

【1、2本のちょろり白髪に】三十代向けの若白髪隠し対策についてまとめてみた【おすすめ】

【AWS】LambdaでJSON/JSONL(line)形式を取りこみ、CSV形式に変換する

AWS
スポンサーリンク

AWSのLambdaで、JSON(もしくはJSONL)をCSVに変換する記事がなかったので書いてみた。

JSONL(JSONライン形式)で地味にハマっている人が多そうだ。。。

AWS(+python)に触れて一年未満の人が書いているので、間違い等あれば優しく教えて頂けると嬉しい。

スポンサーリンク

想定している構成図

S3にJSONファイルがアップロードされたら、CSVファイルに変換してS3に置き直すイメージである。

分かりやすくするためにシンプルに記載したが、IoTCoreで収集したデータやKinesisDataFirehose(配信ストリーム)で収集したデータが入ってくる想定が多いかと思う。

実装編

大部分はこちらのサイトを参考にさせていただいた

今回のJSON⇒CSV変換のソースを作るにあたり、大部分は以下のQiitaを参考にさせて頂いた。

大変ありがとうございます。

Qiita  【Python】S3にアップロードされたCSVファイルをAWS LambdaでJSONファイルに変換する

  • Lambdaに割り当てるIAMロールの作成
  • S3にファイルを置かれたら、Labmdaが実行されるトリガ作成

などなどは、上記のQiitaを参考にして頂ければと思う。

そんなワケであまり深く考えずにS3トリガで実装してしまったが、EventBridgeの方がいいのかもしれない(EventBridgeで実装できるかは未検証)

EventBridgeでやる場合のトリガ方法はここに記載しない。また、Lambda引数の中身が変わってしまうため、後のソースコードはそのまま使えなくなる。各自読み替えて欲しい。

JSONの場合

ファイル形式

IoTCoreで収集したデータを直接収集して、S3に上げている人はこちらに該当する人が多いだろう。
こんな感じのデータである。

[
    {"type":"device1","date":"2022/07/01 00:00:01","value1":100.001,"value2":29.509},
    {"type":"device1","date":"2022/07/01 00:00:02","value1":105.002,"value2":30.001}
]

上記データはキーなしだけど、キーありでもいけるんじゃないかな(適当)

Lambdaソース
import json
import csv
import boto3
import os
from datetime import datetime, timezone, timedelta

fieldnames = ['type', 'date', 'value1', 'value2']       # header情報
s3 = boto3.client('s3')

def lambda_handler(event, context):

    json_data = []

    # TZを日本に変更
    JST = timezone(timedelta(hours=+9), 'JST')
    timestamp = datetime.now(JST).strftime('%Y%m%d%H%M%S')

    # 一時的な読み書き用ファイル
    tmp_json = '/tmp/test_{ts}.json'.format(ts=timestamp)

    for record in event['Records']:
        bucket_name = record['s3']['bucket']['name']
        key_name = record['s3']['object']['key']

    s3_object = s3.get_object(Bucket=bucket_name, Key=key_name)
    data = s3_object['Body'].read()
    contents = data.decode('utf-8')
    
    try:
        # 受信したJSON内容を一時ファイルに書き込み
        with open(tmp_json, 'w') as json_data:
            json.dump(contents, json_data)
        
        # 上記で作成したJSONファイルロード
        with open(tmp_json, 'r') as f:
            json_dict = json.load(f)
            #辞書型に変換
            j = json.loads(json_dict)


        # CSVファイル形式の書込
        with open(tmp_csv, 'w') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames, 
                                doublequote=True, 
                                quoting=csv.QUOTE_ALL)
            # ヘッダ出力する場合はコメントアウトを外す
            # writer.writeheader()
            writer.writerows(j)

        # S3に保存しなおす
        with open(tmp_csv, 'r') as csv_file_contents:
            response = s3.put_object(Bucket=bucket_name, Key=outputted_csv, Body=csv_file_contents.read())

        os.remove(tmp_csv)
        os.remove(tmp_json)

    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e
     
    return

JSONL(JSON line形式)

前提条件

JSONL形式を取りこむ際は、外部ライブラリの「pandas」が必要である。

こちらも下記のQiitaを参考に取り込んで頂ければと思う。

本当に、Qiita様々である。ありがとうございます。

Qiita  AWS Lambdaでpandasを使いたい!

ファイル形式

JSONライン形式である。
KinesisDataFirehose(配信ストリーム)を介すると、なぜかこの形式で吐き出される。

Athenaで読み込むことを想定しているのかな?

{"type":"device1","date":"2022/07/01 00:00:01","value1":100.001,"value2":29.509}
{"type":"device1","date":"2022/07/01 00:00:02","value1":105.002,"value2":30.001}

JSONと似ているが、[]がなかったり、カンマはない。単体のデータを改行で区切ったデータとなる。

ソース
import pandas as pd
import json
import csv
import boto3
import os
from datetime import datetime, timezone, timedelta

s3 = boto3.client('s3')

def lambda_handler(event, context):

    json_data = []

    # TZを日本に変更
    JST = timezone(timedelta(hours=+9), 'JST')
    timestamp = datetime.now(JST).strftime('%Y%m%d%H%M%S')

    # 欲しいデータ部分のみ取り出す
    for record in event['Records']:
        bucket_name = record['s3']['bucket']['name']
        key_name = record['s3']['object']['key']

    s3_object = s3.get_object(Bucket=bucket_name, Key=key_name)
    data = s3_object['Body'].read()
    contents = data.decode('utf-8')
    # print('contents:', contents)
    
    # データフレーム形式に変換
    df = pd.read_json(contents, lines=True)
    
    # ファイル名形式を整える
    tmp_csv = '/tmp/test_{ts}.csv'.format(ts=timestamp)
    outputted_csv = 'output/test_{ts}.csv'.format(ts=timestamp)
    
    # CSVファイル形式で一時ファイルに保存
    df.to_csv(tmp_csv, index=False)

    try:
        # S3に保存
        with open(tmp_csv, 'r') as csv_file_contents:
            # print('csv_file:', csv_file_contents)
            response = s3.put_object(Bucket=bucket_name, Key=outputted_csv, Body=csv_file_contents.read())
    
        os.remove(tmp_csv)
        
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

    return

動作確認

トリガを設定したS3箇所に、適当なJSONファイルをアップロードすれば良い。

上手く動いている場合は、同じS3のoutputフォルダにCSVファイルが出来上がるハズである。

出来なかった場合は、適宜printを入れるなりして、CloudWatchで調査してみて欲しい。

思うところなど

Python自体が分かっても、Lambdaが理解できない

AWSに実際に触れて、「Lambdaはまんま関数なんでしょ?」って感覚でやる人が多いだろうが、引数に何が入ってくるか分からん! でつまづく人が多いんじゃないかと思っている。

私も上記で紹介したQiitaの記事がなければ一生、分からなかったと思う……。

IoT EventがJSON形式対応

そもそもなんでJSON形式で取り込むの? は、IoT EventがJSON形式にしか対応してないのでが回答となる。

だけど、JSON→CSVは見かけなかったということは……何か根本的に理解してないのか、私?

もっと効率的なインフラ構成があれば是非、ご教授いただけると嬉しい。

GlueでもJSONL→CSV変換は可能

が、少々コストが高い。一日一回ぐらいの頻度じゃないと辛い。

参考までに私が少々やらかした分を参考においておく。

5分ごとにGlue起動×24時間で、$15~$20ぐらいかかったと思う。

Lambdaだと同じ頻度でも10分の1以下になると思う。それぐらい差がある。

Glueでやる場合の記事も見かけなかったので、後で記事にできればしておきたい。

QuickSightはCSV対応

QuickSightはBIツールである。要はデータの見えるかだ。QuickSightのことを考えるとJSONLではなく、CSVの方が扱いやすいかなと思っている。

JSONLのまま、Athenaで一度、取り込めばいいのか? と思わなくもないが……。が、やはり何かと手軽なのはCSVだよなとは思う。何かあってもS3から落とせるワケだし。

QuickSightをやってみると分かるが、そこそこ頻繁にデータセット更新したい人は、S3にCSVファイルを置いた時点で、また一工夫が必要だったりする。

興味ある人は以下のQiitaをどうぞ(ほんと、ありがとうございます)

Qiita  運用が楽なQuickSightダッシュボードを作る_5(データセット自動更新編)

まとめ:AWS沼は深い


が、知れば知るほど、AWSのサービスってかみ合っているんだが、かみ合っていないんだがたまによく分からない時がある。

同じことをやりたくても、方法も幾通りもあるし……どれがベストなのか分からん。

ベストプラクティスは、システム思想にも左右されるのだろうが。むつかしい。

AWSは歴史がそこそこあるからインターネット資料は豊富かと思ってたが、やってみると意外とない。

各言語と違って、google先生に聞いても解決しないことが結構多いなと感じ、筆をとってみた次第である。

これからもちょくちょく書けたらかきたい。

コメント