Kumulative Stateful Transformation in Apache Spark Streaming



Dieser Blog-Beitrag beschreibt zustandsbehaftete Transformationen in Spark Streaming. Erfahren Sie alles über kumulatives Tracking und Up-Skills für eine Hadoop Spark-Karriere.

Beitrag von Prithviraj Bose

In meinem vorherigen Blog habe ich Stateful Transformationen mit dem Fensterkonzept von Apache Spark Streaming besprochen. Du kannst es lesen Hier .





In diesem Beitrag werde ich kumulative Stateful-Operationen in Apache Spark Streaming diskutieren. Wenn Sie Spark Streaming noch nicht kennen, empfehle ich Ihnen dringend, meinen vorherigen Blog zu lesen, um zu verstehen, wie Fenster funktionieren.

Arten der Stateful Transformation im Spark-Streaming (Fortsetzung…)

> Kumulatives Tracking

Wir hatten das benutzt reductByKeyAndWindow (…) API zum Verfolgen des Status von Schlüsseln, jedoch birgt das Fenster für bestimmte Anwendungsfälle Einschränkungen. Was ist, wenn wir die Zustände der Schlüssel durchgehend akkumulieren möchten, anstatt sie auf ein Zeitfenster zu beschränken? In diesem Fall müssten wir verwenden updateStateByKey (…) FEUER.



Diese API wurde in Spark 1.3.0 eingeführt und war sehr beliebt. Diese API hat jedoch einen gewissen Leistungsaufwand. Ihre Leistung nimmt mit zunehmender Größe der Zustände ab. Ich habe ein Beispiel geschrieben, um die Verwendung dieser API zu zeigen. Sie finden den Code Hier .

In Spark 1.6.0 wurde eine neue API eingeführt mapWithState (…) Dies löst die von updateStateByKey (…) . In diesem Blog werde ich diese spezielle API anhand eines von mir geschriebenen Beispielprogramms diskutieren. Sie finden den Code Hier .

Bevor ich mich mit einem Code-Rundgang befasse, lassen Sie uns ein paar Worte zum Checkpointing sparen. Für jede zustandsbehaftete Transformation ist Checkpointing obligatorisch. Checkpointing ist ein Mechanismus zum Wiederherstellen des Schlüsselstatus, falls das Treiberprogramm fehlschlägt. Beim Neustart des Treibers wird der Status der Schlüssel aus den Prüfpunktdateien wiederhergestellt. Checkpoint-Standorte sind normalerweise HDFS oder Amazon S3 oder ein zuverlässiger Speicher. Während des Testens des Codes kann man auch im lokalen Dateisystem speichern.



beste Java-Idee für Ubuntu

Im Beispielprogramm hören wir den Socket-Textstrom auf host = localhost und port = 9999. Er markiert den eingehenden Stream in (Wörter, Anzahl der Vorkommen) und verfolgt die Wortanzahl mithilfe der 1.6.0-API mapWithState (…) . Außerdem werden Schlüssel ohne Updates mit entfernt StateSpec.timeout API. Wir überprüfen in HDFS und die Häufigkeit der Checkpoints beträgt alle 20 Sekunden.

Erstellen wir zunächst eine Spark-Streaming-Sitzung.

Spark-streaming-session

Wir schaffen eine checkpointDir im HDFS und rufen Sie dann die Objektmethode auf getOrCreate (…) . Das getOrCreate API überprüft die checkpointDir Um festzustellen, ob frühere Zustände wiederhergestellt werden müssen, erstellt dieser die Spark-Streaming-Sitzung neu und aktualisiert die Zustände der Schlüssel anhand der in den Dateien gespeicherten Daten, bevor mit neuen Daten fortgefahren wird. Andernfalls wird eine neue Spark-Streaming-Sitzung erstellt.

Das getOrCreate Nimmt den Checkpoint-Verzeichnisnamen und eine Funktion (die wir benannt haben createFunc ) deren Unterschrift sollte sein () => StreamingContext .

PHP verwandeln String in Array

Lassen Sie uns den Code im Inneren untersuchen createFunc .

Zeile 2: Wir erstellen einen Streaming-Kontext mit dem Jobnamen 'TestMapWithStateJob' und dem Stapelintervall = 5 Sekunden.

Zeile 5: Legen Sie das Prüfpunktverzeichnis fest.

Zeile 8: Legen Sie die Statusspezifikation mithilfe der Klasse fest org.apache.streaming.StateSpec Objekt. Wir legen zuerst die Funktion fest, die den Status verfolgt, und dann die Anzahl der Partitionen für die resultierenden DStreams, die während nachfolgender Transformationen generiert werden sollen. Schließlich setzen wir das Zeitlimit (auf 30 Sekunden). Wenn innerhalb von 30 Sekunden keine Aktualisierung für einen Schlüssel eingeht, wird der Schlüsselstatus entfernt.

Zeile 12 #: Richten Sie den Socket-Stream ein, reduzieren Sie die eingehenden Batch-Daten, erstellen Sie ein Schlüssel-Wert-Paar und rufen Sie an mapWithState Stellen Sie das Prüfpunktintervall auf 20 Sekunden ein und drucken Sie schließlich die Ergebnisse aus.

Das Spark-Framework ruft th auf e createFunc für jeden Schlüssel mit dem vorherigen Wert und dem aktuellen Status. Wir berechnen die Summe und aktualisieren den Status mit der kumulierten Summe und geben schließlich die Summe für den Schlüssel zurück.

ist eine Beziehung in Java

Github-Quellen -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Hast du eine Frage an uns? Bitte erwähnen Sie es in den Kommentaren und wir werden uns bei Ihnen melden.

Zusammenhängende Posts:

Erste Schritte mit Apache Spark & ​​Scala

Stateful Transformationen mit Fenster im Spark-Streaming