PyTorchの重いCUDA処理を非同期化したらメモリリークした話と、その解決策

PyTorchの重いCUDA処理を非同期化したらメモリリークした話と、その解決策

こんにちは!Qualitegプロダクト開発部です!

今回は同期メソッドを非同期メソッド(async)化しただけなのに、思わぬメモリリーク※に見舞われたお話です。

深層学習モデルを使った動画処理システムを開発していた時のことです。
「処理の進捗をリアルタイムでWebSocketで通知したい」という要件があり、「単にasync/awaitを使えばいいだけでしょ?」と軽く考えていたら、思わぬ落とし穴にはまりました。

プロ仕様のGPUを使っていたにも関わらず、メモリ不足でクラッシュしてしまいました。

この記事では、その原因と解決策、そして学んだ教訓を詳しく共有したいと思います。同じような問題に直面している方の参考になれば幸いです。

※ 厳密には「メモリリーク」ではなく「メモリの解放遅延」ですが、 実用上の影響は同じなので、この記事では便宜上「メモリリーク」と表現します。

背景:なぜ進捗通知は非同期である必要があるのか

モダンなWebアプリケーションの要求

最近のWebアプリケーション開発では、ユーザー体験を向上させるため、長時間かかる処理の進捗をリアルタイムで表示することが当たり前になってきました。

特に生成AIの普及により、数分から数十分かかる処理も珍しくありません。当社のMotionVoxも長めの動画を生成時には長時間処理となります。「今どこまで進んでいるのか」「あとどれくらいかかるのか」を知るために進捗通知が必要となるわけです。

進捗表示があると便利ですよね

非同期progress_listenerが必要な理由を深く理解する

最初は「なぜ進捗通知が非同期でなければならないのか」を深く考えていませんでしたが実装を進めるうちに、その必要性が明確になってきました。

まず、WebSocketでの通信は本質的に非同期です。たとえば、FastAPIでWebSocketを扱う場合、以下のようなコードになります

from fastapi import FastAPI, WebSocket
from datetime import datetime
import asyncio

app = FastAPI()

@app.websocket("/ws/{task_id}")
async def websocket_endpoint(websocket: WebSocket, task_id: str):
    """WebSocketエンドポイント - クライアントとの双方向通信を確立"""
    
    await websocket.accept()  # 接続を受け入れる(非同期)
    
    # 進捗通知用の非同期関数を定義
    async def send_progress(percent: float, message: str, details: dict = None):
        """進捗情報をWebSocketで送信する非同期関数"""
        
        payload = {
            "type": "progress",
            "task_id": task_id,
            "percent": percent,
            "message": message,
            "timestamp": datetime.now().isoformat(),
            "details": details or {}
        }
        
        # WebSocketでJSONを送信(これは非同期操作)
        await websocket.send_json(payload)
    
    try:
        # 重い処理を実行(ここが今回の問題の核心)
        result = await process_heavy_video_task(
            task_id=task_id,
            progress_listener=send_progress  # 非同期関数を渡す
        )
        
        # 処理完了を通知
        await websocket.send_json({
            "type": "complete",
            "task_id": task_id,
            "result": result,
            "timestamp": datetime.now().isoformat()
        })
        
    except Exception as e:
        # エラーを通知
        await websocket.send_json({
            "type": "error",
            "task_id": task_id,
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        })
    
    finally:
        await websocket.close()

さらに、実際のプロダクション環境では、進捗通知は単にWebSocketで送信するだけでなく、複数の非同期操作を同時に行う必要があります

最初の実装:素朴なアプローチとその思考過程

ステップ1:元々の同期的な処理

最初に実装した動画処理システムは、PyTorchのチュートリアルでよく見かける、標準的な同期処理でした。これは問題なく動作していました

import torch
import torchvision.transforms as transforms
from typing import List, Any
import numpy as np

def process_video_frames(
    model: torch.nn.Module,
    frames: List[np.ndarray],
    device: str = "cuda"
) -> List[torch.Tensor]:
    """
    動画フレームを処理する同期関数
    
    Args:
        model: PyTorchの深層学習モデル
        frames: 動画フレームのリスト(numpy配列)
        device: 実行デバイス("cuda" or "cpu")
    
    Returns:
        処理済みフレームのリスト
    """
    
    # 前処理の定義(画像の正規化など)
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((512, 512)),  # モデルの入力サイズに合わせる
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])
    
    processed_frames = []
    total_frames = len(frames)
    
    print(f"Starting processing of {total_frames} frames...")
    
    # 各フレームを順次処理
    for i, frame in enumerate(frames):
        # 前処理:numpy配列をテンソルに変換
        input_tensor = transform(frame)
        input_tensor = input_tensor.unsqueeze(0)  # バッチ次元を追加
        
        # GPUに転送
        input_tensor = input_tensor.to(device)
        
        # 推論実行(勾配計算は不要)
        with torch.no_grad():
            output = model(input_tensor)
        
        # 後処理:結果をCPUに戻す
        result = output.cpu()
        processed_frames.append(result)
        
        # 簡単な進捗表示(100フレームごと)
        if (i + 1) % 100 == 0:
            memory_allocated = torch.cuda.memory_allocated() / 1024**3
            print(f"Processed {i+1}/{total_frames} frames, "
                  f"GPU Memory: {memory_allocated:.2f}GB")
    
    print(f"Processing complete! Total frames: {len(processed_frames)}")
    return processed_frames

この実装はシンプルで分かりやすく、小規模な動画では問題なく動作していました。

「簡単に解決できる」と思った瞬間

「処理の進捗をリアルタイムでWebページに表示したい」となると、さきほどのように非同期での通知処理が必要となります。

もともとは動画生成処理は同期処理として実装していたので、「進捗通知を行うprogress_listenerが非同期関数として渡されるのであれば、それを呼び出す処理関数も非同期にすればいいだけではないか」と。これは一見、とても論理的で単純な解決策に思えます。

Pythonにはasync/await構文があるので非同期関数から非同期関数を呼び出すのは、awaitキーワードを使うだけで簡単に実現できます。

つまり同期処理をしていた動画生成処理に async をつけるだけ。その中で 非同期な progress_listener をコールバックすれば解決、、、のはずでした

そして自信を持って、以下のようなコードを書いてみると、、

async def process_video_frames_async(
    model: torch.nn.Module,
    frames: List[np.ndarray],
    progress_listener: Any,  # 非同期のコールバック関数
    device: str = "cuda"
) -> List[torch.Tensor]:
    """
    動画フレームを処理する非同期関数(問題のあるバージョン)
    
    この実装には重大な問題があります!
    """
    
    # 前処理の定義(同期版と同じ)
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((512, 512)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])
    
    processed_frames = []
    total_frames = len(frames)
    
    print(f"Starting async processing of {total_frames} frames...")
    start_time = asyncio.get_event_loop().time()
    
    for i, frame in enumerate(frames):
        loop_start = asyncio.get_event_loop().time()
        
        # === ここから同期的な処理 ===
        
        # 前処理:numpy配列をテンソルに変換
        input_tensor = transform(frame)
        input_tensor = input_tensor.unsqueeze(0)
        
        # GPUに転送
        input_tensor = input_tensor.to(device)
        
        # 推論実行
        with torch.no_grad():
            output = model(input_tensor)
        
        # 後処理:結果をCPUに戻す
        result = output.cpu()
        processed_frames.append(result)
        
        # === ここまで同期的な処理 ===
        
        # 非同期で進捗通知!これで解決!(と思っていた)
        current_progress = (i + 1) / total_frames * 100
        elapsed_time = asyncio.get_event_loop().time() - start_time
        estimated_total = elapsed_time / (i + 1) * total_frames
        remaining_time = estimated_total - elapsed_time
        
        await progress_listener(
            percent=current_progress,
            message=f"Processing frame {i+1}/{total_frames}",
            details={
                "current_frame": i + 1,
                "total_frames": total_frames,
                "elapsed_seconds": round(elapsed_time, 2),
                "remaining_seconds": round(remaining_time, 2),
                "fps": round((i + 1) / elapsed_time, 2) if elapsed_time > 0 else 0,
                "memory_gb": torch.cuda.memory_allocated() / 1024**3
            }
        )
        
        # 10フレームごとに詳細ログ出力
        if (i + 1) % 10 == 0:
            loop_time = asyncio.get_event_loop().time() - loop_start
            memory_used = torch.cuda.memory_allocated() / 1024**3
            print(f"Frame {i+1}: Loop time: {loop_time:.3f}s, "
                  f"GPU Memory: {memory_used:.2f}GB")
    
    return processed_frames

コードはシンプルで型チェッカーも警告を出さず、構文エラーもありません。

「はい、おしまい」

のはずが。。

予期せぬ結果:メモリが爆発的に増加

テスト環境での初回実行

開発環境でテスト動画(30秒、900フレーム)を処理したところ、問題なく動作しました。「やはり簡単だった」と安心していました。

しかし、本番相当の動画(10分、18,000フレーム)で実行してみると、恐ろしい事態が発生しました

Starting async processing of 18000 frames...
Frame 10: Loop time: 0.125s, GPU Memory: 1.82GB
Frame 100: Loop time: 0.156s, GPU Memory: 3.45GB
Frame 500: Loop time: 0.203s, GPU Memory: 8.72GB
Frame 1000: Loop time: 0.298s, GPU Memory: 16.34GB
Frame 1500: Loop time: 0.412s, GPU Memory: 24.81GB
Frame 2000: Loop time: 0.589s, GPU Memory: 33.15GB
Frame 2500: Loop time: 0.834s, GPU Memory: 41.62GB
Frame 2800: Loop time: 1.205s, GPU Memory: 47.23GB

RuntimeError: CUDA out of memory. Tried to allocate 512.00 MiB 
(GPU 0; 47.54 GiB total capacity; 46.89 GiB already allocated; 
324.00 MiB free; 47.12 GiB reserved in total by PyTorch)

観察された異常な挙動

メモリ使用量の増加だけでなく、他にも奇妙な現象が観察されました

処理速度の劇的な低下

・最初のフレーム: 0.125秒

・2800番目のフレーム: 1.205秒(約10倍遅い)

CPUメモリも増加

・システムモニターを見ると、RAMの使用量も着実に増加

プロセスが応答しなくなる

・一定時間後、Ctrl+Cでも停止できない状態に・・・

原因分析:なぜメモリリーク(厳密には解放遅延)が発生したのか?

原因1: 全結果をメモリに蓄積する設計の問題

最初の、そして最も明白な問題は、処理結果を全てリストに蓄積するというイケてない実装をやらかしていたことです。最新の論文とその実証コードを読んでいると本当に「あるある」なんですが、プロダクションでこういうことをやってはいけないのに、やっちまいました。
(言い訳をすると、まずは動確のためにこういう荒っぽい実装をしておいて、後でちゃんとバッチ化や分割をやろうとおもっていたけど、なんとなく動いているうちに修正を忘れていました。。。Issueにはあげておいたのに見逃していました)

ただし、これだけでは説明がつきませんでした。

なぜなら、同期版でも同じ実装であり、同期版のほうでは正常に動作していたからです。

processed_frames = []  # ここに全フレームの結果を保持
for frame in frames:    # 18,000フレームの場合...
    output = model(frame)  # 各出力が約50MBだとすると...
    processed_frames.append(output)  # 合計900GB必要!?

実際の計算をしてみましょう

  • 1フレームの入力: 512×512×3×4バイト = 約3MB
  • モデルの中間層出力: 約20MB
  • 最終出力: 約30MB
  • 合計: 約50MB/フレーム
  • 18,000フレーム × 50MB = 900GB

これは明らかに問題ですが、同期版でも同じはずです。

同期版のときは動いてたじゃん!

では、なぜ非同期版だけがクラッシュしたのでしょうか?

原因2: async関数におけるメモリ管理の特殊性+PyTorch CUDAキャッシュの影響

ここが今回の最大の落とし穴でした。
Pythonのasync関数は、通常の関数とは異なるメモリ管理を行います。
awaitを挟むと、その時点でコルーチンの「現在のローカル変数状態」がフレームに保存され、後続コードから参照される可能性のある変数は解放されません。
このため、GPU上の大きなテンソルや推論結果が次のイテレーションに入ってもメモリに残りやすくなります。

さらにPyTorchは、GPUメモリ(VRAM)を使い終わってもすぐにOSに返さず内部キャッシュとして保持する設計です。同期版では変数が速やかに寿命を迎えるためキャッシュが上書きされて大きな増加になりにくいのですが、非同期版では変数の寿命が延びるためキャッシュ領域が膨らみ、結果として急激なVRAM増加につながります。

# 同期関数の場合のメモリ管理
def sync_process():
    for i in range(1000):
        # 大きなテンソルを作成(4MB)
        big_tensor = torch.randn(1000, 1000).cuda()
        
        # モデルで処理(さらに4MB)
        result = model(big_tensor)
        
        # ここで重要:ループの終わりで big_tensor は次のイテレーションで
        # 上書きされ、古いものは即座にガベージコレクションの対象になる
        
        # Pythonの参照カウントが0になり、メモリが解放される

しかし、async関数では状況が異なるんです

# 非同期関数の場合のメモリ管理
async def async_process():
    for i in range(1000):
        # 大きなテンソルを作成(4MB)
        big_tensor = torch.randn(1000, 1000).cuda()
        
        # モデルで処理(さらに4MB)
        result = model(big_tensor)
        
        # ここが問題の核心!
        await something()  # awaitでコルーチンの実行が中断される
        
        # この時点で、Pythonはコルーチンの状態を保存する必要があるため、big_tensorやresultへの参照が保持されます。
同期版ではループ終了と同時に参照が切れるため、PyTorchのキャッシュ領域はすぐに再利用されますが、非同期版では寿命が延びた変数がキャッシュ領域を占有し続けます。
その結果、変数寿命の延び+PyTorchのVRAMキャッシュの組み合わせで、GPUメモリが雪だるま式に増えていきます。

コルーチンの状態保存について、もう少し捕捉すると、↓のような感じです

async def detailed_async_process():
    # これらの変数は全てコルーチンのフレームに保存される
    local_var_1 = create_large_object()  # 100MB
    local_var_2 = create_another_object()  # 50MB
    
    for i in range(100):
        loop_var = create_loop_object()  # 10MB
        
        # awaitの時点で、以下が保存される:
        # - local_var_1, local_var_2
        # - loop_var
        # - ループカウンタ i
        # - その他の全てのローカル変数
        await async_operation()
        
        # 問題:loop_varが上書きされても、
        # 前のloop_varがまだ参照されている可能性がある

原因3: PyTorchのCUDA操作とasyncioの相性問題

PyTorchのCUDA操作は、本質的に同期的なブロッキング操作です。

これらの操作は、ほんとはCUDAストリームを使って内部的に非同期化されていますが、Pythonレベルでは同期的に見えます。

# PyTorchのCUDA操作の内部動作
tensor_gpu = tensor.cuda()  # この時点でCUDAストリームにエンキュー
output = model(tensor_gpu)  # GPUで実行(Pythonは待機)
result = output.cpu()  # GPU→CPUの転送(同期ポイント)

async関数内でこれらのブロッキング操作を行うと、以下の問題が発生します

メモリ解放タイミングの遅延

async def memory_leak():
    for i in range(1000):
        # GPUメモリを確保
        gpu_tensor = create_gpu_tensor()
        
        # ブロッキング処理
        process_gpu_tensor(gpu_tensor)
        
        # ここでメモリが解放されるべきだが...
        await notify_progress()
        
        # 実際にはまだ解放されていない可能性

イベントループのブロック

async def problematic():
    # この重い処理がイベントループをブロック
    result = heavy_gpu_operation()  # 他のコルーチンが実行できない
    await something()  # ようやく他のコルーチンに制御が移る

原因4: ガベージコレクションのタイミング

Pythonのガベージコレクション(GC)は、通常は参照カウントが0になると即座に動作しますが、循環参照がある場合は定期的なGCを待つ必要があります。

非同期関数では、コルーチンオブジェクト自体が変数への参照を保持するため、GCのタイミングが予測しづらくなります

import gc
import sys

async def gc_timing_issue():
    for i in range(1000):
        obj = LargeObject()
        
        # 参照カウントを確認
        print(f"Reference count: {sys.getrefcount(obj)}")
        # 通常は2(obj変数 + getrefcount内の一時参照)
        
        await something()
        
        # awaitの後では参照カウントが増えている可能性
        print(f"Reference count after await: {sys.getrefcount(obj)}")
        # 3以上になっていることがある(コルーチンフレームの参照)
    
    # 手動でGCを実行しないとメモリが解放されない
    gc.collect()

ということで、asyncにすると、普段意識していなかったコルーチン特有のメモリ管理の複雑さに遭遇するんです。単に「async」キーワードを付与しただけでもだいぶ内部動作変わるんだぞ、と覚えておきます。

解決策の実装:様々なアプローチの詳細

ということで、原因がみえてきたところで、解決策について考えていきたいとおもいます。

解決策1: ストリーミング処理による根本的解決

まず、同期非同期を語る前に、メモリに全データを保持するトンデモ実装は直さなければいけません。処理したデータを即座にディスクに書き出すストリーミング処理に変更しました。案外これが最も効果的な解決策でした。メモリを大量にスタックしながらぐるぐるまわすのはそもそもあまり良くないんです。コルーチンの場合はとくに。

import os
import json
import gc
import tempfile
from pathlib import Path
from datetime import datetime
from typing import List, Optional, Callable

import numpy as np
import torch


def get_transform():
    import torchvision.transforms as T
    return T.Compose([
        T.ToPILImage(),
        T.Resize((512, 512)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

async def process_video_frames_streaming(
    model: torch.nn.Module,
    frames: List[np.ndarray],
    output_dir: Optional[str] = None,
    progress_listener: Optional[Callable] = None,
    batch_size: int = 32,
    device: str = "cuda",
    use_amp: bool = False,
) -> str:

    # 出力ディレクトリの準備
    if output_dir is None:
        output_dir = tempfile.mkdtemp(prefix="video_frames_")
    else:
        os.makedirs(output_dir, exist_ok=True)

    output_path = Path(output_dir)

    # メタデータの保存
    metadata = {
        "total_frames": len(frames),
        "batch_size": batch_size,
        "model_name": model.__class__.__name__,
        "device": device,
        "processed_at": datetime.now().isoformat(),
    }
    with open(output_path / "metadata.json", "w", encoding="utf-8") as f:
        json.dump(metadata, f, ensure_ascii=False, indent=2)

    total_frames = len(frames)

    # 重要: transformはループ外で一度だけ生成
    transform = get_transform()

    # モデルは推論モードに
    model.eval()
    if device == "cuda":
        model.to(device)

    print(f"Starting streaming processing of {total_frames} frames")
    print(f"Output directory: {output_dir}")
    print(f"Batch size: {batch_size}")

    # autocastのdtypeは環境に合わせて(Ampere以降ならfloat16/TF32等)
    autocast_dtype = torch.float16 if use_amp else None

    # バッチごとに処理
    for batch_start in range(0, total_frames, batch_size):
        batch_end = min(batch_start + batch_size, total_frames)
        batch = frames[batch_start:batch_end]

        # ---- バッチ処理開始 ----
        # 推論時はinference_modeが最も軽量(no_gradより更に最適化)
        with torch.inference_mode():
            for local_idx, frame in enumerate(batch):
                global_idx = batch_start + local_idx

                # 前処理(CPU)
                input_tensor = transform(frame)  # CxHxW, float32 on CPU
                # ピン止めするとnon_blocking転送が効く(オプション)
                input_tensor = input_tensor.pin_memory() if device == "cuda" else input_tensor
                input_tensor = input_tensor.unsqueeze(0)  # NxCxHxW

                # GPUへ転送(non_blocking=Trueで待ちを減らす)
                if device == "cuda":
                    input_tensor = input_tensor.to(device, non_blocking=True)

                # 推論(必要なら混合精度)
                if use_amp and device == "cuda":
                    with torch.autocast(device_type="cuda", dtype=autocast_dtype):
                        output = model(input_tensor)
                else:
                    output = model(input_tensor)

                # CPUに戻して保存(即座にメモリから解放)
                result_cpu = output.detach().to("cpu", copy=True)

                frame_filename = output_path / f"frame_{global_idx:06d}.pt"
                # 必要に応じて軽量化(例: 半精度): result_cpu = result_cpu.half()
                torch.save(
                    {
                        "frame_index": global_idx,
                        "output": result_cpu,
                        "input_shape": tuple(frame.shape),
                        "processing_time": datetime.now().isoformat(),
                    },
                    frame_filename,
                )

                # 明示的に開放
                del input_tensor, output, result_cpu

        # バッチ処理後、GPUメモリ/CPUガベコレを整理(頻度は用途に応じて)
        if device == "cuda":
            torch.cuda.empty_cache()
            # torch.cuda.ipc_collect() は新しいPyTorchでは通常不要
        gc.collect()

        # メモリ状況ログ
        memory_stats = None
        if device == "cuda":
            allocated = torch.cuda.memory_allocated() / 1024**3
            reserved = torch.cuda.memory_reserved() / 1024**3
            free = torch.cuda.mem_get_info()[0] / 1024**3
            memory_stats = {"allocated": allocated, "reserved": reserved, "free": free}
            print(
                f"Batch {batch_start}-{batch_end}: "
                f"GPU Memory - Allocated: {allocated:.2f}GB, "
                f"Reserved: {reserved:.2f}GB, Free: {free:.2f}GB"
            )

        # 非同期で進捗通知(バッチ単位)
        if progress_listener:
            progress = batch_end / total_frames * 100.0
            await progress_listener(
                percent=progress,
                message=f"Processed {batch_end}/{total_frames} frames",
                details={
                    "batch_start": batch_start,
                    "batch_end": batch_end,
                    "batch_size": len(batch),
                    "output_dir": str(output_dir),
                    "memory_stats": memory_stats,
                },
            )

        # イベントループに制御を返す(重要)
        await asyncio.sleep(0)

    print(f"Processing complete! Results saved to: {output_dir}")

    # 完了通知
    if progress_listener:
        await progress_listener(
            percent=100.0,
            message="Processing complete",
            details={"total_frames": total_frames, "output_dir": str(output_dir), "success": True},
        )

    return str(output_dir)

解決策2: Thread Poolを使った適切な非同期化

あとは、PyTorchの処理は同期的に保ち、run_in_executorを使って別スレッドで実行しつつ、非同期コールバックでの進捗通知を実現する方法です。

問題:別スレッドから非同期関数を呼べない

ただしこの方法は別スレッドにPyTorchの同期処理を逃がすのは良いものの、別スレッドで実行される同期関数から、非同期のprogress_listenerを呼べないというところでつまづきます

# これはできない
def pytorch_process_sync(frames, progress_listener):
    """別スレッドで実行される同期関数"""
    for i, frame in enumerate(frames):
        result = model(frame)
        # エラー!同期関数から非同期関数は呼べない
        await progress_listener(i)  # SyntaxError

解決:バッチ単位で処理して非同期通知

そこで、ちょっとトリッキーですがバッチ処理の区切りでメインスレッドに戻り、そこで非同期コールバックを呼ぶ設計をためしました

import asyncio
from concurrent.futures import ThreadPoolExecutor

class VideoProcessor:
    """
    Thread Poolを使いつつ、非同期コールバックで進捗通知
    """
    
    def __init__(self, model: torch.nn.Module, device: str = "cuda"):
        self.model = model
        self.device = device
        self.executor = ThreadPoolExecutor(max_workers=1)
    
    def _process_batch_sync(
        self, 
        batch: List[np.ndarray], 
        start_idx: int
    ) -> dict:
        """
        バッチを同期的に処理(別スレッドで実行)
        
        進捗通知はせず、処理結果とメタデータを返すだけ
        """
        results = []
        processing_times = []
        
        for local_idx, frame in enumerate(batch):
            start_time = time.time()
            
            with torch.no_grad():
                # PyTorchの処理
                tensor = self.transform(frame).unsqueeze(0).to(self.device)
                output = self.model(tensor)
                result = output.cpu()
                
                # 保存
                global_idx = start_idx + local_idx
                output_path = f"output/frame_{global_idx:06d}.pt"
                torch.save(result, output_path)
                results.append(output_path)
                
                # メモリクリア
                del tensor, output, result
            
            processing_times.append(time.time() - start_time)
        
        # GPUメモリクリア
        if self.device == "cuda":
            torch.cuda.empty_cache()
        
        # バッチ処理の結果を返す
        return {
            "saved_paths": results,
            "processing_times": processing_times,
            "memory_used": torch.cuda.memory_allocated() / 1024**3 if self.device == "cuda" else 0,
            "thread_id": threading.current_thread().ident
        }
    
    async def process_frames_async(
        self,
        frames: List[np.ndarray],
        progress_listener: Callable,  # 非同期コールバック
        batch_size: int = 32
    ) -> List[str]:
        """
        メインの非同期インターフェース
        
        バッチごとに処理し、各バッチ完了後に非同期で進捗通知
        """
        
        loop = asyncio.get_event_loop()
        total_frames = len(frames)
        all_saved_paths = []
        total_processing_time = 0
        
        for batch_start in range(0, total_frames, batch_size):
            batch_end = min(batch_start + batch_size, total_frames)
            batch = frames[batch_start:batch_end]
            
            # バッチを別スレッドで処理
            batch_result = await loop.run_in_executor(
                self.executor,
                self._process_batch_sync,
                batch,
                batch_start
            )
            
            # 結果を蓄積
            all_saved_paths.extend(batch_result["saved_paths"])
            total_processing_time += sum(batch_result["processing_times"])
            
            # ここでメインスレッドに戻っているので、
            # 非同期コールバックを呼べる!
            progress = batch_end / total_frames * 100
            
            await progress_listener(
                percent=progress,
                message=f"Processed batch {batch_start}-{batch_end}/{total_frames}",
                details={
                    "current_batch": batch_start // batch_size + 1,
                    "total_batches": (total_frames + batch_size - 1) // batch_size,
                    "batch_processing_time": sum(batch_result["processing_times"]),
                    "average_frame_time": sum(batch_result["processing_times"]) / len(batch),
                    "memory_gb": batch_result["memory_used"],
                    "fps": len(all_saved_paths) / total_processing_time if total_processing_time > 0 else 0,
                    "thread_id": batch_result["thread_id"]
                }
            )
        
        # 完了通知
        await progress_listener(
            percent=100,
            message="Processing complete",
            details={
                "total_frames": total_frames,
                "total_time": total_processing_time,
                "average_fps": total_frames / total_processing_time
            }
        )
        
        return all_saved_paths

この方法のメリット

この方法ですと、イベントループがブロックされないという重要な利点があります。
PyTorchの重い処理は別スレッドで実行されるため、メインスレッドは他のWebSocketリクエストやAPIコールを処理し続けることができます。これにより、一人のユーザーが重い動画処理を実行している間も、他のユーザーへのレスポンスは素早く返すことができ、システム全体のレスポンシビリティが保たれます。

非同期処理の中に重たい同期処理が混ざる 実装を避けることができるというわけです

ちなみに、

PyTorchでGPU処理を別スレッドに逃がす場合、スレッド間でCUDAコンテキストを共有しますが、

  • マルチスレッド環境でのCUDA呼び出しは基本的にスレッドセーフではない(公式ドキュメントで注意ありです)
  • 同時に複数スレッドでGPUを叩くとパフォーマンス低下や予期せぬエラーになる可能性があります

ので留意が必要です

使用例

# FastAPIでの使用例
@app.post("/process-video")
async def process_video_endpoint(video_id: str):
    processor = VideoProcessor(model)
    
    # WebSocketで進捗を送信する非同期コールバック
    async def send_progress(percent, message, details):
        await websocket.send_json({
            "type": "progress",
            "video_id": video_id,
            "percent": percent,
            "message": message,
            "details": details,
            "timestamp": datetime.now().isoformat()
        })
        
        # データベースにも記録(非同期)
        await db.execute(
            "UPDATE video_tasks SET progress = ? WHERE id = ?",
            (percent, video_id)
        )
    
    # 処理実行(非同期コールバック付き)
    results = await processor.process_frames_async(
        frames=load_video_frames(video_id),
        progress_listener=send_progress,
        batch_size=32
    )
    
    return {"status": "complete", "results": results}

解決策3: より細かい進捗通知が必要な場合

もしバッチ単位ではなく、どうしてもフレーム単位での進捗通知が必要な場合は、asyncio.run_coroutine_threadsafeを使う方法もあります

class DetailedProgressProcessor:
    """
    フレーム単位の詳細な進捗通知を実現
    """
    
    def _process_with_detailed_progress(
        self,
        frames: List[np.ndarray],
        loop: asyncio.AbstractEventLoop,
        async_progress_listener: Callable
    ) -> List[str]:
        """
        別スレッドで実行され、フレームごとに進捗通知
        """
        results = []
        total = len(frames)
        
        for i, frame in enumerate(frames):
            # PyTorchの処理
            with torch.no_grad():
                tensor = self.transform(frame).to(self.device)
                output = self.model(tensor)
                result = output.cpu()
            
            # 保存
            path = f"output/frame_{i:06d}.pt"
            torch.save(result, path)
            results.append(path)
            
            # 別スレッドから非同期関数を呼ぶ魔法!
            future = asyncio.run_coroutine_threadsafe(
                async_progress_listener(
                    percent=(i + 1) / total * 100,
                    message=f"Processing frame {i+1}/{total}",
                    details={"frame_index": i}
                ),
                loop  # メインスレッドのイベントループ
            )
            
            # 必要に応じて結果を待つ(オプション)
            try:
                future.result(timeout=0.1)  # 100ms以内に完了を期待
            except TimeoutError:
                # 進捗通知が遅れても処理は継続
                pass
            
            # 定期的にメモリクリア
            if i % 10 == 0 and self.device == "cuda":
                torch.cuda.empty_cache()
        
        return results
    
    async def process_with_detailed_progress(
        self,
        frames: List[np.ndarray],
        progress_listener: Callable
    ) -> List[str]:
        """
        詳細な進捗通知付きの処理
        """
        loop = asyncio.get_event_loop()
        
        # 別スレッドで処理を実行
        results = await loop.run_in_executor(
            None,
            self._process_with_detailed_progress,
            frames,
            loop,  # 現在のイベントループを渡す
            progress_listener
        )
        
        return results

「torch.cuda.empty_cache()」呼びだしの注意点

torch.cuda.empty_cacheで留意すべき点は、これはコード内に「定期的にメモリクリア」と書いていますが、これは物理的にメモリをOSに返すわけではなく、PyTorchのキャッシュ領域を空にするだけです。
ですので、空き容量を強制的に増やす場合に有効ですが、呼びすぎると逆に再確保のコストが増える可能性もあります。

パフォーマンス比較

実際のテスト結果(非同期コールバックを使用):

実装方法 GPU メモリ 進捗通知の頻度 実装の複雑さ
元の非同期実装(問題あり) 増加→OOM フレーム単位 ★☆☆☆
ストリーミング(解決策1) 3.2GB一定 バッチ単位 ★★☆☆
Thread Pool(解決策2) 3.5GB一定 バッチ単位 ★★★☆

まとめ

非同期プログラミングとGPU処理の落とし穴

今回の経験から得た最も重要な学びは、同期的な処理を安易にasyncにすると、コルーチン特有のメモリ管理の罠にはまるということです。通常の同期関数なら、ループごとに変数が上書きされて自然にメモリが解放されますが、async関数では、awaitを挟むと、その時点のローカル変数のうち、後続コードから参照されうるものはコルーチンオブジェクトに保持されるため想定外のメモリ保持が発生します。これは「非同期にすれば進捗通知できる」という単純な発想では見落としがちな、しかし致命的な問題でした。

また、GPUメモリを継続的に確保し続けると、ガベージコレクションのタイミングとCUDAメモリ管理の相性問題でメモリリークのような状態になることも重要なきづきでした。PyTorchのCUDA操作は本質的に同期的であり、これを無理にasync関数内で実行すると、メモリ解放のタイミングが狂い、大容量のVRAMでさえ枯渇してしまいました。

実践的な解決アプローチ

この問題に直面したら、まずはストリーミング処理でメモリを分割することから始めるのが良いとかんjました。全フレームを一度に処理するのではなく、バッチ単位で処理して即座にディスクに保存することで、メモリの遅延解放が危険なレベルに達することを防げます。今回の要件の場合、この対処だけでUT,負荷テストもすべて通り問題は解決しました。おかげで既存のコードへの変更も最小限で済みます。

万が一それでも要件を満たせない場合は、Thread Poolとrun_in_executorを使った処理の分離を検討すると良いでしょう。PyTorchの処理を別スレッドに分離することで、イベントループのブロッキングを防ぎつつ、非同期コールバックも正しく動作させることができます。実装はやや複雑になりますが、より柔軟な制御が可能になるとおもいます

最後に、明示的なメモリ管理を忘れずに実装することが重要です。

一見冗長に見えても、安定した動作のために必要な「保険」だと考えdel文での変数削除とtorch.cuda.empty_cache()の定期的な実行をやりましょう

これらの対策を段階的に適用することで、「今風な非同期処理」と「重いGPU処理」を安全に共存させることができます。単に「async/awaitを使えばいい」という表面的な理解では解決できない問題ですが、適切なアプローチを選べば今回の事例のように必ず解決できるとおもいます。

それでは、今回も最後までお読みいただきありがとうございました!また次回お会いしましょう!

Read more

ゼロトラスト時代のLLMセキュリティ完全ガイド:ガーディアンエージェントへの進化を見据えて

ゼロトラスト時代のLLMセキュリティ完全ガイド:ガーディアンエージェントへの進化を見据えて

こんにちは! 今日はセキュリティの新たな考え方「ゼロトラスト」とLLMを中心としたAIセキュリティについて解説いたします! はじめに 3つのパラダイムシフトが同時に起きている いま、企業のIT環境では3つの大きな変革が起ころうとしています。 1つ目は「境界防御からゼロトラストへ」というセキュリティモデルの転換。 2つ目は「LLMの爆発的普及」による新たなリスクの出現。 そして3つ目は「AIエージェント時代の到来」とそれに伴う「ガーディアンエージェント」という新概念の登場です。 これらは別々の出来事のように見えて、実は密接に関連しています。本記事では、この3つの変革がどのように結びつき、企業がどのような対策を取るべきかを解説いたします 目次 1. はじめに:3つのパラダイムシフトが同時に起きている 2. 第1の変革:ゼロトラストという新しいセキュリティ思想 3. 第2の変革:LLM時代の到来とその影響 4. 第3の変革:AIエージェントとガーディアンエージェント 5. 3つの変革を統合する:実践的なアプローチ 6. 実装のベストプラクティス 7. 日本

By Qualiteg コンサルティング
発話音声からリアルなリップシンクを生成する技術 第4回:LSTMの学習と限界、そしてTransformerへ

発話音声からリアルなリップシンクを生成する技術 第4回:LSTMの学習と限界、そしてTransformerへ

1. 位置損失 (L_position) - 口の形の正確さ 時間 口の開き 正解 予測 L_position = Σᵢ wᵢ × ||y_pred - y_true||² 各時点での予測値と正解値の差を計算。重要なパラメータ(顎の開き、口の開き)には大きな重みを付けます。 jaw_open: ×2.0 mouth_open: ×2.0 その他: ×1.0 2. 速度損失 (L_velocity) - 動きの速さ 時間 速度 t→t+1 v = y[t] -

By Qualiteg 研究部, Qualiteg コンサルティング
大企業のAIセキュリティを支える基盤技術 - 今こそ理解するActive Directory 第1回 基本概念の理解

大企業のAIセキュリティを支える基盤技術 - 今こそ理解するActive Directory 第1回 基本概念の理解

こんにちは! 今回から数回にわたり Active Directory について解説してまいります。 Active Directory(AD:アクティブディレクトリー)は、Microsoft が開発したディレクトリサービスであり、今日の大企業における IT インフラストラクチャーにおいて、もはやデファクトスタンダードと言っても過言ではない存在となっており、組織内のユーザー、コンピューター、その他のリソースを一元的に管理するための基盤として広く採用されています。 AIセキュリティの現実:単独では機能しない ChatGPTやClaudeなどの生成AIが企業に急速に普及する中、「AIセキュリティ」という言葉が注目を集めています。情報漏洩の防止、不適切な利用の検知、コンプライアンスの確保など、企業が取り組むべき課題は山積みです。 しかし、ここで注意しなければいけない事実があります。それは、 AIセキュリティソリューションは、それ単体では企業環境で限定的な効果しか期待できない ということです。 企業が直面する本質的な課題 AIセキュリティツールを導入する際、企業のIT部門

By Qualiteg コンサルティング
自治体総合フェア2025に出展いたしました

自治体総合フェア2025に出展いたしました

こんにちは! 先週開催された自治体総合フェア2025に出展いたしましたので、写真で様子をふりかえりながら簡単にレポートいたします! 自治体総合フェア2025 開催概要 自治体総合フェアは公民連携の総合展示会で今年はは2025/7/16~18まで東京ビッグサイトにて開催されました。 株式会社 Qualiteg の出展内容 当社からは4名体制でAIアバター動画生成サービス「MotionVox™」をはじめ、LLMセキュリティソリューション「LLM-Audit™」、企業・自治体向けセキュアチャットサービス「Bestllam🄬」の展示をさせていただきました。 デモ内容 当日のご紹介内容の一部をご紹介いたします MotionVox™ MotionVox は、まるで、本物の人間のようなフォトリアリスティックなアバター動画を生成するサービスです。 これまでから機能を大幅拡張した MotionVox 2.0 をお披露目いたしました。 MotionVox 2.0では、以下のようなフィーチャーを追加いたしました! * まるで人間! リアリティをさらに向上したアバター *

By Qualiteg ビジネス開発本部 | マーケティング部