Apache Pig UDF: Teil 1 - Auswertungs-, Aggregat- und Filterfunktionen



Dieser Beitrag beschreibt Apache Pig UDF - Auswertungs-, Aggregat- und Filterfunktionen. Schauen Sie sich die Eval-, Aggregate- und Filterfunktionen an.

wie man in Java etwas zu einer Macht erhebt

Apache Pig bietet umfassende Unterstützung für benutzerdefinierte Funktionen (UDFs), um die benutzerdefinierte Verarbeitung festzulegen. Pig-UDFs können derzeit in drei Sprachen ausgeführt werden: Java, Python, JavaScript und Ruby. Die umfassendste Unterstützung wird für Java-Funktionen bereitgestellt.





Java-UDFs können auf verschiedene Arten aufgerufen werden. Die einfachste UDF kann nur EvalFunc erweitern, für die nur die exec-Funktion implementiert werden muss. Jede Eval UDF muss dies implementieren. Wenn eine Funktion algebraisch ist, kann sie außerdem eine algebraische Schnittstelle implementieren, um die Abfrageleistung erheblich zu verbessern.

Bedeutung von UDFs bei Schweinen:

Mit Pig können Benutzer vorhandene Operatoren über UDFs mit ihrem eigenen oder dem Code anderer kombinieren. Der Vorteil von Pig ist die Möglichkeit, dass Benutzer seine Operatoren über UDFs mit ihrem eigenen oder dem Code anderer kombinieren können. Bis Version 0.7 müssen alle UDFs in Java geschrieben und als Java-Klassen implementiert sein. Dies erleichtert das Hinzufügen neuer UDFs zu Pig, indem eine Java-Klasse geschrieben und Pig über die JAR-Datei informiert wird.



Schwein selbst kommt mit einigen UDFs. Vor Version 0.8 war es ein sehr begrenzter Satz mit nur den Standard-SQL-Aggregatfunktionen und einigen anderen. In 0.8 wurde eine große Anzahl von Standard-UDFs für Zeichenfolgenverarbeitung, Mathematik und komplexe Typen hinzugefügt.

Was ist ein Sparschwein?

Piggybank ist eine Sammlung von UDFs, die von Benutzern bereitgestellt werden und zusammen mit Pig veröffentlicht werden. Sparschwein-UDFs sind nicht in der Pig JAR enthalten, daher müssen Sie sie manuell in Ihrem Skript registrieren. Sie können auch Ihre eigenen UDFs schreiben oder die von anderen Benutzern geschriebenen verwenden.

Eval-Funktionen

Die UDF-Klasse erweitert die EvalFunc-Klasse, die die Basis für alle Eval-Funktionen bildet. Alle Evaluierungsfunktionen erweitern die Java-Klasse ‘org.apache.pig.EvalFunc. Es wird mit dem Rückgabetyp der UDF parametrisiert, die in diesem Fall eine Java-Zeichenfolge ist. Die Kernmethode in dieser Klasse ist 'exec'. Die erste Zeile des Codes gibt an, dass die Funktion Teil des myudfs-Pakets ist.



Es nimmt einen Datensatz und gibt ein Ergebnis zurück, das für jeden Datensatz aufgerufen wird, der die Ausführungspipeline durchläuft. Es wird ein Tupel benötigt, das alle Felder enthält, die das Skript als Eingabe an Ihre UDF übergibt. Anschließend wird der Typ zurückgegeben, mit dem Sie EvalFunc parametrisiert haben.

Diese Funktion wird bei jedem Eingabetupel aufgerufen. Die Eingabe in die Funktion ist ein Tupel mit Eingabeparametern in der Reihenfolge, in der sie im Pig-Skript an die Funktion übergeben werden. Im folgenden Beispiel verwendet die Funktion einen String als Eingabe. Die folgende Funktion konvertiert die Zeichenfolge von Klein- in Großbuchstaben. Nachdem die Funktion implementiert wurde, muss sie kompiliert und in eine JAR aufgenommen werden.

Unterschied zwischen final finally und finalize
Paket myudfs import java.io.IOException import org.apache.pig.EvalFunc import org.apache.pig.data.Tuple öffentliche Klasse UPPER erweitert EvalFunc {public String exec (Tuple-Eingabe) löst IOException {if (input == null ||) aus input.size () == 0) return null try {String str = (String) input.get (0) return str.toUpperCase ()} catch (Ausnahme e) {neue IOException auslösen ('Eingabezeile für Ausnahmeverarbeitung abgefangen', e)}}}

Aggregatfunktionen:

Aggregatfunktionen sind eine weitere häufige Art der Eval-Funktion. Aggregatfunktionen werden normalerweise auf gruppierte Daten angewendet. Die Aggregatfunktion nimmt einen Beutel und gibt einen Skalarwert zurück. Ein interessantes und wertvolles Merkmal vieler Aggregatfunktionen ist, dass sie auf verteilte Weise inkrementell berechnet werden können. In der Hadoop-Welt bedeutet dies, dass die Teilberechnungen vom Map and Combiner durchgeführt werden können und das Endergebnis vom Reducer berechnet werden kann.

Es ist sehr wichtig sicherzustellen, dass algebraische Aggregatfunktionen als solche implementiert werden. Beispiele für diesen Typ sind die integrierten Funktionen COUNT, MIN, MAX und AVERAGE.

ANZAHL ist ein Beispiel für eine algebraische Funktion, bei der wir die Anzahl der Elemente in einer Teilmenge der Daten zählen und dann die Anzahl summieren können, um eine endgültige Ausgabe zu erhalten. Schauen wir uns die Implementierung der COUNT-Funktion an:

public class COUNT erweitert EvalFunc implementiert Algebraic {public Long exec (Tuple-Eingabe) löst IOException aus {return count (input)} public String getInitial () {return Initial.class.getName ()} public String getIntermed () {return Intermed.class. getName ()} public String getFinal () {return Final.class.getName ()} statische öffentliche Klasse Initial erweitert EvalFunc {public Tuple exec (Tuple-Eingabe) löst IOException aus {return TupleFactory.getInstance (). newTuple (count (Eingabe)) }} statische öffentliche Klasse Intermed erweitert EvalFunc {public Tuple exec (Tuple-Eingabe) löst IOException aus {return TupleFactory.getInstance (). newTuple (sum (input))}} statische öffentliche Klasse Final erweitert EvalFunc {public Tuple exec (Tuple-Eingabe) löst aus IOException {Rückgabesumme (Eingabe)}} statisch geschützt Lange Anzahl (Tupeleingabe) löst ExecException aus {Objektwerte = input.get (0) if (Werte Instanz von DataBag) return ((DataBag) Werte) .size () else if (Werte Instanz der Karte) gibt neue Long (((Map) -Werte) zurück .size ())} statisch geschützte Long-Summe (Tupel i nput) löst ExecException, NumberFormatException aus {DataBag values ​​= (DataBag) input.get (0) long sum = 0 für (Iterator (Tuple) it = values.iterator () it.hasNext ()) {Tuple t = it.next ( ) sum + = (Long) t.get (0)} return sum}}

COUNT implementiert eine algebraische Schnittstelle, die folgendermaßen aussieht:

öffentliche Schnittstelle Algebraisch {public String getInitial () public String getIntermed () public String getFinal ()}

Damit eine Funktion algebraisch ist, muss sie eine algebraische Schnittstelle implementieren, die aus der Definition von drei von EvalFunc abgeleiteten Klassen besteht. Der Vertrag sieht vor, dass die Ausführungsfunktion der Initial-Klasse einmal aufgerufen und an das ursprüngliche Eingabetupel übergeben wird. Die Ausgabe ist ein Tupel, das Teilergebnisse enthält. Die exec-Funktion der Intermed-Klasse kann null oder mehrmals aufgerufen werden und verwendet als Eingabe ein Tupel, das Teilergebnisse enthält, die von der Initial-Klasse oder durch vorherige Aufrufe der Intermed-Klasse erzeugt wurden, und erzeugt ein Tupel mit einem anderen Teilergebnis. Schließlich wird die exec-Funktion der Final-Klasse aufgerufen und gibt das Endergebnis als Skalartyp an.

Filterfunktionen:

Filterfunktionen sind Eval-Funktionen, die einen Booleschen Wert zurückgeben. Es kann überall dort verwendet werden, wo ein Boolescher Ausdruck geeignet ist, einschließlich des Operators FILTER oder des Ausdrucks Bincond. Apache Pig unterstützt Boolean nicht vollständig, daher können Filterfunktionen nicht in Anweisungen wie 'Foreach' angezeigt werden, in denen die Ergebnisse an einen anderen Operator ausgegeben werden. Filterfunktionen können jedoch in Filteranweisungen verwendet werden.

Das folgende Beispiel implementiert die IsEmpty-Funktion:

import java.io.IOException import java.util.Map import org.apache.pig.FilterFunc import org.apache.pig.PigException import org.apache.pig.backend.executionengine.ExecException import org.apache.pig.data.DataBag import org.apache.pig.data.Tuple import org.apache.pig.data.DataType / ** * Bestimmen Sie, ob eine Tasche oder Karte leer ist. * / public class IsEmpty erweitert FilterFunc {@Override public Boolean exec (Tuple-Eingabe) löst IOException aus {try {Object values ​​= input.get (0) if (Werte Instanz von DataBag) return ((DataBag) -Werte) .size () == 0 else if (Werte Instanz von Map) return ((Map) Werte) .size () == 0 else {int errCode = 2102 String msg = 'Ein' + DataType.findTypeName (Werte) + 'kann nicht auf Leere getestet werden.' neue ExecException auslösen (msg, errCode, PigException.BUG)}} catch (ExecException ee) {throw ee}}}