リストをグループに返すCeleryタスクを再帰的にチェーンする方法は?

4
Hamish Downer 2019-11-23 22:49.

私はこの質問から始めました:リストをグループに返すCeleryタスクをチェーンする方法は?

でも2回拡大したいです。したがって、私のユースケースでは、次のようになります。

  • タスクA:特定の日付のアイテムの総数を決定します
  • タスクB:その日付の1000個のメタデータエントリをダウンロードします
  • タスクC:1つのアイテムのコンテンツをダウンロードする

そのため、各ステップで次のステップのアイテム数を増やしています。タスクの結果をループ.delay()し、次のタスク関数を呼び出すことでそれを行うことができます。しかし、私は自分の主な仕事にそうさせないようにしようと思いました。代わりに、タプルのリストを返します。各タプルは、次の関数を呼び出すための引数に展開されます。

上記の質問には私のニーズを満たすように見える答えがありますが、2レベルの拡張のためにそれをチェーンする正しい方法を見つけることができません。

これが私のコードの非常に切り詰められた例です:

from celery import group
from celery.task import subtask
from celery.utils.log import get_task_logger

from .celery import app

logger = get_task_logger(__name__)

@app.task
def task_range(upper=10):
    # wrap in list to make JSON serializer work
    return list(zip(range(upper), range(upper)))

@app.task
def add(x, y):
    logger.info(f'x is {x} and y is {y}')
    char = chr(ord('a') + x)
    char2 = chr(ord('a') + x*2)
    result = x + y
    logger.info(f'result is {result}')
    return list(zip(char * result, char2 * result))

@app.task
def combine_log(c1, c2):
    logger.info(f'combine log is {c1}{c2}')

@app.task
def dmap(args_iter, celery_task):
    """
    Takes an iterator of argument tuples and queues them up for celery to run with the function.
    """
    logger.info(f'in dmap, len iter: {len(args_iter)}')
    callback = subtask(celery_task)
    run_in_parallel = group(callback.clone(args) for args in args_iter)
    return run_in_parallel.delay()

次に、ネストされたマッピングを機能させるためにさまざまな方法を試しました。まず、1レベルのマッピングが正常に機能するため、次のようになります。

pp = (task_range.s() | dmap.s(add.s()))
pp(2)

私が期待するような結果を生み出すので、私は完全にオフではありません。

しかし、私が別のレベルを追加しようとすると:

ppp = (task_range.s() | dmap.s(add.s() | dmap.s(combine_log.s())))

次に、ワーカーにエラーが表示されます。

[2019-11-23 22:34:12,024: ERROR/ForkPoolWorker-2] Task proj.tasks.dmap[e92877a9-85ce-4f16-88e3-d6889bc27867] raised unexpected: TypeError("add() missing 2 required positional arguments: 'x' and 'y'",)
Traceback (most recent call last):
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/trace.py", line 648, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/hdowner/dev/playground/celery/proj/tasks.py", line 44, in dmap
    return run_in_parallel.delay()
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 186, in delay
    return self.apply_async(partial_args, partial_kwargs)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 1008, in apply_async
    args=args, kwargs=kwargs, **options))
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 1092, in _apply_tasks
    **options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 578, in apply_async
    dict(self.options, **options) if options else self.options))
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 607, in run
    first_task.apply_async(**options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 229, in apply_async
    return _apply(args, kwargs, **options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/task.py", line 532, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() missing 2 required positional arguments: 'x' and 'y'

また、引数をdmap()プレーンタスクシグネチャからチェーンに変更すると、引数がに渡される方法が変わる理由がわかりませんadd()。私の印象では、そうすべきではないということでした。それは、の戻り値がadd()渡されることを意味するだけです。しかし、どうやらそうではありません...

1 answers

1
Hamish Downer 2019-11-25 05:53.

問題はclone()chainインスタンスのメソッドがある時点で引数を渡さないことです。を参照してください。https://stackoverflow.com/a/53442344/3189詳細については。その答えのメソッドを使用すると、dmap()コードは次のようになります。

@app.task
def dmap(args_iter, celery_task):
    """
    Takes an iterator of argument tuples and queues them up for celery to run with the function.
    """
    callback = subtask(celery_task)
    run_in_parallel = group(clone_signature(callback, args) for args in args_iter)
    return run_in_parallel.delay()


def clone_signature(sig, args=(), kwargs=(), **opts):
    """
    Turns out that a chain clone() does not copy the arguments properly - this
    clone does.
    From: https://stackoverflow.com/a/53442344/3189
    """
    if sig.subtask_type and sig.subtask_type != "chain":
        raise NotImplementedError(
            "Cloning only supported for Tasks and chains, not {}".format(sig.subtask_type)
        )
    clone = sig.clone()
    if hasattr(clone, "tasks"):
        task_to_apply_args_to = clone.tasks[0]
    else:
        task_to_apply_args_to = clone
    args, kwargs, opts = task_to_apply_args_to._merge(args=args, kwargs=kwargs, options=opts)
    task_to_apply_args_to.update(args=args, kwargs=kwargs, options=deepcopy(opts))
    return clone

そして、私がそうするとき:

ppp = (task_range.s() | dmap.s(add.s() | dmap.s(combine_log.s())))

すべてが期待どおりに機能します。

Related questions

MORE COOL STUFF

「水曜日」シーズン1の中心には大きなミステリーがあります

「水曜日」シーズン1の中心には大きなミステリーがあります

Netflixの「水曜日」は、典型的な10代のドラマ以上のものであり、実際、シーズン1にはその中心に大きなミステリーがあります.

ボディーランゲージの専門家は、州訪問中にカミラ・パーカー・ボウルズが輝くことを可能にした微妙なケイト・ミドルトンの動きを指摘しています

ボディーランゲージの専門家は、州訪問中にカミラ・パーカー・ボウルズが輝くことを可能にした微妙なケイト・ミドルトンの動きを指摘しています

ケイト・ミドルトンは、州の夕食会と州の訪問中にカミラ・パーカー・ボウルズからスポットライトを奪いたくなかった、と専門家は言う.

一部のファンがハリー・スタイルズとオリビア・ワイルドの「非常に友好的な」休憩が永続的であることを望んでいる理由

一部のファンがハリー・スタイルズとオリビア・ワイルドの「非常に友好的な」休憩が永続的であることを望んでいる理由

一部のファンが、オリビア・ワイルドが彼女とハリー・スタイルズとの間の「難しい」が「非常に友好的」な分割を恒久的にすることを望んでいる理由を見つけてください.

エリザベス女王の死後、ケイト・ミドルトンはまだ「非常に困難な時期」を過ごしている、と王室の専門家が明らかにする 

エリザベス女王の死後、ケイト・ミドルトンはまだ「非常に困難な時期」を過ごしている、と王室の専門家が明らかにする 

エリザベス女王の死後、ケイト・ミドルトンが舞台裏で「非常に困難な時期」を過ごしていたと伝えられている理由を調べてください.

セントヘレナのジェイコブのはしごを登るのは、気弱な人向けではありません

セントヘレナのジェイコブのはしごを登るのは、気弱な人向けではありません

セント ヘレナ島のジェイコブズ ラダーは 699 段の真っ直ぐ上る階段で、頂上に到達すると証明書が発行されるほどの難易度です。

The Secrets of Airline Travel Quiz

The Secrets of Airline Travel Quiz

Air travel is far more than getting from point A to point B safely. How much do you know about the million little details that go into flying on airplanes?

Where in the World Are You? Take our GeoGuesser Quiz

Where in the World Are You? Take our GeoGuesser Quiz

The world is a huge place, yet some GeoGuessr players know locations in mere seconds. Are you one of GeoGuessr's gifted elite? Take our quiz to find out!

バイオニック読書はあなたをより速く読むことができますか?

バイオニック読書はあなたをより速く読むことができますか?

BionicReadingアプリの人気が爆発的に高まっています。しかし、それは本当にあなたを速読術にすることができますか?

Total War:Warhammer:Kotakuレビュー

Total War:Warhammer:Kotakuレビュー

私はこのゲームを嫌う準備ができていました。先週の前に、Total War:Warhammerについての私の考えがありました:それでもここに私は、私の手にある完成品であり、私は変わった男です。

涙の道:軍事化された帝国主義勢力がスタンディングロックキャンプを占領

涙の道:軍事化された帝国主義勢力がスタンディングロックキャンプを占領

スタンディングロックスー族のメンバーと水の保護者は、ノースダコタ州のスタンディングロックにあるオセティサコウィンキャンプを去ります。(Twitter経由のCNNスクリーンショット)火と煙がスカイラインを覆い、スタンディングロックスー族のメンバーと水の保護者が、聖なるものを守りながら建てた家、オセティサコウィン(セブンカウンシルファイアーズ)キャンプから行進し、太鼓を打ち、歌い、祈りました。ダコタアクセスパイプラインとしても知られる「ブラックスネーク」からの土地。

シアーズとKマートはイヴァンカ・トランプの商品を自分たちで取り除いています

シアーズとKマートはイヴァンカ・トランプの商品を自分たちで取り除いています

写真:APシアーズとKマートは、イヴァンカ・トランプのトランプホームアイテムのコレクションも、誰も購入したくないために削除しました。シアーズとKマートの両方の親会社であるシアーズホールディングスは、土曜日のABCニュースへの声明で、彼らが気にかけていると辛抱強く説明しましたトランプラインを売り続けるにはお金を稼ぐことについてあまりにも多く。

ポテトチップスでたった10分でスペインのトルティーヤを作る

ポテトチップスでたった10分でスペインのトルティーヤを作る

伝統的なスペインのトルティーヤは通常、オリーブオイルで柔らかくなるまで調理されたポテトから始まります(30分以上かかる場合があります)が、ケトルで調理されたポテトチップスの助けを借りてわずか10分でテーブルに置くことができます。上のビデオはすべてがバラバラにならないように裏返す方法を含め、レシピ全体を説明しますが、必要なのは4〜5個の卵と3カップのケトルチップスだけです。

ケイト・ミドルトンとウィリアム王子は、彼らが子供たちと行っているスパイをテーマにした活動を共有しています

ケイト・ミドルトンとウィリアム王子は、彼らが子供たちと行っているスパイをテーマにした活動を共有しています

ケイト・ミドルトンとウィリアム王子は、子供向けのパズルの本の序文を書き、ジョージ王子、シャーロット王女、ルイ王子と一緒にテキストを読むと述べた.

事故で押しつぶされたスイカは、動物を喜ばせ水分補給するために野生生物保護団体に寄付されました

事故で押しつぶされたスイカは、動物を喜ばせ水分補給するために野生生物保護団体に寄付されました

Yak's Produce は、数十個のつぶれたメロンを野生動物のリハビリ専門家であるレスリー グリーンと彼女のルイジアナ州の救助施設で暮らす 42 匹の動物に寄付しました。

デミ・ロヴァートは、新しいミュージシャンのボーイフレンドと「幸せで健康的な関係」にあります: ソース

デミ・ロヴァートは、新しいミュージシャンのボーイフレンドと「幸せで健康的な関係」にあります: ソース

8 枚目のスタジオ アルバムのリリースに向けて準備を進めているデミ ロヴァートは、「スーパー グレート ガイ」と付き合っている、と情報筋は PEOPLE に確認しています。

Plathville の Kim と Olivia Plath が数年ぶりに言葉を交わすことへようこそ

Plathville の Kim と Olivia Plath が数年ぶりに言葉を交わすことへようこそ

イーサン プラスの誕生日のお祝いは、TLC のウェルカム トゥ プラスビルのシーズン 4 のフィナーレで、戦争中の母親のキム プラスと妻のオリビア プラスを結びつけました。

仕事の生産性を高める 8 つのシンプルなホーム オフィスのセットアップのアイデア

仕事の生産性を高める 8 つのシンプルなホーム オフィスのセットアップのアイデア

ホームオフィスのセットアップ術を極めよう!AppExert の開発者は、家族全員が一緒にいる場合でも、在宅勤務の技術を習得しています。祖父や曽祖父が共同家族で暮らしていた頃の記憶がよみがえりました。

2022 年、私たちのデジタル ライフはどこで終わり、「リアル ライフ」はどこから始まるのでしょうか?

20 年前のタイムトラベラーでさえ、日常生活におけるデジタルおよびインターネットベースのサービスの重要性に驚くことでしょう。MySpace、eBay、Napster などのプラットフォームは、高速化に焦点を合わせた世界がどのようなものになるかを示してくれました。

ニューロマーケティングの秘密科学

ニューロマーケティングの秘密科学

マーケティング担当者が人間の欲望を操作するために使用する、最先端の (気味が悪いと言う人もいます) メソッドを探ります。カートをいっぱいにして 3 桁の領収書を持って店を出る前に、ほんの数点の商品を買いに行ったことはありませんか? あなたは一人じゃない。

地理情報システムの日: GIS 開発者として学ぶべき最高の技術スタック

地理情報システムの日: GIS 開発者として学ぶべき最高の技術スタック

私たちが住んでいる世界を確実に理解するには、データが必要です。ただし、空間参照がない場合、このデータは地理的コンテキストがないと役に立たなくなる可能性があります。

Language