Athenaクエリを実行し、その実行結果をS3に保存するLambda(Python3.9)である。
前提条件
Lambda(プログラム)
import boto3
import json
import csv
import time
import os
from datetime import datetime, timezone, timedelta
def lambda_handler(event, context):
# 汎用変数
json_data = []
# TZを日本に変更
JST = timezone(timedelta(hours=+9), 'JST')
timestamp = datetime.now(JST).strftime('%Y%m%d%H%M%S')
# Athenaクライアントの作成
athena_client = boto3.client('athena')
# Athenaクエリ実行元のデータベース名
database = "your-database-name"
# Athenaクエリ実行元のテーブル名
tablename = "your-table-name"
# 結果を保存するS3バケットのパス
output_bucket = "s3://your-backetname/output/"
# クエリの実行設定
query_string = f"SELECT * FROM {tablename};"
# クエリの実行
response = athena_client.start_query_execution(
QueryString=query_string,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': output_bucket}
)
# クエリ実行IDの取得
query_execution_id = response['QueryExecutionId']
# クエリが完了するまで待機
while True:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(1)
# クエリ結果のS3パスを取得
result_path = output_bucket + query_execution_id + '.csv'
# 諸変数の用意
bucket_name = result_path.split('/')[2]
object_key = '/'.join(result_path.split('/')[3:])
local_path = '/tmp/test.csv'
# 一時的な読み書き用ファイル(後で消す)
tmp_json = '/tmp/test_{ts}.json'.format(ts=timestamp)
# 最終的な出力ファイル
outputted_json = 'json/test_{ts}.json'.format(ts=timestamp)
# S3クライアントの作成
s3_client = boto3.client('s3')
# 結果ファイルのダウンロード
s3_client.download_file(bucket_name, object_key, local_path)
# 結果ファイルをJSON形式で読み込み
with open(local_path, 'r') as file:
csv_reader = csv.DictReader(file)
for csv_row in csv_reader:
json_data.append(csv_row)
# JSON形式にて一時ファイル保存
with open(tmp_json, 'w') as json_file:
json_file.write(json.dumps(json_data))
# S3に保存
with open(tmp_json, 'r') as json_file_contents:
response = s3_client.put_object(Bucket=bucket_name, Key=outputted_json, Body=json_file_contents.read())
# 一時ファイルの削除
os.remove(tmp_json)
return {
'statusCode': 200,
'body': "success"
}
後半の辺りは、違う記事でも大変お世話になったQiita記事を参考にさせて頂いている。
Qitta 【Python】S3にアップロードされたCSVファイルをAWS LambdaでJSONファイルに変換する
Athenaクエリ実行はそこそこ時間がかかるので、Lambdaのタイムアウト値を変更しよう。
私は面倒だったのでやらなかったが、非同期にするなどは自身でアレンジして貰えればと思う。
また、Athenaクエリ結果がCSVで勝手に保存されている分をLambda内で削除していないのは、S3のライフサイクル管理を利用して削除しているからである。
ライフサイクル管理なら、タイムラグ削除することでしばらくクエリの生結果が残せるので、何かトラブルが起こった際に使えるかな~という想定だ。
S3のライフサイクル設定は、以下のサイトがとても参考になった。ありがたい。
参考 S3ライフサイクルルールで古いオブジェクトを自動削除する
IAM権限も少々厄介である(次章に載せる)
IAM設定(ポリシー設定)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AthenaPermissions",
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults"
],
"Resource": "*"
},
{
"Sid": "GluePermissions",
"Effect": "Allow",
"Action": [
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartitions"
],
"Resource": "*"
},
{
"Sid": "S3Permissions",
"Effect": "Allow",
"Action": [
"s3:ListBucket*",
"s3:ListBucketMultipartUploads",
"s3:GetObject*",
"s3:GetBucketLocation",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:PutObject*",
"s3:PutObjectAcl",
"s3:PutObjectVersionAcl",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::your-backetname",
"arn:aws:s3:::your-backetname/*"
]
}
]
}
S3周囲の権限、いまだにどれを付与すればいいのか、よく分からん……。
コメント