逐次生成されるトークンのバッファリング

逐次生成されるトークンのバッファリング

こんにちは! (株)Qualiteg プロダクト開発部 です!

今日は、推論シーンでよくある、トークン細切れ問題に対処する方法をご紹介します。

ストリーミングチャットで使用する逐次生成のとき、文章は1トークンずつ生成されますが、1トークンは”単語単位”でもなければ”1文字”単位でもなく、学習時使われていたトークナイザーの処理に依存します。

一般的には 形態素解析→サブワード→語彙リスト構築 を行いますが、このとき、後で文章生成するときに重要なタグ、たとえば "<NL>" というタグが重要な意味をもつにもかかわらず、細切れにされてトークナイズされてしまうことがあります。たとえば、 "<" "N" "L>" のように粉砕されてしまうようなパターンです。

(これを避ける方法はあるのですが、今回は、学習済のモデルをあからじめ与えられた状態でどうするか、を考えます)

こういうパターンが発生してしまったとき逐次生成で "<NL>" を認識しユーザーにチャットUIでレスポンスするときに "<" "N" という生成過程を見せないためのテクニックのご紹介となります。

最終的に以下のような文章が生成されるとき

"Hello there!<NL>my name is tokflow."

逐次生成だと以下のようなるとします
(実際はここまでひどいトークナイズはありえ無いですが)


"He"
"Hello"
"Hello "
"Hello t"
"Hello th"
"Hello there"
"Hello there!<"
"Hello there!<N"
"Hello there!<NL>m"
"Hello there!<NL>my "
"Hello there!<NL>my nam"
"Hello there!<NL>my name"
"Hello there!<NL>my name "
"Hello there!<NL>my name is"
"Hello there!<NL>my name is tokfl"
"Hello there!<NL>my name is tokflow."

ここで注目すべきポイントは "<NL>" が "<" "N" "L>m" と分割されてしまっている点です。

ユーザー側に生成結果をみせるとき "<NL>" は改行を意味する特殊タグだとすると、 "<" や "N" の時点では、まだ特殊タグなのか文章本体なのか見分けがつきません。そのため、ゆるい処理をしているストリーミングチャットUIだと、そのまま"<" や "N" を表示してしまいます。

これを避けるためには、 "<" や "N" の登場時点ではまだUIに出力せず、 "<NL>" まで確定したあと、それを改行として出力する、という処理が必要になります。
これを私たちはトークンのバッファリングと呼んでいます。

Qualiteg ではこの トークンのバッファリング 処理をしてくれる機能を Python ライブラリとしてリリースしていますので、ここでその使い方をご説明いたします。

当ライブラリは、 TokFlow と呼び、大規模言語モデルにより逐次的に生成されるトークンをバッファリングし、必要な置換処理を行いながら出力するユーティリティとなります。

下のように小さな かけら のようになった文章の断片=トークンを逐次入力したとき、置換が必要な文字列を順次置換しながら出力することができます。

このとき、トークンのバッファリングして遅延出力を行うことで、置換前のトークンは出力しないようにできます。

["He","llo"," ","t","h","ere","!<","N","L>m","y ","nam","e"," ","is"," tokfl","ow.","<","N","L>N","ice"," to ","me","et you."]

↑の例は、 <NL> の出現を検知し \n に置換しながら出力している様子です。

当ライブラリは、

  • 置換条件として、好きな文字列を置換対象に指定可能
  • 複数の置換条件を指定可能

です

TokFlow のインストール方法

pip install tokflow

使い方/サンプルコード

import time
from tokflow import TokFlow

TOKEN_GENERATOR_MOCK = ["He", "llo", " ", "t", "h", "ere", "!<", "N", "L>m", "y ", "nam", "e", " ", "is", " tokfl", "ow.",
                  "<", "N", "L>N", "ice", " to ", "me", "et you."]

# "<NL>" を "\n" に置換する。 "<NL>" は検索対象文字列。 "n" は置換先文字列
# 置換条件は複数指定可能。
tokf = TokFlow([("<NL>", "\n")])

for input_token in TOKEN_GENERATOR_MOCK: 
    # トークン(文章のかけらとなる1,2文字程度の文字列)を順次入力していく。トークンは内部でバッファリングされる。
    output_token = tokf.put(input_token)

    # output_token に今出力可能な出力トークンが出力される。
    # トークンのバッファリング中に検索対象文字列が出現する可能性がある場合は
    # 出力トーンは空文字となる。
    print(f"{output_token}", end="", flush=True)

    # 逐次生成されることを目視するため、ウェイトをはさむ
    time.sleep(0.3)


# トークンの入力が終わったら、最後に flush して残っているバッファを出力し切る
print(f"{tokf.flush()}", end="", flush=True)

生成オプション

put メソッドには put(text,opts) のように オプションパラメータ opts を指定することが可能です

opts は {"in_type":"spot","out_type:"spot" } のように入力の形式と出力の形式を指定することが可能です。

以下のように挙動します。

in_type out_type Description
spot spot トークンを put メソッドに逐次送り、生成分のみ都度出力するモード。
spot full トークンを put メソッドに逐次送り、フルセンテンスを出力するモード。
full spot フルセンテンスを一度に put メソッドに送り、生成分のみ都度出力するモード。
full full フルセンテンスを一度に put メソッドに送り、フルセンテンスを出力するモード。

注意点:

  • flush メソッドを呼び出す前に全ての文字列を put メソッドに送る必要があります。特に full モードでは、全ての入力文字列を一度に送ります。
  • 出力のタイプ (out_type) が full の場合、最終的な結果を取得するためには flush メソッドを呼び出す必要があります。
  • それぞれのモードで一貫性を保つためには、put メソッドの呼び出しパターンと flush メソッドの使用を適切に組み合わせることが重要です。

コード例

condition = {"in_type": "full", "out_type": "full"} のようにルールを指定し、 put および flush の引数に condition を指定します。

    tokf = TokFlow([("<NL>", "\n")])

    condition = {"in_type": "full", "out_type": "full"}
    prev_len = 0
    for input_token_base in get_example_texts():
        output_sentence = tokf.put(input_token_base, condition)

        print(f"output_sentence:{output_sentence}")

        if prev_len > len(output_sentence):
            raise ValueError("Length error")

        if "<NL>" in output_sentence:
            raise Exception("Failure Must be converted str found.")

        prev_len = len(output_sentence)

    output_sentence = tokf.flush(condition)

停止文字列を検出して文章生成を停止する SentenceStopクラス

SentenceStopクラスは、特定のキーワードを検出し、そのキーワードが見つかった時点でテキスト生成を停止するためのクラスです。テキストは1文字ずつ入力されるシチュエーションを想定しています。

主な機能

  • 特定のキーワードの検出: 文字列内の特定のキーワードを検出します。検出したキーワードは停止文字列として扱われます。
  • テキスト生成の停止: 検出した停止文字列の位置でテキスト生成を停止します。具体的には、停止文字列が検出された時点でのテキストを返します。
  • リアルタイム処理: 文字列が1文字ずつ入力されるシチュエーションを想定しており、リアルタイムでの処理が可能です。

使用方法

初期化時には停止するためのキーワードを指定します。その後、putメソッドで1文字ずつ入力を行い、停止文字列が見つかった場合にはその時点でのテキストを返します。全ての入力が終わった場合には、flushメソッドを用いて残りのテキストを出力します。

put メソッドには put(text,opts) のように オプションパラメータ opts を指定することが可能です

opts は {"in_type":"spot","out_type:"spot","skip_existing_stop_str":True } の形式をとります。

in_type と out_type について

以下のように挙動します。

in_type out_type Description
spot spot トークンを put メソッドに逐次送り、生成分のみ都度出力するモード。
spot full トークンを put メソッドに逐次送り、フルセンテンスを出力するモード。
full spot フルセンテンスを一度に put メソッドに送り、生成分のみ都度出力するモード。
full full フルセンテンスを一度に put メソッドに送り、フルセンテンスを出力するモード。

skip_existing_stop_str について

skip_existing_stop_str:True にした場合、
初回の put 時に指定した text に、停止文字列が含まれていた場合でも、そこで停止処理は発生させない。

サンプルその1

import sys

sys.path.append('../')

import time
from tokflow import SentenceStop

"""
ストリーミングされるセンテンスを指定した停止文字列 "<NL>" を検出した段階で停止させる
"""

FULL_STREAM_TEXTS = texts = [
    'は',  #
    'はい',  #
    'はい、',  #
    'はい、こちら',  #
    'はい、こちらを',  #
    'はい、こちらをお',  #
    'はい、こちらをお勧め',  #
    'はい、こちらをお勧めします',  #
    'はい、こちらをお勧めします。',  #
    'はい、こちらをお勧めします。<',  #
    'はい、こちらをお勧めします。<N',  #
    'はい、こちらをお勧めします。<NL',  #
    'はい、こちらをお勧めします。<NL>',  #
    'はい、こちらをお勧めします。<NL><',  #
    'はい、こちらをお勧めします。<NL><N',  #
    'はい、こちらをお勧めします。<NL><NL',  #
    'はい、こちらをお勧めします。<NL><NL>',  #
    'はい、こちらをお勧めします。<NL><NL>「',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は、',  #
    'はい、こちらをお勧めします。<NL><NL>「ハチ公像」は、最も有名な',  #
]

sens = SentenceStop(["<NL>"])

condition = {"in_type": "full", "out_type": "full"}

for input_token_base in FULL_STREAM_TEXTS:

    out = sens.put(input_token_base, condition)

    text = out.get("text")  # 出力すべきテキスト
    stop_str_found = out.get("stop_str_found")  # 停止文字列が検出された か否か
    possible = out.get("possible")  # 停止文字列を検出しかかっている か否か
    stop_str = out.get("stop_str")  # 停止文字列(複数の停止文字列を指定していた場合、どの停止文字列が検出されたのか)

    print(f"text:'{text}' possible:{possible} stop_str_found:{stop_str_found} stop_str:{stop_str}")
    if stop_str_found:
        # 停止文字列が検出された場合、処理を停止する
        break
    time.sleep(0.01)

if not stop_str_found:
    # 最後までいっても、停止文字列が検出されなかった場合
    # ペンディング中のテキストを出力しきる
    # (停止文字列が検出された場合は、停止文字列の直前までの文字列が出力されるためflushは不要)
    print(f"flush:{sens.flush(condition)}", end="", flush=True)

<NL> を検出した時点で、stop_str_found フラグが True となり、 はい、こちらをお勧めします。 で文章生成を停止することができる。

サンプルその2 停止文字列を含む場合

以下のように condition = {"in_type": "full", "out_type": "full", "skip_existing_stop_str": True}
"skip_existing_stop_str": True} を指定している場合、初回に入力するテキスト はい、<NL>こちらを には停止文字列
<NL> を含んでいるが、初回テキストにある<NL> は停止文字列と扱わずスキップする。

import sys

sys.path.append('../')

import time
from tokflow import SentenceStop

"""
既に停止文字列 "<NL>"が存在しているパートから開始された場合、
既存分はスキップして、次以降でストリーミングされるセンテンスを指定した停止文字列 "<NL>" を検出した段階で停止させる
"""

FULL_STREAM_TEXTS = texts = [
    'はい、<NL>こちらを',  #
    'はい、<NL>こちらをお',  #
    'はい、<NL>こちらをお勧',  #
    'はい、<NL>こちらをお勧め',  #
    'はい、<NL>こちらをお勧めし',  #
    'はい、<NL>こちらをお勧めしま',  #
    'はい、<NL>こちらをお勧めします',  #
    'はい、<NL>こちらをお勧めします。',  #
    'はい、<NL>こちらをお勧めします。<',  #
    'はい、<NL>こちらをお勧めします。<N',  #
    'はい、<NL>こちらをお勧めします。<NL',  #
    'はい、<NL>こちらをお勧めします。<NL>',  #
    'はい、<NL>こちらをお勧めします。<NL>「',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は、',  #
    'はい、<NL>こちらをお勧めします。<NL>「ハチ公像」は、最も有名な',  #
]

sens = SentenceStop(["<NL>"])

condition = {"in_type": "full", "out_type": "full", "skip_existing_stop_str": True}

for input_token_base in FULL_STREAM_TEXTS:

    out = sens.put(input_token_base, condition)

    text = out.get("text")  # 出力すべきテキスト
    stop_str_found = out.get("stop_str_found")  # 停止文字列が検出された か否か
    possible = out.get("possible")  # 停止文字列を検出しかかっている か否か
    stop_str = out.get("stop_str")  # 停止文字列(複数の停止文字列を指定していた場合、どの停止文字列が検出されたのか)

    print(f"text:'{text}' possible:{possible} stop_str_found:{stop_str_found} stop_str:{stop_str}")
    if stop_str_found:
        # 停止文字列が検出された場合、処理を停止する
        break
    time.sleep(0.01)

if not stop_str_found:
    # 最後までいっても、停止文字列が検出されなかった場合
    # ペンディング中のテキストを出力しきる
    # (停止文字列が検出された場合は、停止文字列の直前までの文字列が出力されるためflushは不要)
    print(f"flush:{sens.flush(condition)}", end="", flush=True)

ストリーム置換処理について

逐次的に出現するトークン(文字列の断片)を順次読み込み、
読み込んだトークンは、これまで読み込んだトークンと結合します。

結合されたトークンのことをトークンバッファと呼びます。

この処理が順次行われる際に、トークンバッファ内にあらかじめ指定された文字列(以降、「検索対象文字列」と呼びます)が出現した場合、その文字列を別の文字列(以降、「置換先文字列」と呼びます)に置換します。

トークンは逐次的に読み込まれるため、途中で検索対象文字列とは無関係の文字列や検索対象文字列の一部がトークンバッファに蓄積されていきます。検索対象文字列になり得ない順序でトークンバッファが構成されたと判断された瞬間に、トークンバッファはメソッドの戻り値として返却されます。

一方で、検索対象文字列になり得る順序でトークンバッファが構成されている場合は、検索対象文字列が出現するか、検索対象文字列になりえないと判断されるまで、戻り値は空文字となります。

このような方法により、検索対象文字列が出現するまでバッファリングを行い、逐次トークンのほとんどをそのまま表示させることができます。必要に応じて置換を行い、表示を遅らせることが可能です。これによって、ストリーム処理を効率的に行うことができます。


Read more

大企業のAIセキュリティを支える基盤技術 - 今こそ理解するActive Directory 第2回 ドメイン環境の構築

大企業のAIセキュリティを支える基盤技術 - 今こそ理解するActive Directory 第2回 ドメイン環境の構築

こんにちは、今回はシリーズ第2回ドメイン環境の構築 - 検証環境の構築手順について解説いたします! 連載の構成 第1章:基本概念の理解 - Active DirectoryとKerberos/NTLM認証の基礎 【★今回です★】第2章:ドメイン環境の構築 - 検証環境の構築手順 第3章:クライアントとサーバーのドメイン参加 - ドメイン参加の詳細手順 第4章:プロキシサーバーと統合Windows認証 第5章:ブラウザ設定と認証 - 各ブラウザでの設定方法 第6章:トラブルシューティング - よくある問題と解決方法 第7章:セキュリティとベストプラクティス - 本番環境での考慮事項 第8章:実践的な構成例 - AIセキュリティツールとの統合事例 第2章:ドメイン環境の構築 2.1 ドメイン名の設計 2.1.1 ドメイン名の命名規則 Active Directoryを構築する際、

By Qualiteg コンサルティング
AIがよく間違える「クロージャ問題」の本質と対策

AIがよく間違える「クロージャ問題」の本質と対策

こんにちは! 本日は「クロージャ問題」に関する話題となります。 Pythonでループ内に関数を定義したことはありますか? もしあるなら、あれれ?な挙動に遭遇したことがあるかもしれません。 本稿では、Pythonプログラマーなら一度は経験する「クロージャ問題」について、初心者にもわかりやすく解説してみたいとおもいます クロージャとは何か? そもそも ”クロージャ” とは何でしょうか。 クロージャ(closure)とは、関数が自分の定義されたスコープの変数を覚えて持ち運ぶ仕組み のことです。 もう少し分解すると、次の2つがポイントとなります 1. 内側の関数が、外側の関数の変数を使える 2. 外側の関数が終了しても、その変数は生き続ける 普通の関数とクロージャ―を使った関数を比較してみましょう 普通の関数との比較 まずは普通の関数から、 def add(x, y): return x + y print(add(3, 5)) # 8 print(add(3, 7)

By Qualiteg プロダクト開発部
フリーランスHub様にQualiteg Blogをご紹介いただきました

フリーランスHub様にQualiteg Blogをご紹介いただきました

この度、フリーランス向け案件検索サービス「フリーランスHub」様の特集記事「トレンドをキャッチアップ!AIに関する情報が得られるメディア・ブログまとめ」にて、弊社が運営する「Qualiteg Blog」をご紹介いただきました。 掲載記事について フリーランスHub様の記事では、AI技術の最前線で活躍するエンジニアや開発者の方々に向けて、価値ある情報源となるメディア・ブログが厳選して紹介されています。 その中で、Qualiteg Blogを「AI技術の専門知識を実践的なビジネス活用につなげる貴重な情報源」として取り上げていただきました。 特に以下の点を評価いただいております * 実践的なビジネス活用事例の提供 AI新規事業創出や事業選定方法など、経営者やビジネスリーダーが直面する課題への具体的な解決策 * 技術的な深掘りコンテンツ リップシンク技術など、実際のサービスで使用されている技術の開発現場目線での詳細な解説 * 多様な情報発信 代表執筆記事、AIトピックス、講演会動画など、幅広いフォーマットでの情報提供 今後も価値ある情報発

By Qualiteg ニュース
PyTorchの重いCUDA処理を非同期化したらメモリリークした話と、その解決策

PyTorchの重いCUDA処理を非同期化したらメモリリークした話と、その解決策

こんにちは!Qualitegプロダクト開発部です! 今回は同期メソッドを非同期メソッド(async)化しただけなのに、思わぬメモリリーク※に見舞われたお話です。 深層学習モデルを使った動画処理システムを開発していた時のことです。 「処理の進捗をリアルタイムでWebSocketで通知したい」という要件があり、「単にasync/awaitを使えばいいだけでしょ?」と軽く考えていたら、思わぬ落とし穴にはまりました。 プロ仕様のGPUを使っていたにも関わらず、メモリ不足でクラッシュしてしまいました。 この記事では、その原因と解決策、そして学んだ教訓を詳しく共有したいと思います。同じような問題に直面している方の参考になれば幸いです。 ※ 厳密には「メモリリーク」ではなく「メモリの解放遅延」ですが、 実用上の影響は同じなので、この記事では便宜上「メモリリーク」と表現します。 背景:なぜ進捗通知は非同期である必要があるのか モダンなWebアプリケーションの要求 最近のWebアプリケーション開発では、ユーザー体験を向上させるため、長時間かかる処理の進捗をリアルタイムで表示することが

By Qualiteg プロダクト開発部