Model Context Protocol完全実装ガイド 2025- 仕様変遷から最新Streamable HTTPまでの全て

Model Context Protocol完全実装ガイド 2025- 仕様変遷から最新Streamable HTTPまでの全て

こんにちは!
現在、LLM業界で破竹の勢いでひろまっているMCPについて、本日はとくに実装面について解説していきたいとおもいます。

MCP、MCPとひとくちにいっていますが、実は短期間でけっこう「標準」とよばれる仕様が変化しておりますので、仕様のバリエーションを順を追って解説しつつ、実際に実装をしていきたいとおもいます。


さて、MCPですが、2024年後半、Anthropicが発表したModel Context Protocol(MCP)は、AI分野における重要な転換点となりました。

従来、各AIベンダーが独自に実装していたツール呼び出し機能(tool useと呼びます)を標準化し、AIモデルと外部システムの連携を統一的に扱える仕組みを提供しました

本記事で、MCPの誕生から現在に至るまでの技術的変遷を詳細に追いながら、2025年時点での最適な実装方法を完全なソースコードと共に解説します。特に、仕様の変化に振り回されがちな実装者の視点から、なぜ現在の形に収束したのか、そして今後どのような実装アプローチを取るべきかを明確にしていきます。

第1章 MCPが解決しようとした問題

AIツール呼び出しの混沌とした状況

2024年までのAI開発現場では、各モデルプロバイダーが独自のツール呼び出し仕様を採用していました。OpenAIはFunction Calling、AnthropicやMetaはTool Use、GoogleはFunction Declarationsという具合に、名称も実装方法もバラバラでした。

開発者は同じ「現在時刻を取得する」という単純な機能でも、LLMプロバイダーごとに異なる実装を用意する必要がありました。

まあ、この新しい技術がではじめたとき、相互運用性はあとまわしっていうのはソフトウェア業界では「あるある」なんですが、

こうした状況は単に面倒なだけでなく、AIエージェントの可能性を制限していていました。複数のAIモデルを組み合わせたシステムや、モデルを切り替え可能なアプリケーションの開発は困難で、ツールの再利用性も著しく低い状態でした。

MCPという解答

MCPは、この混沌とした状況に対する明確な解答でした。JSON-RPC 2.0を基盤とし、ツール定義からセッション管理、ストリーミング通信まで、AIとツールの連携に必要なすべてを標準化しました。これにより、一度MCPサーバーを実装すれば、Claude Desktop、VS Code、その他のMCP対応クライアントから同じツールを利用できるようになりました。

第2章 仕様の変遷と技術的背景

stdio時代 - ローカル実行における理想的な設計

初期のMCPは、標準入出力(stdio)を使用した通信を前提としていました。これはプロセス間通信の最も基本的な形であり、ローカル環境では効率的でした。

# 初期のstdio実装の概念
import sys
import json

while True:
    line = sys.stdin.readline()
    request = json.loads(line)
    # リクエスト処理
    response = handle_request(request)
    sys.stdout.write(json.dumps(response) + "\n")
    sys.stdout.flush()

しかし、この設計には根本的な制約があります。

そうですstdioはプロセスの親子関係を前提とした通信方式であり、ネットワークを介した通信には本質的に適していません。

そもそも stdio を通信方式だっておもってる昨今のエンジニアってどれくらいいるんでしょうか。私みたいに黒いコンソールしかなかった時代にコード書いていた人やいまでもC言語現役ですっていう人以外は stdio とかあえて意識していないきもするのですが、

まあ、開発者がVS CodeやローカルのClaude Desktopから使う分にはとくに問題ありませんでした。

ただ、複数のユーザーが同時にアクセスするWebサービスや、分散システムの一部として動作させることは(なんで?ってかんじですが)設計上想定されていませんでした。

HTTP+SSE時代 - ネットワーク対応への第一歩

次に登場したのが、HTTP POSTとServer-Sent Events(SSE)を組み合わせた方式でした。クライアントからサーバーへはHTTP POST、サーバーからクライアントへはSSEという非対称な通信モデルです。

この方式には一定の合理性がありました。SSEはブラウザでネイティブサポートされており、リアルタイムな進捗通知も可能でした。しかし、実装の複雑さは増大しました。セッション管理、エンドポイントの分離、接続の同期など、開発者が考慮すべき点が多すぎました。
これはSSEならではのめんどくささです。

Streamable HTTP時代 - 成熟した統一仕様

そして、2025年現在、MCPはStreamable HTTPという形に収束しています。単一のHTTPエンドポイントで、通常のリクエスト/レスポンスとストリーミングの両方を処理できる洗練された設計です。

# Streamable HTTPの基本概念
@app.post("/mcp")
async def mcp_endpoint(request: Request):
    if "text/event-stream" in request.headers.get("accept", ""):
        # ストリーミングレスポンス
        return StreamingResponse(generate_sse())
    else:
        # 通常のJSONレスポンス
        return JSONResponse(handle_request(await request.json()))

この統一により、実装の複雑さが大幅に軽減され、同時にスケーラビリティも向上しました。各トランスポート方式の特性を整理すると、次のような変遷が見えてきます。

MCPトランスポート方式の変遷

技術要素 ローカルに閉じた実装 リモート化
stdio HTTP+SSE Streamable HTTP
登場時期 初期(2024年後半) 過渡期(2024年末) 現在(2025年)
通信方式 標準入出力
プロセス間直接通信
HTTP POST(C→S)
SSE(S→C)
非対称な2方向通信
単一HTTPエンドポイント
Acceptヘッダーで自動切替
プロトコル JSON-RPC 2.0
改行区切り
JSON-RPC 2.0
POST/SSE分離
JSON-RPC 2.0
統合型
実装複雑度 低(シンプルループ) 高(2エンドポイント管理) 中(SDKで簡素化)
ストリーミング 改行ベース SSEイベント SSE/chunked自動選択
セッション管理 プロセスライフサイクル URLパラメータ(手動管理) Mcp-Session-Idヘッダー(標準化)
スケーラビリティ N/A(ローカルのみ) △ 制限あり(SSE接続管理が課題) ◎ 優秀(ステートレス可能)
主な用途 VS Code拡張
CLI ツール
ローカルIDE
実験的実装
小規模デモ
プロトタイプ
本番Webサービス
エンタープライズAPI
SaaS製品
メリット 最高速
実装が単純
セキュア
ネットワーク対応
ブラウザ互換
既存技術の組み合わせ
統一的実装
クラウドネイティブ
運用ツール対応
デメリット リモート不可
Webサービス化困難
実装が複雑
デバッグ困難
スケーリング課題
HTTPオーバーヘッド
ローカルでは過剰

この表から明確なのは、ローカル実行に特化したstdioから始まり、リモート化の要求に応えるためにHTTP+SSEという過渡的な実装を経て、最終的にStreamable HTTPという統一的なソリューションに収束したということです。

現在では、用途に応じて「ローカル専用ならstdio、リモート化の可能性があるならStreamable HTTP」という明確な使い分けが確立されました。

ストリーミングの本質的な意義

さて、MCPでは、そもそもなんでストリーミングがうれしいんでしょうか。

ストリーミング形式で返信を返すことの意義はなんでしょか。

まずはそこを解説していきます。

「ストリーミング形式」、これは単なる通信方式の選択にとどまらない深い意味を持っています。

まず、従来の同期的な通信モデルと、ストリーミングによる非同期モデルの違いを図でみてみましょう。

まず同期モデルはこんなかんじでしょうか。

【従来の同期モデル】

User ──────> Web UI ──────> LLM ──────> MCP Server
                               │             │
                               │             ▼
                               │         [処理実行]
                               │           (長時間)
                               │             │
                               ◄─────────────┘
                               │         (待機中...)
                               ▼
                           [応答生成]
                               │
User ◄────── Web UI ◄──────────┘

問題点:LLMは処理が完了するまで何も知らない状態で待機

次はストリーミングモデルです

【ストリーミングモデル】

User ──────> Web UI ──────> LLM ──────> MCP Server
     │          │             │             │
     │          │             │             ▼
     │          │             │         [処理開始]
     │          │             │             │
     │          │             ◄─ stream ────┤ "処理開始しました"
     │          │             │             │
     │          │         [中間応答]        ▼
     │          ◄─ stream ────┤         [処理中 30%]
     │      "処理中..."       │             │
     ◄──────────┤             ◄─ stream ────┤ "30%完了"
                │             │             │
                │         [戦略調整]        ▼
                │             │         [処理中 60%]
                │             ◄─ stream ────┤ "60%完了"
     │          ◄─ stream ────┤             │
     │      "もうすぐ..."     │             ▼
     ◄──────────┤             │         [処理完了]
                │             ◄─ stream ────┤ "結果: XXX"
                │             │             │
                │         [最終応答]        │
                ◄─ stream ────┤             │
     │      "完了しました"    │             │
     ◄──────────┘             │             │

利点:全ての参加者がリアルタイムで状況を共有

AIモデルを中心に据えたアプリケーション開発において、モデルが外部ツールと対話しながら段階的に処理を進めていくためには、途中経過を逐次返す仕組みが不可欠です。

AIは「いま何が起きているのか」「どれだけ処理が進んでいるのか」を把握しながら応答生成を続ける必要があり、結果だけがまとまって返ってくる同期型のリクエスト方式では、多くの場合その要件を満たせません。

【ストリーミングによる適応的処理】

LLM ──────> MCP Server (データベース検索)
 │              │
 │              ▼ stream: "1000件中100件検索済み..."
 │◄─────────────┤
 │              │
 ├─判断: 想定より時間がかかっている
 │       
 ├──────────> MCP Server (検索条件を絞り込み)
 │              │
 │              ▼ stream: "条件変更: 50件に絞り込み"
 │◄─────────────┤
 │              │
 │              ▼ stream: "検索完了"
 │◄─────────────┤
 │
 ▼
[ユーザーへの応答生成]
"検索範囲を最適化して、最も関連性の高い
 50件のデータから結果を取得しました"

ストリーミングが存在する最大の意味は、モデル側が外部処理を"待ち続ける"のではなく、"処理の進行状況を観察しながら応答生成を継続できる"という点にあります。AIモデルは厳密な意味での非同期処理を持ちません。外部ツールの実行が長時間に及ぶ場合でも、モデルはレスポンスをブロックしたまま停止するしかありませんでした。しかし、ストリーミングが導入されることで、外部のMCPサーバーが部分的な進捗や中間データを都度AIに返し、AIはそれを受け取るたびに次の判断を行えるようになりました。

【ユーザー体験の改善】

従来:
User ──> Web UI ──> [ローディング表示...] ──> (不安な待機時間) ──> 結果

ストリーミング:
User ──> Web UI ┬─> "データベースに接続中..."
                ├─> "1000件のレコードを検索中..."
                ├─> "関連データを分析中..."
                ├─> "結果を整形中..."
                └─> "完了: [詳細な結果]"

ユーザーは各ステップで何が起きているか把握でき、
処理の透明性と信頼性が大幅に向上

また、ストリーミング返信はアプリケーションにとっても決定的に重要です。通常のサーバー実装では、長い処理時間を必要とするタスクを扱う際、ユーザーは「レスポンスが返ってくるまで待つ」という不透明な状態に置かれていました。AIを中心に据えたUIでは、これが体験の質に直結します。MCPサーバーがストリーミングで段階的に結果やログを返すことで、アプリケーションはそれを逐次UIに反映でき、「AIが何をしているのか」が可視化されます。

たとえば、外部ツールで動画をつくってるとき、動画がどのくらい完成しているのか進捗67%、みたいな情報をWebUIに返してやるとUXが向上しますよね。

さらに、この仕組みはAIエージェントの高度化にも直結しています。外部処理の結果がストリームとして流れ込むということは、AIモデルが「部分結果を見て戦略を変える」ことを初めて可能にします。これがなんともAI時代っていう感じですよね。

【エラーからの回復】

LLM ──────> MCP Server (API呼び出し)
 │              │
 │              ▼ stream: "APIに接続中..."
 │◄─────────────┤
 │              │
 │              ▼ stream: "エラー: レート制限"
 │◄─────────────┤
 │              
 ├─判断: 代替手段を使用
 │       
 ├──────────> MCP Server (キャッシュから取得)
 │              │
 │              ▼ stream: "キャッシュから取得中..."
 │◄─────────────┤
 │              │
 │              ▼ stream: "成功: データ取得完了"
 │◄─────────────┤
 │
 ▼
[ユーザーへの応答]
"最新データの取得に失敗したため、
 1時間前のキャッシュデータを使用しました"

長い処理の途中で異常が検出された場合にAIが即座に代替手段を提案したり、処理開始後に追加の指示が必要な場面を判断したりすることが可能になります。同期的に一度に返ってくるレスポンスでは、こうした逐次判断は本質的に不可能でした。

総じて、MCPサーバーがストリーミング形式の返信を返す意義は、単に効率を高めるためだけの技術ではありません。AIモデル、MCPサーバー、そしてアプリケーションの三者が同じ時間軸を共有し、逐次的な情報のやり取りを通じて「対話しながら仕事を進める」という新しい実行モデルを成立させるための基盤です。そして、Streamable HTTPはこのストリーミングの意義を最大限に活かしながら、実装の複雑さを最小限に抑えることに成功した、MCPの成熟形と言えるでしょう。

第3章 完全実装 - FastMCPによる標準準拠サーバー

なぜFastMCPを使うべきか

MCPの仕様は一見シンプルですが、実際に仕様準拠のサーバーを実装しようとすると、考慮すべき点が膨大になります。バッチリクエスト処理、通知専用メッセージの204レスポンス、Mcp-Session-Idヘッダーの管理など、細かい仕様が多数存在します。

FastMCPは、これらすべてを吸収し、開発者がビジネスロジックに集中できる環境を提供してくれます。

完全なMCPサーバー実装

以下が、FastMCPを使用した本番環境対応のMCPサーバー実装です。

#!/usr/bin/env python3
"""
Production-Ready MCP DateTime Server
本番環境で使用可能な完全なMCPサーバー実装
"""
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import pytz
import logging
from mcp.server.fastmcp import FastMCP

# ロギング設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger("mcp-datetime-server")

# タイムゾーン設定
JST = pytz.timezone('Asia/Tokyo')

# FastMCPインスタンス作成
mcp = FastMCP(
    name="datetime-server-production",
    version="1.0.0"
)

# サーバーメタデータ
mcp.metadata = {
    "description": "Production-ready datetime server with JST support",
    "author": "Your Organization",
    "timezone": "Asia/Tokyo",
    "supported_languages": ["ja", "en"]
}


@mcp.tool(
    description="日本標準時(JST)で現在の時刻を取得します"
)
async def get_current_time(format: str = "standard") -> str:
    """
    現在時刻を指定された形式で返す
    
    Args:
        format: 出力形式 (standard, iso, unix)
    """
    now = datetime.now(JST)
    
    if format == "iso":
        return now.isoformat()
    elif format == "unix":
        return str(int(now.timestamp()))
    else:
        return f"現在の時刻は {now.strftime('%H:%M:%S')} です"


@mcp.tool(
    description="本日の日付を日本語形式で取得します"
)
async def get_current_date(
    include_era: bool = False
) -> str:
    """
    現在の日付を返す
    
    Args:
        include_era: 和暦を含めるか
    """
    now = datetime.now(JST)
    date_str = now.strftime("%Y年%m月%d日")
    day_of_week = ["月", "火", "水", "木", "金", "土", "日"][now.weekday()]
    
    result = f"今日は {date_str} ({day_of_week}曜日) です"
    
    if include_era:
        # 令和の計算(2019年5月1日から)
        reiwa_start = datetime(2019, 5, 1, tzinfo=JST)
        if now >= reiwa_start:
            reiwa_year = now.year - 2018
            result += f" (令和{reiwa_year}年)"
    
    return result


@mcp.tool(
    description="指定された日数後の日付を計算します"
)
async def calculate_future_date(
    days: int,
    from_date: Optional[str] = None
) -> str:
    """
    未来または過去の日付を計算
    
    Args:
        days: 日数(負の値で過去)
        from_date: 起点日(ISO形式、省略時は今日)
    """
    if from_date:
        base_date = datetime.fromisoformat(from_date).replace(tzinfo=JST)
    else:
        base_date = datetime.now(JST)
    
    target_date = base_date + timedelta(days=days)
    date_str = target_date.strftime("%Y年%m月%d日")
    day_of_week = ["月", "火", "水", "木", "金", "土", "日"][target_date.weekday()]
    
    if days > 0:
        return f"{days}日後は {date_str} ({day_of_week}曜日) です"
    elif days < 0:
        return f"{abs(days)}日前は {date_str} ({day_of_week}曜日) でした"
    else:
        return f"指定日は {date_str} ({day_of_week}曜日) です"


@mcp.tool(
    description="2つの日付間の日数を計算します"
)
async def calculate_days_between(
    date1: str,
    date2: str
) -> str:
    """
    日付間の日数差を計算
    
    Args:
        date1: 最初の日付(ISO形式)
        date2: 2番目の日付(ISO形式)
    """
    d1 = datetime.fromisoformat(date1).replace(tzinfo=JST)
    d2 = datetime.fromisoformat(date2).replace(tzinfo=JST)
    
    diff = abs((d2 - d1).days)
    
    if d1 < d2:
        return f"{date1} から {date2} まで {diff}日間です"
    elif d1 > d2:
        return f"{date2} から {date1} まで {diff}日間です"
    else:
        return "同じ日付です"


@mcp.resource(
    uri="timezone://current",
    name="Current Timezone Information",
    description="現在のタイムゾーン詳細情報",
    mime_type="application/json"
)
async def get_timezone_resource() -> Dict[str, Any]:
    """タイムゾーン情報をリソースとして提供"""
    now = datetime.now(JST)
    utc_now = datetime.now(pytz.UTC)
    
    return {
        "timezone": "Asia/Tokyo",
        "abbreviation": "JST",
        "offset": "+09:00",
        "offset_seconds": 32400,
        "current_jst": now.isoformat(),
        "current_utc": utc_now.isoformat(),
        "is_dst": False
    }


@mcp.on_initialize
async def on_initialize(params: Dict[str, Any]) -> None:
    """サーバー初期化時の処理"""
    client_info = params.get("clientInfo", {})
    logger.info(f"MCP Server initialized by {client_info.get('name', 'unknown')}")
    logger.info(f"Protocol version: {params.get('protocolVersion')}")


@mcp.on_error
async def on_error(error: Exception) -> None:
    """エラー発生時の処理"""
    logger.error(f"MCP Server error: {error}", exc_info=True)


def main():
    """メインエントリーポイント"""
    import argparse
    
    parser = argparse.ArgumentParser(
        description="Production MCP DateTime Server"
    )
    parser.add_argument(
        "--port", type=int, default=8080,
        help="Server port (default: 8080)"
    )
    parser.add_argument(
        "--host", type=str, default="0.0.0.0",
        help="Server host (default: 0.0.0.0)"
    )
    
    args = parser.parse_args()
    
    print(f"Starting MCP Server on {args.host}:{args.port}")
    print(f"Endpoint: http://{args.host}:{args.port}/mcp")
    
    # FastMCPがすべての処理を自動化
    mcp.run(
        transport="streamable-http",
        host=args.host,
        port=args.port
    )


if __name__ == "__main__":
    main()

このコードの重要な点は、FastMCPのデコレータを使用することで、仕様準拠が自動的に保証されることです。開発者はツールのロジックに集中でき、プロトコルの詳細を意識する必要がありません。

第4章 クライアント実装 - 各LLMプロバイダーとの統合

MCPサーバーが完成したら、次はクライアント側の実装です。各LLMプロバイダーのAPIとMCPを統合する必要があります。

Claude(Anthropic)クライアント実装

Claudeは、MCPの生みの親であるAnthropicのモデルだけあって、最も自然にMCPと統合できます。

import asyncio
from typing import List, Dict, Any
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from anthropic import AsyncAnthropic

class ClaudeMCPIntegration:
    def __init__(self, mcp_url: str, api_key: str):
        self.mcp_url = mcp_url
        self.anthropic = AsyncAnthropic(api_key=api_key)
        self.session = None
        
    async def connect(self):
        """MCPサーバーに接続"""
        transport = await sse_client(self.mcp_url)
        self.session = ClientSession(
            transport.read_stream,
            transport.write_stream
        )
        await self.session.__aenter__()
        await self.session.initialize()
        
    async def ask_claude_with_tools(self, prompt: str) -> str:
        """MCPツールを使用してClaudeに質問"""
        # MCPツール一覧を取得
        tools_result = await self.session.list_tools()
        
        # Claude API形式に変換
        claude_tools = []
        for tool in tools_result.tools:
            claude_tools.append({
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.inputSchema
            })
        
        # Claudeに問い合わせ
        response = await self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            tools=claude_tools,
            messages=[{"role": "user", "content": prompt}]
        )
        
        # ツール呼び出しの処理
        for content in response.content:
            if content.type == "tool_use":
                # MCPツールを実行
                result = await self.session.call_tool(
                    content.name,
                    content.input or {}
                )
                
                # 結果をClaudeに返して最終回答を生成
                final_response = await self.anthropic.messages.create(
                    model="claude-3-5-sonnet-20241022",
                    max_tokens=1024,
                    messages=[
                        {"role": "user", "content": prompt},
                        {"role": "assistant", "content": response.content},
                        {
                            "role": "user",
                            "content": [{
                                "type": "tool_result",
                                "tool_use_id": content.id,
                                "content": result.content[0].text
                            }]
                        }
                    ]
                )
                
                return final_response.content[0].text
        
        return response.content[0].text

OpenAI GPTクライアント実装

OpenAIのFunction Callingも、MCPツールとシームレスに統合できます。

from openai import AsyncOpenAI
import json

class OpenAIMCPIntegration:
    def __init__(self, mcp_url: str, api_key: str):
        self.mcp_url = mcp_url
        self.openai = AsyncOpenAI(api_key=api_key)
        self.session = None
    
    async def connect(self):
        """MCPサーバーに接続"""
        transport = await sse_client(self.mcp_url)
        self.session = ClientSession(
            transport.read_stream,
            transport.write_stream
        )
        await self.session.__aenter__()
        await self.session.initialize()
    
    async def ask_gpt_with_tools(self, prompt: str) -> str:
        """MCPツールを使用してGPTに質問"""
        # MCPツール一覧を取得
        tools_result = await self.session.list_tools()
        
        # OpenAI形式に変換
        openai_tools = []
        for tool in tools_result.tools:
            openai_tools.append({
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema
                }
            })
        
        # GPTに問い合わせ
        response = await self.openai.chat.completions.create(
            model="gpt-4-turbo-preview",
            tools=openai_tools,
            messages=[{"role": "user", "content": prompt}]
        )
        
        message = response.choices[0].message
        
        # ツール呼び出しの処理
        if message.tool_calls:
            tool_results = []
            for tool_call in message.tool_calls:
                # MCPツールを実行
                result = await self.session.call_tool(
                    tool_call.function.name,
                    json.loads(tool_call.function.arguments)
                )
                
                tool_results.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result.content[0].text
                })
            
            # 最終回答を生成
            final_response = await self.openai.chat.completions.create(
                model="gpt-4-turbo-preview",
                messages=[
                    {"role": "user", "content": prompt},
                    message,
                    *tool_results
                ]
            )
            
            return final_response.choices[0].message.content
        
        return message.content

Google Geminiクライアント実装

Geminiも同様のパターンで実装できます。

import google.generativeai as genai

class GeminiMCPIntegration:
    def __init__(self, mcp_url: str, api_key: str):
        self.mcp_url = mcp_url
        genai.configure(api_key=api_key)
        self.session = None
        self.model = None
    
    async def connect(self):
        """MCPサーバーに接続"""
        transport = await sse_client(self.mcp_url)
        self.session = ClientSession(
            transport.read_stream,
            transport.write_stream
        )
        await self.session.__aenter__()
        await self.session.initialize()
        
        # ツール定義を取得してモデルを初期化
        tools_result = await self.session.list_tools()
        
        functions = []
        for tool in tools_result.tools:
            functions.append({
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.inputSchema
            })
        
        self.model = genai.GenerativeModel(
            model_name="gemini-1.5-pro",
            tools=functions
        )
    
    async def ask_gemini_with_tools(self, prompt: str) -> str:
        """MCPツールを使用してGeminiに質問"""
        response = self.model.generate_content(prompt)
        
        # 関数呼び出しの処理
        for part in response.candidates[0].content.parts:
            if hasattr(part, 'function_call'):
                fc = part.function_call
                
                # MCPツールを実行
                result = await self.session.call_tool(
                    fc.name,
                    dict(fc.args)
                )
                
                # 結果を返して最終回答を生成
                function_response = genai.protos.FunctionResponse(
                    name=fc.name,
                    response={"result": result.content[0].text}
                )
                
                final_response = self.model.generate_content(
                    contents=[
                        {"role": "user", "parts": [{"text": prompt}]},
                        {"role": "model", "parts": response.candidates[0].content.parts},
                        {"role": "user", "parts": [{"function_response": function_response}]}
                    ]
                )
                
                return final_response.candidates[0].content.parts[0].text
        
        return response.candidates[0].content.parts[0].text

第5章 実装上の重要な考慮点

エラーハンドリングとリトライ

本番環境では、ネットワークの一時的な問題やサーバーの過負荷に対処する必要があります。

import asyncio
from typing import TypeVar, Callable
import random

T = TypeVar('T')

async def retry_with_exponential_backoff(
    func: Callable[[], T],
    max_retries: int = 3,
    base_delay: float = 1.0
) -> T:
    """指数バックオフによるリトライ実装"""
    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            logger.warning(
                f"Attempt {attempt + 1} failed: {e}. "
                f"Retrying in {delay:.2f} seconds..."
            )
            await asyncio.sleep(delay)

セッション管理とコネクションプーリング

長時間稼働するサービスでは、適切なセッション管理が重要です。

class MCPConnectionPool:
    """MCPサーバーへの接続プール管理"""
    
    def __init__(self, mcp_url: str, max_connections: int = 10):
        self.mcp_url = mcp_url
        self.max_connections = max_connections
        self.connections = asyncio.Queue(maxsize=max_connections)
        self.lock = asyncio.Lock()
    
    async def acquire(self) -> ClientSession:
        """接続を取得"""
        try:
            return self.connections.get_nowait()
        except asyncio.QueueEmpty:
            async with self.lock:
                if self.connections.qsize() < self.max_connections:
                    # 新しい接続を作成
                    transport = await sse_client(self.mcp_url)
                    session = ClientSession(
                        transport.read_stream,
                        transport.write_stream
                    )
                    await session.__aenter__()
                    await session.initialize()
                    return session
                else:
                    # 接続が空くまで待機
                    return await self.connections.get()
    
    async def release(self, session: ClientSession):
        """接続を解放"""
        await self.connections.put(session)

パフォーマンス最適化

大規模なデプロイメントでは、パフォーマンスの最適化が重要です。

import functools
from typing import Any, Dict
import hashlib
import pickle

class MCPResponseCache:
    """MCPレスポンスのキャッシュ実装"""
    
    def __init__(self, ttl_seconds: int = 300):
        self.cache: Dict[str, tuple[Any, float]] = {}
        self.ttl_seconds = ttl_seconds
    
    def _generate_key(self, tool_name: str, arguments: Dict[str, Any]) -> str:
        """キャッシュキーの生成"""
        data = f"{tool_name}:{pickle.dumps(arguments, protocol=pickle.HIGHEST_PROTOCOL)}"
        return hashlib.sha256(data.encode()).hexdigest()
    
    async def get_or_fetch(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        fetch_func: Callable
    ) -> Any:
        """キャッシュから取得、なければフェッチ"""
        key = self._generate_key(tool_name, arguments)
        
        # キャッシュチェック
        if key in self.cache:
            result, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl_seconds:
                return result
        
        # フェッチして結果をキャッシュ
        result = await fetch_func()
        self.cache[key] = (result, time.time())
        return result

第6章 運用とモニタリング

こちらは、半分おまけですが、MCPサーバーをつくるときに忘れないようにするため、追加しておきます。

ヘルスチェックの実装

本番環境では、サービスの健全性を常に監視する必要がありますので、最低限でもヘルスチェックエンドポイントはいれておきまほう

@mcp.tool(
    description="サーバーの健全性をチェックします"
)
async def health_check() -> str:
    """ヘルスチェック用ツール"""
    checks = {
        "server_status": "healthy",
        "timezone_check": datetime.now(JST).isoformat(),
        "memory_usage": get_memory_usage(),
        "uptime": get_uptime()
    }
    
    return json.dumps(checks, ensure_ascii=False, indent=2)

def get_memory_usage() -> str:
    """メモリ使用量を取得"""
    import psutil
    process = psutil.Process()
    memory_mb = process.memory_info().rss / 1024 / 1024
    return f"{memory_mb:.2f} MB"

def get_uptime() -> str:
    """稼働時間を取得"""
    global server_start_time
    if not server_start_time:
        server_start_time = datetime.now()
    
    uptime = datetime.now() - server_start_time
    return str(uptime)

ロギングとメトリクス

適切なロギングは、問題の早期発見と解決に不可欠ですのでしっかりいれておきましょう

import structlog
from prometheus_client import Counter, Histogram, generate_latest

# 構造化ログの設定
logger = structlog.get_logger()

# Prometheusメトリクス
tool_calls_total = Counter(
    'mcp_tool_calls_total',
    'Total number of tool calls',
    ['tool_name']
)
tool_duration_seconds = Histogram(
    'mcp_tool_duration_seconds',
    'Tool execution duration',
    ['tool_name']
)

@mcp.tool()
async def monitored_tool(param: str) -> str:
    """監視機能付きツール"""
    start_time = time.time()
    
    try:
        # ツール実行カウンタを増やす
        tool_calls_total.labels(tool_name='monitored_tool').inc()
        
        # 実際の処理
        result = await process_something(param)
        
        # 成功をログ
        logger.info(
            "tool_executed",
            tool_name="monitored_tool",
            param=param,
            duration=time.time() - start_time
        )
        
        return result
        
    except Exception as e:
        # エラーをログ
        logger.error(
            "tool_failed",
            tool_name="monitored_tool",
            param=param,
            error=str(e),
            duration=time.time() - start_time
        )
        raise
    
    finally:
        # 実行時間を記録
        tool_duration_seconds.labels(
            tool_name='monitored_tool'
        ).observe(time.time() - start_time)

第7章 今後の展望と結論

MCPの未来

MCPは、AIエージェントのエコシステムにおいて中心的な役割を果たし続けるとおもいます。万が一べつの仕様がでてもMCPのようなものは必須の技術となるとおもいます。

さて、本日は、stdio時代からStreamable HTTPまでをざざっと解説しましたが、現在からは以下のような発展をしていくとおもいます。

まず、ツールの相互運用性がさらに向上します。異なるベンダーのAIモデル間でツールを共有し、組み合わせて使用することが一般的になるでしょう。また、セキュリティ機能の強化も進むと考えられます。認証、認可、監査ログなど、エンタープライズ向けの機能が標準化される可能性が高いです。

さらに、リアルタイム性の向上も期待されます。WebSocketsやWebTransportなど、より効率的な双方向通信プロトコルのサポートが追加される可能性があります。

現時点でMCP実装者への提言

当社エンジニアを含め、2025年時点でMCPサーバーを実装する開発者には、以下のアプローチがオススメです

第一に、FastMCPのような(半)公式SDKを積極的に活用しましょう。

実は私は自前実装を最初していたのですが、案の定相互運用性に問題があり、後悔しました・・・

仕様の詳細に振り回されることなく、ビジネスロジックに集中できます。これはほんとにだいじです

第二に、李モーターMCPサーバーにはStreamable HTTPトランスポートを標準として採用しましょう。

これが最も成熟し、実用的な選択肢です。

第三に、最初から本番環境を意識した実装を心がけることです。エラーハンドリング、ロギング、モニタリングは後から追加するのではなく、最初から組み込むべきです。あとインターネット上に公開する場合は認証・認可機構も必要になるますね。

結論

本日はMCPの仕様変遷と実装方法について解説してまいりました。

MCPは、AIと外部システムの連携における重要な標準として確立されました。初期のstdioベースの実装から、HTTP+SSEを経て、現在のStreamable HTTPに至るまでの変遷は、実際の使用場面からのフィードバックを反映した自然な進化でした。

また、FastMCPを使用したサーバー実装、各LLMプロバイダーとの統合、そして運用面での考慮点まで、実践的な内容を網羅しました。

さらに、MCPサーバーがストリーミング形式で返信を返すことの意義についても触れましたが、これは単なる通信効率の問題ではなく、AIモデル、MCPサーバー、そしてアプリケーションの三者が同じ時間軸を共有し、逐次的な情報のやり取りを通じて「対話しながら仕事を進める」という新しい実行モデル、新しいUXを生み出すポテンシャルを秘めているな感じています。例えるなら、2000年代前半にAjaxが登場したときのようなワクワク感でしょうか。

ということで、ある程度仕様が安定してきたいまこそMCPを活用した革新的なAIアプリケーションの開発に取り組む絶好の機会じゃないでしょうか?

それでは、本日もお読みいただきありがとうございました!

次回またお会いしましょう!

Read more

【出展報告】ASCII STARTUP TechDay 2025

【出展報告】ASCII STARTUP TechDay 2025

こんにちは! 本日、「ASCII STARTUP TechDay 2025」に出展してまいりましたのでレポートさせていただきます! ASCII STARTUP TechDay 2025 ASCII STARTUP TechDay 2025は、2025年11月17日(月)に東京・浅草橋ヒューリックホール&カンファレンスで開催された、ディープテック・スタートアップのエコシステム構築をテーマにした展示交流・カンファレンスイベントです。 秋の展示会は本当にいいですね 本日はとてもよいお天気で、涼しくて、展示会にはピッタリの気候で朝からルンルンでした。しかも午後からの展示会ということで、気持ちに余裕をもって朝の業務をこなしていたところ、けっこうすぐに昼前になり、あわてて現場へ。 浅草橋は当社からもわりと近いという立地の良さを甘く見ておりましたが💦、なんとか予定時刻前に到着しました。やっぱり、都心開催は本当にありがたいですね。 会場へ急いでいると、おなかが「ぐ~」と鳴り 「そういえば、朝食まだだったわ」 とおもったところに、なんと私の大好きなエッセンさん🍞のトラックがあるで

By Qualiteg ビジネス開発本部 | マーケティング部
サブスクビジネス完全攻略 第1回~『アープがさぁ...』『チャーンがさぁ...』にもう困らない完全ガイド

サブスクビジネス完全攻略 第1回~『アープがさぁ...』『チャーンがさぁ...』にもう困らない完全ガイド

なぜサブスクリプションモデルが世界を変えているのか、でもAI台頭でSaaSは終わってしまうの? こんにちは! Qualitegコンサルティングです! 新規事業戦略コンサルタントとして日々クライアントと向き合う中で、ここ最近特に増えているのがSaaSビジネスに関する相談です。興味深いのは、その背景にある動機の多様性です。純粋に収益モデルを改善したい企業もあれば、 「SaaS化を通じて、うちもデジタルネイティブ企業として見られたい」 という願望を持つ伝統的な大企業も少なくありません。 SaaSという言葉が日本のビジネスシーンに本格的に浸透し始めたのは2010年代前半。それから約15年が経ち、今やSaaSは「先進的な企業の証」のように扱われています。 まず SaaSは「サーズ」と読みます。 (「サース」でも間違ではありません、どっちもアリです) ほかにも、 MRR、ARR、アープ、チャーンレート、NRR、Rule of 40…… こうした横文字が飛び交う経営会議に、戸惑いながらも「乗り遅れてはいけない」と焦る新規事業担当者の姿をよく目にします。 しかし一方で、2024

By Qualiteg コンサルティング
ASCII STARTUP TechDay 2025に出展します!

ASCII STARTUP TechDay 2025に出展します!

株式会社Qualitegは、2025年11月17日(月)に東京・浅草橋ヒューリックホール&カンファレンスで開催される「ASCII STARTUP TechDay 2025」に出展いたします。 イベント概要 「ASCII STARTUP TechDay 2025」は、日本のディープテックエコシステムを次のレベルへ押し上げ、新産業を創出するイノベーションカンファレンスです。ディープテック・スタートアップの成長を支えるエコシステムの構築、そして成長・発展を目的に、学術、産業、行政の垣根を越えて知を結集する場として開催されます。 開催情報 * 日時:2025年11月17日(月)13:00~18:00 * 会場:東京・浅草橋ヒューリックホール&カンファレンス * 住所:〒111-0053 東京都台東区浅草橋1-22-16ヒューリック浅草橋ビル * アクセス:JR総武線「浅草橋駅(西口)」より徒歩1分 出展内容 当社ブースでは、以下の3つの主要サービスをご紹介いたします。 1.

By Qualiteg ニュース
大企業のAIセキュリティを支える基盤技術 - 今こそ理解するActive Directory 第4回 プロキシサーバーと統合Windows認証

大企業のAIセキュリティを支える基盤技術 - 今こそ理解するActive Directory 第4回 プロキシサーバーと統合Windows認証

11月に入り、朝晩の冷え込みが本格的になってきましたね。オフィスでも暖房を入れ始めた方も多いのではないでしょうか。 温かいコーヒーを片手に、シリーズ第4回「プロキシサーバーと統合Windows認証」をお届けします。 さて、前回(第3回)は、クライアントPCやサーバーをドメインに参加させる際の「信頼関係」の確立について深掘りしました。コンピューターアカウントが120文字のパスワードで自動認証される仕組みを理解いただけたことで、今回のプロキシサーバーの話もスムーズに入っていけるはずです。 ChatGPTやClaudeへのアクセスを監視する中間プロキシを構築する際、最も重要なのが「確実なユーザー特定」です。せっかくHTTPS通信をインターセプトして入出力内容を記録できても、アクセス元が「tanaka_t」なのか「yamada_h」なのかが分からなければ、監査ログとしての価値は半減してしまいます。 今回は、プロキシサーバー自体をドメインメンバーとして動作させることで、Kerberosチケットの検証を可能にし、透過的なユーザー認証を実現する方法を詳しく解説します。Windows版Squid

By Qualiteg AIセキュリティチーム