DBInputFormat zum Übertragen von Daten von SQL in die NoSQL-Datenbank



Ziel dieses Blogs ist es, zu lernen, wie Daten aus SQL-Datenbanken in HDFS übertragen werden und wie Daten aus SQL-Datenbanken in NoSQL-Datenbanken übertragen werden.

In diesem Blog werden wir die Fähigkeiten und Möglichkeiten einer der wichtigsten Komponenten der Hadoop-Technologie untersuchen, d. H. MapReduce.

Heute setzen Unternehmen das Hadoop-Framework als erste Wahl für die Datenspeicherung ein, da es große Datenmengen effektiv verarbeiten kann. Wir wissen aber auch, dass die Daten vielseitig sind und in verschiedenen Strukturen und Formaten vorliegen. Um eine so große Vielfalt von Daten und ihre verschiedenen Formate zu kontrollieren, sollte es einen Mechanismus geben, der alle Sorten berücksichtigt und dennoch ein effektives und konsistentes Ergebnis liefert.





Die leistungsstärkste Komponente im Hadoop-Framework ist MapReduce, mit dem die Daten und ihre Struktur besser gesteuert werden können als mit den anderen Gegenstücken. Obwohl dies einen Lernaufwand und die Komplexität der Programmierung erfordert, können Sie mit Hadoop sicher jede Art von Daten verarbeiten, wenn Sie diese Komplexität bewältigen können.

Das MapReduce-Framework unterteilt alle Verarbeitungsaufgaben in zwei Phasen: Map und Reduce.



Um Ihre Rohdaten für diese Phasen vorzubereiten, müssen Sie einige grundlegende Klassen und Schnittstellen kennen. Die Superklasse für diese Wiederaufbereitung ist Eingabeformat.

Das Eingabeformat Klasse ist eine der Kernklassen in der Hadoop MapReduce-API. Diese Klasse ist für die Definition von zwei Hauptsachen verantwortlich:

  • Datenaufteilung
  • Plattenleser

Datenaufteilung ist ein grundlegendes Konzept im Hadoop MapReduce-Framework, das sowohl die Größe einzelner Kartenaufgaben als auch den potenziellen Ausführungsserver definiert. Das Plattenleser ist verantwortlich für das tatsächliche Lesen von Datensätzen aus der Eingabedatei und deren Übermittlung (als Schlüssel / Wert-Paare) an den Mapper.



Die Anzahl der Mapper wird anhand der Anzahl der Teilungen festgelegt. Es ist die Aufgabe von InputFormat, die Teilungen zu erstellen. Meistens entspricht die Split-Größe der Blockgröße, aber es ist nicht immer so, dass Splits basierend auf der HDFS-Blockgröße erstellt werden. Es hängt ganz davon ab, wie die Methode getSplits () Ihres InputFormat überschrieben wurde.

Es gibt einen grundlegenden Unterschied zwischen MR-Split und HDFS-Block. Ein Block ist ein physischer Datenblock, während ein Split nur ein logischer Block ist, den ein Mapper liest. Ein Split enthält keine Eingabedaten, sondern nur eine Referenz oder Adresse der Daten. Ein Split hat im Grunde zwei Dinge: Eine Länge in Bytes und eine Reihe von Speicherorten, die nur Zeichenfolgen sind.

Um dies besser zu verstehen, nehmen wir ein Beispiel: Verarbeiten von in MySQL gespeicherten Daten mit MR. Da es in diesem Fall kein Konzept für Blöcke gibt, lautet die Theorie: „Splits werden immer basierend auf dem HDFS-Block erstellt“,schlägt fehl. Eine Möglichkeit besteht darin, Teilungen basierend auf Zeilenbereichen in Ihrer MySQL-Tabelle zu erstellen (und genau das tut DBInputFormat, ein Eingabeformat zum Lesen von Daten aus relationalen Datenbanken). Wir können k Teilungen haben, die aus n Zeilen bestehen.

Nur für die auf FileInputFormat basierenden InputFormats (ein InputFormat zur Verarbeitung von in Dateien gespeicherten Daten) werden die Teilungen basierend auf der Gesamtgröße der Eingabedateien in Byte erstellt. Die Dateisystem-Blockgröße der Eingabedateien wird jedoch als Obergrenze für Eingabeaufteilungen behandelt. Wenn Sie eine Datei haben, die kleiner als die HDFS-Blockgröße ist, erhalten Sie nur 1 Mapper für diese Datei. Wenn Sie ein anderes Verhalten wünschen, können Sie mapred.min.split.size verwenden. Dies hängt jedoch wiederum ausschließlich von getSplits () Ihres InputFormat ab.

Wir haben so viele bereits vorhandene Eingabeformate unter dem Paket org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

Wofür wird Mongodb verwendet?

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

Beispiel für einen Gurken-Java-Selen-Webdriver

TextInputFormat.html

Der Standardwert ist TextInputFormat.

Ebenso haben wir so viele Ausgabeformate, die die Daten von Reduzierern lesen und in HDFS speichern:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Standardmäßig ist TextOutputFormat.

Wenn Sie diesen Blog gelesen haben, haben Sie Folgendes gelernt:

  • So schreiben Sie ein Kartenreduzierungsprogramm
  • Informationen zu verschiedenen Arten von InputFormats, die in Mapreduce verfügbar sind
  • Was ist die Notwendigkeit von InputFormats
  • So schreiben Sie benutzerdefinierte InputFormats
  • Übertragen von Daten aus SQL-Datenbanken nach HDFS
  • So übertragen Sie Daten aus SQL-Datenbanken (hier MySQL) in NoSQL-Datenbanken (hier Hbase)
  • Übertragen von Daten aus einer SQL-Datenbank in eine andere Tabelle in SQL-Datenbanken (Möglicherweise ist dies nicht so wichtig, wenn wir dies in derselben SQL-Datenbank tun. Es ist jedoch nichts Falsches daran, über dieselben Kenntnisse zu verfügen. Sie wissen es nie wie es in Gebrauch kommen kann)

Voraussetzung:

  • Hadoop vorinstalliert
  • SQL vorinstalliert
  • Hbase vorinstalliert
  • Grundlegendes Verständnis von Java
  • MapReduce-Wissen
  • Grundkenntnisse des Hadoop-Frameworks

Lassen Sie uns die Problemstellung verstehen, die wir hier lösen werden:

Wir haben eine Mitarbeitertabelle in MySQL DB in unserer relationalen Datenbank Edureka. Gemäß den Geschäftsanforderungen müssen wir nun alle in der relationalen Datenbank verfügbaren Daten in das Hadoop-Dateisystem verschieben, d. H. HDFS, NoSQL DB, bekannt als Hbase.

Wir haben viele Möglichkeiten, um diese Aufgabe zu erledigen:

  • Sqoop
  • Gerinne
  • Karte verkleinern

Jetzt möchten Sie kein anderes Tool für diesen Vorgang installieren und konfigurieren. Sie haben nur noch eine Option: das Hadoop-Verarbeitungsframework MapReduce. Mit dem MapReduce-Framework haben Sie während der Übertragung die volle Kontrolle über die Daten. Sie können die Spalten bearbeiten und direkt an einer der beiden Zielpositionen platzieren.

Hinweis:

Unterschied zwischen Agile und Devops
  • Wir müssen den MySQL-Connector herunterladen und in den Klassenpfad von Hadoop einfügen, um Tabellen aus der MySQL-Tabelle abzurufen. Laden Sie dazu den Connector com.mysql.jdbc_5.1.5.jar herunter und speichern Sie ihn im Verzeichnis Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Stellen Sie außerdem alle Hbase-Gläser unter den Hadoop-Klassenpfad, damit Ihr MR-Programm auf Hbase zugreifen kann. Führen Sie dazu den folgenden Befehl aus ::
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Die Softwareversionen, die ich bei der Ausführung dieser Aufgabe verwendet habe, sind:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Eclipse Moon

Um das Programm bei Kompatibilitätsproblemen zu vermeiden, empfehle ich meinen Lesern, den Befehl in einer ähnlichen Umgebung auszuführen.

Benutzerdefiniertes DBInputWritable:

Paket com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implementiert Writable, DBWritable {private int id privater Stringname, dept public void readFields (DataInput in) löst IOException {} public void readFields (ResultSet rs) aus löst SQLException aus // Resultset-Objekt repräsentiert die von einer SQL-Anweisung zurückgegebenen Daten {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) löst IOException {aus } public void write (PreparedStatement ps) löst SQLException aus {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Benutzerdefiniertes DBOutputWritable:

Paket com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementiert Writable, DBWritable {privater Stringname private int id private Stringabteilung public DBOutputWritable (Stringname, int id, Stringabteilung) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) löst IOException aus {} public void readFields (ResultSet rs) löst SQLException aus {} public void write (DataOutput out) löst IOException {} public void write (PreparedStatement) aus ps) löst SQLException aus {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

Eingabetabelle:

Datenbank edureka erstellen
Tabelle emp erstellen (empid int nicht null, Name varchar (30), dept varchar (20), Primärschlüssel (empid))
in emp Werte einfügen (1, 'abhay', 'Entwicklung'), (2, 'brundesh', 'test')
Wählen Sie * aus emp

Fall 1: Übertragung von MySQL zu HDFS

Paket com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce Job importieren org.apache.hadoop.mapreduce.lib.db.DBConfiguration importieren org.apache.hadoop.mapreduce.lib.db.DBInputFormat importieren org.apache.hadoop.mapreduce.lib.output.FileOutputFormat importieren org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public statisch void main (String [] args) löst eine Ausnahme aus {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // Treiberklasse' jdbc: mysql: // localhost: 3306 / edureka ', // Datenbank-URL' root ', // Benutzername' root ') // Passwort Job job = neuer Job (conf) Job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) neuer Pfad (args [0])) DBInputFormat.setInput (Job, DBInputWritable.class, 'emp', // Name der Eingabetabelle null, null, neuer String [] {'empid', 'name', 'dept'} / / Tabellenspalten) Pfad p = neuer Pfad (args [0]) FileSystem fs = FileSystem.get (neuer URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Mit diesem Code können wir das Eingabeformat für den Zugriff auf unsere SQL-Quelldatenbank vorbereiten oder konfigurieren. Der Parameter enthält die Treiberklasse, die URL enthält die Adresse der SQL-Datenbank, ihren Benutzernamen und das Kennwort.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // Treiberklasse 'jdbc: mysql: // localhost: 3306 / edureka', // Datenbank-URL 'root', // Benutzername 'root') //Passwort

Mit diesem Code können wir die Details der Tabellen in der Datenbank übergeben und im Jobobjekt festlegen. Die Parameter umfassen natürlich die Jobinstanz, die benutzerdefinierte beschreibbare Klasse, die die DBWritable-Schnittstelle implementieren muss, den Namen der Quelltabelle, die Bedingung, falls sonst null, alle Sortierparameter, sonst null, die Liste der Tabellenspalten.

DBInputFormat.setInput (Job, DBInputWritable.class, 'emp', // Name der Eingabetabelle null, null, neuer String [] {'empid', 'name', 'dept'} // Tabellenspalten)

Mapper

Paket com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map erweitert Mapper {
geschützte void map (LongWritable-Schlüssel, DBInputWritable-Wert, Context ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (neuer Text (Name + '' + ID + '' + Abteilung), ID)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reduzierer: Identitätsreduzierer verwendet

Befehl zum Ausführen:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Ausgabe: MySQL-Tabelle auf HDFS übertragen

hadoop dfs -ls / dbtohdfs / *

Fall 2: Übertragung von einer Tabelle in MySQL zu einer anderen in MySQL

Erstellen einer Ausgabetabelle in MySQL

Erstellen Sie die Tabelle employee1 (Name varchar (20), id int, dept varchar (20)).

Paket com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) löst eine Ausnahme aus {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // Treiberklasse 'jdbc: mysql: // localhost : 3306 / edureka ', // Datenbank-URL' root ', // Benutzername' root ') // Passwort Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // Name der Eingabetabelle null, null, new String [] {'id ',' name ',' dept '} // Tabellenspalten) DBOutputFormat.setOutput (job,' employee1 ', // Name der Ausgabetabelle new String [] {' name ',' id ',' dept '} // table Spalten) System.exit (job.waitForCompletion (true)? 0: 1)}}

Mit diesem Code können wir den Namen der Ausgabetabelle in SQL DB konfigurieren. Die Parameter sind Jobinstanz, Name der Ausgabetabelle und Name der Ausgabespalte.

DBOutputFormat.setOutput (job, 'employee1', // Name der Ausgabetabelle new String [] {'name', 'id', 'dept'} // Tabellenspalten)

Mapper: Wie Fall 1

Reduzierer:

Paket com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce erweitert Reducer {protected void redu (Textschlüssel, Iterable-Werte, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (Zeile [0] .toString (), Integer.parseInt (Zeile [1] .toString ()), Zeile [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Befehl zum Ausführen:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Ausgabe: Übertragene Daten von der EMP-Tabelle in MySQL zu einer anderen Tabelle Employee1 in MySQL

Fall 3: Übertragung von einer Tabelle in MySQL in eine NoSQL-Tabelle (Hbase)

Erstellen einer Hbase-Tabelle zur Aufnahme der Ausgabe aus der SQL-Tabelle:

Erstelle 'Mitarbeiter', 'offizielle_info'

Fahrerklasse:

Paket Dbtohbase importieren org.apache.hadoop.conf.Configuration importieren org.apache.hadoop.mapreduce.Job importieren org.apache.hadoop.mapreduce.lib.db.DBConfiguration importieren org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text öffentliche Klasse MainDbToHbase {public static void main (String [] args) löst Exception {Configuration conf = aus HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // Treiberklasse 'jdbc: mysql: // localhost: 3306 / edureka' , // Datenbank-URL 'root', // Benutzername 'root') // Passwort Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('employee', Reduce.class, job) job.setInputFormatFormatCorm ( Klasse) DBInputFormat.setInput (Job, DBInputWritable.class, 'emp', // Name der Eingabetabelle null, null, neuer String [] {'empid', 'name', 'dept'} // Tabellenspalten) System.exit (job.waitForCompletion (true)? 0: 1)}}

Mit diesem Code können Sie die Ausgabeschlüsselklasse konfigurieren, die im Fall von hbase ImmutableBytesWritable ist

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Hier übergeben wir den Namen der hbase-Tabelle und den Reduzierer, um auf die Tabelle einzuwirken.

TableMapReduceUtil.initTableReducerJob ('Mitarbeiter', Reduce.class, Job)

Mapper:

Paket Dbtohbase importieren java.io.IOException importieren org.apache.hadoop.mapreduce.Mapper importieren org.apache.hadoop.hbase.io.ImmutableBytesWritable importieren org.apache.hadoop.hbase.util.Bytes importieren org.apache.hadoop.io .LongWritable-Import org.apache.hadoop.io.Text-Import org.apache.hadoop.io.IntWritable public class Map erweitert Mapper {private IntWritable one = neue IntWritable (1) geschützte leere Map (LongWritable-ID, DBInputWritable-Wert, Kontextkontext) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

In diesem Code nehmen wir Werte aus den Gettern der DBinputwritable-Klasse und übergeben sie dann weiter
ImmutableBytesWritable, damit sie den Reduzierer in bytewriatble Form erreichen, die Hbase versteht.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + dept ))

Reduzierer:

Paket Dbtohbase importieren java.io.IOException importieren org.apache.hadoop.hbase.client.Put importieren org.apache.hadoop.hbase.io.ImmutableBytesWritable importieren org.apache.hadoop.hbase.mapreduce.TableReducer importieren org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce erweitert TableReducer {public void redu (ImmutableBytesWritable-Schlüssel, Iterable-Werte, Kontextkontext) löst IOException, InterruptedException {String [] caus = null // Loop-Werte aus for (Text val: values) {caus = val.toString (). split ('')} // In HBase setzen Put put = new Put (key.get ()) put.add (Bytes.toBytes ('offiziell_info') ), Bytes.toBytes ('name'), Bytes.toBytes (Ursache [0])) put.add (Bytes.toBytes ('offizielle_info'), Bytes.toBytes ('Abteilung'), Bytes.toBytes (Ursache [1] ])) context.write (key, put)}}

Mit diesem Code können wir die genaue Zeile und Spalte bestimmen, in der die Werte des Reduzierers gespeichert werden sollen. Hier speichern wir jedes Empid in einer separaten Zeile, da wir Empid als Zeilenschlüssel erstellt haben, der eindeutig wäre. In jeder Zeile speichern wir die offiziellen Informationen der Mitarbeiter unter der Spaltenfamilie 'offiziell_info' unter den Spalten 'Name' bzw. 'Abteilung'.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('offizielle_info'), Bytes.toBytes ('name'), Bytes.toBytes (Ursache [0])) put.add (Bytes. toBytes ('offiziell_info'), Bytes.toBytes ('Abteilung'), Bytes.toBytes (Ursache [1])) context.write (Schlüssel, Put)

Übertragene Daten in Hbase:

Mitarbeiter scannen

Wie wir sehen, konnten wir die Aufgabe der Migration unserer Geschäftsdaten von einer relationalen SQL-Datenbank zu einer NoSQL-Datenbank erfolgreich abschließen.

Im nächsten Blog erfahren Sie, wie Sie Codes für andere Eingabe- und Ausgabeformate schreiben und ausführen.

Veröffentlichen Sie weiterhin Ihre Kommentare, Fragen oder Rückmeldungen. Ich würde es lieben von Dir zu hören.

Hast du eine Frage an uns? Bitte erwähnen Sie es in den Kommentaren und wir werden uns bei Ihnen melden.

Zusammenhängende Posts: