Pythonmultiprocessing.poolとクラスの目的関数およびニューロエボリューションとの相互作用

3
aadharna 2019-12-22 03:09.

警告、私はできるだけ具体的にしたいので、これは長くなるでしょう。


正確な問題:これはマルチプロセッシングの問題です。私のクラスはすべて、以前の実験で構築/期待どおりに動作することを確認しました。

編集:事前にスレッドを言った。


スレッド環境で問題のおもちゃの例を実行すると、すべてが動作します。ただし、実際の問題に移行すると、コードが壊れます。具体的には、TypeError: can't pickle _thread.lock objectsエラーが発生します。フルスタックは一番下にあります。

ここでの私のスレッド化のニーズは、コードを適応させた例とは少し異なります- https://github.com/CMA-ES/pycma/issues/31。この例では、評価ごとに個別に呼び出すことができる1つの適応度関数があり、関数呼び出しはいずれも相互作用できません。しかし、私の本当の問題では、遺伝的アルゴリズムを使用してニューラルネットワークの重みを最適化しようとしています。GAは潜在的な重みを提案するため、環境内でこれらのNNコントローラーの重みを評価する必要があります。シングルスレッドの場合、単純なforループで重みを評価し[nn.evaluate(weights) for weights in potential_candidates]、最もパフォーマンスの高い個体を見つけて、次の突然変異ラウンドでそれらの重みを使用する環境を1つだけ持つことができます。ただし、スレッド環境で1つのシミュレーションを単純に行うことはできません。

したがって、評価する単一の関数を渡す代わりに、関数のリストを渡します(環境は同じですが、通信ストリームが個人間で相互作用しないようにプロセスをフォークしました。 )

すぐに注意すべきもう1つのこと:私はきちんとしたからの並列ビルド評価データ構造を使用しています

neat.parallelからインポートParallelEvaluator#はmultiprocessing.Poolを使用します

おもちゃのサンプルコード:

NPARAMS = nn.flat_init_weights.shape[0]    # make this a 1000-dimensional problem.
NPOPULATION = 5                            # use population size of 5.
MAX_ITERATION = 100                        # run each solver for 100 function calls.

import time
from neat.parallel import ParallelEvaluator  # uses multiprocessing.Pool
import cma

def fitness(x):
    time.sleep(0.1)
    return sum(x**2)

# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
#     return [f(x, *args) for x in X]

# parallel evaluation of all solutions
def _evaluate2(self, weights, *args):
    """redefine evaluate without the dependencies on neat-internal data structures
    """
    jobs = []
    for i, w in enumerate(weights):
        jobs.append(self.pool.apply_async(self.eval_function[i], (w, ) + args))

    return [job.get() for job in jobs]

ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [fitness]*NPOPULATION)

# time both
for eval_all in [parallel_eval.evaluate2]:
    es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION, 
                                                     'popsize': NPOPULATION})
    es.disp_annotation()
    while not es.stop():
        X = es.ask()
        es.tell(X, eval_all(X))
    es.disp()

必要な背景:

おもちゃの例から実際のコードに切り替えると、上記は失敗します。

私のクラスは次のとおりです。

LevelGenerator (simple GA class that implements mutate, etc)
GridGame (OpenAI wrapper; launches a Java server in which to run the simulation; 
          handles all communication between the Agent and the environment)
Agent    (neural-network class, has an evaluate fn which uses the NN to play a single rollout)
Objective (handles serializing/de-serializing weights: numpy <--> torch; launching the evaluate function)

# The classes get composed to get the necessary behavior:
env   = GridGame(Generator)
agent = NNAgent(env)                # NNAgent is a subclass of (Random) Agent)
obj   = PyTorchObjective(agent)

# My code normally all interacts like this in the single-threaded case:

def test_solver(solver): # Solver: CMA-ES, Differential Evolution, EvolutionStrategy, etc
    history = []
    for j in range(MAX_ITERATION):
        solutions = solver.ask() #2d-numpy array. (POPSIZE x NPARAMS)
        fitness_list = np.zeros(solver.popsize)
        for i in range(solver.popsize):
            fitness_list[i] = obj.function(solutions[i], len(solutions[i]))
        solver.tell(fitness_list)
        result = solver.result() # first element is the best solution, second element is the best fitness
        history.append(result[1])

        scores[j] = fitness_list

    return history, result

だから、私が実行しようとすると:

NPARAMS = nn.flat_init_weights.shape[0]        
NPOPULATION = 5                                
MAX_ITERATION = 100                            

_x = NNAgent(GridGame(Generator))

gyms = [_x.mutate(0.0) for _ in range(NPOPULATION)]
objs = [PyTorchObjective(a) for a in gyms]

def evaluate(objective, weights):
    return objective.fun(weights, len(weights))

import time
from neat.parallel import ParallelEvaluator  # uses multiprocessing.Pool
import cma

def fitness(agent):
    return agent.evalute()

# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
#     return [f(x, *args) for x in X]

# parallel evaluation of all solutions
def _evaluate2(self, X, *args):
    """redefine evaluate without the dependencies on neat-internal data structures
    """
    jobs = []
    for i, x in enumerate(X):
        jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))

    return [job.get() for job in jobs]

ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [obj.fun for obj in objs])
# obj.fun takes in the candidate weights, loads them into the NN, and then evaluates the NN in the environment.

# time both
for eval_all in [parallel_eval.evaluate2]:
    es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION, 
                                                     'popsize': NPOPULATION})
    es.disp_annotation()
    while not es.stop():
        X = es.ask()
        es.tell(X, eval_all(X, NPARAMS))
    es.disp()

次のエラーが発生します。

TypeError                            Traceback (most recent call last)
<ipython-input-57-3e6b7bf6f83a> in <module>
      6     while not es.stop():
      7         X = es.ask()
----> 8         es.tell(X, eval_all(X, NPARAMS))
      9     es.disp()

<ipython-input-55-2182743d6306> in _evaluate2(self, X, *args)
     14         jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
     15 
---> 16     return [job.get() for job in jobs]

<ipython-input-55-2182743d6306> in <listcomp>(.0)
     14         jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
     15 
---> 16     return [job.get() for job in jobs]

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    429                         break
    430                     try:
--> 431                         put(task)
    432                     except Exception as e:
    433                         job, idx = task[:2]

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

TypeError: can't pickle _thread.lock objects

また、これはクラス関数であるという事実が原因である可能性があることもここで読みました-TypeError:_thread.lockオブジェクトをピクルできません-グローバルスコープの適応度関数を作成しましたdef fitness(agent): return agent.evalute()が、それも機能しませんでした。

このエラーは、元々PyTorchObjectiveクラスのevaluate関数をラムダ関数として持っていたことが原因である可能性があると思いましたが、変更するとまだ壊れていました。

どんな洞察も大歓迎です、そしてこの巨大なテキストの壁を読んでくれてありがとう。

1 answers

3
ivan_pozdeev 2019-12-22 11:55.

複数のスレッドを使用していません。複数のプロセスを使用しています。

apply_async関数自体を含め、渡すすべての引数は、内部でシリアル化(ピクルス化)され、IPCチャネルを介してワーカープロセスに渡されます(詳細については、multiprocessingドキュメントを参照してください)。したがって、本質的にプロセスローカルであるものに関連付けられているエンティティを渡すことはできません。アトミック操作を実行するにはロックを使用する必要があるため、これにはほとんどの同期プリミティブが含まれます。

これが発生するたびに(このエラーメッセージに関する他の多くの質問が示すように)、賢くなりすぎて、並列化ロジックがすでに組み込まれているオブジェクトを並列化フレームワークに渡そうとしている可能性があります。


このような「並列化されたオブジェクト」を使用して「複数レベルの並列化」を作成する場合は、次のいずれかを使用することをお勧めします。

  • そのオブジェクトの並列化メカニズムを適切に使用し、複数のレベルを気にしないでください。とにかくコアがある以上のことを一度に行うことはできません。または
  • ワーカープロセス内でこれらの「並列化されたオブジェクト」を作成して使用する
    • ただしmultiprocessing、ワーカープロセスが独自のプールを生成することは意図的に禁止されているため、ここで制限に達する可能性があります。
      • ワーカーに作業キューにアイテムを追加させることができますが、Queue制限に達する可能性もあります。
    • したがって、このようなシナリオでは、より高度なサードパーティの分散ワークキューソリューションが望ましい場合があります。

Related questions

MORE COOL STUFF

「水曜日」シーズン1の中心には大きなミステリーがあります

「水曜日」シーズン1の中心には大きなミステリーがあります

Netflixの「水曜日」は、典型的な10代のドラマ以上のものであり、実際、シーズン1にはその中心に大きなミステリーがあります.

ボディーランゲージの専門家は、州訪問中にカミラ・パーカー・ボウルズが輝くことを可能にした微妙なケイト・ミドルトンの動きを指摘しています

ボディーランゲージの専門家は、州訪問中にカミラ・パーカー・ボウルズが輝くことを可能にした微妙なケイト・ミドルトンの動きを指摘しています

ケイト・ミドルトンは、州の夕食会と州の訪問中にカミラ・パーカー・ボウルズからスポットライトを奪いたくなかった、と専門家は言う.

一部のファンがハリー・スタイルズとオリビア・ワイルドの「非常に友好的な」休憩が永続的であることを望んでいる理由

一部のファンがハリー・スタイルズとオリビア・ワイルドの「非常に友好的な」休憩が永続的であることを望んでいる理由

一部のファンが、オリビア・ワイルドが彼女とハリー・スタイルズとの間の「難しい」が「非常に友好的」な分割を恒久的にすることを望んでいる理由を見つけてください.

エリザベス女王の死後、ケイト・ミドルトンはまだ「非常に困難な時期」を過ごしている、と王室の専門家が明らかにする 

エリザベス女王の死後、ケイト・ミドルトンはまだ「非常に困難な時期」を過ごしている、と王室の専門家が明らかにする&nbsp;

エリザベス女王の死後、ケイト・ミドルトンが舞台裏で「非常に困難な時期」を過ごしていたと伝えられている理由を調べてください.

セントヘレナのジェイコブのはしごを登るのは、気弱な人向けではありません

セントヘレナのジェイコブのはしごを登るのは、気弱な人向けではありません

セント ヘレナ島のジェイコブズ ラダーは 699 段の真っ直ぐ上る階段で、頂上に到達すると証明書が発行されるほどの難易度です。

The Secrets of Airline Travel Quiz

The Secrets of Airline Travel Quiz

Air travel is far more than getting from point A to point B safely. How much do you know about the million little details that go into flying on airplanes?

Where in the World Are You? Take our GeoGuesser Quiz

Where in the World Are You? Take our GeoGuesser Quiz

The world is a huge place, yet some GeoGuessr players know locations in mere seconds. Are you one of GeoGuessr's gifted elite? Take our quiz to find out!

バイオニック読書はあなたをより速く読むことができますか?

バイオニック読書はあなたをより速く読むことができますか?

BionicReadingアプリの人気が爆発的に高まっています。しかし、それは本当にあなたを速読術にすることができますか?

独自のGoogleHomeジュークボックスを作成する方法

独自のGoogleHomeジュークボックスを作成する方法

スマートホームサウンドシステムをワンランク上に上げたいとお考えの場合は、これが楽しい方法です。少し余分な作業といくつかのRFIDテクノロジーを使用して、次のパーティーで感動すること間違いなしの独自のカード駆動ジュークボックスを構築できます。

はい、私たちの生活のための行進は黒人についてでした、そしてそれは時間についてです

はい、私たちの生活のための行進は黒人についてでした、そしてそれは時間についてです

コモンとアンドラデイは、2018年3月24日、ワシントンDCで開催されるマーチフォーアワーライフズラリーで、シェハン枢機卿学校合唱団のメンバーと「スタンドアップフォーサムシング」を行います。

テスラモデル3について何を知りたいですか?

テスラモデル3について何を知りたいですか?

テスラモデル3は未来的であるだけでなく、驚くほど普通に感じます。しかし、実際には何が正常ですか?前回はモデル3を数時間しか運転で​​きませんでしたが、今回は週末を過ごしました。

今週のビジネス:スイッチポートの本当の問題

今週のビジネス:スイッチポートの本当の問題

見積もり| 「スイッチはPS4やXboxOneほど強力ではありません。誰もがそれを知っています。

ケイト・ミドルトンとウィリアム王子は、彼らが子供たちと行っているスパイをテーマにした活動を共有しています

ケイト・ミドルトンとウィリアム王子は、彼らが子供たちと行っているスパイをテーマにした活動を共有しています

ケイト・ミドルトンとウィリアム王子は、子供向けのパズルの本の序文を書き、ジョージ王子、シャーロット王女、ルイ王子と一緒にテキストを読むと述べた.

事故で押しつぶされたスイカは、動物を喜ばせ水分補給するために野生生物保護団体に寄付されました

事故で押しつぶされたスイカは、動物を喜ばせ水分補給するために野生生物保護団体に寄付されました

Yak's Produce は、数十個のつぶれたメロンを野生動物のリハビリ専門家であるレスリー グリーンと彼女のルイジアナ州の救助施設で暮らす 42 匹の動物に寄付しました。

デミ・ロヴァートは、新しいミュージシャンのボーイフレンドと「幸せで健康的な関係」にあります: ソース

デミ・ロヴァートは、新しいミュージシャンのボーイフレンドと「幸せで健康的な関係」にあります: ソース

8 枚目のスタジオ アルバムのリリースに向けて準備を進めているデミ ロヴァートは、「スーパー グレート ガイ」と付き合っている、と情報筋は PEOPLE に確認しています。

Plathville の Kim と Olivia Plath が数年ぶりに言葉を交わすことへようこそ

Plathville の Kim と Olivia Plath が数年ぶりに言葉を交わすことへようこそ

イーサン プラスの誕生日のお祝いは、TLC のウェルカム トゥ プラスビルのシーズン 4 のフィナーレで、戦争中の母親のキム プラスと妻のオリビア プラスを結びつけました。

仕事の生産性を高める 8 つのシンプルなホーム オフィスのセットアップのアイデア

仕事の生産性を高める 8 つのシンプルなホーム オフィスのセットアップのアイデア

ホームオフィスのセットアップ術を極めよう!AppExert の開発者は、家族全員が一緒にいる場合でも、在宅勤務の技術を習得しています。祖父や曽祖父が共同家族で暮らしていた頃の記憶がよみがえりました。

2022 年、私たちのデジタル ライフはどこで終わり、「リアル ライフ」はどこから始まるのでしょうか?

20 年前のタイムトラベラーでさえ、日常生活におけるデジタルおよびインターネットベースのサービスの重要性に驚くことでしょう。MySpace、eBay、Napster などのプラットフォームは、高速化に焦点を合わせた世界がどのようなものになるかを示してくれました。

ニューロマーケティングの秘密科学

ニューロマーケティングの秘密科学

マーケティング担当者が人間の欲望を操作するために使用する、最先端の (気味が悪いと言う人もいます) メソッドを探ります。カートをいっぱいにして 3 桁の領収書を持って店を出る前に、ほんの数点の商品を買いに行ったことはありませんか? あなたは一人じゃない。

地理情報システムの日: GIS 開発者として学ぶべき最高の技術スタック

地理情報システムの日: GIS 開発者として学ぶべき最高の技術スタック

私たちが住んでいる世界を確実に理解するには、データが必要です。ただし、空間参照がない場合、このデータは地理的コンテキストがないと役に立たなくなる可能性があります。

Language