今までのお仕事履歴 & まったりお仕事募集中♪

【質の良い無料コンテンツ紹介】

【ブラックレーベル】謎解き特集!【SCRAPゲームブック】

【通信制大学】産業能率大学に三年次編入しました

【悲報】ジュニアNISAで、買い付け口座を間違えていた話【NISA預りと特定預りは違うのよ】

当サイトではアフィリエイト広告を利用して商品を紹介しています。

【AWS Lambda】Athenaクエリの実行結果を、S3に保存する

AWS

Athenaクエリを実行し、その実行結果をS3に保存するLambda(Python3.9)である。

前提条件

  • Athenaクエリを実行するためのデータカタログは作成済みである
  • 私の都合でS3への保存は、JSONに変換している。CSVのまま保存したい人は、適当にソースぶっこ抜いてくれ。
  • 動作すればいいやぐらいの加減で作成しているので、ご自身で使う際はご自身の責任で、足りない部分を真面目に補完して頂ければと思う

Lambda(プログラム)

import boto3
import json
import csv
import time
import os
from datetime import datetime, timezone, timedelta

def lambda_handler(event, context):
    # 汎用変数
    json_data = []
    
    # TZを日本に変更
    JST = timezone(timedelta(hours=+9), 'JST')
    timestamp = datetime.now(JST).strftime('%Y%m%d%H%M%S')
    
    # Athenaクライアントの作成
    athena_client = boto3.client('athena')

  # Athenaクエリ実行元のデータベース名
    database = "your-database-name"
  # Athenaクエリ実行元のテーブル名
    tablename = "your-table-name"
  # 結果を保存するS3バケットのパス
    output_bucket = "s3://your-backetname/output/"

    # クエリの実行設定
    query_string = f"SELECT * FROM {tablename};"

    # クエリの実行
    response = athena_client.start_query_execution(
        QueryString=query_string,
        QueryExecutionContext={'Database': database},
        ResultConfiguration={'OutputLocation': output_bucket}
    )

    # クエリ実行IDの取得
    query_execution_id = response['QueryExecutionId']

    # クエリが完了するまで待機
    while True:
        response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        status = response['QueryExecution']['Status']['State']
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        time.sleep(1)

    # クエリ結果のS3パスを取得
    result_path = output_bucket + query_execution_id + '.csv'

    # 諸変数の用意
    bucket_name = result_path.split('/')[2]
    object_key = '/'.join(result_path.split('/')[3:])
    local_path = '/tmp/test.csv'

    # 一時的な読み書き用ファイル(後で消す)
    tmp_json = '/tmp/test_{ts}.json'.format(ts=timestamp)
    # 最終的な出力ファイル
    outputted_json = 'json/test_{ts}.json'.format(ts=timestamp)

    # S3クライアントの作成
    s3_client = boto3.client('s3')

    # 結果ファイルのダウンロード
    s3_client.download_file(bucket_name, object_key, local_path)

    # 結果ファイルをJSON形式で読み込み
    with open(local_path, 'r') as file:
        csv_reader = csv.DictReader(file)
        for csv_row in csv_reader:
                json_data.append(csv_row)

        # JSON形式にて一時ファイル保存
        with open(tmp_json, 'w') as json_file:
            json_file.write(json.dumps(json_data))

        # S3に保存
        with open(tmp_json, 'r') as json_file_contents:
            response = s3_client.put_object(Bucket=bucket_name, Key=outputted_json, Body=json_file_contents.read())

        # 一時ファイルの削除
        os.remove(tmp_json)

    return {
        'statusCode': 200,
        'body': "success"
    }

後半の辺りは、違う記事でも大変お世話になったQiita記事を参考にさせて頂いている。

Qitta 【Python】S3にアップロードされたCSVファイルをAWS LambdaでJSONファイルに変換する

Athenaクエリ実行はそこそこ時間がかかるので、Lambdaのタイムアウト値を変更しよう。

私は面倒だったのでやらなかったが、非同期にするなどは自身でアレンジして貰えればと思う。

また、Athenaクエリ結果がCSVで勝手に保存されている分をLambda内で削除していないのは、S3のライフサイクル管理を利用して削除しているからである。

ライフサイクル管理なら、タイムラグ削除することでしばらくクエリの生結果が残せるので、何かトラブルが起こった際に使えるかな~という想定だ。

S3のライフサイクル設定は、以下のサイトがとても参考になった。ありがたい。

参考 S3ライフサイクルルールで古いオブジェクトを自動削除する

IAM権限も少々厄介である(次章に載せる)

IAM設定(ポリシー設定)

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "AthenaPermissions",
			"Effect": "Allow",
			"Action": [
				"athena:StartQueryExecution",
				"athena:GetQueryExecution",
				"athena:GetQueryResults"
			],
			"Resource": "*"
		},
		{
			"Sid": "GluePermissions",
			"Effect": "Allow",
			"Action": [
				"glue:GetTable",
				"glue:GetDatabase",
				"glue:GetPartitions"
			],
			"Resource": "*"
		},
		{
			"Sid": "S3Permissions",
			"Effect": "Allow",
			"Action": [
				"s3:ListBucket*",
				"s3:ListBucketMultipartUploads",
				"s3:GetObject*",
				"s3:GetBucketLocation",
				"s3:ListMultipartUploadParts",
				"s3:AbortMultipartUpload",
				"s3:PutObject*",
				"s3:PutObjectAcl",
				"s3:PutObjectVersionAcl",
				"s3:DeleteObject"
			],
			"Resource": [
				"arn:aws:s3:::your-backetname",
				"arn:aws:s3:::your-backetname/*"
			]
		}
	]
}

S3周囲の権限、いまだにどれを付与すればいいのか、よく分からん……。

コメント