python - Pythonがサブプロセスと通信する

python streaming subprocess producer-consumer

私は常にデータを生成しているサブプロセスを持っていますが、ほとんどのデータは関心がありません。しかし、時々、ランダムなタイミングで、出力のサンプルを取得する必要があります。境界。たとえば、プロセスが毎秒100バイトの定数を生成し、有用な情報が100バイトのチャンクで提供されると仮定します。それが4秒間実行された後、100バイトの出力を表示するように依頼します。その後、400バイトから499バイトまでのバイトに関心があります。しかし、4.1秒で要求した場合、インターセプトしてバイト410-509を取得したくないので、待機してバイト500-599を確認する必要があります。それ以外の場合、プロセスは出力を/dev/nullにうまくストリーミングしているはずであり、出力ストリームをブロックしたくありません。友人のfredも、たとえば4.6秒で100バイトを要求する可能性があるので、そのデータをオフにして、複数のユーザーがデータを読み取れるようにする必要もあります。

この種の既存のデザインパターンはありますか? Pythonサブプロセスで実装して、サブプロセスとの通信が非ブロッキングであることを確認するにはどうすればよいですか?
答え
指定されたstdoutから常に100バイトのチャンクで読み取る必要がある場合があります。次に、コンシューマのリストがあります。これは、文字列/バイト文字列を取る関数として実装されている場合があります(2.xまたは3.xを使用しているかどうかによって異なります)。各チャンクはすべてのコンシューマに送信されてから破棄されます。

このようなもの:

def f_a(s): pass
def f_b(s): pass

consumers = [f_a, f_b]

while True:
    chunk = process.stdout.read(100)
    if chunk == '': break # or something like that
    for c in consumers: c(chunk)


これをスレッドで実行すると、必要に応じて、または必要に応じてコンシューマを変更できます。

ただし、コンシューマがブロックしないように注意する必要があります。そうしないと、ループがブロックします。 OSがサブプロセスとあなたの間に非常に大きなバッファを提供するので、それがあまり長く続かなくても問題ありません。しかし、それは無限ではありません。したがって、コンシューマごとまたはループ内でバッファリングを追加する必要があるかもしれません。
関連記事

python - ソラリス上のpython numpy、遅い、またはリンクされていませんか?

python - Google App EngineのSSLError(ローカル開発サーバー)

python - Django:STATIC_URLはappnameをURLに追加します

python - flickAPI:ユーザーアカウントのセットを一覧表示するにはどうすればよいですか?

python - Pythonの他の属性定義内からクラス属性にアクセスする

python - CherryPyでのユーザー管理

python - GStreamerとTheora

python - DjangoカスタムChangeUserForm設定is_active、is_staff、is_superuserをFalseに設定

python - StringToSignの作成

python - 'コマンドラインインテリセンス'(TAB TAB)をPythonスクリプトで動作させる方法は?