逐次生成されるトークンのバッファリング

逐次生成されるトークンのバッファリング

こんにちは! (株)Qualiteg プロダクト開発部 です!

今日は、推論シーンでよくある、トークン細切れ問題に対処する方法をご紹介します。

ストリーミングチャットで使用する逐次生成のとき、文章は1トークンずつ生成されますが、1トークンは”単語単位”でもなければ”1文字”単位でもなく、学習時使われていたトークナイザーの処理に依存します。

一般的には 形態素解析→サブワード→語彙リスト構築 を行いますが、このとき、後で文章生成するときに重要なタグ、たとえば "<NL>" というタグが重要な意味をもつにもかかわらず、細切れにされてトークナイズされてしまうことがあります。たとえば、 "<" "N" "L>" のように粉砕されてしまうようなパターンです。

(これを避ける方法はあるのですが、今回は、学習済のモデルをあからじめ与えられた状態でどうするか、を考えます)

こういうパターンが発生してしまったとき逐次生成で "<NL>" を認識しユーザーにチャットUIでレスポンスするときに "<" "N" という生成過程を見せないためのテクニックのご紹介となります。

最終的に以下のような文章が生成されるとき

"Hello there!<NL>my name is tokflow."

逐次生成だと以下のようなるとします
(実際はここまでひどいトークナイズはありえ無いですが)


"He"
"Hello"
"Hello "
"Hello t"
"Hello th"
"Hello there"
"Hello there!<"
"Hello there!<N"
"Hello there!<NL>m"
"Hello there!<NL>my "
"Hello there!<NL>my nam"
"Hello there!<NL>my name"
"Hello there!<NL>my name "
"Hello there!<NL>my name is"
"Hello there!<NL>my name is tokfl"
"Hello there!<NL>my name is tokflow."

ここで注目すべきポイントは "<NL>" が "<" "N" "L>m" と分割されてしまっている点です。

ユーザー側に生成結果をみせるとき "<NL>" は改行を意味する特殊タグだとすると、 "<" や "N" の時点では、まだ特殊タグなのか文章本体なのか見分けがつきません。そのため、ゆるい処理をしているストリーミングチャットUIだと、そのまま"<" や "N" を表示してしまいます。

これを避けるためには、 "<" や "N" の登場時点ではまだUIに出力せず、 "<NL>" まで確定したあと、それを改行として出力する、という処理が必要になります。
これを私たちはトークンのバッファリングと呼んでいます。

Qualiteg ではこの トークンのバッファリング 処理をしてくれる機能を Python ライブラリとしてリリースしていますので、ここでその使い方をご説明いたします。

当ライブラリは、 TokFlow と呼び、大規模言語モデルにより逐次的に生成されるトークンをバッファリングし、必要な置換処理を行いながら出力するユーティリティとなります。

下のように小さな かけら のようになった文章の断片=トークンを逐次入力したとき、置換が必要な文字列を順次置換しながら出力することができます。

このとき、トークンのバッファリングして遅延出力を行うことで、置換前のトークンは出力しないようにできます。

["He","llo"," ","t","h","ere","!<","N","L>m","y ","nam","e"," ","is"," tokfl","ow.","<","N","L>N","ice"," to ","me","et you."]

↑の例は、 <NL> の出現を検知し \n に置換しながら出力している様子です。

当ライブラリは、

  • 置換条件として、好きな文字列を置換対象に指定可能
  • 複数の置換条件を指定可能

です

TokFlow のインストール方法

pip install tokflow

使い方/サンプルコード

import time
from tokflow import TokFlow

TOKEN_GENERATOR_MOCK = ["He", "llo", " ", "t", "h", "ere", "!<", "N", "L>m", "y ", "nam", "e", " ", "is", " tokfl", "ow.",
                  "<", "N", "L>N", "ice", " to ", "me", "et you."]

# "<NL>" を "\n" に置換する。 "<NL>" は検索対象文字列。 "n" は置換先文字列
# 置換条件は複数指定可能。
tokf = TokFlow([("<NL>", "\n")])

for input_token in TOKEN_GENERATOR_MOCK: 
    # トークン(文章のかけらとなる1,2文字程度の文字列)を順次入力していく。トークンは内部でバッファリングされる。
    output_token = tokf.put(input_token)

    # output_token に今出力可能な出力トークンが出力される。
    # トークンのバッファリング中に検索対象文字列が出現する可能性がある場合は
    # 出力トーンは空文字となる。
    print(f"{output_token}", end="", flush=True)

    # 逐次生成されることを目視するため、ウェイトをはさむ
    time.sleep(0.3)


# トークンの入力が終わったら、最後に flush して残っているバッファを出力し切る
print(f"{tokf.flush()}", end="", flush=True)

生成オプション

put メソッドには put(text,opts) のように オプションパラメータ opts を指定することが可能です

opts は {"in_type":"spot","out_type:"spot" } のように入力の形式と出力の形式を指定することが可能です。

以下のように挙動します。

in_type out_type Description
spot spot トークンを put メソッドに逐次送り、生成分のみ都度出力するモード。
spot full トークンを put メソッドに逐次送り、フルセンテンスを出力するモード。
full spot フルセンテンスを一度に put メソッドに送り、生成分のみ都度出力するモード。
full full フルセンテンスを一度に put メソッドに送り、フルセンテンスを出力するモード。

注意点:

  • flush メソッドを呼び出す前に全ての文字列を put メソッドに送る必要があります。特に full モードでは、全ての入力文字列を一度に送ります。
  • 出力のタイプ (out_type) が full の場合、最終的な結果を取得するためには flush メソッドを呼び出す必要があります。
  • それぞれのモードで一貫性を保つためには、put メソッドの呼び出しパターンと flush メソッドの使用を適切に組み合わせることが重要です。

コード例

condition = {"in_type": "full", "out_type": "full"} のようにルールを指定し、 put および flush の引数に condition を指定します。

    tokf = TokFlow([("<NL>", "\n")])

    condition = {"in_type": "full", "out_type": "full"}
    prev_len = 0
    for input_token_base in get_example_texts():
        output_sentence = tokf.put(input_token_base, condition)

        print(f"output_sentence:{output_sentence}")

        if prev_len > len(output_sentence):
            raise ValueError("Length error")

        if "<NL>" in output_sentence:
            raise Exception("Failure Must be converted str found.")

        prev_len = len(output_sentence)

    output_sentence = tokf.flush(condition)

停止文字列を検出して文章生成を停止する SentenceStopクラス

SentenceStopクラスは、特定のキーワードを検出し、そのキーワードが見つかった時点でテキスト生成を停止するためのクラスです。テキストは1文字ずつ入力されるシチュエーションを想定しています。

主な機能

  • 特定のキーワードの検出: 文字列内の特定のキーワードを検出します。検出したキーワードは停止文字列として扱われます。
  • テキスト生成の停止: 検出した停止文字列の位置でテキスト生成を停止します。具体的には、停止文字列が検出された時点でのテキストを返します。
  • リアルタイム処理: 文字列が1文字ずつ入力されるシチュエーションを想定しており、リアルタイムでの処理が可能です。

使用方法

初期化時には停止するためのキーワードを指定します。その後、putメソッドで1文字ずつ入力を行い、停止文字列が見つかった場合にはその時点でのテキストを返します。全ての入力が終わった場合には、flushメソッドを用いて残りのテキストを出力します。

put メソッドには put(text,opts) のように オプションパラメータ opts を指定することが可能です

opts は {"in_type":"spot","out_type:"spot","skip_existing_stop_str":True } の形式をとります。

in_type と out_type について

以下のように挙動します。

in_type out_type Description
spot spot トークンを put メソッドに逐次送り、生成分のみ都度出力するモード。
spot full トークンを put メソッドに逐次送り、フルセンテンスを出力するモード。
full spot フルセンテンスを一度に put メソッドに送り、生成分のみ都度出力するモード。
full full フルセンテンスを一度に put メソッドに送り、フルセンテンスを出力するモード。

skip_existing_stop_str について

skip_existing_stop_str:True にした場合、
初回の put 時に指定した text に、停止文字列が含まれていた場合でも、そこで停止処理は発生させない。

サンプルその1

import sys

sys.path.append('../')

import time
from tokflow import SentenceStop

"""
ストリーミングされるセンテンスを指定した停止文字列 "<NL>" を検出した段階で停止させる
"""

FULL_STREAM_TEXTS = texts = [
    'は',  #
    'はい',  #
    'はい、',  #
    'はい、こちら',  #
    'はい、こちらを',  #
    'はい、こちらをお',  #
    'はい、こちらをお勧め',  #
    'はい、こちらをお勧めします',  #
    'はい、こちらをお勧めします。',  #
    'はい、こちらをお勧めします。<',  #
    'はい、こちらをお勧めします。<N',  #
    'はい、こちらをお勧めします。<NL',  #
    'はい、こちらをお勧めします。<NL>',  #
    'はい、こちらをお勧めします。<NL><',  #
    'はい、こちらをお勧めします。<NL><N',  #
    'はい、こちらをお勧めします。<NL><NL',  #
    'はい、こちらをお勧めします。<NL><NL>',  #
    'はい、こちらをお勧めします。<NL><NL>「',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は、',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は、最も有名な',  #
]

sens = SentenceStop(["<NL>"])

condition = {"in_type": "full", "out_type": "full"}

for input_token_base in FULL_STREAM_TEXTS:

    out = sens.put(input_token_base, condition)

    text = out.get("text")  # 出力すべきテキスト
    stop_str_found = out.get("stop_str_found")  # 停止文字列が検出された か否か
    possible = out.get("possible")  # 停止文字列を検出しかかっている か否か
    stop_str = out.get("stop_str")  # 停止文字列(複数の停止文字列を指定していた場合、どの停止文字列が検出されたのか)

    print(f"text:'{text}' possible:{possible} stop_str_found:{stop_str_found} stop_str:{stop_str}")
    if stop_str_found:
        # 停止文字列が検出された場合、処理を停止する
        break
    time.sleep(0.01)

if not stop_str_found:
    # 最後までいっても、停止文字列が検出されなかった場合
    # ペンディング中のテキストを出力しきる
    # (停止文字列が検出された場合は、停止文字列の直前までの文字列が出力されるためflushは不要)
    print(f"flush:{sens.flush(condition)}", end="", flush=True)

<NL> を検出した時点で、stop_str_found フラグが True となり、 はい、こちらをお勧めします。 で文章生成を停止することができる。

サンプルその2 停止文字列を含む場合

以下のように condition = {"in_type": "full", "out_type": "full", "skip_existing_stop_str": True}
"skip_existing_stop_str": True} を指定している場合、初回に入力するテキスト はい、<NL>こちらを には停止文字列
<NL> を含んでいるが、初回テキストにある<NL> は停止文字列と扱わずスキップする。

import sys

sys.path.append('../')

import time
from tokflow import SentenceStop

"""
既に停止文字列 "<NL>"が存在しているパートから開始された場合、
既存分はスキップして、次以降でストリーミングされるセンテンスを指定した停止文字列 "<NL>" を検出した段階で停止させる
"""

FULL_STREAM_TEXTS = texts = [
    'はい、<NL>こちらを',  #
    'はい、<NL>こちらをお',  #
    'はい、<NL>こちらをお勧',  #
    'はい、<NL>こちらをお勧め',  #
    'はい、<NL>こちらをお勧めし',  #
    'はい、<NL>こちらをお勧めしま',  #
    'はい、<NL>こちらをお勧めします',  #
    'はい、<NL>こちらをお勧めします。',  #
    'はい、<NL>こちらをお勧めします。<',  #
    'はい、<NL>こちらをお勧めします。<N',  #
    'はい、<NL>こちらをお勧めします。<NL',  #
    'はい、<NL>こちらをお勧めします。<NL>',  #
    'はい、<NL>こちらをお勧めします。<NL>「',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は、',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は、最も有名な',  #
]

sens = SentenceStop(["<NL>"])

condition = {"in_type": "full", "out_type": "full", "skip_existing_stop_str": True}

for input_token_base in FULL_STREAM_TEXTS:

    out = sens.put(input_token_base, condition)

    text = out.get("text")  # 出力すべきテキスト
    stop_str_found = out.get("stop_str_found")  # 停止文字列が検出された か否か
    possible = out.get("possible")  # 停止文字列を検出しかかっている か否か
    stop_str = out.get("stop_str")  # 停止文字列(複数の停止文字列を指定していた場合、どの停止文字列が検出されたのか)

    print(f"text:'{text}' possible:{possible} stop_str_found:{stop_str_found} stop_str:{stop_str}")
    if stop_str_found:
        # 停止文字列が検出された場合、処理を停止する
        break
    time.sleep(0.01)

if not stop_str_found:
    # 最後までいっても、停止文字列が検出されなかった場合
    # ペンディング中のテキストを出力しきる
    # (停止文字列が検出された場合は、停止文字列の直前までの文字列が出力されるためflushは不要)
    print(f"flush:{sens.flush(condition)}", end="", flush=True)

ストリーム置換処理について

逐次的に出現するトークン(文字列の断片)を順次読み込み、
読み込んだトークンは、これまで読み込んだトークンと結合します。

結合されたトークンのことをトークンバッファと呼びます。

この処理が順次行われる際に、トークンバッファ内にあらかじめ指定された文字列(以降、「検索対象文字列」と呼びます)が出現した場合、その文字列を別の文字列(以降、「置換先文字列」と呼びます)に置換します。

トークンは逐次的に読み込まれるため、途中で検索対象文字列とは無関係の文字列や検索対象文字列の一部がトークンバッファに蓄積されていきます。検索対象文字列になり得ない順序でトークンバッファが構成されたと判断された瞬間に、トークンバッファはメソッドの戻り値として返却されます。

一方で、検索対象文字列になり得る順序でトークンバッファが構成されている場合は、検索対象文字列が出現するか、検索対象文字列になりえないと判断されるまで、戻り値は空文字となります。

このような方法により、検索対象文字列が出現するまでバッファリングを行い、逐次トークンのほとんどをそのまま表示させることができます。必要に応じて置換を行い、表示を遅らせることが可能です。これによって、ストリーム処理を効率的に行うことができます。


Read more

Node.jsで大容量ファイルを扱う:AIモデルのような大きなデータ保存はストリーム処理使いましょう

Node.jsで大容量ファイルを扱う:AIモデルのような大きなデータ保存はストリーム処理使いましょう

こんにちは!今日はAIシステムのフロントサーバーとしてもよく使用するNode.jsについてのお話です。 AIモデルの普及に伴い、大容量のデータファイルを扱う機会が急増しています。LLMなどのモデルファイルやトレーニングデータセットは数GB、場合によっては数十、数百GBにも達することがあります。 一方、Node.jsはWebアプリケーションのフロントサーバーとして広く採用されており、データマネジメントやPythonで書かれたAIバックエンドとの橋渡し役としてもかなりお役立ちな存在です。 本記事では、Node.js v20LTSで5GB程度のファイルを処理しようとして遭遇した問題と、その解決方法について解説します。 Node.jsのバッファサイズ制限の変遷 Node.jsのバッファサイズ制限は、バージョンによって大きく変化してきました Node.jsバージョン サポート終了日 バッファサイズ上限 備考 Node.js 0.12.x 2016年12月31日 ~1GB 初期のバッファサイズ制限(smalloc.kMaxLength使用) Node.js 4.

By Qualiteg プロダクト開発部
AGI時代に向けたプログラマーの未来:役割変化とキャリア戦略

AGI時代に向けたプログラマーの未来:役割変化とキャリア戦略

はじめに 私がはじめてコードを書いたのは1989年です。 当時NECのPC88というパソコンを中古でかってもらい N-88 Basic というBASIC言語のコードをみようみまねで書いて動かしたあの日から何年経つのでしょうか。 当時、電波新聞社のマイコンBASICマガジンという雑誌があり、ベーマガにはいろんなパソコン向けのプログラムコードが掲載されていました。 そんなわけでもう35年以上趣味や仕事でプログラミングに従事していますが、開発環境、情報流通の仕組みには革命といっていいほどの変化、進化がおこりました。 しかしながら、そんな中でも、あくまでコードを書くのは「私」という生身の人間でした。 そうしたある種の古き良き時代は、いよいよ本格的に終わりを告げようとしています。 2023年ごろからのLLM技術の飛躍的進歩により、プログラミング業界は大きな転換期を迎えています。 特に、OpenAI o3,o1やClaude 3.5、Gemini2.0などの大規模言語モデル(LLM)の進化や、その先にある将来的な汎用人工知能(AGI)の出現は、プログラマーやAIエンジニアの役割に根

By Tomonori Misawa / CEO
PythonとWSL開発のトラブルシューティング: PyCharmとCondaの環境不一致問題

PythonとWSL開発のトラブルシューティング: PyCharmとCondaの環境不一致問題

こんにちは! 今回は、WSL上のConda環境をPyCharmから利用する際に発生した「同じ環境なのにパッケージリストが一致しない」という問題に遭遇したため、その原因と対策について書いてみたいとおもいます 問題の状況 開発の流れは以下のようなものでした 1. WSL環境でConda仮想環境を作成 2. その環境をPyCharmのプロジェクトインタプリタとして設定 3. 開発を進める中で奇妙な現象に気づく 具体的には、次のような不一致が発生していました * PyCharmのプロジェクト設定で表示されるpipパッケージのリスト * WSLでConda環境をアクティベートした後にpip listコマンドで表示されるパッケージのリスト これらが一致せず、「WSL側のシェルから直接インストールしたパッケージがPyCharmで認識されない」という問題が生じていました。 この手の問題でよくある原因は、PyCharm側がWSL側の更新を得るのに少し時間がかかったり、 Indexing が遅れているなどなのですが、今回はそれが原因ではありませんでした。 危険な「静かな

By Qualiteg プロダクト開発部
人気ゲーム「ヒット&ブロー」で学ぶ情報理論

人気ゲーム「ヒット&ブロー」で学ぶ情報理論

こんにちは! Qualiteg研究部です! 今日はAIにおいても非常に重要な情報理論について、Nintendo Switchの人気ゲーム「世界のアソビ大全51」にも収録されている「ヒット&ブロー」というゲームを題材に解説いたします! はじめに 論理的思考力を鍛える定番パズルゲームとして長年親しまれている「ヒット&ブロー」(海外では「Mastermind」として知られています)。 このゲームは一見シンプルながらも、その攻略には深い論理的アプローチが必要とされております。 本稿では、このゲームについて情報理論という数学的概念を用いてゲームの素性を分析する方法について掘り下げてみたいとおもいます。 さらに、この情報理論が現代の人工知能(AI)技術においてどのように活用されているかについても触れていきます。 ヒット&ブローのルール説明 ヒット&ブローは、相手が秘密に設定した色や数字の組み合わせを推測するゲームです。日本では主に数字を使った「数当てゲーム」として親しまれていますが、本記事では色を使ったバージョン(マスターマインド)に焦点を当てます。 Nintendo Sw

By Qualiteg 研究部