逐次生成されるトークンのバッファリング

逐次生成されるトークンのバッファリング

こんにちは! (株)Qualiteg プロダクト開発部 です!

今日は、推論シーンでよくある、トークン細切れ問題に対処する方法をご紹介します。

ストリーミングチャットで使用する逐次生成のとき、文章は1トークンずつ生成されますが、1トークンは”単語単位”でもなければ”1文字”単位でもなく、学習時使われていたトークナイザーの処理に依存します。

一般的には 形態素解析→サブワード→語彙リスト構築 を行いますが、このとき、後で文章生成するときに重要なタグ、たとえば "<NL>" というタグが重要な意味をもつにもかかわらず、細切れにされてトークナイズされてしまうことがあります。たとえば、 "<" "N" "L>" のように粉砕されてしまうようなパターンです。

(これを避ける方法はあるのですが、今回は、学習済のモデルをあからじめ与えられた状態でどうするか、を考えます)

こういうパターンが発生してしまったとき逐次生成で "<NL>" を認識しユーザーにチャットUIでレスポンスするときに "<" "N" という生成過程を見せないためのテクニックのご紹介となります。

最終的に以下のような文章が生成されるとき

"Hello there!<NL>my name is tokflow."

逐次生成だと以下のようなるとします
(実際はここまでひどいトークナイズはありえ無いですが)


"He"
"Hello"
"Hello "
"Hello t"
"Hello th"
"Hello there"
"Hello there!<"
"Hello there!<N"
"Hello there!<NL>m"
"Hello there!<NL>my "
"Hello there!<NL>my nam"
"Hello there!<NL>my name"
"Hello there!<NL>my name "
"Hello there!<NL>my name is"
"Hello there!<NL>my name is tokfl"
"Hello there!<NL>my name is tokflow."

ここで注目すべきポイントは "<NL>" が "<" "N" "L>m" と分割されてしまっている点です。

ユーザー側に生成結果をみせるとき "<NL>" は改行を意味する特殊タグだとすると、 "<" や "N" の時点では、まだ特殊タグなのか文章本体なのか見分けがつきません。そのため、ゆるい処理をしているストリーミングチャットUIだと、そのまま"<" や "N" を表示してしまいます。

これを避けるためには、 "<" や "N" の登場時点ではまだUIに出力せず、 "<NL>" まで確定したあと、それを改行として出力する、という処理が必要になります。
これを私たちはトークンのバッファリングと呼んでいます。

Qualiteg ではこの トークンのバッファリング 処理をしてくれる機能を Python ライブラリとしてリリースしていますので、ここでその使い方をご説明いたします。

当ライブラリは、 TokFlow と呼び、大規模言語モデルにより逐次的に生成されるトークンをバッファリングし、必要な置換処理を行いながら出力するユーティリティとなります。

下のように小さな かけら のようになった文章の断片=トークンを逐次入力したとき、置換が必要な文字列を順次置換しながら出力することができます。

このとき、トークンのバッファリングして遅延出力を行うことで、置換前のトークンは出力しないようにできます。

["He","llo"," ","t","h","ere","!<","N","L>m","y ","nam","e"," ","is"," tokfl","ow.","<","N","L>N","ice"," to ","me","et you."]

↑の例は、 <NL> の出現を検知し \n に置換しながら出力している様子です。

当ライブラリは、

  • 置換条件として、好きな文字列を置換対象に指定可能
  • 複数の置換条件を指定可能

です

TokFlow のインストール方法

pip install tokflow

使い方/サンプルコード

import time
from tokflow import TokFlow

TOKEN_GENERATOR_MOCK = ["He", "llo", " ", "t", "h", "ere", "!<", "N", "L>m", "y ", "nam", "e", " ", "is", " tokfl", "ow.",
                  "<", "N", "L>N", "ice", " to ", "me", "et you."]

# "<NL>" を "\n" に置換する。 "<NL>" は検索対象文字列。 "n" は置換先文字列
# 置換条件は複数指定可能。
tokf = TokFlow([("<NL>", "\n")])

for input_token in TOKEN_GENERATOR_MOCK: 
    # トークン(文章のかけらとなる1,2文字程度の文字列)を順次入力していく。トークンは内部でバッファリングされる。
    output_token = tokf.put(input_token)

    # output_token に今出力可能な出力トークンが出力される。
    # トークンのバッファリング中に検索対象文字列が出現する可能性がある場合は
    # 出力トーンは空文字となる。
    print(f"{output_token}", end="", flush=True)

    # 逐次生成されることを目視するため、ウェイトをはさむ
    time.sleep(0.3)


# トークンの入力が終わったら、最後に flush して残っているバッファを出力し切る
print(f"{tokf.flush()}", end="", flush=True)

生成オプション

put メソッドには put(text,opts) のように オプションパラメータ opts を指定することが可能です

opts は {"in_type":"spot","out_type:"spot" } のように入力の形式と出力の形式を指定することが可能です。

以下のように挙動します。

in_type out_type Description
spot spot トークンを put メソッドに逐次送り、生成分のみ都度出力するモード。
spot full トークンを put メソッドに逐次送り、フルセンテンスを出力するモード。
full spot フルセンテンスを一度に put メソッドに送り、生成分のみ都度出力するモード。
full full フルセンテンスを一度に put メソッドに送り、フルセンテンスを出力するモード。

注意点:

  • flush メソッドを呼び出す前に全ての文字列を put メソッドに送る必要があります。特に full モードでは、全ての入力文字列を一度に送ります。
  • 出力のタイプ (out_type) が full の場合、最終的な結果を取得するためには flush メソッドを呼び出す必要があります。
  • それぞれのモードで一貫性を保つためには、put メソッドの呼び出しパターンと flush メソッドの使用を適切に組み合わせることが重要です。

コード例

condition = {"in_type": "full", "out_type": "full"} のようにルールを指定し、 put および flush の引数に condition を指定します。

    tokf = TokFlow([("<NL>", "\n")])

    condition = {"in_type": "full", "out_type": "full"}
    prev_len = 0
    for input_token_base in get_example_texts():
        output_sentence = tokf.put(input_token_base, condition)

        print(f"output_sentence:{output_sentence}")

        if prev_len > len(output_sentence):
            raise ValueError("Length error")

        if "<NL>" in output_sentence:
            raise Exception("Failure Must be converted str found.")

        prev_len = len(output_sentence)

    output_sentence = tokf.flush(condition)

停止文字列を検出して文章生成を停止する SentenceStopクラス

SentenceStopクラスは、特定のキーワードを検出し、そのキーワードが見つかった時点でテキスト生成を停止するためのクラスです。テキストは1文字ずつ入力されるシチュエーションを想定しています。

主な機能

  • 特定のキーワードの検出: 文字列内の特定のキーワードを検出します。検出したキーワードは停止文字列として扱われます。
  • テキスト生成の停止: 検出した停止文字列の位置でテキスト生成を停止します。具体的には、停止文字列が検出された時点でのテキストを返します。
  • リアルタイム処理: 文字列が1文字ずつ入力されるシチュエーションを想定しており、リアルタイムでの処理が可能です。

使用方法

初期化時には停止するためのキーワードを指定します。その後、putメソッドで1文字ずつ入力を行い、停止文字列が見つかった場合にはその時点でのテキストを返します。全ての入力が終わった場合には、flushメソッドを用いて残りのテキストを出力します。

put メソッドには put(text,opts) のように オプションパラメータ opts を指定することが可能です

opts は {"in_type":"spot","out_type:"spot","skip_existing_stop_str":True } の形式をとります。

in_type と out_type について

以下のように挙動します。

in_type out_type Description
spot spot トークンを put メソッドに逐次送り、生成分のみ都度出力するモード。
spot full トークンを put メソッドに逐次送り、フルセンテンスを出力するモード。
full spot フルセンテンスを一度に put メソッドに送り、生成分のみ都度出力するモード。
full full フルセンテンスを一度に put メソッドに送り、フルセンテンスを出力するモード。

skip_existing_stop_str について

skip_existing_stop_str:True にした場合、
初回の put 時に指定した text に、停止文字列が含まれていた場合でも、そこで停止処理は発生させない。

サンプルその1

import sys

sys.path.append('../')

import time
from tokflow import SentenceStop

"""
ストリーミングされるセンテンスを指定した停止文字列 "<NL>" を検出した段階で停止させる
"""

FULL_STREAM_TEXTS = texts = [
    'は',  #
    'はい',  #
    'はい、',  #
    'はい、こちら',  #
    'はい、こちらを',  #
    'はい、こちらをお',  #
    'はい、こちらをお勧め',  #
    'はい、こちらをお勧めします',  #
    'はい、こちらをお勧めします。',  #
    'はい、こちらをお勧めします。<',  #
    'はい、こちらをお勧めします。<N',  #
    'はい、こちらをお勧めします。<NL',  #
    'はい、こちらをお勧めします。<NL>',  #
    'はい、こちらをお勧めします。<NL><',  #
    'はい、こちらをお勧めします。<NL><N',  #
    'はい、こちらをお勧めします。<NL><NL',  #
    'はい、こちらをお勧めします。<NL><NL>',  #
    'はい、こちらをお勧めします。<NL><NL>「',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は、',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は、最も有名な',  #
]

sens = SentenceStop(["<NL>"])

condition = {"in_type": "full", "out_type": "full"}

for input_token_base in FULL_STREAM_TEXTS:

    out = sens.put(input_token_base, condition)

    text = out.get("text")  # 出力すべきテキスト
    stop_str_found = out.get("stop_str_found")  # 停止文字列が検出された か否か
    possible = out.get("possible")  # 停止文字列を検出しかかっている か否か
    stop_str = out.get("stop_str")  # 停止文字列(複数の停止文字列を指定していた場合、どの停止文字列が検出されたのか)

    print(f"text:'{text}' possible:{possible} stop_str_found:{stop_str_found} stop_str:{stop_str}")
    if stop_str_found:
        # 停止文字列が検出された場合、処理を停止する
        break
    time.sleep(0.01)

if not stop_str_found:
    # 最後までいっても、停止文字列が検出されなかった場合
    # ペンディング中のテキストを出力しきる
    # (停止文字列が検出された場合は、停止文字列の直前までの文字列が出力されるためflushは不要)
    print(f"flush:{sens.flush(condition)}", end="", flush=True)

<NL> を検出した時点で、stop_str_found フラグが True となり、 はい、こちらをお勧めします。 で文章生成を停止することができる。

サンプルその2 停止文字列を含む場合

以下のように condition = {"in_type": "full", "out_type": "full", "skip_existing_stop_str": True}
"skip_existing_stop_str": True} を指定している場合、初回に入力するテキスト はい、<NL>こちらを には停止文字列
<NL> を含んでいるが、初回テキストにある<NL> は停止文字列と扱わずスキップする。

import sys

sys.path.append('../')

import time
from tokflow import SentenceStop

"""
既に停止文字列 "<NL>"が存在しているパートから開始された場合、
既存分はスキップして、次以降でストリーミングされるセンテンスを指定した停止文字列 "<NL>" を検出した段階で停止させる
"""

FULL_STREAM_TEXTS = texts = [
    'はい、<NL>こちらを',  #
    'はい、<NL>こちらをお',  #
    'はい、<NL>こちらをお勧',  #
    'はい、<NL>こちらをお勧め',  #
    'はい、<NL>こちらをお勧めし',  #
    'はい、<NL>こちらをお勧めしま',  #
    'はい、<NL>こちらをお勧めします',  #
    'はい、<NL>こちらをお勧めします。',  #
    'はい、<NL>こちらをお勧めします。<',  #
    'はい、<NL>こちらをお勧めします。<N',  #
    'はい、<NL>こちらをお勧めします。<NL',  #
    'はい、<NL>こちらをお勧めします。<NL>',  #
    'はい、<NL>こちらをお勧めします。<NL>「',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は、',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は、最も有名な',  #
]

sens = SentenceStop(["<NL>"])

condition = {"in_type": "full", "out_type": "full", "skip_existing_stop_str": True}

for input_token_base in FULL_STREAM_TEXTS:

    out = sens.put(input_token_base, condition)

    text = out.get("text")  # 出力すべきテキスト
    stop_str_found = out.get("stop_str_found")  # 停止文字列が検出された か否か
    possible = out.get("possible")  # 停止文字列を検出しかかっている か否か
    stop_str = out.get("stop_str")  # 停止文字列(複数の停止文字列を指定していた場合、どの停止文字列が検出されたのか)

    print(f"text:'{text}' possible:{possible} stop_str_found:{stop_str_found} stop_str:{stop_str}")
    if stop_str_found:
        # 停止文字列が検出された場合、処理を停止する
        break
    time.sleep(0.01)

if not stop_str_found:
    # 最後までいっても、停止文字列が検出されなかった場合
    # ペンディング中のテキストを出力しきる
    # (停止文字列が検出された場合は、停止文字列の直前までの文字列が出力されるためflushは不要)
    print(f"flush:{sens.flush(condition)}", end="", flush=True)

ストリーム置換処理について

逐次的に出現するトークン(文字列の断片)を順次読み込み、
読み込んだトークンは、これまで読み込んだトークンと結合します。

結合されたトークンのことをトークンバッファと呼びます。

この処理が順次行われる際に、トークンバッファ内にあらかじめ指定された文字列(以降、「検索対象文字列」と呼びます)が出現した場合、その文字列を別の文字列(以降、「置換先文字列」と呼びます)に置換します。

トークンは逐次的に読み込まれるため、途中で検索対象文字列とは無関係の文字列や検索対象文字列の一部がトークンバッファに蓄積されていきます。検索対象文字列になり得ない順序でトークンバッファが構成されたと判断された瞬間に、トークンバッファはメソッドの戻り値として返却されます。

一方で、検索対象文字列になり得る順序でトークンバッファが構成されている場合は、検索対象文字列が出現するか、検索対象文字列になりえないと判断されるまで、戻り値は空文字となります。

このような方法により、検索対象文字列が出現するまでバッファリングを行い、逐次トークンのほとんどをそのまま表示させることができます。必要に応じて置換を行い、表示を遅らせることが可能です。これによって、ストリーム処理を効率的に行うことができます。


Read more

【解説】Tekken トークナイザーとは何か? 〜 Mistral が採用する新世代トークナイザーの特徴

【解説】Tekken トークナイザーとは何か? 〜 Mistral が採用する新世代トークナイザーの特徴

こんにちは! 本日は、Tekkenについて解説いたします! 皆さま Tekken と聞いて何を思い浮かべますか? 格ゲーの鉄拳でしょうか? 私は、昔プレイした Age of Empires に登場する鉄剣戦士を思い浮かべました🤗 ちょっと古いかもしれませんが、名作です! さてつかみはこのくらいにして、、 LLMはご存じのとおり驚異的なスピードで進化しています。そんな中でひそかに注目されているのが、トークナイザーの改善です。 たとえば、Meta の Llama 系モデルのトークナイザーは Sentence Piece から BPE系へ進化するなど、LLM業界では従来よりも高効率なトークナイズ(テキスト分割)の方法を導入し始めています。 そして Mistral AI もまた、新たに「Tekken トークナイザー」という仕組みを採用し、大規模言語モデルの性能を底上げしています。 本記事では、Tekken トークナイザーの登場背景や技術的特徴、他のトークナイザーとの違い、さらには Mistral との関係などをわかりやすく解説していきます。 1. Tekken トーク

By Qualiteg プロダクト開発部
[AI新規事業創出]Qualitegオリジナル、アイディア評価、事業アイディア選定方法

[AI新規事業創出]Qualitegオリジナル、アイディア評価、事業アイディア選定方法

Qualiteg blogを訪問してくださった皆様、こんにちは。Micheleです。AIを活用した新規事業やマーケティングを手がけている私には、クライアントからよく寄せられる質問があります。AIを用いた事業展開を検討されている方々が共通して直面するであろう課題に対して、このブログを通じて私なりの解答をご提供したいと思います。 はじめに AI技術の急速な発展は、スタートアップから大企業まで、あらゆるビジネスに新たな可能性をもたらしています。クライアントとの会話の中でも、AIを活用した革新的な事業アイディアに関する相談が増えています。 しかし、多くの企業が「素晴らしいアイディアを思いついた!」と興奮しながらも、そのアイディアを具体化し、成功に導くための方法論に悩んでいるのも事実です。特にAIを用いた事業展開においては、従来のビジネスモデルとは異なる視点が必要となるため、その難しさはさらに増します。 本記事では、Qualitegオリジナルのアイディア評価、事業アイディア選定方法について解説します。特に、AIを用いた事業展開を検討されている方々が共通して直面するであろう課題に対して、

By Join us, Michele on Qualiteg's adventure to innovation
日本語対応!Mistral Small v3 解説

日本語対応!Mistral Small v3 解説

こんにちは! Mistral AIは2025年1月30日、新しい言語モデル「Mistral Small v3」を発表しました。このモデルは、24Bという比較的小規模なパラメータ数ながら、70B以上の大規模モデルに匹敵する性能を実現しています。また日本語対応も謳われており期待の高い小型モデルです! https://huggingface.co/mistralai/Mistral-Small-24B-Instruct-2501 動画 こちら本ブログの解説動画もご覧いただけます😊 きわだってるのは、レイテンシー最適化 Mistral Small 3のめだった特徴は、その処理性能とレイテンシーの絶妙なバランスではないでしょうか。 公開されている以下の性能評価のグラフによると、トークンあたり約11ミリ秒という業界最速レベルのレイテンシーを達成しています。これは、Qwen-2.5 32Bの約15ミリ秒やGemma-2 27Bの約14ミリ秒と比較して、明確な優位性を示しています。さらに注目すべきは、GPT-4o Miniと比較しても、より低いレイテンシーで同等以上の性能を実現し

By Qualiteg プロダクト開発部
[vLLM] To use CUDA with multiprocessing, you must use the 'spawn' start method の対処法

[vLLM] To use CUDA with multiprocessing, you must use the 'spawn' start method の対処法

WSLで vLLM を使用するとき、 tensor parallel を使って複数枚のGPUで1つのLLMをサーブしようとしたとき以下のようなエラーが発生しがちです RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method 遭遇するシーンとしてはvLLMの起動オプションに以下のようにテンソル並列化オプションを指定したときです。 --tensor-parallel-size 2 つまり、マルチプロセッシングでCUDA使うときは、 "fork"じゃなくて"spawn" 使ってね、というエラーです。 これを vLLM に教えるために、以下の2行目のように環境変数を設定してあげるとvLLMが "spawn" を使ってくれるようになります。 export

By Qualiteg プロダクト開発部