Valider les décalages vers Kafka sur les exécuteurs Spark

1
alina 2019-09-27 23:40.

Je reçois des événements de Kafka, en les enrichissant / en les filtrant / en les transformant sur Spark, puis en les stockant dans ES. Je remets les compensations à Kafka

J'ai deux questions / problèmes:

(1) Mon travail Spark actuel est TRÈS lent

J'ai 50 partitions pour un sujet et 20 exécuteurs. Chaque exécuteur a 2 cœurs et 4g de mémoire chacun. Mon chauffeur a 8g de mémoire. Je consomme 1000 événements / partition / seconde et mon intervalle de lot est de 10 secondes. Cela signifie que je consomme 500000 événements en 10 secondes

Mon cluster ES est le suivant:

20 fragments / index

3 instances maîtres c5.xlarge.elasticsearch

12 instances m4.xlarge.elasticsearch

disque / nœud = 1024 Go soit 12 To au total

Et je reçois d'énormes retards de planification et de traitement

(2) Comment puis-je commettre des compensations sur les exécuteurs testamentaires?

Actuellement, j'enrichis / transforme / filtre mes événements sur les exécuteurs puis j'envoie tout à ES en utilisant BulkRequest . C'est un processus synchrone. Si j'obtiens des commentaires positifs, j'envoie la liste des décalages au conducteur. Sinon, je renvoie une liste vide. Sur le pilote, je commets des offsets à Kafka. Je pense qu'il devrait y avoir un moyen de commettre des offsets sur les exécuteurs, mais je ne sais pas comment transmettre kafka Stream aux exécuteurs:

((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, this::onComplete);

C'est le code pour commettre des offsets à Kafka qui nécessite Kafka Stream

Voici mon code global:

 kafkaStream.foreachRDD( // kafka topic
                rdd -> { // runs on driver
                    rdd.cache();
                    String batchIdentifier =
                            Long.toHexString(Double.doubleToLongBits(Math.random()));

                    LOGGER.info("@@ [" + batchIdentifier + "] Starting batch ...");

                    Instant batchStart = Instant.now();

                    List<OffsetRange> offsetsToCommit =
                            rdd.mapPartitionsWithIndex( // kafka partition
                                    (index, eventsIterator) -> { // runs on worker

                                        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

                                        LOGGER.info(
                                                "@@ Consuming " + offsetRanges[index].count() + " events" + " partition: " + index
                                        );

                                        if (!eventsIterator.hasNext()) {
                                            return Collections.emptyIterator();
                                        }

                                        // get single ES documents
                                        List<SingleEventBaseDocument> eventList = getSingleEventBaseDocuments(eventsIterator);

                                        // build request wrappers
                                        List<InsertRequestWrapper> requestWrapperList = getRequestsToInsert(eventList, offsetRanges[index]);

                                        LOGGER.info(
                                                "@@ Processed " + offsetRanges[index].count() + " events" + " partition: " + index + " list size: " + eventList.size()
                                        );

                                        BulkResponse bulkItemResponses = elasticSearchRepository.addElasticSearchDocumentsSync(requestWrapperList);

                                        if (!bulkItemResponses.hasFailures()) {
                                            return Arrays.asList(offsetRanges).iterator();
                                        }

                                        elasticSearchRepository.close();
                                        return Collections.emptyIterator();
                                    },
                                    true
                            ).collect();

                    LOGGER.info(
                            "@@ [" + batchIdentifier + "] Collected all offsets in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
                    );

                    OffsetRange[] offsets = new OffsetRange[offsetsToCommit.size()];

                    for (int i = 0; i < offsets.length ; i++) {
                        offsets[i] = offsetsToCommit.get(i);
                    }

                    try {
                        offsetManagementMapper.commit(offsets);
                    } catch (Exception e) {
                        // ignore
                    }

                    LOGGER.info(
                            "@@ [" + batchIdentifier + "] Finished batch of " + offsetsToCommit.size() + " messages " +
                                    "in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
                    );
                    rdd.unpersist();
                });

1 answers

0
voldy 2019-09-28 13:21.

Vous pouvez déplacer la logique de décalage au-dessus de la boucle rdd ... J'utilise le modèle ci-dessous pour une meilleure gestion et des performances de décalage

JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));



        kafkaStream.foreachRDD( kafkaStreamRDD -> {
            //fetch kafka offsets for manually commiting it later
            OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();

            //filter unwanted data
            kafkaStreamRDD.filter(
                    new Function<ConsumerRecord<String, String>, Boolean>() {
                @Override
                public Boolean call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
                    if(kafkaRecord!=null) {
                        if(!StringUtils.isAnyBlank(kafkaRecord.key() , kafkaRecord.value())) {
                            return Boolean.TRUE;
                        }
                    }
                    return Boolean.FALSE;
                }
            }).foreachPartition( kafkaRecords -> {

                // init connections here

                while(kafkaRecords.hasNext()) {
                    ConsumerRecord<String, String> kafkaConsumerRecord = kafkaRecords.next();
                    // work here
                }

            });
            //commit offsets
            ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges);
        });

Related questions

MORE COOL STUFF

Reba McEntire a révélé la tradition de Noël «amusante» qu'elle partage avec son fils Shelby Blackstock: «Nous rions beaucoup»

Reba McEntire a révélé la tradition de Noël «amusante» qu'elle partage avec son fils Shelby Blackstock: «Nous rions beaucoup»

Découvrez les traditions de Noël que Reba McEntire partage avec son fils, Shelby Blackstock, qui, selon elle, pourraient laisser certains confus.

Meghan Markle s'est liée avec Mariah Carey pour coiffer leurs cheveux naturels

Meghan Markle s'est liée avec Mariah Carey pour coiffer leurs cheveux naturels

Apprenez-en plus sur la façon dont Meghan Markle et Mariah Carey se sont liées sur leurs cheveux naturels dans un épisode du podcast "Archetypes" de Meghan.

Le prince Harry " garde espoir " de pouvoir arranger les choses avec sa famille : " Il aime son père et son frère "

Le prince Harry " garde espoir " de pouvoir arranger les choses avec sa famille : " Il aime son père et son frère "

Découvrez ce qu'une source a affirmé que le prince Harry espérait concernant les relations avec sa famille, en particulier le roi Charles et le prince William.

Brennon Lemieux, star de "Love Is Blind", répond aux accusations de violence conjugale

Brennon Lemieux, star de "Love Is Blind", répond aux accusations de violence conjugale

Un rapport de police accusant Brennon, star de "Love Is Blind", de violence domestique a récemment fait surface. Maintenant, Brennon a répondu aux réclamations.

Qui a dit ça? Le quiz des doubleurs

Qui a dit ça? Le quiz des doubleurs

Il y a le jeu d'acteur, puis il y a le doublage. Les deux sont difficiles à leur manière. Que savez-vous de ces légendes dans le domaine ?

Les Philippines organisent la plus longue fête de Noël au monde

Les Philippines organisent la plus longue fête de Noël au monde

Peut-être qu'aucun pays sur Terre n'aime Noël plus que les Philippines. Les résidents y célèbrent les vacances pendant quatre mois solides. Voici pourquoi.

L'intrication quantique est le phénomène le plus étrange de la physique, mais qu'est-ce que c'est ?

L'intrication quantique est le phénomène le plus étrange de la physique, mais qu'est-ce que c'est&nbsp;?

Einstein a qualifié ce phénomène d'"action effrayante à distance" et les physiciens viennent de remporter le prix Nobel pour leurs travaux, mais qu'est-ce que l'intrication quantique ?

The Secrets of Airline Travel Quiz

The Secrets of Airline Travel Quiz

Air travel is far more than getting from point A to point B safely. How much do you know about the million little details that go into flying on airplanes?

Ce pilote place la barre haute pour le reste de This Is Us

Ce pilote place la barre haute pour le reste de This Is Us

Photo : NBC Un pilote peut-il être trop bon ? Cela semble peu probable, mais cela pourrait bien être le cas pour les fans de This Is Us. La nouvelle série très attendue de l'écrivain Crazy, Stupid, Love Dan Fogelman s'articule autour d'une fin tordue: une qui met en place la série avec justesse, mais qui est si habilement réalisée qu'elle ne laisse pas beaucoup de place à l'amélioration.

Oh, les GIF fonctionnent enfin sur Facebook

Oh, les GIF fonctionnent enfin sur Facebook

Voici quelques nouvelles: vous pouvez désormais intégrer des GIF sur Facebook. Eh bien, techniquement, vous pouvez publier un lien vers un GIF, et Facebook produira l'animation, comme à peu près tous les autres réseaux sociaux le font depuis des années.

9 faits sur la sécurité informatique que les experts souhaitent que vous sachiez

9 faits sur la sécurité informatique que les experts souhaitent que vous sachiez

Chaque jour, vous entendez parler de failles de sécurité, de virus et de gangs de pirates malveillants qui pourraient vous laisser sans ressources - ou, pire, mettre votre pays à genoux. Mais quelle est la vérité sur ces dangers numériques? Nous avons demandé à des experts en sécurité informatique de séparer les mythes des faits.

Sortez les calembours, parce que Firestorm vient de mettre le feu au flash

Sortez les calembours, parce que Firestorm vient de mettre le feu au flash

Si je vous disais que le Flash était la partie la moins intéressante de l'épisode Flash de la nuit dernière, vous pourriez être un peu inquiet - cela ne semble pas être une bonne chose. Mais c'est quand le héros de DC, Firestorm, prend enfin le devant de la scène, et presque tous les autres personnages - même Cisco et Caitlin - ont des conflits convaincants! Non, vraiment! "The Nuclear Man" est encore un autre épisode de qualité, et je pense que nous pouvons supposer en toute sécurité que la série a enlevé ses roues d'entraînement, pour ainsi dire, et se sent suffisamment en confiance pour faire plus que simplement présenter un freak-o - la semaine pour que Barry se batte.

Le patinage artistique américain "frustré" par l'absence de décision finale dans l'épreuve par équipe, demande une décision équitable

Le patinage artistique américain "frustré" par l'absence de décision finale dans l'épreuve par équipe, demande une décision équitable

L'équipe attend les médailles qu'elle a remportées aux Jeux olympiques d'hiver de 2022 à Pékin, alors qu'un cas de dopage impliquant la patineuse artistique russe Kamila Valieva est en cours de règlement.

Les acheteurs d'Amazon disent qu'ils dorment «comme un bébé choyé» grâce à ces taies d'oreiller en soie qui coûtent aussi peu que 10 $

Les acheteurs d'Amazon disent qu'ils dorment «comme un bébé choyé» grâce à ces taies d'oreiller en soie qui coûtent aussi peu que 10 $

Des milliers d'acheteurs Amazon recommandent la taie d'oreiller en soie Mulberry, et elle est en vente en ce moment. La taie d'oreiller en soie est disponible en plusieurs couleurs et aide à garder les cheveux doux et la peau claire. Achetez les taies d'oreiller en soie alors qu'elles sont jusqu'à 46% de réduction sur Amazon

Un professeur de l'Université Purdue arrêté pour avoir prétendument vendu de la méthamphétamine et proposé des femmes pour des faveurs sexuelles

Un professeur de l'Université Purdue arrêté pour avoir prétendument vendu de la méthamphétamine et proposé des femmes pour des faveurs sexuelles

Le département de police de Lafayette a commencé à enquêter sur un professeur de l'Université Purdue en décembre après avoir reçu plusieurs plaintes concernant un "homme suspect s'approchant de femmes".

Concept Drift : le monde change trop vite pour l'IA

Concept Drift : le monde change trop vite pour l'IA

Tout comme le monde qui nous entoure, la langue est en constante évolution. Alors que dans les époques précédentes, les changements de langue se produisaient sur des années, voire des décennies, cela peut maintenant se produire en quelques jours, voire quelques heures.

L'Inde me botte le cul

L'Inde me botte le cul

Je suis de retour pour la première fois en six ans. Je ne peux pas vous dire depuis combien de temps j'attends ça.

Precios accesibles, nuestro aprendizaje desde la perspectiva iOS

Precios accesibles, nuestro aprendizaje desde la perspectiva iOS

Cómo mejoramos la accesibilidad de nuestro componente de precio, y cómo nos marcó el camino hacia nuevos saberes para nuestro sistema de diseño. Por Ana Calderon y Laura Sarmiento Leer esta historia en inglés.

ℝ

“And a river went out of Eden to water the garden, and from thence it was parted and became into four heads” Genesis 2:10. ? The heart is located in the middle of the thoracic cavity, pointing eastward.

Language