逐次生成されるトークンのバッファリング

逐次生成されるトークンのバッファリング

こんにちは! (株)Qualiteg プロダクト開発部 です!

今日は、推論シーンでよくある、トークン細切れ問題に対処する方法をご紹介します。

ストリーミングチャットで使用する逐次生成のとき、文章は1トークンずつ生成されますが、1トークンは”単語単位”でもなければ”1文字”単位でもなく、学習時使われていたトークナイザーの処理に依存します。

一般的には 形態素解析→サブワード→語彙リスト構築 を行いますが、このとき、後で文章生成するときに重要なタグ、たとえば "<NL>" というタグが重要な意味をもつにもかかわらず、細切れにされてトークナイズされてしまうことがあります。たとえば、 "<" "N" "L>" のように粉砕されてしまうようなパターンです。

(これを避ける方法はあるのですが、今回は、学習済のモデルをあからじめ与えられた状態でどうするか、を考えます)

こういうパターンが発生してしまったとき逐次生成で "<NL>" を認識しユーザーにチャットUIでレスポンスするときに "<" "N" という生成過程を見せないためのテクニックのご紹介となります。

最終的に以下のような文章が生成されるとき

"Hello there!<NL>my name is tokflow."

逐次生成だと以下のようなるとします
(実際はここまでひどいトークナイズはありえ無いですが)


"He"
"Hello"
"Hello "
"Hello t"
"Hello th"
"Hello there"
"Hello there!<"
"Hello there!<N"
"Hello there!<NL>m"
"Hello there!<NL>my "
"Hello there!<NL>my nam"
"Hello there!<NL>my name"
"Hello there!<NL>my name "
"Hello there!<NL>my name is"
"Hello there!<NL>my name is tokfl"
"Hello there!<NL>my name is tokflow."

ここで注目すべきポイントは "<NL>" が "<" "N" "L>m" と分割されてしまっている点です。

ユーザー側に生成結果をみせるとき "<NL>" は改行を意味する特殊タグだとすると、 "<" や "N" の時点では、まだ特殊タグなのか文章本体なのか見分けがつきません。そのため、ゆるい処理をしているストリーミングチャットUIだと、そのまま"<" や "N" を表示してしまいます。

これを避けるためには、 "<" や "N" の登場時点ではまだUIに出力せず、 "<NL>" まで確定したあと、それを改行として出力する、という処理が必要になります。
これを私たちはトークンのバッファリングと呼んでいます。

Qualiteg ではこの トークンのバッファリング 処理をしてくれる機能を Python ライブラリとしてリリースしていますので、ここでその使い方をご説明いたします。

当ライブラリは、 TokFlow と呼び、大規模言語モデルにより逐次的に生成されるトークンをバッファリングし、必要な置換処理を行いながら出力するユーティリティとなります。

下のように小さな かけら のようになった文章の断片=トークンを逐次入力したとき、置換が必要な文字列を順次置換しながら出力することができます。

このとき、トークンのバッファリングして遅延出力を行うことで、置換前のトークンは出力しないようにできます。

["He","llo"," ","t","h","ere","!<","N","L>m","y ","nam","e"," ","is"," tokfl","ow.","<","N","L>N","ice"," to ","me","et you."]

↑の例は、 <NL> の出現を検知し \n に置換しながら出力している様子です。

当ライブラリは、

  • 置換条件として、好きな文字列を置換対象に指定可能
  • 複数の置換条件を指定可能

です

TokFlow のインストール方法

pip install tokflow

使い方/サンプルコード

import time
from tokflow import TokFlow

TOKEN_GENERATOR_MOCK = ["He", "llo", " ", "t", "h", "ere", "!<", "N", "L>m", "y ", "nam", "e", " ", "is", " tokfl", "ow.",
                  "<", "N", "L>N", "ice", " to ", "me", "et you."]

# "<NL>" を "\n" に置換する。 "<NL>" は検索対象文字列。 "n" は置換先文字列
# 置換条件は複数指定可能。
tokf = TokFlow([("<NL>", "\n")])

for input_token in TOKEN_GENERATOR_MOCK: 
    # トークン(文章のかけらとなる1,2文字程度の文字列)を順次入力していく。トークンは内部でバッファリングされる。
    output_token = tokf.put(input_token)

    # output_token に今出力可能な出力トークンが出力される。
    # トークンのバッファリング中に検索対象文字列が出現する可能性がある場合は
    # 出力トーンは空文字となる。
    print(f"{output_token}", end="", flush=True)

    # 逐次生成されることを目視するため、ウェイトをはさむ
    time.sleep(0.3)


# トークンの入力が終わったら、最後に flush して残っているバッファを出力し切る
print(f"{tokf.flush()}", end="", flush=True)

生成オプション

put メソッドには put(text,opts) のように オプションパラメータ opts を指定することが可能です

opts は {"in_type":"spot","out_type:"spot" } のように入力の形式と出力の形式を指定することが可能です。

以下のように挙動します。

in_type out_type Description
spot spot トークンを put メソッドに逐次送り、生成分のみ都度出力するモード。
spot full トークンを put メソッドに逐次送り、フルセンテンスを出力するモード。
full spot フルセンテンスを一度に put メソッドに送り、生成分のみ都度出力するモード。
full full フルセンテンスを一度に put メソッドに送り、フルセンテンスを出力するモード。

注意点:

  • flush メソッドを呼び出す前に全ての文字列を put メソッドに送る必要があります。特に full モードでは、全ての入力文字列を一度に送ります。
  • 出力のタイプ (out_type) が full の場合、最終的な結果を取得するためには flush メソッドを呼び出す必要があります。
  • それぞれのモードで一貫性を保つためには、put メソッドの呼び出しパターンと flush メソッドの使用を適切に組み合わせることが重要です。

コード例

condition = {"in_type": "full", "out_type": "full"} のようにルールを指定し、 put および flush の引数に condition を指定します。

    tokf = TokFlow([("<NL>", "\n")])

    condition = {"in_type": "full", "out_type": "full"}
    prev_len = 0
    for input_token_base in get_example_texts():
        output_sentence = tokf.put(input_token_base, condition)

        print(f"output_sentence:{output_sentence}")

        if prev_len > len(output_sentence):
            raise ValueError("Length error")

        if "<NL>" in output_sentence:
            raise Exception("Failure Must be converted str found.")

        prev_len = len(output_sentence)

    output_sentence = tokf.flush(condition)

停止文字列を検出して文章生成を停止する SentenceStopクラス

SentenceStopクラスは、特定のキーワードを検出し、そのキーワードが見つかった時点でテキスト生成を停止するためのクラスです。テキストは1文字ずつ入力されるシチュエーションを想定しています。

主な機能

  • 特定のキーワードの検出: 文字列内の特定のキーワードを検出します。検出したキーワードは停止文字列として扱われます。
  • テキスト生成の停止: 検出した停止文字列の位置でテキスト生成を停止します。具体的には、停止文字列が検出された時点でのテキストを返します。
  • リアルタイム処理: 文字列が1文字ずつ入力されるシチュエーションを想定しており、リアルタイムでの処理が可能です。

使用方法

初期化時には停止するためのキーワードを指定します。その後、putメソッドで1文字ずつ入力を行い、停止文字列が見つかった場合にはその時点でのテキストを返します。全ての入力が終わった場合には、flushメソッドを用いて残りのテキストを出力します。

put メソッドには put(text,opts) のように オプションパラメータ opts を指定することが可能です

opts は {"in_type":"spot","out_type:"spot","skip_existing_stop_str":True } の形式をとります。

in_type と out_type について

以下のように挙動します。

in_type out_type Description
spot spot トークンを put メソッドに逐次送り、生成分のみ都度出力するモード。
spot full トークンを put メソッドに逐次送り、フルセンテンスを出力するモード。
full spot フルセンテンスを一度に put メソッドに送り、生成分のみ都度出力するモード。
full full フルセンテンスを一度に put メソッドに送り、フルセンテンスを出力するモード。

skip_existing_stop_str について

skip_existing_stop_str:True にした場合、
初回の put 時に指定した text に、停止文字列が含まれていた場合でも、そこで停止処理は発生させない。

サンプルその1

import sys

sys.path.append('../')

import time
from tokflow import SentenceStop

"""
ストリーミングされるセンテンスを指定した停止文字列 "<NL>" を検出した段階で停止させる
"""

FULL_STREAM_TEXTS = texts = [
    'は',  #
    'はい',  #
    'はい、',  #
    'はい、こちら',  #
    'はい、こちらを',  #
    'はい、こちらをお',  #
    'はい、こちらをお勧め',  #
    'はい、こちらをお勧めします',  #
    'はい、こちらをお勧めします。',  #
    'はい、こちらをお勧めします。<',  #
    'はい、こちらをお勧めします。<N',  #
    'はい、こちらをお勧めします。<NL',  #
    'はい、こちらをお勧めします。<NL>',  #
    'はい、こちらをお勧めします。<NL><',  #
    'はい、こちらをお勧めします。<NL><N',  #
    'はい、こちらをお勧めします。<NL><NL',  #
    'はい、こちらをお勧めします。<NL><NL>',  #
    'はい、こちらをお勧めします。<NL><NL>「',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は、',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は、最も有名な',  #
]

sens = SentenceStop(["<NL>"])

condition = {"in_type": "full", "out_type": "full"}

for input_token_base in FULL_STREAM_TEXTS:

    out = sens.put(input_token_base, condition)

    text = out.get("text")  # 出力すべきテキスト
    stop_str_found = out.get("stop_str_found")  # 停止文字列が検出された か否か
    possible = out.get("possible")  # 停止文字列を検出しかかっている か否か
    stop_str = out.get("stop_str")  # 停止文字列(複数の停止文字列を指定していた場合、どの停止文字列が検出されたのか)

    print(f"text:'{text}' possible:{possible} stop_str_found:{stop_str_found} stop_str:{stop_str}")
    if stop_str_found:
        # 停止文字列が検出された場合、処理を停止する
        break
    time.sleep(0.01)

if not stop_str_found:
    # 最後までいっても、停止文字列が検出されなかった場合
    # ペンディング中のテキストを出力しきる
    # (停止文字列が検出された場合は、停止文字列の直前までの文字列が出力されるためflushは不要)
    print(f"flush:{sens.flush(condition)}", end="", flush=True)

<NL> を検出した時点で、stop_str_found フラグが True となり、 はい、こちらをお勧めします。 で文章生成を停止することができる。

サンプルその2 停止文字列を含む場合

以下のように condition = {"in_type": "full", "out_type": "full", "skip_existing_stop_str": True}
"skip_existing_stop_str": True} を指定している場合、初回に入力するテキスト はい、<NL>こちらを には停止文字列
<NL> を含んでいるが、初回テキストにある<NL> は停止文字列と扱わずスキップする。

import sys

sys.path.append('../')

import time
from tokflow import SentenceStop

"""
既に停止文字列 "<NL>"が存在しているパートから開始された場合、
既存分はスキップして、次以降でストリーミングされるセンテンスを指定した停止文字列 "<NL>" を検出した段階で停止させる
"""

FULL_STREAM_TEXTS = texts = [
    'はい、<NL>こちらを',  #
    'はい、<NL>こちらをお',  #
    'はい、<NL>こちらをお勧',  #
    'はい、<NL>こちらをお勧め',  #
    'はい、<NL>こちらをお勧めし',  #
    'はい、<NL>こちらをお勧めしま',  #
    'はい、<NL>こちらをお勧めします',  #
    'はい、<NL>こちらをお勧めします。',  #
    'はい、<NL>こちらをお勧めします。<',  #
    'はい、<NL>こちらをお勧めします。<N',  #
    'はい、<NL>こちらをお勧めします。<NL',  #
    'はい、<NL>こちらをお勧めします。<NL>',  #
    'はい、<NL>こちらをお勧めします。<NL>「',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は、',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は、最も有名な',  #
]

sens = SentenceStop(["<NL>"])

condition = {"in_type": "full", "out_type": "full", "skip_existing_stop_str": True}

for input_token_base in FULL_STREAM_TEXTS:

    out = sens.put(input_token_base, condition)

    text = out.get("text")  # 出力すべきテキスト
    stop_str_found = out.get("stop_str_found")  # 停止文字列が検出された か否か
    possible = out.get("possible")  # 停止文字列を検出しかかっている か否か
    stop_str = out.get("stop_str")  # 停止文字列(複数の停止文字列を指定していた場合、どの停止文字列が検出されたのか)

    print(f"text:'{text}' possible:{possible} stop_str_found:{stop_str_found} stop_str:{stop_str}")
    if stop_str_found:
        # 停止文字列が検出された場合、処理を停止する
        break
    time.sleep(0.01)

if not stop_str_found:
    # 最後までいっても、停止文字列が検出されなかった場合
    # ペンディング中のテキストを出力しきる
    # (停止文字列が検出された場合は、停止文字列の直前までの文字列が出力されるためflushは不要)
    print(f"flush:{sens.flush(condition)}", end="", flush=True)

ストリーム置換処理について

逐次的に出現するトークン(文字列の断片)を順次読み込み、
読み込んだトークンは、これまで読み込んだトークンと結合します。

結合されたトークンのことをトークンバッファと呼びます。

この処理が順次行われる際に、トークンバッファ内にあらかじめ指定された文字列(以降、「検索対象文字列」と呼びます)が出現した場合、その文字列を別の文字列(以降、「置換先文字列」と呼びます)に置換します。

トークンは逐次的に読み込まれるため、途中で検索対象文字列とは無関係の文字列や検索対象文字列の一部がトークンバッファに蓄積されていきます。検索対象文字列になり得ない順序でトークンバッファが構成されたと判断された瞬間に、トークンバッファはメソッドの戻り値として返却されます。

一方で、検索対象文字列になり得る順序でトークンバッファが構成されている場合は、検索対象文字列が出現するか、検索対象文字列になりえないと判断されるまで、戻り値は空文字となります。

このような方法により、検索対象文字列が出現するまでバッファリングを行い、逐次トークンのほとんどをそのまま表示させることができます。必要に応じて置換を行い、表示を遅らせることが可能です。これによって、ストリーム処理を効率的に行うことができます。


Read more

【NPM】クラシックトークンが2025年12月9日に完全廃止されたことに伴うパッケージのインストールエラー(403)と対処法

【NPM】クラシックトークンが2025年12月9日に完全廃止されたことに伴うパッケージのインストールエラー(403)と対処法

こんにちは! 本日は2025年12月9日に行われた npm に関する重要なアップデートについて解説いたします! 2025年12月9日、npmがセキュリティ強化のためclassic tokenを完全に無効化しました。 この影響で、プライベートパッケージを使用しているプロジェクトで突然npm installが失敗するケースが発生しています。(パブリックパッケージの使用には影響はありません) 本記事では、実際に遭遇したエラーと解決方法についてみていきたいと思います。 発生した問題 症状 プライベートパッケージ(@your-org/package-name形式)を含むプロジェクトで npm install を実行すると、以下のようなエラーが発生 パターン1: 404エラー npm ERR! code E404 npm ERR! 404 Not Found - GET https://registry.npmjs.org/@your-org/package-name/... npm ERR! 404 '@your-org/package-name@x.x.

By Qualiteg プロダクト開発部
Anthropic Python SDKのcount_tokens機能が0.75.0~正式版に変わりました:移行ガイド

Anthropic Python SDKのcount_tokens機能が0.75.0~正式版に変わりました:移行ガイド

こんにちは! 本日は Anthropic Claude API を使用するのに便利な Anthropic Python SDK に関する話題です! 2週間ほど前にわりと大きな変更がありましたので、解説いたします。 はじめに 「あれ、client.count_tokens() が動かない...」 Anthropic Python SDKをアップデートしたら、今まで動いていたトークンカウントのコードがエラーになった。そんな経験をされたLLMエンジニアの方も多いのではないでしょうか。 当社のBestllamのように、LLM統合サービスを開発していると、実際にユーザーがどれほどのトークンを使用しているのかを正確に把握することは非常に重要になります。利用料金の計算、コンテキストウィンドウの管理、そしてユーザーへの使用量の可視化など、トークンカウント機能はサービスの根幹を支える機能です。そのため、この機能が突然動かなくなると影響は小さくありません。 ゆえに本番サービスを提供している場合、pip install で気軽にSDKバージョンを上げてはいけません。 さて、Anthropi

By Qualiteg プロダクト開発部
ログを ちょこっと grep するツール "ちょこぐれっぷ" つくりました

ログを ちょこっと grep するツール "ちょこぐれっぷ" つくりました

こんにちは! 今日はちょこっとしたツールをつくりました。 ログをちょこっとgrepするツールです。もちろん無料。 chocoGrep - ちょこっとgrep!ログフィルタツールちょこっとgrepするならchocoGrep!「error or warning」と書くだけの簡単or/and検索。AIエージェントに渡す前にログを最適化。正規表現不要、インストール不要。chocoGrepQualiteg Inc. Cursor、Devin、Claude Code、ChatGPT——AIコーディングエージェントにエラーログを渡してデバッグを手伝ってもらう。もう日常ですよね。 でも、 * ログを全部貼り付けたら、AIの応答がやたら遅い * 「トークン制限を超えました」と怒られる * 大量のログの中から、AIが的外れな部分に注目してしまう そこで、つくったちょこっとgrepするためのツールです 名付けて ちょこぐれっぷ!chogoGrep! chocoGrepって何? ブラウザで動く、ゆるいgrepツールです。 ログを貼り付けて、検索ワードを入れるだけ。インストール不要

By Qualiteg プロダクト開発部
GPUを使った分散処理で見落としがちなCPUボトルネックとtasksetによる解決法

GPUを使った分散処理で見落としがちなCPUボトルネックとtasksetによる解決法

こんにちは! 複数枚のGPUをつかった並列処理システムを設計しているときCPUについてはあまり考えないでシステムを設計してしまうことがあります。 「機械学習システムの主役はGPUなんだから、CPUなんて、あんまり気にしなくてよいのでは」 いいえ、そうでもないんです。 推論中のあるタイミングに急に動作が遅くなったりするときCPUが原因であることがけっこうあります。 概要(5分で分かる要点) 先日GPUを使った並列処理システムで、予期しないCPUボトルネックが発生し、パフォーマンスが大幅に低下する問題に遭遇しました。 複数のプロセスが異なるGPUを使用しているにも関わらず、処理が極端に遅くなる現象の原因は、処理パイプラインの一部に含まれるCPU集約的な計算処理でした。 問題の症状 * 単一プロセス実行時:正常な速度 * 複数プロセス並列実行時:処理時間が数倍に増加 * GPUリソースに競合なし(nvidia-smiで確認済み) 根本原因 処理パイプラインにGPUに適さないCPU集約的な計算(データ前処理、統計変換など)が含まれており、複数プロセスが同じCP

By Qualiteg プロダクト開発部