今までのお仕事履歴 & まったりお仕事募集中♪

【質の良い無料コンテンツ紹介】

【ブラックレーベル】謎解き特集!【SCRAPゲームブック】

【通信制大学】産業能率大学に三年次編入しました

【悲報】ジュニアNISAで、買い付け口座を間違えていた話【NISA預りと特定預りは違うのよ】

当サイトではアフィリエイト広告を利用して商品を紹介しています。

【AWS】LambdaでJSON/JSONL(line)形式を取りこみ、CSV形式に変換する

AWS

AWSのLambdaで、JSON(もしくはJSONL)をCSVに変換する記事がなかったので書いてみた。

JSONL(JSONライン形式)で地味にハマっている人が多そうだ。。。

AWS(+python)に触れて一年未満の人が書いているので、間違い等あれば優しく教えて頂けると嬉しい。

想定している構成図

S3にJSONファイルがアップロードされたら、CSVファイルに変換してS3に置き直すイメージである。

分かりやすくするためにシンプルに記載したが、IoTCoreで収集したデータやKinesisDataFirehose(配信ストリーム)で収集したデータが入ってくる想定が多いかと思う。

実装編

大部分はこちらのサイトを参考にさせていただいた

今回のJSON⇒CSV変換のソースを作るにあたり、大部分は以下のQiitaを参考にさせて頂いた。

大変ありがとうございます。

Qiita  【Python】S3にアップロードされたCSVファイルをAWS LambdaでJSONファイルに変換する

  • Lambdaに割り当てるIAMロールの作成
  • S3にファイルを置かれたら、Labmdaが実行されるトリガ作成

などなどは、上記のQiitaを参考にして頂ければと思う。

そんなワケであまり深く考えずにS3トリガで実装してしまったが、EventBridgeの方がいいのかもしれない(EventBridgeで実装できるかは未検証)

EventBridgeでやる場合のトリガ方法はここに記載しない。また、Lambda引数の中身が変わってしまうため、後のソースコードはそのまま使えなくなる。各自読み替えて欲しい。

JSONの場合

ファイル形式

IoTCoreで収集したデータを直接収集して、S3に上げている人はこちらに該当する人が多いだろう。
こんな感じのデータである。

[
    {"type":"device1","date":"2022/07/01 00:00:01","value1":100.001,"value2":29.509},
    {"type":"device1","date":"2022/07/01 00:00:02","value1":105.002,"value2":30.001}
]

上記データはキーなしだけど、キーありでもいけるんじゃないかな(適当)

Lambdaソース
import json
import csv
import boto3
import os
from datetime import datetime, timezone, timedelta

fieldnames = ['type', 'date', 'value1', 'value2']       # header情報
s3 = boto3.client('s3')

def lambda_handler(event, context):

    json_data = []

    # TZを日本に変更
    JST = timezone(timedelta(hours=+9), 'JST')
    timestamp = datetime.now(JST).strftime('%Y%m%d%H%M%S')

    # 一時的な読み書き用ファイル
    tmp_json = '/tmp/test_{ts}.json'.format(ts=timestamp)

    for record in event['Records']:
        bucket_name = record['s3']['bucket']['name']
        key_name = record['s3']['object']['key']

    s3_object = s3.get_object(Bucket=bucket_name, Key=key_name)
    data = s3_object['Body'].read()
    contents = data.decode('utf-8')
    
    try:
        # 受信したJSON内容を一時ファイルに書き込み
        with open(tmp_json, 'w') as json_data:
            json.dump(contents, json_data)
        
        # 上記で作成したJSONファイルロード
        with open(tmp_json, 'r') as f:
            json_dict = json.load(f)
            #辞書型に変換
            j = json.loads(json_dict)


        # CSVファイル形式の書込
        with open(tmp_csv, 'w') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames, 
                                doublequote=True, 
                                quoting=csv.QUOTE_ALL)
            # ヘッダ出力する場合はコメントアウトを外す
            # writer.writeheader()
            writer.writerows(j)

        # S3に保存しなおす
        with open(tmp_csv, 'r') as csv_file_contents:
            response = s3.put_object(Bucket=bucket_name, Key=outputted_csv, Body=csv_file_contents.read())

        os.remove(tmp_csv)
        os.remove(tmp_json)

    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e
     
    return

JSONL(JSON line形式)

前提条件

JSONL形式を取りこむ際は、外部ライブラリの「pandas」が必要である。

こちらも下記のQiitaを参考に取り込んで頂ければと思う。

本当に、Qiita様々である。ありがとうございます。

Qiita  AWS Lambdaでpandasを使いたい!

ファイル形式

JSONライン形式である。
KinesisDataFirehose(配信ストリーム)を介すると、なぜかこの形式で吐き出される。

Athenaで読み込むことを想定しているのかな?

{"type":"device1","date":"2022/07/01 00:00:01","value1":100.001,"value2":29.509}
{"type":"device1","date":"2022/07/01 00:00:02","value1":105.002,"value2":30.001}

JSONと似ているが、[]がなかったり、カンマはない。単体のデータを改行で区切ったデータとなる。

ソース
import pandas as pd
import json
import csv
import boto3
import os
from datetime import datetime, timezone, timedelta

s3 = boto3.client('s3')

def lambda_handler(event, context):

    json_data = []

    # TZを日本に変更
    JST = timezone(timedelta(hours=+9), 'JST')
    timestamp = datetime.now(JST).strftime('%Y%m%d%H%M%S')

    # 欲しいデータ部分のみ取り出す
    for record in event['Records']:
        bucket_name = record['s3']['bucket']['name']
        key_name = record['s3']['object']['key']

    s3_object = s3.get_object(Bucket=bucket_name, Key=key_name)
    data = s3_object['Body'].read()
    contents = data.decode('utf-8')
    # print('contents:', contents)
    
    # データフレーム形式に変換
    df = pd.read_json(contents, lines=True)
    
    # ファイル名形式を整える
    tmp_csv = '/tmp/test_{ts}.csv'.format(ts=timestamp)
    outputted_csv = 'output/test_{ts}.csv'.format(ts=timestamp)
    
    # CSVファイル形式で一時ファイルに保存
    df.to_csv(tmp_csv, index=False)

    try:
        # S3に保存
        with open(tmp_csv, 'r') as csv_file_contents:
            # print('csv_file:', csv_file_contents)
            response = s3.put_object(Bucket=bucket_name, Key=outputted_csv, Body=csv_file_contents.read())
    
        os.remove(tmp_csv)
        
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

    return

動作確認

トリガを設定したS3箇所に、適当なJSONファイルをアップロードすれば良い。

上手く動いている場合は、同じS3のoutputフォルダにCSVファイルが出来上がるハズである。

出来なかった場合は、適宜printを入れるなりして、CloudWatchで調査してみて欲しい。

思うところなど

Python自体が分かっても、Lambdaが理解できない

AWSに実際に触れて、「Lambdaはまんま関数なんでしょ?」って感覚でやる人が多いだろうが、引数に何が入ってくるか分からん! でつまづく人が多いんじゃないかと思っている。

私も上記で紹介したQiitaの記事がなければ一生、分からなかったと思う……。

IoT EventがJSON形式対応

そもそもなんでJSON形式で取り込むの? は、IoT EventがJSON形式にしか対応してないのでが回答となる。

だけど、JSON→CSVは見かけなかったということは……何か根本的に理解してないのか、私?

もっと効率的なインフラ構成があれば是非、ご教授いただけると嬉しい。

GlueでもJSONL→CSV変換は可能

が、少々コストが高い。一日一回ぐらいの頻度じゃないと辛い。

参考までに私が少々やらかした分を参考においておく。

5分ごとにGlue起動×24時間で、$15~$20ぐらいかかったと思う。

Lambdaだと同じ頻度でも10分の1以下になると思う。それぐらい差がある。

Glueでやる場合の記事も見かけなかったので、後で記事にできればしておきたい。

QuickSightはCSV対応

QuickSightはBIツールである。要はデータの見えるかだ。QuickSightのことを考えるとJSONLではなく、CSVの方が扱いやすいかなと思っている。

JSONLのまま、Athenaで一度、取り込めばいいのか? と思わなくもないが……。が、やはり何かと手軽なのはCSVだよなとは思う。何かあってもS3から落とせるワケだし。

QuickSightをやってみると分かるが、そこそこ頻繁にデータセット更新したい人は、S3にCSVファイルを置いた時点で、また一工夫が必要だったりする。

興味ある人は以下のQiitaをどうぞ(ほんと、ありがとうございます)

Qiita  運用が楽なQuickSightダッシュボードを作る_5(データセット自動更新編)

まとめ:AWS沼は深い


が、知れば知るほど、AWSのサービスってかみ合っているんだが、かみ合っていないんだがたまによく分からない時がある。

同じことをやりたくても、方法も幾通りもあるし……どれがベストなのか分からん。

ベストプラクティスは、システム思想にも左右されるのだろうが。むつかしい。

AWSは歴史がそこそこあるからインターネット資料は豊富かと思ってたが、やってみると意外とない。

各言語と違って、google先生に聞いても解決しないことが結構多いなと感じ、筆をとってみた次第である。

これからもちょくちょく書けたらかきたい。

コメント