RDD mit Spark: Der Baustein von Apache Spark



Dieser Blog über RDD mit Spark bietet Ihnen ein detailliertes und umfassendes Wissen über RDD, das die grundlegende Einheit von Spark & ​​How nützlich ist.

, Das Wort selbst reicht aus, um bei jedem Hadoop-Ingenieur einen Funken zu erzeugen. ZU n In-Memory Verarbeitungswerkzeug Das ist blitzschnell im Cluster-Computing. Im Vergleich zu MapReduce werden durch die In-Memory-Datenfreigabe RDDs erstellt 10-100x Schneller als Netzwerk- und Festplattenfreigabe und all dies ist aufgrund von RDDs (Resilient Distributed Data Sets) möglich. Die wichtigsten Punkte, auf die wir uns heute in diesem RDD mit Spark-Artikel konzentrieren, sind:

Benötigen Sie RDDs?

Warum brauchen wir RDD? -RDD mit Spark





Die Welt entwickelt sich mit und Data Science wegen des Fortschritts in . Algorithmen beyogen auf Regression , , und das läuft weiter Verteilt Iterative Comput ation Mode, die das Wiederverwenden und Teilen von Daten zwischen mehreren Recheneinheiten umfasst.

Das traditionelle Techniken benötigten einen stabilen Zwischen- und verteilten Speicher wie HDFS Bestehend aus sich wiederholenden Berechnungen mit Datenreplikationen und Datenserialisierung, was den Prozess erheblich verlangsamte. Eine Lösung zu finden war nie einfach.



Das ist wo RDDs (Resilient Distributed Datasets) kommt auf das große Ganze.

RDD s sind einfach zu verwenden und mühelos zu erstellen, da Daten aus Datenquellen importiert und in RDDs abgelegt werden. Ferner werden die Operationen angewendet, um sie zu verarbeiten. Sie sind ein verteilte Sammlung von Speicher mit Berechtigungen als Schreibgeschützt und vor allem sind sie Fehlertoleranz .



Wenn überhaupt Datenpartition von das RDD ist hat verloren kann es durch Anwenden derselben regeneriert werden Transformation Operation auf dieser verlorenen Partition in Abstammung , anstatt alle Daten von Grund auf neu zu verarbeiten. Diese Art von Ansatz in Echtzeitszenarien kann Wunder bewirken, wenn Daten verloren gehen oder ein System ausfällt.

Koch gegen Ansible gegen Marionette

Was sind RDDs?

RDD oder ( Elastischer verteilter Datensatz ) ist eine grundlegende Datenstruktur in Spark. Der Begriff Belastbar definiert die Fähigkeit, die die Daten automatisch oder Daten generiert zurück rollen zum Originalzustand wenn ein unerwartetes Unglück mit der Wahrscheinlichkeit eines Datenverlusts auftritt.

Die in RDDs geschriebenen Daten sind partitioniert und gespeichert in mehrere ausführbare Knoten . Wenn ein ausführender Knoten schlägt fehl In der Laufzeit wird dann sofort die Sicherung von der nächster ausführbarer Knoten . Aus diesem Grund werden RDDs im Vergleich zu anderen herkömmlichen Datenstrukturen als fortschrittliche Art von Datenstrukturen angesehen. RDDs können strukturierte, unstrukturierte und halbstrukturierte Daten speichern.

Lassen Sie uns mit unserem RDD mithilfe des Spark-Blogs fortfahren und die einzigartigen Funktionen von RDDs kennenlernen, die einen Vorteil gegenüber anderen Arten von Datenstrukturen bieten.

Merkmale von RDD

  • In Erinnerung (RAM) Berechnungen : Das Konzept der In-Memory-Berechnung bringt die Datenverarbeitung in eine schnellere und effizientere Phase, in der die Gesamtverarbeitung stattfindet Performance des Systems ist aktualisiert.
  • L. seine Bewertung : Der Begriff Lazy Bewertung sagt die Transformationen werden auf die Daten in RDD angewendet, aber die Ausgabe wird nicht generiert. Stattdessen sind die angewendeten Transformationen angemeldet.
  • Beharrlichkeit : Die resultierenden RDDs sind immer wiederverwendbar.
  • Grobkörnige Operationen : Der Benutzer kann Transformationen auf alle Elemente in Datensätzen anwenden Karte, Filter oder gruppiere nach Operationen.
  • Fehlertoleranz : Bei Datenverlust kann das System zurückrollen zu seinem Originalzustand mit dem protokollierten Transformationen .
  • Unveränderlichkeit : Definierte, abgerufene oder erstellte Daten können nicht sein geändert sobald es im System angemeldet ist. Wenn Sie auf das vorhandene RDD zugreifen und es ändern müssen, müssen Sie ein neues RDD erstellen, indem Sie einen Satz von anwenden Transformation Funktionen auf die aktuelle oder vorhergehende RDD.
  • Partitionierung : Es ist der entscheidende Einheit der Parallelität in Spark RDD. Standardmäßig basiert die Anzahl der erstellten Partitionen auf Ihrer Datenquelle. Sie können sogar die Anzahl der Partitionen festlegen, die Sie verwenden möchten benutzerdefinierte Partition Funktionen.

Erstellung von RDD mit Spark

RDDs können in erstellt werden Drei Wege:

  1. Daten lesen von parallelisierte Sammlungen
val PCRDD = spark.sparkContext.parallelize (Array ('Mo', 'Di', 'Mi', 'Do', 'Fr', 'Sa'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Bewirbt sich Transformation auf früheren RDDs
val words = spark.sparkContext.parallelize (Seq ('Spark', 'ist', 'a', 'sehr', 'mächtig', 'Sprache')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Daten lesen von externer Speicher oder Dateipfade wie HDFS oder HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operationen an RDDs:

Es gibt hauptsächlich zwei Arten von Operationen, die an RDDs ausgeführt werden, nämlich:

  • Transformationen
  • Aktionen

Transformationen :: Das Operationen Wir bewerben uns bei RDDs um Filter, Zugriff und ändern die Daten in der übergeordneten RDD, um a zu generieren aufeinanderfolgende RDD wird genannt Transformation . Die neue RDD gibt einen Zeiger auf die vorherige RDD zurück, um die Abhängigkeit zwischen ihnen sicherzustellen.

Transformationen sind Faule Bewertungen, Mit anderen Worten, die Vorgänge, die auf die RDD angewendet werden, an der Sie arbeiten, werden protokolliert, jedoch nicht hingerichtet. Das System löst nach dem Auslösen des ein Ergebnis oder eine Ausnahme aus Aktion .

Wir können Transformationen wie folgt in zwei Typen unterteilen:

  • Enge Transformationen
  • Breite Transformationen

Enge Transformationen Wir wenden enge Transformationen auf a an einzelne Partition des übergeordneten RDD zum Generieren eines neuen RDD als Daten, die zum Verarbeiten des RDD erforderlich sind, sind auf einer einzelnen Partition des verfügbar übergeordnete ASD . Die Beispiele für enge Transformationen sind:

  • Karte()
  • Filter()
  • flatMap ()
  • partition ()
  • mapPartitions ()

Breite Transformationen: Wir wenden die breite Transformation auf an mehrere Partitionen um eine neue RDD zu generieren. Die zur Verarbeitung des RDD erforderlichen Daten sind auf den mehreren Partitionen des verfügbar übergeordnete ASD . Die Beispiele für breite Transformationen sind:

  • um ... verringern()
  • Union()

Aktionen : Aktionen weisen Apache Spark an, sich zu bewerben Berechnung und geben Sie das Ergebnis oder eine Ausnahme an die Treiber-RDD zurück. Einige der Aktionen umfassen:

  • sammeln()
  • Anzahl()
  • nehmen()
  • zuerst()

Lassen Sie uns die Operationen auf RDDs praktisch anwenden:

IPL (indische Premier League) ist ein Cricket-Turnier, dessen Höhepunkt auf höchstem Niveau liegt. Lassen Sie uns heute den IPL-Datensatz in die Hand nehmen und unsere RDD mit Spark ausführen.

  • Zuerst, Laden Sie die CSV-Übereinstimmungsdaten von IPL herunter. Nach dem Herunterladen wird es als EXCEL-Datei mit Zeilen und Spalten angezeigt.

Im nächsten Schritt zünden wir den Funken und laden die Datei match.csv von ihrem Speicherort, in meinem Fall mycsvDateispeicherort ist '/Benutzer/edureka_566977/test/matches.csv'

Beginnen wir nun mit dem Transformation Teil zuerst:

  • Karte():

Wir gebrauchen Kartentransformation Anwenden einer bestimmten Transformationsoperation auf jedes Element einer RDD. Hier erstellen wir eine RDD mit dem Namen CKfile, in der unsere gespeichert werdencsvDatei. Wir werden eine weitere RDD namens States to erstellen Speichern Sie die Stadtdetails .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val state = CKfile.map (_. split (',') (2)) States.collect (). foreach (println)

  • Filter():

Filtertransformation, der Name selbst beschreibt seine Verwendung. Wir verwenden diese Transformationsoperation, um die selektiven Daten aus einer Sammlung gegebener Daten herauszufiltern. Wir bewerben uns Filterbetrieb Hier erhalten Sie die Aufzeichnungen der IPL-Spiele des Jahres 2017 und speichern Sie es in fil RDD.

Unterschied zwischen Methodenüberladung und Überschreiben in Java
val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Wir wenden flatMap an, eine Transformationsoperation auf jedes der Elemente einer RDD, um eine neue RDD zu erstellen. Es ähnelt der Map-Transformation. hier bewerben wir unsFlatmapzu spucke die Streichhölzer der Stadt Hyderabad aus und speichern Sie die Daten infilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • partition ():

Alle Daten, die wir in eine RDD schreiben, werden in eine bestimmte Anzahl von Partitionen aufgeteilt. Wir verwenden diese Transformation, um die zu finden Anzahl der Partitionen Die Daten sind tatsächlich aufgeteilt in.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Wir betrachten MapPatitions als Alternative zu Map () undfür jede() zusammen. Wir verwenden hier mapPartitions, um die zu finden Anzahl der Reihen Wir haben in unserer Datei RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • um ... verringern():

Wir gebrauchenUm ... verringern() auf Schlüssel-Wert-Paare . Wir haben diese Transformation bei unserem genutztcsvDatei, um den Player mit dem zu finden höchster Mann der Spiele .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • Union():

Der Name erklärt alles, wir verwenden Gewerkschaftstransformation zu Club zwei RDDs zusammen . Hier erstellen wir zwei RDDs, nämlich fil und fil2. fil RDD enthält die Datensätze der IPL-Übereinstimmungen 2017 und fil2 RDD enthält die IPL-Übereinstimmungsaufzeichnungen 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Beginnen wir mit dem Aktion Teil, in dem wir die tatsächliche Ausgabe zeigen:

  • sammeln():

Sammeln ist die Aktion, die wir verwenden Inhalt anzeigen in der RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • Anzahl():

Anzahlist eine Aktion, mit der wir die zählen Anzahl der Datensätze in der RDD vorhanden.HierMit dieser Operation zählen wir die Gesamtzahl der Datensätze in unserer Datei match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • nehmen():

Take ist eine Aktion, die dem Sammeln ähnelt, aber der einzige Unterschied besteht darin, dass sie jede drucken kann selektive Anzahl von Zeilen gemäß Benutzeranforderung. Hier wenden wir den folgenden Code an, um das zu drucken Top Ten der führenden Berichte.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. nimm (10) .foreach (println)

  • zuerst():

First () ist eine Aktionsoperation ähnlich wie collect () und take ()eswird verwendet, um den obersten Bericht in der Ausgabe zu drucken. Hier verwenden wir die erste () Operation, um die zu finden maximale Anzahl von Spielen in einer bestimmten Stadt und wir bekommen Mumbai als Ausgabe.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val States = CKfile.map (_. split (',') (2)) val Scount = States.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Um unseren Prozess des Lernens von RDD mit Spark noch interessanter zu gestalten, habe ich einen interessanten Anwendungsfall entwickelt.

RDD mit Spark: Pokemon Use Case

  • Zuerst, Laden Sie eine Pokemon.csv-Datei herunter und laden Sie sie in die Spark-Shell, wie wir es in die Matches.csv-Datei getan haben.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemons gibt es tatsächlich in einer großen Auswahl. Lassen Sie uns ein paar Sorten finden.

  • Schema aus der Datei Pokemon.csv entfernen

Wir brauchen das vielleicht nicht Schema der Pokemon.csv-Datei. Daher entfernen wir es.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Finden der Anzahl von Partitionen unsere pokemon.csv ist verteilt in.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Wasserpokemon

Das finden Anzahl der Wasserpokemon

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Feuerpokemon

Das finden Anzahl der Feuerpokemon

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Wir können auch die erkennen Population eines anderen Pokemon-Typs mit der Zählfunktion
WaterRDD.count () FireRDD.count ()

  • Da mag ich das Spiel von Verteidigungsstrategie Lass uns das Pokemon mit finden maximale Verteidigung.
val defensceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defensceList.max ())

  • Wir kennen das Maximum Verteidigungsstärkewert aber wir wissen nicht, um welches Pokemon es sich handelt. Lassen Sie uns also herausfinden, welches das ist Pokémon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Bestellung von [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Lassen Sie uns nun das Pokemon mit sortieren am wenigsten Verteidigung
val minDefencePokemon = defensceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Lassen Sie uns nun das Pokémon mit einem sehen weniger defensive Strategie.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemonName2 =H .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Damit sind wir mit dem Spark-Artikel am Ende dieser RDD angelangt. Ich hoffe, wir haben ein wenig Licht auf Ihr Wissen über RDDs, ihre Funktionen und die verschiedenen Arten von Operationen geworfen, die mit ihnen ausgeführt werden können.

Dieser Artikel basiert auf wurde entwickelt, um Sie auf die Zertifizierungsprüfung für Cloudera Hadoop und Spark Developer (CCA175) vorzubereiten. Sie erhalten detaillierte Kenntnisse über Apache Spark und das Spark-Ökosystem, einschließlich Spark RDD, Spark SQL, Spark MLlib und Spark Streaming. Sie erhalten umfassende Kenntnisse über die Programmiersprache Scala, HDFS, Sqoop, Flume, Spark GraphX ​​und Messaging-Systeme wie Kafka.

Datenwissenschaft was ist das?