[ChatStream] Web サーバー(ASGI server) の起動

[ChatStream] Web サーバー(ASGI server) の起動

こんにちは! (株)Qualiteg プロダクト開発部 です!

本稿では、 ChatStream 搭載した Webサーバーの起動方法について説明いたします!

uvicorn(内部起動)

ChatStreamは FastAPI/Starlette に対応しているため、 ASGI サーバーで動作させることができます。

uvicorn をコード内で定義するには以下のように実装します

def start_server():
    uvicorn.run(app, host='localhost', port=9999)


def main():
    start_server()


if __name__ == "__main__":
    main()

ソースコード全体

import torch
import uvicorn
from fastapi import FastAPI, Request
from fastersession import FasterSessionMiddleware, MemoryStore
from transformers import AutoTokenizer, AutoModelForCausalLM

from chatstream import ChatStream, ChatPromptTogetherRedPajamaINCITEChat as ChatPrompt

model_path = "togethercomputer/RedPajama-INCITE-Chat-3B-v1"
device = "cuda"  # "cuda" / "cpu"

tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16)
model.to(device)

chat_stream = ChatStream(
    num_of_concurrent_executions=2,
    max_queue_size=5,
    model=model,
    tokenizer=tokenizer,
    device=device,
    chat_prompt_clazz=ChatPrompt,
)

app = FastAPI()

app.add_middleware(FasterSessionMiddleware,
                   secret_key="your-session-secret-key",  # Key for cookie signature
                   store=MemoryStore(),  # Store for session saving
                   http_only=True,  # True: Cookie cannot be accessed from client-side scripts such as JavaScript
                   secure=True,  # False: For local development env. True: For production. Requires Https
                   )


@app.post("/chat_stream")
async def stream_api(request: Request):
    # handling FastAPI/Starlette's Request
    response = await chat_stream.handle_chat_stream_request(request)
    return response


@app.on_event("startup")
async def startup():
    # start request queueing system
    await chat_stream.start_queue_worker()


def start_server():
    uvicorn.run(app, host='localhost', port=9999)


def main():
    start_server()


if __name__ == "__main__":
    main()

uvicorn(外部起動)

次に uvicorn 外部起動 のパターンについてみていきましょう。

./example にある example_server_redpajama_simple.py をサーバーとして起動する場合

uvicorn example.web_server_redpajama_simple.py:app --host 0.0.0.0 --port 3000

のようになります。こちらのほうがサーバーとアプリの分離ができていて、より本番に近いアプローチとなります。

uvicornの起動オプション

https://www.uvicorn.org/settings/

ソースコード

example_server_redpajama_simple.py

import torch
import uvicorn
from fastapi import FastAPI, Request
from fastersession import FasterSessionMiddleware, MemoryStore
from transformers import AutoTokenizer, AutoModelForCausalLM

from chatstream import ChatStream, ChatPromptTogetherRedPajamaINCITEChat as ChatPrompt

model_path = "togethercomputer/RedPajama-INCITE-Chat-3B-v1"
device = "cuda"  # "cuda" / "cpu"

tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16)
model.to(device)

chat_stream = ChatStream(
    num_of_concurrent_executions=2,
    max_queue_size=5,
    model=model,
    tokenizer=tokenizer,
    device=device,
    chat_prompt_clazz=ChatPrompt,
)

app = FastAPI()

app.add_middleware(FasterSessionMiddleware,
                   secret_key="your-session-secret-key",  # Key for cookie signature
                   store=MemoryStore(),  # Store for session saving
                   http_only=True,  # True: Cookie cannot be accessed from client-side scripts such as JavaScript
                   secure=True,  # False: For local development env. True: For production. Requires Https
                   )


@app.post("/chat_stream")
async def stream_api(request: Request):
    # handling FastAPI/Starlette's Request
    response = await chat_stream.handle_chat_stream_request(request)
    return response


@app.on_event("startup")
async def startup():
    # start request queueing system
    await chat_stream.start_queue_worker()

gunicorn

次は gunicorn を使用する方法についてです。こちらは、本番のAPIサーバーとして、もっとも一般的なアプローチとなります。ここで起動した gunicorn を APIサーバーとして Nginx などの Webサーバーをリバースプロキシとして組み合わせることで、本番システムとして稼働させることができます。

./example にある example_server_redpajama_simple.py をサーバーとして起動する場合

gunicorn example.web_server_redpajama_simple.py:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:3000

(注意:Windows 環境では動作しません)

ソースコード

example_server_redpajama_simple.py

import torch

from fastapi import FastAPI, Request
from fastersession import FasterSessionMiddleware, MemoryStore
from transformers import AutoTokenizer, AutoModelForCausalLM

from chatstream import ChatStream, ChatPromptTogetherRedPajamaINCITEChat as ChatPrompt

model_path = "togethercomputer/RedPajama-INCITE-Chat-3B-v1"
device = "cuda"  # "cuda" / "cpu"

tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16)
model.to(device)

chat_stream = ChatStream(
    num_of_concurrent_executions=2,
    max_queue_size=5,
    model=model,
    tokenizer=tokenizer,
    device=device,
    chat_prompt_clazz=ChatPrompt,
)

app = FastAPI()

app.add_middleware(FasterSessionMiddleware,
                   secret_key="your-session-secret-key",  # Key for cookie signature
                   store=MemoryStore(),  # Store for session saving
                   http_only=True,  # True: Cookie cannot be accessed from client-side scripts such as JavaScript
                   secure=True,  # False: For local development env. True: For production. Requires Https
                   )


@app.post("/chat_stream")
async def stream_api(request: Request):
    # handling FastAPI/Starlette's Request
    response = await chat_stream.handle_chat_stream_request(request)
    return response


@app.on_event("startup")
async def startup():
    # start request queueing system
    await chat_stream.start_queue_worker()

いかがでしたでしょうか。ChatStreamが FastAPI/Starlette ベースで実装されているので本番に向けては標準的なアプローチでサーバー構築することが可能です。

ただし、商用環境を考慮しますと、通常、1つの ChatStream APIサーバーだけで運用することは稀で、1つの ChatStream API サーバーを ChatStreamノードと呼び、複数のChatStreamノードでリージョンごとのクラスターを作ります。Qualiteg では、このようなスケールアウト型の負荷対策をさせるシステム構成を推奨しております。

Read more

楽観的ロック vs 悲観的ロック:実際のトラブルから学ぶ排他制御

楽観的ロック vs 悲観的ロック:実際のトラブルから学ぶ排他制御

こんにちは! Qualitegプロダクト開発部です! 「楽観的ロックを実装したのに、まだ競合エラーが出るんですけど...」 これは私たちが実際に経験したことです。 本記事では、楽観的ロックと悲観的ロックの違いを、実際に発生したトラブルを通じて解説します。 抽象的な説明ではなく、 「なぜそれが必要なのか」「どんな問題を解決できるのか」 を実感できる内容を目指します。 目次 1. 問題の背景:並列処理で謎のエラー 2. ロックなしの世界:なぜ競合が起きるのか 3. 楽観的ロックの導入:期待と現実 4. 楽観的ロックの限界:解決できなかった問題 5. 悲観的ロックによる解決 6. 実装時のハマりポイント 7. どちらを選ぶべきか:判断基準 8. まとめ 1. 問題の背景:並列処理で謎のエラー 1.1 システムの概要 私たちが開発していたのは、 複数のワークスペースを切り替えて使用するAPIサーバー でした。 当社AI関係のプロダクトの一部だったのですが、結合テスト兼負荷テストを実行すると、まれに発生してしまっていました。 ユーザーは複数のワーキン

By Qualiteg プロダクト開発部
企業セキュリティはなぜ複雑になったのか? 〜AD+Proxyの時代から現代のクラウド対応まで〜

企業セキュリティはなぜ複雑になったのか? 〜AD+Proxyの時代から現代のクラウド対応まで〜

こんにちは! ChatGPTやClaudeといった生成AIサービスが業務に浸透し始めた今、 「AIに機密情報を送ってしまうリスク」 が新たなセキュリティ課題として浮上しています。 この課題に向き合う中で、私たちは改めて「企業のセキュリティアーキテクチャはどう変遷してきたのか」を振り返る機会がありました。 すると、ある疑問が浮かんできます。 「なんでこんなに複雑になってるんだっけ?」 企業のセキュリティ担当者なら、一度は思ったことがあるのではないでしょうか。 アルファベット3〜4文字の製品が乱立し、それぞれが微妙に重複した機能を持ち、設定は複雑化し、コストは膨らみ続けています。 当社ではAIセキュリティ関連プロダクトをご提供しておりますが、AI時代のセキュリティを考える上でも、この歴史を理解することは重要ではないかと考えました。 本記事では、企業ネットワークセキュリティの変遷を振り返りながら、「なぜこうなったのか」を整理してみたいと思います。 第1章:観測点を集約できた時代 ― オンプレAD + Proxy(〜2010年代前半) 統制しやすかったモデル かつ

By Qualiteg コンサルティング, Qualiteg AIセキュリティチーム
【IT温故知新】WS-* の栄光と黄昏:エンタープライズITはいかにして「実装」に敗北したか

【IT温故知新】WS-* の栄光と黄昏:エンタープライズITはいかにして「実装」に敗北したか

こんにちは。 —— 2003年のSOAから、2026年のAIへ —— この記事は、過去の技術動向を振り返り、そこから学べる教訓について考察してみたものです。 歴史は常に、後から見れば明らかなことが、当時は見えなかったという教訓を与えてくれます。 そして、今私たちが「正しい」と信じていることもまた、20年後には違う評価を受けているかもしれません。 だからこそ、振り返ることには意味があるとおもいます。同じ轍を踏まないために。 はじめに:20年前の熱狂を覚えていますか 2000年代初頭。 私はSOA(サービス指向アーキテクチャ)に本気で取り組んでいました。 当時、SOAは「次世代のエンタープライズアーキテクチャ」として、業界全体が熱狂していました。 カンファレンスに行けば満員御礼、ベンダーのブースには人だかり、書店にも関連の書籍がちらほらと。 SOAP、SOAP with attachments、JAX-RPC、WS-Security、WS-ReliableMessaging、WS-AtomicTransaction... 仕様書の山と格闘する日々でした。 あれから

By Qualiteg コンサルティング
DockerビルドでPythonをソースからビルドするとGCCがSegmentation faultする話

DockerビルドでPythonをソースからビルドするとGCCがSegmentation faultする話

こんにちは!Qualitegプロダクト開発部です! 本日は Docker環境でPythonをソースからビルドした際に発生した、GCCの内部コンパイラエラー(Segmentation fault) について共有します。 一見すると「リソース不足」や「Docker特有の問題」に見えますが、実際には PGO(Profile Guided Optimization)とLTO(Link Time Optimization)を同時に有効にした場合に、GCC自身がクラッシュするケースでした。 ただ、今回はDockerによって問題が隠れやすいという点もきづいたので、あえてDockerを織り交ぜた構成でのPythonソースビルドとGCCクラッシュについて実際に発生した題材をもとに共有させていただこうとおもいます 同様の構成でビルドしている方の参考になれば幸いです TL;DR * Docker内でPythonを --enable-optimizations --with-lto 付きでソースビルドすると GCCが internal compiler error(Segmentati

By Qualiteg プロダクト開発部