S3上にある複数ファイルを、ひとつのファイルにマージするLambda(Python3.9)である。
前提条件
Lambda(プログラム)
import boto3
from dataclasses import dataclass
from typing import Optional
import os
from pathlib import Path
import json
@dataclass
class S3Manager:
source_bucket: str
source_prefix: str
profile: Optional[str] = None
def _session(self):
s = boto3.session.Session(
profile_name=self.profile
)
return s
def _list_source(self, *, accumulated=None, next_token=None, func=None):
s3client = self._session().client('s3')
if next_token:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
ContinuationToken=next_token,
)
else:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
)
if 'Contents' in response:
keys = [i['Key'] for i in response['Contents']]
else:
keys = []
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
next_token = None
if func:
return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)
def _accumulate(self, *, response, keys, func, next_token, accumulated):
got_keys = (accumulated or []) + keys
if next_token:
print(f'searching... current fetch keys are :{len(got_keys)}')
return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
else:
return got_keys
def list_all(self) -> list:
return self._list_source(func=self._accumulate)
def _delete(self, *, response, keys, func, next_token, accumulated):
if keys:
print(f'deleting: {self.source_bucket}/{self.source_prefix}')
s3client = boto3.Session().client('s3')
s3client.delete_objects(
Bucket=self.source_bucket,
Delete={
'Objects': [{'Key': key} for key in keys],
'Quiet': False
},
)
if next_token:
return self._list_source(next_token=next_token, func=func)
def delete_all(self) -> None:
self._list_source(func=self._delete)
def list_all_test(self):
s3_resource = self._session().resource('s3')
a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
b = [k.key for k in a]
print(len(b))
# 対象リストでファイル統合
def merge_files(self, target_bucket, target_key):
s3_client = self._session().client('s3')
keys = self.list_all()
combined_data = []
for key in keys:
response = s3_client.get_object(Bucket=self.source_bucket, Key=key)
file_content = json.loads(response['Body'].read().decode('utf-8'))
combined_data.extend(file_content)
# マージされたデータを新しいファイルに出力
s3_client.put_object(Bucket=target_bucket, Key=target_key, Body=json.dumps(combined_data))
# メイン
def lambda_handler(event, context):
#対象S3バケット名
source_bucket = "your-source-bucket-name"
#マージ対象となるファイル名の絞り込み対象
source_prefix = "merge_input/20230819"
#マージ対象のバケット名
target_bucket = "your-target-bucket-name"
#マージ後のファイル名
target_key = 'merge_output/merge.json'
# S3対象リスト取得
s3_manager = S3Manager(source_bucket=source_bucket, source_prefix=source_prefix)
all_keys = s3_manager.list_all()
# マージ
s3_manager.merge_files(target_bucket=target_bucket, target_key=target_key)
return {
'statusCode': 200,
'body': all_keys
}
S3のリスト出力部分は、有志サイトのプログラムを利用させて頂いてもらっている。
Qitta Boto3でS3のリスト出力をするときは、list_objects_v2ではなくBucket().objects.filterを使おう
source_prefixで対象となるファイルを絞り込んでいる。
上記例だと、s3://your-backetname/merge_input/にある20230819部分はファイル冒頭部分となる。
なので、20230819_01.jsonや20230819_02.jsonが対象、20230820_01.jsonは対象ではないという感じだ。
merge_input以下の全ファイルを対象にしたいのならスラッシュ以下を消せば良いし、もっと汎用的にしたいのなら20230819部分は、テキストフォーマット辺りで本日の日付になるように、変更すると良い。
あとはコメントを見て、適当に類推してくれればと思う(THE・適当)
IAM(ポリシー設定)
対象となるS3バケットの権限を付与してけれ。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3Permissions",
"Effect": "Allow",
"Action": [
"s3:ListBucket*",
"s3:ListBucketMultipartUploads",
"s3:GetObject*",
"s3:GetBucketLocation",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:PutObject*",
"s3:PutObjectAcl",
"s3:PutObjectVersionAcl",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::your-backetname",
"arn:aws:s3:::your-backetname/*"
]
}
]
}
コメント