Safie Engineers' Blog!

Safieのエンジニアが書くブログです

FastAPI StreamingResponse の使い方

サーバサイドエンジニアの松木 (@tatsuma_matsuki) です。 Safieでは、FastAPIを利用していくつかのサービスを開発しています。Safieのサービスの性質上、APIサーバで画像ファイルなどのオブジェクトを扱うことが多いです。

大きいサイズのオブジェクトをクライアントにダウンロードさせるAPIなどでは、FastAPIの StreamingResponse を使うのが便利ですが、このStreamingResponseの使い方を扱った良いリファレンスがネット上であまり見つからなかったので、この記事で実際にコードなどを示しながら実装例を共有していきたいと思います!

StreamingResponse

FastAPIのStreamingResponseは、HTTPのbodyとなるコンテンツをいくつかのチャンクに分割してストリームで返すために利用されるレスポンスクラスです。まずは、このStreamingResponseクラスのコンストラクタ引数についてまとめてみます。詳細は以下のリンクを参照ください。 https://github.com/encode/starlette/blob/0.21.0/starlette/responses.py#L224

引数 説明
content Iterator or AsyncIterable メインとなるオブジェクトのデータ(必須)
status_code int ステータスコードを指定(デフォルトは200、省略可)
headers Mapping レスポンスヘッダーを指定(省略可)
media_type str 指定した値は、Content-Typeヘッダーの値として利用されます。headersでもContent-Typeを指定した場合は、headersの値が優先されます。(省略可)
background BackgroundTask BackgroundTask を指定(省略可)

メインとなるオブジェクトのデータはcontentの引数に渡します。content引数は、イテレータ、非同期イテレータ(もしくは、ジェネレータ、非同期ジェネレータ)を指定できますので、この仕様に合わせてcontentを渡してあげる必要があります。

S3からオブジェクトをダウンロードして返す

オブジェクトを管理するオブジェクトストレージのサービスとしてまず思い浮かぶのはおそらくS3でしょう!APIサーバ上でS3からオブジェクトをダウンロードして、そのオブジェクトをクライアントに返すというような処理に、StreamingResponseが利用できます。

S3からオブジェクトをダウンロードして、それをStreamingResponseで返す処理は、例えば以下のような実装になります。S3からのオブジェクト取得には、 aiobotocore を利用しています。aiobotocoreは、botocoreとaiohttpによる 非同期の クライアント実装となっています。

StreamingResponseは、非同期イテレータと通常の同期イテレータの両方が使用可能なので、非同期ではないbotocoreを利用した場合とのダウンロード時間を比較した結果も後で紹介します。

...
async def s3_client(
    settings: Settings = fastapi.Depends(get_settings),
) -> AsyncGenerator[aiobotocore.client.AioBaseClient, None]:
    session = aiobotocore.session.AioSession()
    async with session.create_client(
        "s3",
        endpoint_url=settings.aws_endpoint_url,
        aws_access_key_id=settings.aws_access_key_id,
        aws_secret_access_key=settings.aws_secret_access_key,
        verify=settings.aws_tls_verify,
    ) as client:
        yield client

def s3_client_sync(
    settings: Settings = fastapi.Depends(get_settings),
) -> botocore.client.BaseClient:
    session = botocore.session.Session()
    return session.create_client(
        "s3",
        endpoint_url=settings.aws_endpoint_url,
        aws_access_key_id=settings.aws_access_key_id,
        aws_secret_access_key=settings.aws_secret_access_key,
        verify=settings.aws_tls_verify,
    )

@router.get(
    "/tasks/{task_id}/image",
    summary="Download Image",
    response_class=StreamingResponse,
)
async def get_image(
    task_id: int = Path(..., title="Task ID"),
    settings: Settings = fastapi.Depends(get_settings),
    s3_client: aiobotocore.client.AioBaseClient = fastapi.Depends(s3_client),
    # s3_client_sync: botocore.client.BaseClient = fastapi.Depends(s3_client_sync),
):

    s3_bucket = settings.s3_image_bucket
    s3_key = f"test/{task_id}/image.jpg"
    try:
        image = await s3_client.get_object(Bucket=s3_bucket, Key=s3_key)
        # image = s3_client_sync.get_object(Bucket=s3_bucket, Key=s3_key)
    except Exception:
        raise HTTPException(status_code=500, detail="Failed to get the image")

    return StreamingResponse(
        content=image["Body"],  # 非同期イテレータ
        media_type="image/jpeg",
        headers={"content-length": str(image["ContentLength"]), "etag": image["ETag"]},
    )

botocoreのget_object()のレスポンスはDict型で、"Body"キーの値(image["Body"])にオブジェクトのデータが返ってきます。

image["Body"]の型は aiobotocore.response.StreamingBody ですが、これは非同期イテレータとなっており、StreamingResponseのcontentにそのまま渡すことができます。

同期・非同期イテレータでのダウンロード時間比較

StreamingResponseは、通常の同期イテレータと非同期イテレータの両方に対応していますが、それぞれを利用した場合にどの程度オブジェクトのダウンロード時間が変わるのかを少し見てみたいと思います。

S3(今回はlocalstackを利用)上に10MBのオブジェクトをアップロードして、それを上記で実装したAPIでダウンロードします。

まずは、通常の同期イテレータ(botocore)を用いた場合の結果です。

$ time curl -sv http://localhost:30080/tasks/1/image -o image.jpg
*   Trying 127.0.0.1:30080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 30080 (#0)
> GET /tasks/1/image HTTP/1.1
> Host: localhost:30080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< date: Wed, 28 Sep 2022 04:26:28 GMT
< server: uvicorn
< content-length: 10485760
< etag: "46f40f95feefb6ab49b2b3679b5ec0a2"
< content-type: image/jpeg
< 
{ [1024 bytes data]
* Connection #0 to host localhost left intact

real    0m19.873s
user    0m0.115s
sys     0m0.300s

約20秒 ほどかかりました。次に、非同期イテレータ(aiobotocore)を用いた場合です。

$ time curl -sv http://localhost:30080/tasks/1/image -o image.jpg
*   Trying 127.0.0.1:30080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 30080 (#0)
> GET /tasks/1/image HTTP/1.1
> Host: localhost:30080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< date: Wed, 28 Sep 2022 04:28:05 GMT
< server: uvicorn
< content-length: 10485760
< etag: "46f40f95feefb6ab49b2b3679b5ec0a2"
< content-type: image/jpeg
< 
{ [3072 bytes data]
* Connection #0 to host localhost left intact

real    0m3.825s
user    0m0.039s
sys     0m0.198s

こちらは 4秒弱 という結果でした!非同期イテレータをcontentに指定することでダウンロード時間がかなり早くなっています。

個人的には思ったよりも違いが出たので、API実装する上では、StreamingResponseに非同期のイテレータを渡すことが性能面でかなり重要になってきそうです。

他サービスAPIからオブジェクトをダウンロードして返す

S3以外の実装例も紹介したいと思います。複数のサービス連携を実装する中では別サービスのAPIからオブジェクトをダウンロードして、それをStreamingResponseで返すようなシーンもありますので、その実装例も書いてみます。

...
async def get_exit_stack() -> AsyncGenerator[contextlib.AsyncExitStack, None]:
    async with contextlib.AsyncExitStack() as stack:
        yield stack

async def get_aiohttp_session() -> AsyncGenerator[aiohttp.ClientSession, None]:
    async with aiohttp.ClientSession() as session:
        yield session

@router.get(
    "/tasks/{task_id}/bytes/{size}",
    summary="Download bytes",
    response_class=StreamingResponse,
)
async def get_bytes(
    task_id: int = Path(..., title="Task ID"),
    size: int = Path(..., title="The number of Bytes to download"),
    exit_stack: contextlib.AsyncExitStack = Depends(get_exit_stack),
    session: aiohttp.ClientSession = Depends(get_aiohttp_session),
):

    res = await exit_stack.enter_async_context(
        session.get(url=f"http://httpbin.org/stream-bytes/{size}")
    )
    if res.status != 200:
        raise HTTPException(status_code=500, detail="Failed to download bytes")

    return StreamingResponse(
        content=res.content.iter_chunked(1 * 1024),
        media_type=res.headers["content-type"],
    )

HTTPクライアントの 非同期実装 である aiohttp を用いて、他サービスのAPIからオブジェクトを非同期でダウンロードする実装にしています。他サービスとして、今回は httpbin.org を利用しています。

aiohttp.ClientSessionでgetした際のレスポンス(ClientResponse)のcontentは、aiohttp.streams.StreamReader 型となっています。これ自体非同期イテレータとして扱えるため、そのままStreamingResponseのcontentに渡してもOKですが、iter_chunked(size: int) のメソッドを使って各イテレーションで扱うデータサイズを指定することもできます。

ちなみに、aiohttp.ClientSessionのレスポンスは、コンテキストマネージャーとして実装されており、そのままwith分を書くとwithブロックを抜けるタイミングでクローズされてしまい、contentをすべて読み込めないため、contextlib.AsyncExitStack を用いて、レスポンスがクローズされるタイミングを遅らせています。

単体テストの実装

最後にmockを用いた単体テストの例も書いてみます。aiohttp.ClientSessionのメソッドをmockすることで他サービスからのResponseをmockしてテストを実装します。

...
class MockByteGenerator:
    async def iter_chunked(self, size: int) -> AsyncGenerator[bytes, None]:
        for _ in range(5):
            yield b"x" * size


class MockStreamResponse:
    def __init__(self, content: Any, status: int, headers: Dict[str, str]):
        self.content = content
        self.status = status
        self.headers = headers

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc, tb):
        pass


@pytest.mark.asyncio
async def test_get_bytes(test_client):

    with mock.patch("aiohttp.ClientSession.get") as mock_get:
        mock_get.return_value = MockStreamResponse(
            content=MockByteGenerator(),
            status=200,
            headers={"content-type": "application/octet-stream"},
        )
        res = await test_client.get(f"/tasks/1/bytes/1")
        assert res.status_code == 200
        assert res.content == b"x" * 1024 * 5

aiohttpのClientResponseをmockするclassであるMockStreamResponseは、ClientResponseと同じくコンテキストマネージャーとして実装する必要があり、__aenter____aexit__ のメソッドを追加しています。また、インスタンス変数として、contentを用意して、iter_chunked()のメソッドが非同期ジェネレータを返すように実装しています。

まとめ

FastAPIでStreamingResponseを使って、オブジェクトをダウンロードするAPIの実装例を二つ書いてみました!StreamingResponseは、同期イテレータと非同期イテレータの両方をサポートしていますが、 非同期イテレータを渡してあげることで応答時間を改善することが可能です。S3クライアント、HTTPクライアント共に非同期の実装がすでに存在するので、それらを利用するだけで比較的容易に実装できるかと思います。これから、FastAPIを用いてAPI等を開発される方の参考になりますと幸いです!

© Safie Inc.