DBInputFormat‬ to transfer data from ‪SQL‬ to ‪NoSQL‬ database

In this blog we will explore the capabilities and possibilities of one of the most important components of Hadoop technology i.e. MapReduce.
Today, companies are adopting Hadoop framework as their first choice for data storage because of its capabilities to handle large data effectively. But we also know that the data is versatile and exist in various structures and formats. To control such a huge variety of data and its different formats there should be a mechanism to accommodate all the varieties and yet produce an effective and consistent result.
The most powerful component in Hadoop framework is MapReduce which can provide the control on the data and its structure better than its other counterparts. Though it requires overhead of learning curve and the programming complexity, if you can handle these complexities you can surely handle any kind of data with Hadoop.
MapReduce framework breaks all its processing tasks into basically two phases : Map and Reduce.
Preparing your raw data for these phases requires understanding of some basic classes and interfaces. The super class for these reprocessing is InputFormat.
The InputFormat class is one of the core classes in the Hadoop MapReduce API. This class is responsible for defining two main things:
  • Data splits
  • Record reader
Data split is a fundamental concept in Hadoop MapReduce framework which defines both the size of individual map tasks and its potential execution server. The Record Reader is responsible for actual reading records from the input file and submitting them (as key/value pairs) to the mapper.
Number of mappers is decided based on the number of splits. It is the job of InputFormat to create the splits.  Most of the time split size is equivalent to block size but it’s not always that splits will be created based on the HDFS block size. It totally depends on how the  getSplits() method of your InputFormat has been overridden.
There is a fundamental difference between MR split and HDFS block. A block is a physical chunk of data while a split is just a logical chunk which a mapper reads. A split does not contain the input data, it just holds a reference or address of the data. A split basically has two things: A length in bytes and a set of storage locations, which are just strings.
To understand this better, let’s take one example: Processing data stored in your MySQL using MR. Since there is no concept of blocks in this case, the theory: “splits are always created based on the HDFS block”, fails. One possibility is to create splits based on ranges of rows in your MySQL table (and this is what DBInputFormat does, an input format for reading data from a relational databases). We may have k number of splits consisting of n rows.
It is only for the InputFormats based on FileInputFormat (an InputFormat for handling data stored in files) that the splits are created based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. If you have a file smaller than the HDFS block size, you’ll get only 1 mapper for that file. If you want to have some different behavior, you can use mapred.min.split.size. But it again depends solely on the getSplits() of your InputFormat.
We have so many pre-existing input formats available under package org.apache.hadoop.mapreduce.lib.input.
[TXT] CombineFileInputFormat.html
[TXT] CombineFileRecordReader.html
[TXT] CombineFileRecordReaderWrapper.html
[TXT] CombineFileSplit.html
[TXT] CombineSequenceFileInputFormat.html
[TXT] CombineTextInputFormat.html
[TXT] FileInputFormat.html
[TXT] FileInputFormatCounter.html
[TXT] FileSplit.html
[TXT] FixedLengthInputFormat.html
[TXT] InvalidInputException.html
[TXT] KeyValueLineRecordReader.html
[TXT] KeyValueTextInputFormat.html
[TXT] MultipleInputs.html
[TXT] NLineInputFormat.html
[TXT] SequenceFileAsBinaryInputFormat.html
[TXT] SequenceFileAsTextInputFormat.html
[TXT] SequenceFileAsTextRecordReader.html
[TXT] SequenceFileInputFilter.html
[TXT] SequenceFileInputFormat.html
[TXT] SequenceFileRecordReader.html
[TXT] TextInputFormat.html
The default being TextInputFormat.
Similarly, we have so many output formats which reads the data from reducers and  stores it into HDFS:
[TXT] FileOutputCommitter.html
[TXT] FileOutputFormat.html
[TXT] FileOutputFormatCounter.html
[TXT] FilterOutputFormat.html
[TXT] LazyOutputFormat.html
[TXT] MapFileOutputFormat.html
[TXT] MultipleOutputs.html
[TXT] NullOutputFormat.html
[TXT] PartialFileOutputCommitter.html
[TXT] PartialOutputCommitter.html
[TXT] SequenceFileAsBinaryOutputFormat.html
[TXT] SequenceFileOutputFormat.html
[TXT] TextOutputFormat.html
Default being TextOutputFormat.
By the time you finish reading this blog,  you would have learned:
  • How to write a map reduce program
  • About different types of InputFormats available in Mapreduce
  • What is the need of  InputFormats
  • How to write custom InputFormats
  • How to transfer data from SQL databases to HDFS
  • How to transfer data from SQL(here MySQL)  databases to NoSQL databases (here Hbase)
  • How to transfer data from one SQL databases to other table in SQL databases (Perhaps this may not be that much important if we do this in the same SQL database. However, there is nothing wrong in having a knowledge of the same. You never know how it can come into use)
Prerequisite:
  • Hadoop pre-installed
  • SQL pre-installed
  • Hbase pre-installed
  • Java basic understanding
  • MapReduce knowledge
  • Hadoop framework basic knowledge
Let’s understand the problem statement which we are going to solve here:
We have an employee table in MySQL DB in our relational database Edureka. Now as per the business requirement we have to shift all the data available in relational DB to Hadoop file system i.e. HDFS, NoSQL DB known as Hbase .
We have many options to do this task:
  • Sqoop
  • Flume
  • MapReduce
Now, you do not want to install and configure any other tool for this operation. You are left with only one option which is Hadoop’s processing framework MapReduce. MapReduce framework would give you full control over the data while transferring. You can manipulate the columns and put directly at any of the two target locations.
Note:
  • We need to download and put the MySQL connector in the classpath of Hadoop to fetch tables from MySQL table. To do this download the connector com.mysql.jdbc_5.1.5.jar and keep it under Hadoop_home/share/Hadoop/MaPreduce/lib directory.
cp Downloads/com.mysql.jdbc_5.1.5.jar  $HADOOP_HOME/share/hadoop/mapreduce/lib/
  • Also, put all Hbase jars under Hadoop classpath in order to make your MR program access Hbase. To do this, execute the following command:
cp $HBASE_HOME/lib/* $HADOOP_HOME/share/hadoop/mapreduce/lib/
The software versions that I have used in the execution of this task are:
  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Eclipse Luna
In order to avoid the program in any compatibility issue, I prescribe my readers to run the command with similar environment.

Custom DBInputWritable:

package com.inputFormat.copy;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBInputWritable implements Writable, DBWritable
{
 private int id;
 private String name,dept;
public void readFields(DataInput in) throws IOException { }
public void readFields(ResultSet rs) throws SQLException
 //Resultset object represents the data returned from a SQL statement
 {
 id = rs.getInt(1);
 name = rs.getString(2);
 dept = rs.getString(3);
 }
public void write(DataOutput out) throws IOException { }
public void write(PreparedStatement ps) throws SQLException
 {
 ps.setInt(1, id);
 ps.setString(2, name);
 ps.setString(3, dept);
 }
public int getId()
 {
 return id;
 }
public String getName()
 {
 return name;
 }

 public String getDept()
 {
 return dept;
 }
}

Custom DBOutputWritable:

package com.inputFormat.copy;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBOutputWritable implements Writable, DBWritable
{
 private String name;
 private int id;
 private String dept;
public DBOutputWritable(String name, int id,String dept )
 {
 this.name = name;
 this.id = id;
 this.dept = dept;
 }
public void readFields(DataInput in) throws IOException { }
public void readFields(ResultSet rs) throws SQLException
 {
 }
public void write(DataOutput out) throws IOException { }
public void write(PreparedStatement ps) throws SQLException
 {
 ps.setString(1, name);
 ps.setInt(2, id);
 ps.setString(3, dept);
 }
}

Input Table:

create database edureka;
create table emp(empid int not null,name varchar(30),dept varchar(20),primary key(empid));
insert into emp values(1,"abhay","developement"),(2,"brundesh","test");
select * from emp;

Case 1: Transfer from MySQL to HDFS

package com.inputFormat.copy;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class MainDbtohdfs
{
 public static void main(String[] args) throws Exception
 {
 Configuration conf = new Configuration();
 DBConfiguration.configureDB(conf,
 "com.mysql.jdbc.Driver", // driver class
 "jdbc:mysql://localhost:3306/edureka", // db url
 "root", // user name
 "root"); //password
Job job = new Job(conf);
 job.setJarByClass(MainDbtohdfs.class);
 job.setMapperClass(Map.class);

 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);

 job.setInputFormatClass(DBInputFormat.class);

 FileOutputFormat.setOutputPath(job, new Path(args[0]));

 DBInputFormat.setInput(
 job,
 DBInputWritable.class,
 "emp", //input table name
 null,
 null,
 new String[] { "empid", "name" ,"dept"} // table columns
 );

 Path p=new Path(args[0]);
 FileSystem fs= FileSystem.get(new URI(args[0]), conf);
 fs.delete(p);

 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}
This piece of code lets us prepare or configure the inputformat to access our source SQL DB.The parameter includes the driver class, the URL has the address of the SQL database, its username and the password.
DBConfiguration.configureDB(conf,
 "com.mysql.jdbc.Driver", // driver class
 "jdbc:mysql://localhost:3306/edureka", // db url
 "root", // user name
 "root"); //password
This piece of code lets us pass the details of the tables in the database and set it in the job object. The parameters includes of course the job instance, the custom writable class which must implement DBWritable interface, the source table name, condition if any else null, any sorting parameters else null, the list of table columns respectively.
DBInputFormat.setInput(
 job,
 DBInputWritable.class,
 "emp", //input table name
 null,
 null,
 new String[] { "empid", "name" ,"dept"} // table columns
 );

Mapper

package com.inputFormat.copy;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
public class Map extends Mapper<LongWritable, DBInputWritable, Text, IntWritable>
{
protected void map(LongWritable key, DBInputWritable value, Context ctx)
 {
 try
 {
 String name = value.getName();
 IntWritable id = new IntWritable(value.getId());
 String dept = value.getDept();
ctx.write(new Text(name+"\t"+id+"\t"+dept),id);
} catch(IOException e)
 {
 e.printStackTrace();
 } catch(InterruptedException e)
 {
 e.printStackTrace();
 }
 }
 }

Reducer: Identity Reducer Used

Command to run:
hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs /dbtohdfs

Output: MySQL Table Transferred to HDFS

hadoop dfs -ls /dbtohdfs/*

Case 2: Transfer From One Table in MySQL to Another in MySQL

creating output table in MySQL
create table employee1(name varchar(20),id int,dept varchar(20));
package com.inputFormat.copy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
public class Mainonetable_to_other_table
{
 public static void main(String[] args) throws Exception
 {
 Configuration conf = new Configuration();
 DBConfiguration.configureDB(conf,
 "com.mysql.jdbc.Driver", // driver class
 "jdbc:mysql://localhost:3306/edureka", // db url
 "root", // user name
 "root"); //password
Job job = new Job(conf);
 job.setJarByClass(Mainonetable_to_other_table.class);
 job.setMapperClass(Map.class);
 job.setReducerClass(Reduce.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 job.setOutputKeyClass(DBOutputWritable.class);
 job.setOutputValueClass(NullWritable.class);
 job.setInputFormatClass(DBInputFormat.class);
 job.setOutputFormatClass(DBOutputFormat.class);

 DBInputFormat.setInput(
 job,
 DBInputWritable.class,
 "emp", //input table name
 null,
 null,
 new String[] { "empid", "name" ,"dept"} // table columns
 );
DBOutputFormat.setOutput(
 job,
 "employee1", // output table name
 new String[] { "name", "id","dept" } //table columns
 );
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}
This piece of code lets us configure the output table name in SQL DB.The parameters are job instance,output table name and the output column names respectively.
DBOutputFormat.setOutput(
 job,
 "employee1", // output table name
 new String[] { "name", "id","dept" } //table columns
 );

Mapper: Same as Case 1

Reducer:
package com.inputFormat.copy;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable>
{
 protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)
 {
 int sum = 0;
String line[] = key.toString().split("\t");
try
 {
 ctx.write(new DBOutputWritable(line[0].toString(),Integer.parseInt(line[1].toString()),line[2].toString()), NullWritable.get());
 } catch(IOException e)
 {
 e.printStackTrace();
 } catch(InterruptedException e)
 {
 e.printStackTrace();
 }
 }
}

Command to Run:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Output: Transferred Data From EMP Table in MySQL to Another Table Employee1 in MySQL

Case 3: Transfer From Table in MySQL to NoSQL (Hbase) Table 

Creating Hbase table to accomodate output from the SQL table:
create 'employee','official_info'

Driver Class:

package Dbtohbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;

public class MainDbToHbase
{
 public static void main(String[] args) throws Exception
 {
 Configuration conf = HBaseConfiguration.create();
 HTableInterface mytable=new HTable(conf,"emp");
 DBConfiguration.configureDB(conf,
 "com.mysql.jdbc.Driver", // driver class
 "jdbc:mysql://localhost:3306/edureka", // db url
 "root", // user name
 "root"); //password

 Job job = new Job(conf,"dbtohbase");
 job.setJarByClass(MainDbToHbase.class);
 job.setMapperClass(Map.class);

 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 job.setMapOutputValueClass(Text.class);

 TableMapReduceUtil.initTableReducerJob("employee",Reduce.class, job);
job.setInputFormatClass(DBInputFormat.class);
 job.setOutputFormatClass(TableOutputFormat.class);

 DBInputFormat.setInput(
 job,
 DBInputWritable.class,
 "emp", //input table name
 null,
 null,
 new String[] { "empid", "name" ,"dept" } // table columns
 );
System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}
This piece of code lets you configure the output key class which in case of hbase is ImmutableBytesWritable
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 job.setMapOutputValueClass(Text.class);
Here we are passing the hbase table name and the reducer to act on the table.
 TableMapReduceUtil.initTableReducerJob("employee",Reduce.class, job);

Mapper:

package Dbtohbase;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class Map extends Mapper<LongWritable, DBInputWritable, ImmutableBytesWritable, Text>
{
 private IntWritable one = new IntWritable(1);
protected void map(LongWritable id, DBInputWritable value, Context context)
 {
 try
 {
 String line = value.getName();
 String cd = value.getId()+"";
 String dept = value.getDept();
 context.write(new ImmutableBytesWritable(Bytes.toBytes(cd)),new Text(line+" "+dept));

 } catch(IOException e)
 {
 e.printStackTrace();
 } catch(InterruptedException e)
 {
 e.printStackTrace();
 }
 }
}
In this piece of code we are taking values from the getters of the DBinputwritable class and then passing them in
ImmutableBytesWritable so that they reach the reducer in bytewriatble form which Hbase understands.

 String line = value.getName();
 String cd = value.getId()+"";
 String dept = value.getDept();
 context.write(new ImmutableBytesWritable(Bytes.toBytes(cd)),new Text(line+" "+dept));

Reducer:

package Dbtohbase;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

public class Reduce extends TableReducer<
ImmutableBytesWritable, Text, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<Text> values, 
 Context context) throws IOException, InterruptedException {

 String[] cause=null;

 // Loop values
 for(Text val : values)
 {
 cause=val.toString().split(" ");

 }
 // Put to HBase
 Put put = new Put(key.get());
 put.add(Bytes.toBytes("official_info"), Bytes.toBytes("name"),Bytes.toBytes(cause[0]));
 put.add(Bytes.toBytes("official_info"), Bytes.toBytes("department"), Bytes.toBytes(cause[1]));
 context.write(key, put);
 }
 }
 
This piece of code lets us decide the exact row and the column in which we would be storing values from the reducer. Here we are storing each empid in separate row as we made empid as row key which would be unique. In each row we are storing the official information of the employees under column family “official_info” under columns “name” and “department” respectively.

 Put put = new Put(key.get());
 put.add(Bytes.toBytes("official_info"), Bytes.toBytes("name"),Bytes.toBytes(cause[0]));
 put.add(Bytes.toBytes("official_info"), Bytes.toBytes("department"), Bytes.toBytes(cause[1]));
 context.write(key, put);

Transferred Data in Hbase:

scan employee

As we see we were able to complete the task of migrating our business data from a relational SQL DB to a NoSQL DB successfully.
In the next blog we’ll learn how to write and execute codes for other input and output formats.
Keep posting your comments, questions or any feedback. I would love to hear from you.
Got a question for us? Please mention it in the comments section and we will get back to you.

No comments:

Post a Comment