Model Context Protocol完全実装ガイド 2025- 仕様変遷から最新Streamable HTTPまでの全て
こんにちは!
現在、LLM業界で破竹の勢いでひろまっているMCPについて、本日はとくに実装面について解説していきたいとおもいます。
MCP、MCPとひとくちにいっていますが、実は短期間でけっこう「標準」とよばれる仕様が変化しておりますので、仕様のバリエーションを順を追って解説しつつ、実際に実装をしていきたいとおもいます。
さて、MCPですが、2024年後半、Anthropicが発表したModel Context Protocol(MCP)は、AI分野における重要な転換点となりました。
従来、各AIベンダーが独自に実装していたツール呼び出し機能(tool useと呼びます)を標準化し、AIモデルと外部システムの連携を統一的に扱える仕組みを提供しました
本記事で、MCPの誕生から現在に至るまでの技術的変遷を詳細に追いながら、2025年時点での最適な実装方法を完全なソースコードと共に解説します。特に、仕様の変化に振り回されがちな実装者の視点から、なぜ現在の形に収束したのか、そして今後どのような実装アプローチを取るべきかを明確にしていきます。
第1章 MCPが解決しようとした問題
AIツール呼び出しの混沌とした状況
2024年までのAI開発現場では、各モデルプロバイダーが独自のツール呼び出し仕様を採用していました。OpenAIはFunction Calling、AnthropicやMetaはTool Use、GoogleはFunction Declarationsという具合に、名称も実装方法もバラバラでした。
開発者は同じ「現在時刻を取得する」という単純な機能でも、LLMプロバイダーごとに異なる実装を用意する必要がありました。
まあ、この新しい技術がではじめたとき、相互運用性はあとまわしっていうのはソフトウェア業界では「あるある」なんですが、
こうした状況は単に面倒なだけでなく、AIエージェントの可能性を制限していていました。複数のAIモデルを組み合わせたシステムや、モデルを切り替え可能なアプリケーションの開発は困難で、ツールの再利用性も著しく低い状態でした。
MCPという解答
MCPは、この混沌とした状況に対する明確な解答でした。JSON-RPC 2.0を基盤とし、ツール定義からセッション管理、ストリーミング通信まで、AIとツールの連携に必要なすべてを標準化しました。これにより、一度MCPサーバーを実装すれば、Claude Desktop、VS Code、その他のMCP対応クライアントから同じツールを利用できるようになりました。
第2章 仕様の変遷と技術的背景
stdio時代 - ローカル実行における理想的な設計
初期のMCPは、標準入出力(stdio)を使用した通信を前提としていました。これはプロセス間通信の最も基本的な形であり、ローカル環境では効率的でした。
# 初期のstdio実装の概念
import sys
import json
while True:
line = sys.stdin.readline()
request = json.loads(line)
# リクエスト処理
response = handle_request(request)
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()
しかし、この設計には根本的な制約があります。
そうですstdioはプロセスの親子関係を前提とした通信方式であり、ネットワークを介した通信には本質的に適していません。
そもそも stdio を通信方式だっておもってる昨今のエンジニアってどれくらいいるんでしょうか。私みたいに黒いコンソールしかなかった時代にコード書いていた人やいまでもC言語現役ですっていう人以外は stdio とかあえて意識していないきもするのですが、
まあ、開発者がVS CodeやローカルのClaude Desktopから使う分にはとくに問題ありませんでした。
ただ、複数のユーザーが同時にアクセスするWebサービスや、分散システムの一部として動作させることは(なんで?ってかんじですが)設計上想定されていませんでした。
HTTP+SSE時代 - ネットワーク対応への第一歩
次に登場したのが、HTTP POSTとServer-Sent Events(SSE)を組み合わせた方式でした。クライアントからサーバーへはHTTP POST、サーバーからクライアントへはSSEという非対称な通信モデルです。
この方式には一定の合理性がありました。SSEはブラウザでネイティブサポートされており、リアルタイムな進捗通知も可能でした。しかし、実装の複雑さは増大しました。セッション管理、エンドポイントの分離、接続の同期など、開発者が考慮すべき点が多すぎました。
これはSSEならではのめんどくささです。
Streamable HTTP時代 - 成熟した統一仕様
そして、2025年現在、MCPはStreamable HTTPという形に収束しています。単一のHTTPエンドポイントで、通常のリクエスト/レスポンスとストリーミングの両方を処理できる洗練された設計です。
# Streamable HTTPの基本概念
@app.post("/mcp")
async def mcp_endpoint(request: Request):
if "text/event-stream" in request.headers.get("accept", ""):
# ストリーミングレスポンス
return StreamingResponse(generate_sse())
else:
# 通常のJSONレスポンス
return JSONResponse(handle_request(await request.json()))
この統一により、実装の複雑さが大幅に軽減され、同時にスケーラビリティも向上しました。各トランスポート方式の特性を整理すると、次のような変遷が見えてきます。
MCPトランスポート方式の変遷
| 技術要素 | ローカルに閉じた実装 | リモート化 | |
|---|---|---|---|
| stdio | HTTP+SSE | Streamable HTTP | |
| 登場時期 | 初期(2024年後半) | 過渡期(2024年末) | 現在(2025年) |
| 通信方式 | 標準入出力 プロセス間直接通信 | HTTP POST(C→S) SSE(S→C) 非対称な2方向通信 | 単一HTTPエンドポイント Acceptヘッダーで自動切替 |
| プロトコル | JSON-RPC 2.0 改行区切り | JSON-RPC 2.0 POST/SSE分離 | JSON-RPC 2.0 統合型 |
| 実装複雑度 | 低(シンプルループ) | 高(2エンドポイント管理) | 中(SDKで簡素化) |
| ストリーミング | 改行ベース | SSEイベント | SSE/chunked自動選択 |
| セッション管理 | プロセスライフサイクル | URLパラメータ(手動管理) | Mcp-Session-Idヘッダー(標準化) |
| スケーラビリティ | N/A(ローカルのみ) | △ 制限あり(SSE接続管理が課題) | ◎ 優秀(ステートレス可能) |
| 主な用途 | VS Code拡張 CLI ツール ローカルIDE | 実験的実装 小規模デモ プロトタイプ | 本番Webサービス エンタープライズAPI SaaS製品 |
| メリット | 最高速 実装が単純 セキュア | ネットワーク対応 ブラウザ互換 既存技術の組み合わせ | 統一的実装 クラウドネイティブ 運用ツール対応 |
| デメリット | リモート不可 Webサービス化困難 | 実装が複雑 デバッグ困難 スケーリング課題 | HTTPオーバーヘッド ローカルでは過剰 |
この表から明確なのは、ローカル実行に特化したstdioから始まり、リモート化の要求に応えるためにHTTP+SSEという過渡的な実装を経て、最終的にStreamable HTTPという統一的なソリューションに収束したということです。
現在では、用途に応じて「ローカル専用ならstdio、リモート化の可能性があるならStreamable HTTP」という明確な使い分けが確立されました。
ストリーミングの本質的な意義
さて、MCPでは、そもそもなんでストリーミングがうれしいんでしょうか。
ストリーミング形式で返信を返すことの意義はなんでしょか。
まずはそこを解説していきます。
「ストリーミング形式」、これは単なる通信方式の選択にとどまらない深い意味を持っています。
まず、従来の同期的な通信モデルと、ストリーミングによる非同期モデルの違いを図でみてみましょう。
まず同期モデルはこんなかんじでしょうか。
【従来の同期モデル】
User ──────> Web UI ──────> LLM ──────> MCP Server
│ │
│ ▼
│ [処理実行]
│ (長時間)
│ │
◄─────────────┘
│ (待機中...)
▼
[応答生成]
│
User ◄────── Web UI ◄──────────┘
問題点:LLMは処理が完了するまで何も知らない状態で待機
次はストリーミングモデルです
【ストリーミングモデル】
User ──────> Web UI ──────> LLM ──────> MCP Server
│ │ │ │
│ │ │ ▼
│ │ │ [処理開始]
│ │ │ │
│ │ ◄─ stream ────┤ "処理開始しました"
│ │ │ │
│ │ [中間応答] ▼
│ ◄─ stream ────┤ [処理中 30%]
│ "処理中..." │ │
◄──────────┤ ◄─ stream ────┤ "30%完了"
│ │ │
│ [戦略調整] ▼
│ │ [処理中 60%]
│ ◄─ stream ────┤ "60%完了"
│ ◄─ stream ────┤ │
│ "もうすぐ..." │ ▼
◄──────────┤ │ [処理完了]
│ ◄─ stream ────┤ "結果: XXX"
│ │ │
│ [最終応答] │
◄─ stream ────┤ │
│ "完了しました" │ │
◄──────────┘ │ │
利点:全ての参加者がリアルタイムで状況を共有
AIモデルを中心に据えたアプリケーション開発において、モデルが外部ツールと対話しながら段階的に処理を進めていくためには、途中経過を逐次返す仕組みが不可欠です。
AIは「いま何が起きているのか」「どれだけ処理が進んでいるのか」を把握しながら応答生成を続ける必要があり、結果だけがまとまって返ってくる同期型のリクエスト方式では、多くの場合その要件を満たせません。
【ストリーミングによる適応的処理】
LLM ──────> MCP Server (データベース検索)
│ │
│ ▼ stream: "1000件中100件検索済み..."
│◄─────────────┤
│ │
├─判断: 想定より時間がかかっている
│
├──────────> MCP Server (検索条件を絞り込み)
│ │
│ ▼ stream: "条件変更: 50件に絞り込み"
│◄─────────────┤
│ │
│ ▼ stream: "検索完了"
│◄─────────────┤
│
▼
[ユーザーへの応答生成]
"検索範囲を最適化して、最も関連性の高い
50件のデータから結果を取得しました"
ストリーミングが存在する最大の意味は、モデル側が外部処理を"待ち続ける"のではなく、"処理の進行状況を観察しながら応答生成を継続できる"という点にあります。AIモデルは厳密な意味での非同期処理を持ちません。外部ツールの実行が長時間に及ぶ場合でも、モデルはレスポンスをブロックしたまま停止するしかありませんでした。しかし、ストリーミングが導入されることで、外部のMCPサーバーが部分的な進捗や中間データを都度AIに返し、AIはそれを受け取るたびに次の判断を行えるようになりました。
【ユーザー体験の改善】
従来:
User ──> Web UI ──> [ローディング表示...] ──> (不安な待機時間) ──> 結果
ストリーミング:
User ──> Web UI ┬─> "データベースに接続中..."
├─> "1000件のレコードを検索中..."
├─> "関連データを分析中..."
├─> "結果を整形中..."
└─> "完了: [詳細な結果]"
ユーザーは各ステップで何が起きているか把握でき、
処理の透明性と信頼性が大幅に向上
また、ストリーミング返信はアプリケーションにとっても決定的に重要です。通常のサーバー実装では、長い処理時間を必要とするタスクを扱う際、ユーザーは「レスポンスが返ってくるまで待つ」という不透明な状態に置かれていました。AIを中心に据えたUIでは、これが体験の質に直結します。MCPサーバーがストリーミングで段階的に結果やログを返すことで、アプリケーションはそれを逐次UIに反映でき、「AIが何をしているのか」が可視化されます。
たとえば、外部ツールで動画をつくってるとき、動画がどのくらい完成しているのか進捗67%、みたいな情報をWebUIに返してやるとUXが向上しますよね。
さらに、この仕組みはAIエージェントの高度化にも直結しています。外部処理の結果がストリームとして流れ込むということは、AIモデルが「部分結果を見て戦略を変える」ことを初めて可能にします。これがなんともAI時代っていう感じですよね。
【エラーからの回復】
LLM ──────> MCP Server (API呼び出し)
│ │
│ ▼ stream: "APIに接続中..."
│◄─────────────┤
│ │
│ ▼ stream: "エラー: レート制限"
│◄─────────────┤
│
├─判断: 代替手段を使用
│
├──────────> MCP Server (キャッシュから取得)
│ │
│ ▼ stream: "キャッシュから取得中..."
│◄─────────────┤
│ │
│ ▼ stream: "成功: データ取得完了"
│◄─────────────┤
│
▼
[ユーザーへの応答]
"最新データの取得に失敗したため、
1時間前のキャッシュデータを使用しました"
長い処理の途中で異常が検出された場合にAIが即座に代替手段を提案したり、処理開始後に追加の指示が必要な場面を判断したりすることが可能になります。同期的に一度に返ってくるレスポンスでは、こうした逐次判断は本質的に不可能でした。
総じて、MCPサーバーがストリーミング形式の返信を返す意義は、単に効率を高めるためだけの技術ではありません。AIモデル、MCPサーバー、そしてアプリケーションの三者が同じ時間軸を共有し、逐次的な情報のやり取りを通じて「対話しながら仕事を進める」という新しい実行モデルを成立させるための基盤です。そして、Streamable HTTPはこのストリーミングの意義を最大限に活かしながら、実装の複雑さを最小限に抑えることに成功した、MCPの成熟形と言えるでしょう。
第3章 完全実装 - FastMCPによる標準準拠サーバー
なぜFastMCPを使うべきか
MCPの仕様は一見シンプルですが、実際に仕様準拠のサーバーを実装しようとすると、考慮すべき点が膨大になります。バッチリクエスト処理、通知専用メッセージの204レスポンス、Mcp-Session-Idヘッダーの管理など、細かい仕様が多数存在します。
FastMCPは、これらすべてを吸収し、開発者がビジネスロジックに集中できる環境を提供してくれます。
完全なMCPサーバー実装
以下が、FastMCPを使用した本番環境対応のMCPサーバー実装です。
#!/usr/bin/env python3
"""
Production-Ready MCP DateTime Server
本番環境で使用可能な完全なMCPサーバー実装
"""
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import pytz
import logging
from mcp.server.fastmcp import FastMCP
# ロギング設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger("mcp-datetime-server")
# タイムゾーン設定
JST = pytz.timezone('Asia/Tokyo')
# FastMCPインスタンス作成
mcp = FastMCP(
name="datetime-server-production",
version="1.0.0"
)
# サーバーメタデータ
mcp.metadata = {
"description": "Production-ready datetime server with JST support",
"author": "Your Organization",
"timezone": "Asia/Tokyo",
"supported_languages": ["ja", "en"]
}
@mcp.tool(
description="日本標準時(JST)で現在の時刻を取得します"
)
async def get_current_time(format: str = "standard") -> str:
"""
現在時刻を指定された形式で返す
Args:
format: 出力形式 (standard, iso, unix)
"""
now = datetime.now(JST)
if format == "iso":
return now.isoformat()
elif format == "unix":
return str(int(now.timestamp()))
else:
return f"現在の時刻は {now.strftime('%H:%M:%S')} です"
@mcp.tool(
description="本日の日付を日本語形式で取得します"
)
async def get_current_date(
include_era: bool = False
) -> str:
"""
現在の日付を返す
Args:
include_era: 和暦を含めるか
"""
now = datetime.now(JST)
date_str = now.strftime("%Y年%m月%d日")
day_of_week = ["月", "火", "水", "木", "金", "土", "日"][now.weekday()]
result = f"今日は {date_str} ({day_of_week}曜日) です"
if include_era:
# 令和の計算(2019年5月1日から)
reiwa_start = datetime(2019, 5, 1, tzinfo=JST)
if now >= reiwa_start:
reiwa_year = now.year - 2018
result += f" (令和{reiwa_year}年)"
return result
@mcp.tool(
description="指定された日数後の日付を計算します"
)
async def calculate_future_date(
days: int,
from_date: Optional[str] = None
) -> str:
"""
未来または過去の日付を計算
Args:
days: 日数(負の値で過去)
from_date: 起点日(ISO形式、省略時は今日)
"""
if from_date:
base_date = datetime.fromisoformat(from_date).replace(tzinfo=JST)
else:
base_date = datetime.now(JST)
target_date = base_date + timedelta(days=days)
date_str = target_date.strftime("%Y年%m月%d日")
day_of_week = ["月", "火", "水", "木", "金", "土", "日"][target_date.weekday()]
if days > 0:
return f"{days}日後は {date_str} ({day_of_week}曜日) です"
elif days < 0:
return f"{abs(days)}日前は {date_str} ({day_of_week}曜日) でした"
else:
return f"指定日は {date_str} ({day_of_week}曜日) です"
@mcp.tool(
description="2つの日付間の日数を計算します"
)
async def calculate_days_between(
date1: str,
date2: str
) -> str:
"""
日付間の日数差を計算
Args:
date1: 最初の日付(ISO形式)
date2: 2番目の日付(ISO形式)
"""
d1 = datetime.fromisoformat(date1).replace(tzinfo=JST)
d2 = datetime.fromisoformat(date2).replace(tzinfo=JST)
diff = abs((d2 - d1).days)
if d1 < d2:
return f"{date1} から {date2} まで {diff}日間です"
elif d1 > d2:
return f"{date2} から {date1} まで {diff}日間です"
else:
return "同じ日付です"
@mcp.resource(
uri="timezone://current",
name="Current Timezone Information",
description="現在のタイムゾーン詳細情報",
mime_type="application/json"
)
async def get_timezone_resource() -> Dict[str, Any]:
"""タイムゾーン情報をリソースとして提供"""
now = datetime.now(JST)
utc_now = datetime.now(pytz.UTC)
return {
"timezone": "Asia/Tokyo",
"abbreviation": "JST",
"offset": "+09:00",
"offset_seconds": 32400,
"current_jst": now.isoformat(),
"current_utc": utc_now.isoformat(),
"is_dst": False
}
@mcp.on_initialize
async def on_initialize(params: Dict[str, Any]) -> None:
"""サーバー初期化時の処理"""
client_info = params.get("clientInfo", {})
logger.info(f"MCP Server initialized by {client_info.get('name', 'unknown')}")
logger.info(f"Protocol version: {params.get('protocolVersion')}")
@mcp.on_error
async def on_error(error: Exception) -> None:
"""エラー発生時の処理"""
logger.error(f"MCP Server error: {error}", exc_info=True)
def main():
"""メインエントリーポイント"""
import argparse
parser = argparse.ArgumentParser(
description="Production MCP DateTime Server"
)
parser.add_argument(
"--port", type=int, default=8080,
help="Server port (default: 8080)"
)
parser.add_argument(
"--host", type=str, default="0.0.0.0",
help="Server host (default: 0.0.0.0)"
)
args = parser.parse_args()
print(f"Starting MCP Server on {args.host}:{args.port}")
print(f"Endpoint: http://{args.host}:{args.port}/mcp")
# FastMCPがすべての処理を自動化
mcp.run(
transport="streamable-http",
host=args.host,
port=args.port
)
if __name__ == "__main__":
main()
このコードの重要な点は、FastMCPのデコレータを使用することで、仕様準拠が自動的に保証されることです。開発者はツールのロジックに集中でき、プロトコルの詳細を意識する必要がありません。
第4章 クライアント実装 - 各LLMプロバイダーとの統合
MCPサーバーが完成したら、次はクライアント側の実装です。各LLMプロバイダーのAPIとMCPを統合する必要があります。
Claude(Anthropic)クライアント実装
Claudeは、MCPの生みの親であるAnthropicのモデルだけあって、最も自然にMCPと統合できます。
import asyncio
from typing import List, Dict, Any
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from anthropic import AsyncAnthropic
class ClaudeMCPIntegration:
def __init__(self, mcp_url: str, api_key: str):
self.mcp_url = mcp_url
self.anthropic = AsyncAnthropic(api_key=api_key)
self.session = None
async def connect(self):
"""MCPサーバーに接続"""
transport = await sse_client(self.mcp_url)
self.session = ClientSession(
transport.read_stream,
transport.write_stream
)
await self.session.__aenter__()
await self.session.initialize()
async def ask_claude_with_tools(self, prompt: str) -> str:
"""MCPツールを使用してClaudeに質問"""
# MCPツール一覧を取得
tools_result = await self.session.list_tools()
# Claude API形式に変換
claude_tools = []
for tool in tools_result.tools:
claude_tools.append({
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
})
# Claudeに問い合わせ
response = await self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=claude_tools,
messages=[{"role": "user", "content": prompt}]
)
# ツール呼び出しの処理
for content in response.content:
if content.type == "tool_use":
# MCPツールを実行
result = await self.session.call_tool(
content.name,
content.input or {}
)
# 結果をClaudeに返して最終回答を生成
final_response = await self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": prompt},
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": content.id,
"content": result.content[0].text
}]
}
]
)
return final_response.content[0].text
return response.content[0].text
OpenAI GPTクライアント実装
OpenAIのFunction Callingも、MCPツールとシームレスに統合できます。
from openai import AsyncOpenAI
import json
class OpenAIMCPIntegration:
def __init__(self, mcp_url: str, api_key: str):
self.mcp_url = mcp_url
self.openai = AsyncOpenAI(api_key=api_key)
self.session = None
async def connect(self):
"""MCPサーバーに接続"""
transport = await sse_client(self.mcp_url)
self.session = ClientSession(
transport.read_stream,
transport.write_stream
)
await self.session.__aenter__()
await self.session.initialize()
async def ask_gpt_with_tools(self, prompt: str) -> str:
"""MCPツールを使用してGPTに質問"""
# MCPツール一覧を取得
tools_result = await self.session.list_tools()
# OpenAI形式に変換
openai_tools = []
for tool in tools_result.tools:
openai_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
})
# GPTに問い合わせ
response = await self.openai.chat.completions.create(
model="gpt-4-turbo-preview",
tools=openai_tools,
messages=[{"role": "user", "content": prompt}]
)
message = response.choices[0].message
# ツール呼び出しの処理
if message.tool_calls:
tool_results = []
for tool_call in message.tool_calls:
# MCPツールを実行
result = await self.session.call_tool(
tool_call.function.name,
json.loads(tool_call.function.arguments)
)
tool_results.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result.content[0].text
})
# 最終回答を生成
final_response = await self.openai.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[
{"role": "user", "content": prompt},
message,
*tool_results
]
)
return final_response.choices[0].message.content
return message.content
Google Geminiクライアント実装
Geminiも同様のパターンで実装できます。
import google.generativeai as genai
class GeminiMCPIntegration:
def __init__(self, mcp_url: str, api_key: str):
self.mcp_url = mcp_url
genai.configure(api_key=api_key)
self.session = None
self.model = None
async def connect(self):
"""MCPサーバーに接続"""
transport = await sse_client(self.mcp_url)
self.session = ClientSession(
transport.read_stream,
transport.write_stream
)
await self.session.__aenter__()
await self.session.initialize()
# ツール定義を取得してモデルを初期化
tools_result = await self.session.list_tools()
functions = []
for tool in tools_result.tools:
functions.append({
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
})
self.model = genai.GenerativeModel(
model_name="gemini-1.5-pro",
tools=functions
)
async def ask_gemini_with_tools(self, prompt: str) -> str:
"""MCPツールを使用してGeminiに質問"""
response = self.model.generate_content(prompt)
# 関数呼び出しの処理
for part in response.candidates[0].content.parts:
if hasattr(part, 'function_call'):
fc = part.function_call
# MCPツールを実行
result = await self.session.call_tool(
fc.name,
dict(fc.args)
)
# 結果を返して最終回答を生成
function_response = genai.protos.FunctionResponse(
name=fc.name,
response={"result": result.content[0].text}
)
final_response = self.model.generate_content(
contents=[
{"role": "user", "parts": [{"text": prompt}]},
{"role": "model", "parts": response.candidates[0].content.parts},
{"role": "user", "parts": [{"function_response": function_response}]}
]
)
return final_response.candidates[0].content.parts[0].text
return response.candidates[0].content.parts[0].text
第5章 実装上の重要な考慮点
エラーハンドリングとリトライ
本番環境では、ネットワークの一時的な問題やサーバーの過負荷に対処する必要があります。
import asyncio
from typing import TypeVar, Callable
import random
T = TypeVar('T')
async def retry_with_exponential_backoff(
func: Callable[[], T],
max_retries: int = 3,
base_delay: float = 1.0
) -> T:
"""指数バックオフによるリトライ実装"""
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(
f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.2f} seconds..."
)
await asyncio.sleep(delay)
セッション管理とコネクションプーリング
長時間稼働するサービスでは、適切なセッション管理が重要です。
class MCPConnectionPool:
"""MCPサーバーへの接続プール管理"""
def __init__(self, mcp_url: str, max_connections: int = 10):
self.mcp_url = mcp_url
self.max_connections = max_connections
self.connections = asyncio.Queue(maxsize=max_connections)
self.lock = asyncio.Lock()
async def acquire(self) -> ClientSession:
"""接続を取得"""
try:
return self.connections.get_nowait()
except asyncio.QueueEmpty:
async with self.lock:
if self.connections.qsize() < self.max_connections:
# 新しい接続を作成
transport = await sse_client(self.mcp_url)
session = ClientSession(
transport.read_stream,
transport.write_stream
)
await session.__aenter__()
await session.initialize()
return session
else:
# 接続が空くまで待機
return await self.connections.get()
async def release(self, session: ClientSession):
"""接続を解放"""
await self.connections.put(session)
パフォーマンス最適化
大規模なデプロイメントでは、パフォーマンスの最適化が重要です。
import functools
from typing import Any, Dict
import hashlib
import pickle
class MCPResponseCache:
"""MCPレスポンスのキャッシュ実装"""
def __init__(self, ttl_seconds: int = 300):
self.cache: Dict[str, tuple[Any, float]] = {}
self.ttl_seconds = ttl_seconds
def _generate_key(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""キャッシュキーの生成"""
data = f"{tool_name}:{pickle.dumps(arguments, protocol=pickle.HIGHEST_PROTOCOL)}"
return hashlib.sha256(data.encode()).hexdigest()
async def get_or_fetch(
self,
tool_name: str,
arguments: Dict[str, Any],
fetch_func: Callable
) -> Any:
"""キャッシュから取得、なければフェッチ"""
key = self._generate_key(tool_name, arguments)
# キャッシュチェック
if key in self.cache:
result, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl_seconds:
return result
# フェッチして結果をキャッシュ
result = await fetch_func()
self.cache[key] = (result, time.time())
return result
第6章 運用とモニタリング
こちらは、半分おまけですが、MCPサーバーをつくるときに忘れないようにするため、追加しておきます。
ヘルスチェックの実装
本番環境では、サービスの健全性を常に監視する必要がありますので、最低限でもヘルスチェックエンドポイントはいれておきまほう
@mcp.tool(
description="サーバーの健全性をチェックします"
)
async def health_check() -> str:
"""ヘルスチェック用ツール"""
checks = {
"server_status": "healthy",
"timezone_check": datetime.now(JST).isoformat(),
"memory_usage": get_memory_usage(),
"uptime": get_uptime()
}
return json.dumps(checks, ensure_ascii=False, indent=2)
def get_memory_usage() -> str:
"""メモリ使用量を取得"""
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
return f"{memory_mb:.2f} MB"
def get_uptime() -> str:
"""稼働時間を取得"""
global server_start_time
if not server_start_time:
server_start_time = datetime.now()
uptime = datetime.now() - server_start_time
return str(uptime)
ロギングとメトリクス
適切なロギングは、問題の早期発見と解決に不可欠ですのでしっかりいれておきましょう
import structlog
from prometheus_client import Counter, Histogram, generate_latest
# 構造化ログの設定
logger = structlog.get_logger()
# Prometheusメトリクス
tool_calls_total = Counter(
'mcp_tool_calls_total',
'Total number of tool calls',
['tool_name']
)
tool_duration_seconds = Histogram(
'mcp_tool_duration_seconds',
'Tool execution duration',
['tool_name']
)
@mcp.tool()
async def monitored_tool(param: str) -> str:
"""監視機能付きツール"""
start_time = time.time()
try:
# ツール実行カウンタを増やす
tool_calls_total.labels(tool_name='monitored_tool').inc()
# 実際の処理
result = await process_something(param)
# 成功をログ
logger.info(
"tool_executed",
tool_name="monitored_tool",
param=param,
duration=time.time() - start_time
)
return result
except Exception as e:
# エラーをログ
logger.error(
"tool_failed",
tool_name="monitored_tool",
param=param,
error=str(e),
duration=time.time() - start_time
)
raise
finally:
# 実行時間を記録
tool_duration_seconds.labels(
tool_name='monitored_tool'
).observe(time.time() - start_time)
第7章 今後の展望と結論
MCPの未来
MCPは、AIエージェントのエコシステムにおいて中心的な役割を果たし続けるとおもいます。万が一べつの仕様がでてもMCPのようなものは必須の技術となるとおもいます。
さて、本日は、stdio時代からStreamable HTTPまでをざざっと解説しましたが、現在からは以下のような発展をしていくとおもいます。
まず、ツールの相互運用性がさらに向上します。異なるベンダーのAIモデル間でツールを共有し、組み合わせて使用することが一般的になるでしょう。また、セキュリティ機能の強化も進むと考えられます。認証、認可、監査ログなど、エンタープライズ向けの機能が標準化される可能性が高いです。
さらに、リアルタイム性の向上も期待されます。WebSocketsやWebTransportなど、より効率的な双方向通信プロトコルのサポートが追加される可能性があります。
現時点でMCP実装者への提言
当社エンジニアを含め、2025年時点でMCPサーバーを実装する開発者には、以下のアプローチがオススメです
第一に、FastMCPのような(半)公式SDKを積極的に活用しましょう。
実は私は自前実装を最初していたのですが、案の定相互運用性に問題があり、後悔しました・・・
仕様の詳細に振り回されることなく、ビジネスロジックに集中できます。これはほんとにだいじです
第二に、李モーターMCPサーバーにはStreamable HTTPトランスポートを標準として採用しましょう。
これが最も成熟し、実用的な選択肢です。
第三に、最初から本番環境を意識した実装を心がけることです。エラーハンドリング、ロギング、モニタリングは後から追加するのではなく、最初から組み込むべきです。あとインターネット上に公開する場合は認証・認可機構も必要になるますね。
結論
本日はMCPの仕様変遷と実装方法について解説してまいりました。
MCPは、AIと外部システムの連携における重要な標準として確立されました。初期のstdioベースの実装から、HTTP+SSEを経て、現在のStreamable HTTPに至るまでの変遷は、実際の使用場面からのフィードバックを反映した自然な進化でした。
また、FastMCPを使用したサーバー実装、各LLMプロバイダーとの統合、そして運用面での考慮点まで、実践的な内容を網羅しました。
さらに、MCPサーバーがストリーミング形式で返信を返すことの意義についても触れましたが、これは単なる通信効率の問題ではなく、AIモデル、MCPサーバー、そしてアプリケーションの三者が同じ時間軸を共有し、逐次的な情報のやり取りを通じて「対話しながら仕事を進める」という新しい実行モデル、新しいUXを生み出すポテンシャルを秘めているな感じています。例えるなら、2000年代前半にAjaxが登場したときのようなワクワク感でしょうか。
ということで、ある程度仕様が安定してきたいまこそMCPを活用した革新的なAIアプリケーションの開発に取り組む絶好の機会じゃないでしょうか?
それでは、本日もお読みいただきありがとうございました!
次回またお会いしましょう!