今までのお仕事履歴 & まったりお仕事募集中♪

【質の良い無料コンテンツ紹介】

【ブラックレーベル】謎解き特集!【SCRAPゲームブック】

【通信制大学】産業能率大学に三年次編入しました

【悲報】ジュニアNISAで、買い付け口座を間違えていた話【NISA預りと特定預りは違うのよ】

当サイトではアフィリエイト広告を利用して商品を紹介しています。

【AWS Lambda】S3上にある複数ファイルを、ひとつのファイルにマージする

AWS

S3上にある複数ファイルを、ひとつのファイルにマージするLambda(Python3.9)である。

前提条件

  • S3上に複数ファイルある前提
  • ファイル種別はJSONなので、他でやりたい場合は適当に読みかえて
  • Lambdaのタイムアウト値も適当に変更して
  • (私が作成した部分は)動作すればいいやぐらいの加減で作成しているので、エラー処理は各自おねしゃす

Lambda(プログラム)

import boto3
from dataclasses import dataclass
from typing import Optional
import os
from pathlib import Path
import json

@dataclass
class S3Manager:
    source_bucket: str
    source_prefix: str
    profile: Optional[str] = None

    def _session(self):
        s = boto3.session.Session(
            profile_name=self.profile
        )
        return s

    def _list_source(self, *, accumulated=None, next_token=None, func=None):
        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

        if func:
            return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)

    def _accumulate(self, *, response, keys, func, next_token, accumulated):
        got_keys = (accumulated or []) + keys
        if next_token:
            print(f'searching... current fetch keys are :{len(got_keys)}')
            return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
        else:
            return got_keys

    def list_all(self) -> list:
        return self._list_source(func=self._accumulate)

    def _delete(self, *, response, keys, func, next_token, accumulated):
        if keys:
            print(f'deleting: {self.source_bucket}/{self.source_prefix}')
            s3client = boto3.Session().client('s3')
            s3client.delete_objects(
                Bucket=self.source_bucket,
                Delete={
                    'Objects': [{'Key': key} for key in keys],
                    'Quiet': False
                },
            )

        if next_token:
            return self._list_source(next_token=next_token, func=func)

    def delete_all(self) -> None:
        self._list_source(func=self._delete)

    def list_all_test(self):
        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
        b = [k.key for k in a]
        print(len(b))
        
    # 対象リストでファイル統合
    def merge_files(self, target_bucket, target_key):
        s3_client = self._session().client('s3')
        keys = self.list_all()

        combined_data = []
        for key in keys:
            response = s3_client.get_object(Bucket=self.source_bucket, Key=key)
            file_content = json.loads(response['Body'].read().decode('utf-8'))
            combined_data.extend(file_content)

        # マージされたデータを新しいファイルに出力
        s3_client.put_object(Bucket=target_bucket, Key=target_key, Body=json.dumps(combined_data))

# メイン
def lambda_handler(event, context):
    #対象S3バケット名
  source_bucket = "your-source-bucket-name"
  #マージ対象となるファイル名の絞り込み対象
    source_prefix = "merge_input/20230819"
  #マージ対象のバケット名
    target_bucket = "your-target-bucket-name"
  #マージ後のファイル名
    target_key = 'merge_output/merge.json'

    # S3対象リスト取得
    s3_manager = S3Manager(source_bucket=source_bucket, source_prefix=source_prefix)
    all_keys = s3_manager.list_all()

    # マージ
    s3_manager.merge_files(target_bucket=target_bucket, target_key=target_key)

    return {
        'statusCode': 200,
        'body': all_keys
    }

S3のリスト出力部分は、有志サイトのプログラムを利用させて頂いてもらっている。

Qitta Boto3でS3のリスト出力をするときは、list_objects_v2ではなくBucket().objects.filterを使おう

source_prefixで対象となるファイルを絞り込んでいる。

上記例だと、s3://your-backetname/merge_input/にある20230819部分はファイル冒頭部分となる。

なので、20230819_01.jsonや20230819_02.jsonが対象、20230820_01.jsonは対象ではないという感じだ。

merge_input以下の全ファイルを対象にしたいのならスラッシュ以下を消せば良いし、もっと汎用的にしたいのなら20230819部分は、テキストフォーマット辺りで本日の日付になるように、変更すると良い。

確か、日付のテキストフォーマットはこれでやっていた

あとはコメントを見て、適当に類推してくれればと思う(THE・適当)

IAM(ポリシー設定)

対象となるS3バケットの権限を付与してけれ。

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "S3Permissions",
			"Effect": "Allow",
			"Action": [
				"s3:ListBucket*",
				"s3:ListBucketMultipartUploads",
				"s3:GetObject*",
				"s3:GetBucketLocation",
				"s3:ListMultipartUploadParts",
				"s3:AbortMultipartUpload",
				"s3:PutObject*",
				"s3:PutObjectAcl",
				"s3:PutObjectVersionAcl",
				"s3:DeleteObject"
			],
			"Resource": [
				"arn:aws:s3:::your-backetname",
				"arn:aws:s3:::your-backetname/*"
			]
		}
	]
}

コメント