python - Pythonマルチプロセッシングmap_asyncがハングする

原文 python multiprocessing pool

パーサーでプロセスのプールを閉じるときに、[おそらく]問題が発生します。すべてのタスクが完了すると、ハングして何も実行されません。CPU使用率は約1%です。

profiles_pool = multiprocessing.Pool(processes=4)
pages_pool = multiprocessing.Pool(processes=4)

m = multiprocessing.Manager()
pages = m.list(['URL'])
pages_done = m.list()

while True:
    # grab all links
    res = pages_pool.imap_unordered(deco_process_people, pages, chunksize=1)

    pages_done += pages
    pages = []

    for new_users,new_pages in res:
        users.update(new_users)
        profile_tasks = [ (new_users[i]['link'],i) for i in new_users ]
        # enqueue grabbed links for parsing
        profiles_pool.map_async(deco_process_profiles, 
                                profile_tasks, chunksize=2, 
                                callback=profile_update_callback)
        # i dont need a result of map_async actually
        # callback will apply parsed data to users dict
        # users dict is an instance of Manager.dict()

        for p in new_pages:
            if p not in pages_done and p not in pages:
                pages.append(p)

    # we need more than 900 pages to be parsed for bug occurrence
    #if len(pages) == 0:
    if len(pages_done) > 900:
        break


# 
# closing other pools
#

# ---- the last printed string: 
print 'Closing profiles pool',
sys.stdout.flush()
profiles_pool.close()
profiles_pool.join()
print 'closed'


問題はプールキューでの誤ったオープンタスク計算にあると思いますが、私は自信がないのでこれをチェックできません-idkタスクキューの長さを取得する方法。

それは何で、どこを最初に見るべきですか?
答え
最も即座に明らかな問題は、pages_doneが同期されたManager.listオブジェクトであるため(各プロセスがそれにアトミックにアクセスできるため)、pagesがそのように開始されても、すぐに通常の(マルチ)処理されないリストになります。 :

pages_done += pages
pages = []


2番目に引用された行は、pagesを新しい空の通常リストに再バインドします。

(再バインド割り当てを行うのではなく)2行目のpagesのすべての要素を削除した場合でも、(たとえば)pagesにA、B、Cが含まれているレースに遭遇する可能性があります。 1行目は+=ですが、2行目でA、B、C、Dになりました。

簡単な修正は、アイテムを一度に1つずつpagesから取り出して、1つずつpages_doneに入れることです(あまり効率的ではありません)。ただし、これらをまったく共有データ構造にしない方がよい場合があります。引用符で囲まれたコードでは、必要なようには見えません(引用符で囲まれていないコードがそれに依存していると思います。それ以外の場合、pagesの再バインドはとにかく赤いニシンです!)。
関連記事

python - 圧縮された大きなcsvファイルから何百万ものレコードをmongoDBに効率的に挿入するにはどうすればよいですか?

python - Python Tkinterを使用してボタンに特定の.csvファイルを開く方法は?

python - Django:多対多をフォームに保存する

if-statement - チェックとポーリングのメカニズムのPython的な方法

python - このWhileステートメントの最後のインクリメンターが起動しないのはなぜですか

python - Numpy配列の割り当て

python - WAFを使用したQTプロジェクトの構築

c++ - qextserialportはWindowsでデータをドロップします—それに対して何ができますか?

python - PyQt4:QListWidget間でアイテムを移動する

.net - XMLファイルの重複する値を削除して最後の値を保持する方法