Safie Engineers' Blog!

Safieのエンジニアが書くブログです

gRPC AI推論サーバーの実装比較:Python(Sync/Async) vs C++ による劇的なコスト差について

この記事はSafie Engineers' Blog! Advent Calendar 16日目の記事です。

みなさま初めまして、AI開発部の菅井です。 昨今、AI開発の現場では、単に精度の高いモデルを作るだけでなく、「いかにしてAIをプロダクトとして使える形にするか」というエンジニアリング力が強く求められていると感じます。 我々AI開発部においても例外ではありません。 素晴らしいモデルができても、それを動かすためのサーバー費用が莫大であれば、ビジネスとして成立しないからです。特にカメラ映像を扱う場合、データ量は膨大で、処理の遅延は許されません。 「いかに推論の実行速度を上げるか」 「どうすればクラウド(EC2)のインスタンスコストを極限まで安くできるか」 これらは、モデルのパラメータチューニングと同じくらい、私たちにとって重要な「試行錯誤」のテーマです。 今回は、gRPCを使ったAI推論サーバーにおいて実装方法によりどれくらいコスト差が生じるのかについて紹介します。

1. なぜ、gRPCの実装方法にこだわるのか?

AI推論サーバーのコスト構造を考えた時、最も高価なリソースは「GPU」や「CPU」です。 しかし、一般的なWebサーバーの実装(同期処理)では、通信待ちやI/O待ちの間、この高価なリソースが「待ちぼうけ」をしてしまいます。

  • 同期処理の無駄: 「データ受信待ち」の間、CPUは遊んでいる(のに課金される)。
  • Pythonの限界: 非同期にしても、GIL(Global Interpreter Lock)の制約で、マルチコアCPUを積んだ高価なインスタンスを使っても、1コアしか使い切れない。

これでは、いくら高性能なEC2インスタンスを契約しても、スループット(処理できるカメラ台数)は伸びず、コストばかりが膨れ上がってしまいます。

「高価なコンピュートリソースを、1ミリ秒たりとも遊ばせず、極限まで使い倒すにはどうすればいいか?」

その答えを見つけるために、今回は①同期Python②非同期Python③非同期C++の3パターンを実装し、それぞれのパフォーマンスとコスト効率を徹底的に比較しました。

なぜREST APIやWebSocketではなく、gRPCなのか? 今回の「リアルタイム映像解析」において、gRPCを採用した理由は主に2点あります。

  1. Bidirectional Streaming(双方向ストリーミング): HTTP/1.1(REST)のような「1リクエスト1レスポンス」では、30FPSの映像を送り続ける際のオーバーヘッドが大きすぎます。gRPCなら、接続を維持したまま双方向にデータを流し続けられるため、低遅延な推論が可能です。
  2. Schema Driven(スキーマ駆動): PythonとC++という異なる言語間で高速に連携するためには、厳密な型定義(Protobuf)によるコード自動生成が不可欠でした。これにより、手動でのデータパース実装によるバグや性能劣化を防げます。

(※gRPC自体の基礎については、@S4nTo様から素敵な記事が投稿されておりますので、そちらも併せてご参照ください。 https://qiita.com/S4nTo/items/0ff0445542538ef49a05

よくある「カメラ映像を渡すから、あとの処理はAIエンジニアに任せた!」というパスに対して、我々がサーバーサイドの技術選定(エンジニアリング)でどこまで応えられるか。それはサービスの運用コストを数分の一、あるいは十分の一に削減できる可能性を秘めているのです。

2. 想定ケース

今回の記事で想定するシステム構成は以下の通りです。

  • クライアント: 多数のネットワークカメラ(またはゲートウェイ)。映像フレームを絶え間なく送信し続ける。
  • サーバー: AI推論サーバー(EC2インスタンス)。受け取った映像に対して物体検出を行い、BBOX(座標)などのメタデータを返す。
  • 通信要件:
    • プロトコル: gRPC (Bidirectional Streaming)
    • 同時接続数: 1台のサーバーで可能な限り多く
    • 遅延: リアルタイム性が求められるため、キュー詰まりによる遅延はNG。

この構成において、「Pythonの同期サーバー」「Pythonの非同期サーバー」、そして「C++の非同期サーバー」で実装した場合に、パフォーマンスにどれだけの差が出るのかを見ていきます。

3. 実装

共通:Proto定義

それぞれの実装パターンで用いるProtocolBufferの定義になります。

ObjectDetectionというサービスがDetectStreamというRPCメソッドを持つというものです。実際の画像ストリームを意識して書いていますが、今回の実装では処理自体はダミーのTaskを実行するため中身はほとんど使われません。

syntax = "proto3";

package camera_ai;

// サービス定義
service ObjectDetection {
  // 双方向ストリーミング: 
  // クライアントは映像フレームを送り続け、サーバーは推論結果を返し続ける
  rpc DetectStream (stream DetectRequest) returns (stream DetectResponse) {}
}

// リクエスト: カメラからの1フレーム分のデータ
message DetectRequest {
  // 非同期処理ではリクエストとレスポンスの順序が保証されない場合や、
  // クライアント側で結果をマッチングさせるためにIDが必須です。
  uint64 frame_id = 1; 

  // 画像データ (JPEG/PNG等のエンコード済みデータ、またはRawデータ)
  bytes image_data = 2;

  // 必要に応じて画像サイズなどを入れることもありますが、
  // シンプルにするため今回は省いてもOKです。
  // int32 width = 3;
  // int32 height = 4;
}

// レスポンス: 1フレームに対する推論結果
message DetectResponse {
  // リクエストの frame_id をそのまま返すことで、
  // クライアントはどの画像の推論結果かを特定できます。
  uint64 frame_id = 1;

  // 1枚の画像に複数の物体が写っているため repeated にします
  repeated Object objects = 2;
}

// 検出された個々の物体
message Object {
  string label = 1;      // クラス名 (例: "person", "car")
  float confidence = 2;  // 確信度 (0.0 - 1.0)
  BoundingBox bbox = 3;  // 座標情報
}

// バウンディングボックス (OpenCVのcv::Rectに合わせておくと楽です)
message BoundingBox {
  int32 x = 1;
  int32 y = 2;
  int32 width = 3;
  int32 height = 4;
}

共通:擬似的な推論負荷 (Heavy Task)

GPU推論は非同期ですが、実際には画像の前処理(デコード・リサイズ)や後処理(NMS等)でPythonのCPUリソースが大量に消費されます。この「隠れたボトルネック」を再現するため、sleep ではなく while ループでCPUを回します。

def heavy_inference_simulation(target_ms=30):
    start = time.perf_counter()
    # 指定時間、CPUを空回しして負荷をかける
    while (time.perf_counter() - start) < (target_ms / 1000.0):
        pass
    return "Result BBOX"

共通:Clientの実装

まずはサーバーにいじめ抜くほどの負荷をかけるクライアントです。 Pythonの asyncio を使い、1台のマシンから複数のカメラ接続を擬似的に生成します。

async def main():
    print(f"Starting Load Test with {NUM_CLIENTS} clients for {TEST_DURATION} seconds...")
    
    # クライアントタスクを全て起動
    tasks = []
    for i in range(NUM_CLIENTS):
        tasks.append(run_single_client(i))
        # 全員同時にアクセスするとエラーになりやすいので、わずかに開始時間をずらす
        if i % 10 == 0:
            await asyncio.sleep(0.05)

    # 全タスクの完了を待つ
    await asyncio.gather(*tasks)

① 同期Pythonの実装

まずは比較対象となる、標準的な同期実装です。

protobufで定義されたObjectDetectionServicer.DetectStreamにストリームを受けとった後の処理を書けばいいだけのシンプルな実装になります。 concurrent.futures.ThreadPoolExecutor を使い、1接続につき1スレッドを割り当てます。

  • 特徴: 実装は一番簡単。
  • 弱点: max_workers(スレッド数)が接続上限になる。それ以上の接続はキュー待ちになり、遅延が激増する。
class ObjectDetectionServicer(pb_grpc.ObjectDetectionServicer):
    def DetectStream(self, request_iterator, context):
        """
        双方向ストリーミングの実装。
        request_iterator: クライアントから送られてくるリクエストのイテレータ
        """
        print(f"[{time.ctime()}] New connection established.")

        # クライアントから stream で送られてくるデータを1つずつ処理
        for request in request_iterator:
            frame_id = request.frame_id
            img_data = request.image_data # ここに画像データ(bytes)が入る
            
            # --- ここで本来は推論を行う ---
            # 今回はダミーの推論時間を設ける(例: 30ms)
            heavy_inference_simulation(target_ms=30)

            # ダミーの結果を作成
            print(f"Sync processing frame_id: {frame_id}")
            
            # レスポンスを作成
            response = pb.DetectResponse(
                frame_id=frame_id,
                objects=[
                    pb.Object(
                        label="person",
                        confidence=random.uniform(0.8, 0.99),
                        bbox=pb.BoundingBox(x=100, y=100, width=50, height=200)
                    )
                ]
            )
            
            # 結果を返す
            yield response

② 非同期Pythonの実装

次に、asyncio 版です。非同期版もPythonでは簡単に実装することができます。

ObjectDetectionServicer.DetectStreamにasyncをつけて非同期に対応させる形になります。 推論部分を run_in_executor でスレッドプールに逃がし、イベントループを止めないように工夫します。

  • 特徴: 接続自体は大量にさばける(C10k対応)。
  • 現実: Pythonの GIL (Global Interpreter Lock) の制約により、複数のスレッドで計算しようとしても、同時に動けるCPUコアは実質1つだけ。CPUバウンドな処理(前処理等)が重なると詰まる。
class ObjectDetectionServicer(object_detection_pb2_grpc.ObjectDetectionServicer):
    async def DetectStream(self, request_iterator, context):
        loop = asyncio.get_running_loop()
        
        async for request in request_iterator:
            # 【工夫】重い処理を別スレッドに逃がして、イベントループを守る
            # しかし、Pythonコードである以上、GILの呪縛からは逃れられない...
            await loop.run_in_executor(None, heavy_inference_simulation, 30)
            
            yield object_detection_pb2.DetectResponse(...)

③ 非同期C++の実装

最後に、今回の主役であるC++実装です。 Pythonとは異なり、CallDataやCompletionQueueについて 理解する必要があるため、実装方法は少し複雑になります。

CallDataクラス

Clientとの1つ接続ごとの状態とデータをを管理するクラスになります。

非同期処理では、処理がいったん中断されてスレッドが別の仕事に移るため、「関数のローカル変数」にデータを残しておくことができません。そのため、以下の情報をヒープメモリ上のオブジェクト(CallData)として保持し続けます

  • コンテキスト: ServerContextRequestResponse などのデータ実体。
  • ステート(状態): 現在その接続が「接続待ち」なのか、「受信中」なのか、「送信中」なのかを示す状態遷移(ステートマシン)。
  • ロジック: イベントを受け取った際に実行すべき処理内容(Proceedメソッド)。

CompletionQueueクラス

CompletionQueue は、gRPCライブラリ内部(カーネルやネットワーク層)と、アプリケーション(ユーザーコード)をつなぐ 連絡通路のようなものです。

非同期で行われた操作(データの読み込み、書き込み、接続確立など)が完了した際、その通知(イベント)がすべてこのキューに格納されます。

完全にスレッドセーフであり、複数のスレッドから同時にアクセスしても安全に動作するように設計されています。

ワーカースレッドは、このキューに対して Next() メソッドを呼び出し、「何か完了した仕事はないか?」と問い合わせます。イベントがあれば即座に取り出し、なければイベントが来るまで待機(ブロック)します。

CallDataクラスの実装例

class CallData {
public:
    CallData(ObjectDetection::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq), stream_(&ctx_), status_(CREATE) {
        // インスタンス生成と同時に「接続待ち」状態に入る
        Proceed();
    }

    void Proceed() {
        if (status_ == CREATE) {
            // 1. CREATE状態:
            // gRPCシステムに対して「次の接続要求が来たら、このCallDataを使ってね」と依頼する。
            // タグとして `this` (自分自身のポインタ) を渡すのが定石。
            status_ = PROCESS;
            service_->RequestDetectStream(&ctx_, &stream_, cq_, cq_, this);
        }
        else if (status_ == PROCESS) {
            // 2. PROCESS状態 (初期接続完了):
            // 新しい接続が確立された瞬間にここに来る。
            
            // 重要: 次のクライアントのために、新しい CallData を即座に作って待機させる(Factoryパターン)
            new CallData(service_, cq_);

            // 最初の読み込みを開始する
            // 読み込みが終わったら、また `this` (自分) が呼ばれるようにタグ付けする
            status_ = READING; 
            stream_.Read(&request_, this);
        }
        else if (status_ == READING) {
            // 3. READING状態 (Read完了):
            // クライアントから request_ にデータが入った状態でここに来る。

            // --- 推論処理 (Mock) ---
            // request_.image_data() を cv::Mat に変換したりする場所
            HeavyInferenceSimulation(5); // 5msの重い処理をシミュレート
            
            response_.set_frame_id(request_.frame_id());
            auto* obj = response_.add_objects();
            // objに結果をセットする
            
            // 結果を書き込む
            status_ = WRITING;
            stream_.Write(response_, this);
        }
        else if (status_ == WRITING) {
            // 4. WRITING状態 (Write完了):
            // クライアントへの送信完了後にここに来る。

            std::cout << "Processed frame_id: " << response_.frame_id() << std::endl;
            
            // レスポンスをクリアして次のデータを読む
            response_.Clear();
            status_ = READING;
            stream_.Read(&request_, this);
        }
        else {
            // FINISH状態
            assert(status_ == FINISH);            
            delete this; // 自分の役目は終わったので自決する
        }
    }

    // クライアント切断時などの後処理用
    void Finish() {
        status_ = FINISH;
        stream_.Finish(Status::OK, this);
    }

    // 外部からアクセスするタグ用のアクセサ
    // 処理が成功したか(ok)によって挙動を変える制御が必要
    void OnEvent(bool ok) {
        if (!ok) {
            std::cout << "Client disconnected or finished reading/writing." << std::endl;
            Finish();
            return;
        }
        Proceed();
    }

ワーカースレッドの立ち上げとCompletionQueueの呼び出し例

// サーバーのメインクラス
class ServerImpl {
public:
    ~ServerImpl() {
        server_->Shutdown();
        cq_->Shutdown();
    }

    void Run() {
        std::string server_address("0.0.0.0:50051");
        ServerBuilder builder;
        
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service_);
        
        // CompletionQueue (イベントキュー) の作成
        cq_ = builder.AddCompletionQueue();
        server_ = builder.BuildAndStart();
        std::cout << "Server listening on " << server_address << std::endl;

        // 最初の1つ目の CallData を投入して待ち受け開始
        new CallData(&service_, cq_.get());

        // イベントループ
        void* tag;  // イベントに紐付いたタグ (CallDataのポインタが入る)
        bool ok;

        std::vector<std::thread> threads;
        int thread_count = 15; // サーバーのCPUコア数に合わせるのが一般的        
        for (int i = 0; i < thread_count; i++) {
            threads.emplace_back([this, i](){
                std::cout << "Worker Thread " << i << " started." << std::endl;
                
                void* tag;
                bool ok;
                
                // 複数のスレッドが、同じ cq_ (キュー) を同時に監視する。
                // gRPCのCompletionQueueはスレッドセーフなので、競合は内部で解決してくれる。
                while (cq_->Next(&tag, &ok)) {
                    // 空いているスレッドがイベントを拾って処理を実行
                    static_cast<CallData*>(tag)->OnEvent(ok);
                }
            });
        }

        // 全スレッドの終了を待つ(実際はCtrl+Cされるまで帰ってこない)
        for (auto& t : threads) {
            if (t.joinable()) t.join();
        }        
    }

今回の実装のイメージを以下に図示しています。

C++実装のイメージ

ClientがDetectStreamを呼び出すとComletionQueueにイベント通知がされ、空いているWorker ThreadがCallData を呼び出します。CallDataProceedにより自身の状態をみながら処理を進めます。画像を渡されているのであれば、推論を実施して結果をClinetに返します。

重要なポイントはCompletion Queueがスレッドセーフであり、各スレッドをまたいで使われている点であり、この仕組みが非同期実行を実現しているといえます。

各Worker Threadは空いている順からイベント通知のあったCallDataを処理していきます。そのため、CallData 1Worker Thread 1で呼ばれることがあれば、Worker Thread Nで呼ばれることもあります。同期処理と異なるのは、このようにスレッドが特定のClientのストリームを占有せずに、各Worker Threadが協力して多くのClientのストリームを処理することにあります。

4. ベンチマーク結果

実際にそれぞれの実装方法でどれくらいの差が出るのかを確認していきます。

測定環境

項目 内容
マシン Intel(R) Core(TM) Ultra 7 155H (1.40 GHz) 16コア
Python 3.10.18
CXX 17
推論負荷 5ms間のループ処理
クライアント 30台
ストリーム時間 10秒
ストリームFPS 10

測定結果サマリ

各々がRequestを受け取りResponseを返すまでの時間を計測した結果が以下になります。

実装パターン 平均時間 (Avg) 最速 (Min) 最遅 (Max) 評価
① 同期 Python 63823.44ms 25.03ms 127814.19ms ❌ 破綻
② 非同期 Python 45678.62ms 15.19ms 90461.10ms ❌ 破綻
③ 非同期 C++ 13.157ms 5.611ms 71.566ms 🏆 成立

10FPSを遅延なく処理するためには最遅Maxが100ms以下である必要があります。

③非同期 C++は上記の条件を満たすので本マシンで30台のクライアントをさばくことができることがわかります。

Client数のみを変えていき、最遅Maxを観測した結果を以下のグラフにしました。

処理時間(Max)の比較

①同期Python②非同期Python100msを下回るClient数は2までなので、本マシンで2台のクライアントしかさばけないことがわかります。

実装方法によってコストに15倍の差がでるという結果になりました。

考察

① 同期Python vs ② 非同期Python

同じPython、同じGILの制約下にありながら、同期版だけがMax 127秒(非同期版より30秒以上悪化)という遅延を記録しました。 その決定的な差は、「スレッドを占有する時間の長さ」にあります。

1. 20個の椅子と30人の客

今回のサーバー(16コア)では、Pythonのデフォルト設定により約20個のスレッドが稼働していました。 ここに30台のカメラが同時に接続すると、何が起きるでしょうか。

2. 同期版の敗因:接続=占有

同期実装では、「ストリーミング中の10秒間、スレッドをずっと占有」します。

  • 開始直後: 20台がつながった瞬間、全スレッドが埋まります。
  • 待ちぼうけ: 残り10台は、前の人の配信(10秒)が終わるまで、接続することすら許されず待機させられます。
  • 結果: 「待ち時間10秒」+「処理時間」が確定し、遅延が跳ね上がりました。

3. 非同期版の勝因:計算=占有

非同期実装では、スレッドを占有するのは「推論している一瞬(5ms)」だけです。

  • 高回転: 5msで席を立つため、20個のスレッドで30台を次々とさばくことができます。
  • 結果: 「入り口での門前払い」が起きず、少なくとも接続とデータ受信はスムーズに行われました。

②同期Python vs ③非同期C++

続いて、同じ「非同期アーキテクチャ」を採用している両者の比較です。結果は、C++がPythonを1200倍以上の速度差で圧倒しました。

両者とも「スレッドプール」を使って計算を並列化しようとしています。しかし、実際のCPUコアの使われ方は対照的です。

  • Pythonの限界 (GIL): コード上でスレッドを15個立てても、PythonのGIL (Global Interpreter Lock) の制約により、同時にPythonコードを実行できるのは常に1スレッドだけです。 15車線の高速道路があるのに、工事規制で「たった1車線」しか通行できない状態と同じです。30台のカメラから来るデータが、この細い1車線に殺到したため、大渋滞(90秒の遅延)が発生しました。
  • C++の真価 : GILが存在しません。15個のスレッドが、15個のCPUコアすべてを同時に使用して計算します。 15車線をフルに使って車がビュンビュン流れるため、30台程度の交通量では渋滞など起きようがありません。

まとめ

本記事では、同期・非同期のアーキテクチャの違い、そしてPython・C++という言語特性の違いが、システムのパフォーマンスにどれほど差をもたらすかを検証しました。

実験の結果、同じハードウェアを使っていても、実装方法を変えるだけで「1台のサーバーに収容できるカメラ台数」が15倍以上変わり得ることが確認できました。これは、クラウドインフラ費用が1/15になる可能性を示唆しており、エンジニアリングの工夫が事業の収益性に直結することを証明しています。

残された課題と今後の展望

今回は「非同期C++の可能性」にフォーカスしましたが、検証しきれていないテーマも残されています。

  • Python MultiProcessing: PythonのGILという障害は、マルチプロセス化によって回避する手段が存在します。メモリ消費量とのトレードオフにはなりますが、Pythonの生産性を維持しつつ性能を上げるアプローチとして検証の価値があります。
  • 同期C++との比較: 「C++だから速い」のか「非同期だから速い」のかをより厳密に切り分けるため、同期C++実装との比較も興味深いテーマです。

これらを含めたさらなる検証は、次回の記事でお届けできればと思います。

PoCからプロダクトへ

実際のプロダクト開発は、今回のようなCPUループ処理よりも遥かに複雑です。 映像のデコード、リサイズなどの前処理、そしてGPUを用いた推論処理。これらを遅延なくパイプライン化するために、NVIDIA DeepStream のような特化したツールキットを活用して、さらなる効率化を目指す道もあります。

しかし、どのような手段を選ぶにせよ、本質は変わりません。 AI開発者がモデルの精度だけでなく、「実行速度」や「運用コスト」を意識した設計を行うこと。 そのエンジニアリングへの意識こそが、AIを単なるPoC(概念実証)で終わらせず、持続可能なビジネス価値を生む「プロダクト」へと昇華させる鍵になるのです。

© Safie Inc.