同じSparkセッションで複数のSparkKafka構造化ストリーミングクエリを実行すると、オフセットが増加しますが、numInputRows0が表示されます

0
Amit Joshi 2020-07-24 14:18.

2つのパーティションを持つKafkaトピックのレコードを消費するSparkStructuredStreamingがあります。

Sparkジョブ: 2つのクエリ。それぞれが2つの別々のパーティションから消費され、同じSparkセッションから実行されます。

    val df1 = session.readStream.format("kafka")
            .option("kafka.bootstrap.servers", kafkaBootstrapServer)
            .option("assign", "{\"multi-stream1\" : [0]}")
            .option("startingOffsets", latest)
            .option("key.deserializer", classOf[StringDeserializer].getName)
            .option("value.deserializer", classOf[StringDeserializer].getName)
            .option("max.poll.records", 500)
            .option("failOnDataLoss", true)
            .load()
    val query1 = df1
            .select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
            .select("key","data.*")
            .writeStream.format("parquet").option("path", path).outputMode("append")
            .option("checkpointLocation", checkpoint_dir1)
            .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
            .queryName("query1").start()
    
    val df2 = session.readStream.format("kafka")
            .option("kafka.bootstrap.servers", kafkaBootstrapServer)
            .option("assign", "{\"multi-stream1\" : [1]}")
            .option("startingOffsets", latest)
            .option("key.deserializer", classOf[StringDeserializer].getName)
            .option("value.deserializer", classOf[StringDeserializer].getName)
            .option("max.poll.records", 500)
            .option("failOnDataLoss", true)
            .load()
val query2 = df2.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
            .select("key","data.*")
            .writeStream.format("parquet").option("path", path).outputMode("append")
            .option("checkpointLocation", checkpoint_dir2)
            .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
            .queryName("query2").start()
    session.streams.awaitAnyTermination()

問題:レコードが両方のパーティションにプッシュされるたびに、両方のクエリに進行状況が表示されますが、そのうちの1つだけが出力を出力しています。レコードが処理されているクエリからの出力を確認できます。たとえば、Kafka Partition 0-レコードがプッシュされると、sparkはquery1を処理します。Kafka Partition 1-query1がビジー処理中にレコードがプッシュされ、sparkは開始オフセットと終了オフセットがインクリメントされて表示されますが、クエリ2の場合はnumInputRows = 0です。

実行中の環境:ローカルPC-同じ問題。Dataprocクラスター-spark-submit--packages

org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 --class org.DifferentPartitionSparkStreaming --master yen --deploy-mode cluster --num-executors 2 --driver-memory 4g- -executor-cores 4 --executor-memory 4g gs:// dpl-ingestion-event / jars / stream_consumer-jar- with-dependencies.jar "{" multiple-streaming ":[0]}" latest "10.wxy :9092,10.rst:9092,10.abc:9092 "" {"マルチストリーミング":[1]} "-同じ問題。

チェックポイントと出力パスはGoogleBucketです。

ログ

20/07/24 19:37:27 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "e7d026f7-bf62-4a86-8697-a95a2fc893bb",
  "runId" : "21169889-6e4b-419d-b338-2d4d61999f5b",
  "name" : "reconcile",
  "timestamp" : "2020-07-24T14:06:55.002Z",
  "batchId" : 2,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 3549,
    "getBatch" : 0,
    "getEndOffset" : 1,
    "queryPlanning" : 32,
    "setOffsetRange" : 1,
    "triggerExecution" : 32618,
    "walCommit" : 15821
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Assign[multi-stream1-1]]",
    "startOffset" : {
      "multi-stream1" : {
        "1" : 240
      }
    },
    "endOffset" : {
      "multi-stream1" : {
        "1" : 250
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[gs://dpl-ingestion-event/demo/test/single-partition/data]"
  }

1 answers

0
Amit Joshi 2020-08-02 23:32.

私は問題を解決することができました。根本的な原因は、両方のクエリが同じベースパスに書き込もうとしたことです。したがって、_spark_meta情報の重複がありました。Spark Structured Streamingは、チェックポイントと、処理中のバッチを追跡するための_spark_metadataファイルを維持します。

ソースSparkDoc:

一度だけセマンティクスを維持しながら部分的な障害を正しく処理するために、各バッチのファイルは一意のディレクトリに書き出され、メタデータログにアトミックに追加されます。寄木細工のベースのデータソースが読み取り用に初期化されると、最初にこのログディレクトリをチェックし、存在する場合はファイルリストの代わりにそれを使用します。

したがって、今のところ、すべてのクエリに個別のパスを指定する必要があります。チェックポイントとは異なり、_spark_matadataの場所を構成するオプションはありません。

Related questions

MORE COOL STUFF

Reba McEntire は、彼女が息子の Shelby Blackstock と共有する「楽しい」クリスマスの伝統を明らかにしました:「私たちはたくさん笑います」

Reba McEntire は、彼女が息子の Shelby Blackstock と共有する「楽しい」クリスマスの伝統を明らかにしました:「私たちはたくさん笑います」

Reba McEntire が息子の Shelby Blackstock と共有しているクリスマスの伝統について学びましょう。

メーガン・マークルは、自然な髪のスタイリングをめぐってマライア・キャリーと結ばれました

メーガン・マークルは、自然な髪のスタイリングをめぐってマライア・キャリーと結ばれました

メーガン・マークルとマライア・キャリーが自然な髪の上でどのように結合したかについて、メーガンの「アーキタイプ」ポッドキャストのエピソードで学びましょう.

ハリー王子は家族との関係を修復できるという「希望を持っている」:「彼は父親と兄弟を愛している」

ハリー王子は家族との関係を修復できるという「希望を持っている」:「彼は父親と兄弟を愛している」

ハリー王子が家族、特にチャールズ王とウィリアム王子との関係について望んでいると主張したある情報源を発見してください。

ワイノナ・ジャッドは、パニックに陥った休暇の瞬間に、彼女がジャッド家の家長であることを認識しました

ワイノナ・ジャッドは、パニックに陥った休暇の瞬間に、彼女がジャッド家の家長であることを認識しました

ワイノナ・ジャッドが、母親のナオミ・ジャッドが亡くなってから初めての感謝祭のお祝いを主催しているときに、彼女が今では家長であることをどのように認識したかを学びましょう.

セントヘレナのジェイコブのはしごを登るのは、気弱な人向けではありません

セントヘレナのジェイコブのはしごを登るのは、気弱な人向けではありません

セント ヘレナ島のジェイコブズ ラダーは 699 段の真っ直ぐ上る階段で、頂上に到達すると証明書が発行されるほどの難易度です。

The Secrets of Airline Travel Quiz

The Secrets of Airline Travel Quiz

Air travel is far more than getting from point A to point B safely. How much do you know about the million little details that go into flying on airplanes?

Where in the World Are You? Take our GeoGuesser Quiz

Where in the World Are You? Take our GeoGuesser Quiz

The world is a huge place, yet some GeoGuessr players know locations in mere seconds. Are you one of GeoGuessr's gifted elite? Take our quiz to find out!

バイオニック読書はあなたをより速く読むことができますか?

バイオニック読書はあなたをより速く読むことができますか?

BionicReadingアプリの人気が爆発的に高まっています。しかし、それは本当にあなたを速読術にすることができますか?

「ディス・イズ・アメリカ」はアメリカで一番の曲です

「ディス・イズ・アメリカ」はアメリカで一番の曲です

ドナルドグローバーの「ディスイズアメリカ」は、それがビデオ自体(5月6日にデビューしてから1億900万回以上視聴された)であろうと、ビデオに関する公の言説(ひいてはグローバー、別名)であろうと、先週避けられませんでした。幼稚なガンビーノ、彼自身)。現在、「ディス・イズ・アメリカ」はNo.でデビューします。

ティーンタイタンズは、今日の予告編ハッピーアワーでデッドプールデッドプールを追い出そうとします

ティーンタイタンズは、今日の予告編ハッピーアワーでデッドプールデッドプールを追い出そうとします

Trailer Happy Hourにようこそ。オーディオビジュアルのヴァルハラでは、すべての優れた映画のプロモーションが、あなたの愛、注目、クリックのために永遠に戦います。今日は、ワイルドパーティー、警察が関与する銃撃、80年代の漫画の雑学クイズを打ち破る怒り狂ったウィルアーネットがいるので、すぐに飛び込みましょう。

アフロパンクはパンクのルーツを失いましたか?

アフロパンクはパンクのルーツを失いましたか?

ステファニーキース/ゲッティイメージズ今週末、ニューヨーク州ブルックリンのフォートグリーンのコモドアバリーパークに、謝罪のない闇の海が降りてきます。

ジョージHWブッシュは、2人目の女性が彼女を手探りしたと主張した後、別の謝罪を発表します

ジョージHWブッシュは、2人目の女性が彼女を手探りしたと主張した後、別の謝罪を発表します

(写真:パトリック・スミス/ゲッティイメージズ)削除されたInstagramの投稿で、女優のヘザー・リンドがジョージHWを主張しました

米国のフィギュア スケートは、チーム イベントでの最終決定の欠如に「苛立ち」、公正な裁定を求める

米国のフィギュア スケートは、チーム イベントでの最終決定の欠如に「苛立ち」、公正な裁定を求める

ロシアのフィギュアスケーター、カミラ・バリエバが関与したドーピング事件が整理されているため、チームは2022年北京冬季オリンピックで獲得したメダルを待っています。

Amazonの買い物客は、わずか10ドルのシルクの枕カバーのおかげで、「甘やかされた赤ちゃんのように」眠れると言っています

Amazonの買い物客は、わずか10ドルのシルクの枕カバーのおかげで、「甘やかされた赤ちゃんのように」眠れると言っています

何千人ものAmazonの買い物客がMulberry Silk Pillowcaseを推奨しており、現在販売中. シルクの枕カバーにはいくつかの色があり、髪を柔らかく肌を透明に保ちます。Amazonで最大46%オフになっている間にシルクの枕カバーを購入してください

パデュー大学の教授が覚醒剤を扱った疑いで逮捕され、女性に性的好意を抱かせる

パデュー大学の教授が覚醒剤を扱った疑いで逮捕され、女性に性的好意を抱かせる

ラファイエット警察署は、「不審な男性が女性に近づいた」という複数の苦情を受けて、12 月にパデュー大学の教授の捜査を開始しました。

コンセプト ドリフト: AI にとって世界の変化は速すぎる

コンセプト ドリフト: AI にとって世界の変化は速すぎる

私たちの周りの世界と同じように、言語は常に変化しています。以前の時代では、言語の変化は数年または数十年にわたって発生していましたが、現在では数日または数時間で変化する可能性があります。

SF攻撃で91歳のアジア人女性が殴られ、コンクリートに叩きつけられた

犯罪擁護派のオークランドが暴力犯罪者のロミオ・ロレンゾ・パーハムを釈放

SF攻撃で91歳のアジア人女性が殴られ、コンクリートに叩きつけられた

認知症を患っている 91 歳のアジア人女性が最近、47 番街のアウター サンセット地区でロメオ ロレンゾ パーハムに襲われました。伝えられるところによると、被害者はサンフランシスコの通りを歩いていたところ、容疑者に近づき、攻撃を受け、暴行を受けました。

Precios accesibles, nuestro aprendizaje desde la perspectiva iOS

Precios accesibles, nuestro aprendizaje desde la perspectiva iOS

Cómo mejoramos la accesibilidad de nuestro componente de precio, y cómo nos marcó el camino hacia nuevos saberes para nuestro sistema de diseño. Por Ana Calderon y Laura Sarmiento Leer esta historia en inglés.

ℝ

“And a river went out of Eden to water the garden, and from thence it was parted and became into four heads” Genesis 2:10. ? The heart is located in the middle of the thoracic cavity, pointing eastward.

Language