KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

【Node.js】S3からS3へのアップロードをnode:stream/promisesのpipelineやaws-sdk/lib-storageでやる

はじめに

こちらの記事は Advent Calendar 2024 の 6日目の記事になります。

カケハシでソフトウェアエンジニアをしている加藤です。 この記事では、S3からS3へのファイルのアップロードをNode.js, TypeScript, Stream APIを利用して行う方法について説明します。

背景

S3にあるファイルを取得して後続処理で利用しやすい形にフォーマットを変換して別のS3へ保存したいということはよくあることかと思います。

例えば、CSVファイルを取得してJSONに変換してS3に保存するといった処理です。

まず思いつくのは

  1. S3からファイルをダウンロード
  2. ローカルに保存してフォーマット
  3. S3にアップロード

という方法です。

この方法は非常にシンプルで手っ取り早く実装できるためファイルサイズが小さい場合は問題ありませんが、ファイルサイズが大きい場合はダウンロード、アップロードに時間がかかりますし、ローカルに保存するためのストレージが必要になります。

ちなみに私は何も考えずこの方法でとりあえず実装してしまいました。 今回はS3からStreamで取得して変換してS3にアップロードする方法を説明します。

調査してみると変換や加工(transform)をしたりnode:stream/promisespipelineを利用して処理を行う方法の紹介があまりなかったので、実際に試してみました。

Streamを利用した方法

まず最初にS3からファイルをそのまま取得して変換などをせずアップロードする方法を説明します。

ファイルのダウンロードは@aws-sdk/client-s3GetObjectCommandを利用します。 responseのBodyにはReadableStreamが格納されていますので、これをそのまま利用します。

アップロードはMultipart Uploadが@aws-sdk/client-s3では対応していないためaws-sdk/lib-storageUploadを利用します。

import { S3, GetObjectCommand } from '@aws-sdk/client-s3';
import { Upload } from 'aws-sdk/lib-storage';

const main = async () => {
    const s3 = new S3Client();

    const command = new GetObjectCommand({
        Bucket: bucketName,
        Key: filePath,
    });
    const response = await s3.send(command);

    // マルチパートアップロードをする 
    const upload = new Upload({
        client: s3,
        params: {
            Bucket: bucketName,
            Key: filePath,
            Body: response.Body,
        },
    });

    await upload.done();
};

main();

変換せずそのままアップロードする場合は上記のコードで問題ありませんが、実際には変換や加工を行うことが多いと思います。

node:stream/promisespipelineを利用してtransform処理を追加していくのが可読性も高いし良いと思っています。

例えば、gzip圧縮されたファイルを解凍して、さらにファイルのフォーマットを変換をしてアップロードする場合を考えてみます。 gzip圧縮されたファイルの解凍は、zlibcreateGunzipを利用します。 変換処理は独自の処理を実装する必要があるとします。ここではjsonl形式のファイルをjson形式に変換するとします。

import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3';
import { createInterface } from 'node:readline/promises';
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { Upload } from '@aws-sdk/lib-storage'
import { createGunzip } from 'node:zlib';


const main = async () => {
    const s3 = new S3Client();

    const command = new GetObjectCommand({
        Bucket: bucketName,
        Key: filePath,
    });
    const response = await s3.send(command);

    const gunzipStream = createGunzip();

    let firstLine = true;

    // S3のファイルを解凍してアップロード
    await pipeline(
        response.Body as Readable, // S3から取得したデータ(ストリーム)
        gunzipStream,              // GZIPを解凍するStream
        createInterface,           // 1行ずつ読み込むInterfaceを作成
        // JSONL形式をJSON形式に変換
        async function* (source: AsyncIterable<string>) {
            yield '[';
            for await (const line of source) {
                if (!firstLine) {
                    yield ',';
                }
                yield line;
                firstLine = false;
            }
            yield ']';
        },
        // S3にアップロード
        async function* (source) {
            const upload = new Upload({
                client: s3,
                params: {
                    Bucket: bucketName,
                    Key: filePath.replace(/\.gz$/, ""),
                    Body: Readable.from(source), 
                },
            });
            await upload.done();
        }
    );
};


main();

流れとしては

  1. S3からJsonlファイルを取得
  2. GZIP解凍
  3. 1行ずつ読み込む
  4. JSON形式に変換
  5. S3にアップロード

しています。

上記の処理をnode:stream/promisespipelineを利用してまとめることで、簡単にStreamを利用した処理を実装することができます。 pipelineは複数のStreamを繋げて処理を行うためのユーティリティ関数です。 pipelineの第一引数にS3からGetObjectしたStreamを指定し、第二引数以降には続けて変換処理を行うStreamを指定していき、最後に先ほど実装したaws-sdk/lib-storageUploadを指定します。

変換処理で実装しているジェネレータ関数は、非同期的な反復処理を簡潔に記述する方法を提供します。for await ... of を使って、非同期的にストリームからデータを取得し、変換することができます。

これを書きながら1行ずつ読み込んでくれるcreateInterfaceのpromise版が生えてることに気づきました。 これもそうですしnode:stream/promisespipelineもそうですし、Node.jsのStreamは非常に便利なので、ぜひ使いこなしていきたいですね。

動かして検証してみたところストレージにダウンロードして処理する場合に比べて、メモリの使用量は1/10程度になっていました。

まとめ

S3からファイルを全て読み込んで集計するなどには向きませんが、1行ずつの簡単な変換や加工であればNode.jsのStream APIやnode:stream/promisespipelineaws-sdk/lib-storageを利用して処理を行うことができて便利ですのでぜひ利用してみてください。