In this blog we will
explore the capabilities and possibilities of one of the most important
components of Hadoop technology i.e. MapReduce.
CombineFileInputFormat.html
CombineFileRecordReader.html
CombineFileRecordReaderWrapper.html
CombineFileSplit.html
CombineSequenceFileInputFormat.html
CombineTextInputFormat.html
FileInputFormat.html
FileInputFormatCounter.html
FileSplit.html
FixedLengthInputFormat.html
InvalidInputException.html
KeyValueLineRecordReader.html
KeyValueTextInputFormat.html
MultipleInputs.html
NLineInputFormat.html
SequenceFileAsBinaryInputFormat.html
SequenceFileAsTextInputFormat.html
SequenceFileAsTextRecordReader.html
SequenceFileInputFilter.html
SequenceFileInputFormat.html
SequenceFileRecordReader.html
TextInputFormat.html
FileOutputFormat.html
FileOutputFormatCounter.html
FilterOutputFormat.html
LazyOutputFormat.html
MapFileOutputFormat.html
MultipleOutputs.html
NullOutputFormat.html
PartialFileOutputCommitter.html
PartialOutputCommitter.html
SequenceFileAsBinaryOutputFormat.html
SequenceFileOutputFormat.html
TextOutputFormat.html
Default being TextOutputFormat.
Today, companies are adopting Hadoop
framework as their first choice for data storage because of its
capabilities to handle large data effectively. But we also know that the
data is versatile and exist in various structures and formats. To
control such a huge variety of data and its different formats there
should be a mechanism to accommodate all the varieties and yet produce
an effective and consistent result.
The most powerful component in Hadoop
framework is MapReduce which can provide the control on the data and its
structure better than its other counterparts. Though it requires
overhead of learning curve and the programming complexity, if you can
handle these complexities you can surely handle any kind of data with
Hadoop.
MapReduce framework breaks all its processing tasks into basically two phases : Map and Reduce.
Preparing your raw data for these phases
requires understanding of some basic classes and interfaces. The super
class for these reprocessing is InputFormat.
The InputFormat class is one of the core classes in the Hadoop MapReduce API. This class is responsible for defining two main things:
- Data splits
- Record reader
Data split is a fundamental
concept in Hadoop MapReduce framework which defines both the size of
individual map tasks and its potential execution server. The Record Reader is responsible for actual reading records from the input file and submitting them (as key/value pairs) to the mapper.
Number of mappers is decided based on
the number of splits. It is the job of InputFormat to create the splits.
Most of the time split size is equivalent to block size but it’s not
always that splits will be created based on the HDFS block size. It
totally depends on how the getSplits() method of your InputFormat has
been overridden.
There is a fundamental difference
between MR split and HDFS block. A block is a physical chunk of data
while a split is just a logical chunk which a mapper reads. A split does
not contain the input data, it just holds a reference or address of the
data. A split basically has two things: A length in bytes and a set
of storage locations, which are just strings.
To understand this better, let’s take
one example: Processing data stored in your MySQL using MR. Since there
is no concept of blocks in this case, the theory: “splits are always
created based on the HDFS block”, fails.
One possibility is to create splits based on ranges of rows in your
MySQL table (and this is what DBInputFormat does, an input format for
reading data from a relational databases). We may have k number of
splits consisting of n rows.
It is only for the InputFormats based
on FileInputFormat (an InputFormat for handling data stored in files)
that the splits are created based on the total size, in bytes, of the
input files. However, the FileSystem blocksize of the input files is
treated as an upper bound for input splits. If you have a file smaller
than the HDFS block size, you’ll get only 1 mapper for that file. If you
want to have some different behavior, you can use
mapred.min.split.size. But it again depends solely on the getSplits() of
your InputFormat.
We have so many pre-existing input formats available under package org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.html
CombineFileRecordReader.html
CombineFileRecordReaderWrapper.html
CombineFileSplit.html
CombineSequenceFileInputFormat.html
CombineTextInputFormat.html
FileInputFormat.html
FileInputFormatCounter.html
FileSplit.html
FixedLengthInputFormat.html
InvalidInputException.html
KeyValueLineRecordReader.html
KeyValueTextInputFormat.html
MultipleInputs.html
NLineInputFormat.html
SequenceFileAsBinaryInputFormat.html
SequenceFileAsTextInputFormat.html
SequenceFileAsTextRecordReader.html
SequenceFileInputFilter.html
SequenceFileInputFormat.html
SequenceFileRecordReader.html
TextInputFormat.html
The default being TextInputFormat.
Similarly, we have so many output formats which reads the data from reducers and stores it into HDFS:
FileOutputCommitter.html FileOutputFormat.html
FileOutputFormatCounter.html
FilterOutputFormat.html
LazyOutputFormat.html
MapFileOutputFormat.html
MultipleOutputs.html
NullOutputFormat.html
PartialFileOutputCommitter.html
PartialOutputCommitter.html
SequenceFileAsBinaryOutputFormat.html
SequenceFileOutputFormat.html
TextOutputFormat.html
Default being TextOutputFormat.
By the time you finish reading this blog, you would have learned:
- How to write a map reduce program
- About different types of InputFormats available in Mapreduce
- What is the need of InputFormats
- How to write custom InputFormats
- How to transfer data from SQL databases to HDFS
- How to transfer data from SQL(here MySQL) databases to NoSQL databases (here Hbase)
- How to transfer data from one SQL databases to other table in SQL databases (Perhaps this may not be that much important if we do this in the same SQL database. However, there is nothing wrong in having a knowledge of the same. You never know how it can come into use)
Prerequisite:
- Hadoop pre-installed
- SQL pre-installed
- Hbase pre-installed
- Java basic understanding
- MapReduce knowledge
- Hadoop framework basic knowledge
Let’s understand the problem statement which we are going to solve here:
We have an employee table in MySQL DB in
our relational database Edureka. Now as per the business requirement we
have to shift all the data available in relational DB to Hadoop file
system i.e. HDFS, NoSQL DB known as Hbase .
We have many options to do this task:
- Sqoop
- Flume
- MapReduce
Now, you do not want to install and
configure any other tool for this operation. You are left with only one
option which is Hadoop’s processing framework MapReduce. MapReduce
framework would give you full control over the data while transferring.
You can manipulate the columns and put directly at any of the two target
locations.
Note:
- We need to download and put the MySQL connector in the classpath of Hadoop to fetch tables from MySQL table. To do this download the connector com.mysql.jdbc_5.1.5.jar and keep it under Hadoop_home/share/Hadoop/MaPreduce/lib directory.
cp Downloads/com.mysql.jdbc_5.1.5.jar $HADOOP_HOME/share/hadoop/mapreduce/lib/
- Also, put all Hbase jars under Hadoop classpath in order to make your MR program access Hbase. To do this, execute the following command:
cp $HBASE_HOME/lib/* $HADOOP_HOME/share/hadoop/mapreduce/lib/
The software versions that I have used in the execution of this task are:
- Hadooop-2.3.0
- HBase 0.98.9-Hadoop2
- Eclipse Luna
In order to avoid the program in any compatibility issue, I prescribe my readers to run the command with similar environment.
Custom DBInputWritable:
package com.inputFormat.copy; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.ResultSet; import java.sql.PreparedStatement; import java.sql.SQLException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; public class DBInputWritable implements Writable, DBWritable { private int id; private String name,dept; public void readFields(DataInput in) throws IOException { } public void readFields(ResultSet rs) throws SQLException //Resultset object represents the data returned from a SQL statement { id = rs.getInt(1); name = rs.getString(2); dept = rs.getString(3); } public void write(DataOutput out) throws IOException { } public void write(PreparedStatement ps) throws SQLException { ps.setInt(1, id); ps.setString(2, name); ps.setString(3, dept); } public int getId() { return id; } public String getName() { return name; } public String getDept() { return dept; } }
Custom DBOutputWritable:
package com.inputFormat.copy; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.ResultSet; import java.sql.PreparedStatement; import java.sql.SQLException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; public class DBOutputWritable implements Writable, DBWritable { private String name; private int id; private String dept; public DBOutputWritable(String name, int id,String dept ) { this.name = name; this.id = id; this.dept = dept; } public void readFields(DataInput in) throws IOException { } public void readFields(ResultSet rs) throws SQLException { } public void write(DataOutput out) throws IOException { } public void write(PreparedStatement ps) throws SQLException { ps.setString(1, name); ps.setInt(2, id); ps.setString(3, dept); } }
Input Table:
create database edureka;
create table emp(empid int not null,name varchar(30),dept varchar(20),primary key(empid));
insert into emp values(1,"abhay","developement"),(2,"brundesh","test");
select * from emp;
Case 1: Transfer from MySQL to HDFS
package com.inputFormat.copy; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; public class MainDbtohdfs { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver class "jdbc:mysql://localhost:3306/edureka", // db url "root", // user name "root"); //password Job job = new Job(conf); job.setJarByClass(MainDbtohdfs.class); job.setMapperClass(Map.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(DBInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); DBInputFormat.setInput( job, DBInputWritable.class, "emp", //input table name null, null, new String[] { "empid", "name" ,"dept"} // table columns ); Path p=new Path(args[0]); FileSystem fs= FileSystem.get(new URI(args[0]), conf); fs.delete(p); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
This piece of code lets us prepare or
configure the inputformat to access our source SQL DB.The parameter
includes the driver class, the URL has the address of the SQL database,
its username and the password.
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver class "jdbc:mysql://localhost:3306/edureka", // db url "root", // user name "root"); //password
This piece of code lets us pass the
details of the tables in the database and set it in the job object. The
parameters includes of course the job instance, the custom writable
class which must implement DBWritable interface, the source table name,
condition if any else null, any sorting parameters else null, the list
of table columns respectively.
DBInputFormat.setInput( job, DBInputWritable.class, "emp", //input table name null, null, new String[] { "empid", "name" ,"dept"} // table columns );
Mapper
package com.inputFormat.copy; import java.io.IOException; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; public class Map extends Mapper<LongWritable, DBInputWritable, Text, IntWritable> {
protected void map(LongWritable key, DBInputWritable value, Context ctx) { try { String name = value.getName(); IntWritable id = new IntWritable(value.getId()); String dept = value.getDept();
ctx.write(new Text(name+"\t"+id+"\t"+dept),id);
} catch(IOException e) { e.printStackTrace(); } catch(InterruptedException e) { e.printStackTrace(); } } }
Reducer: Identity Reducer Used
Command to run:
hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs /dbtohdfs
Output: MySQL Table Transferred to HDFS
hadoop dfs -ls /dbtohdfs/*
Case 2: Transfer From One Table in MySQL to Another in MySQL
creating output table in MySQL
create table employee1(name varchar(20),id int,dept varchar(20));
package com.inputFormat.copy; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; public class Mainonetable_to_other_table { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver class "jdbc:mysql://localhost:3306/edureka", // db url "root", // user name "root"); //password Job job = new Job(conf); job.setJarByClass(Mainonetable_to_other_table.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(DBOutputWritable.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); DBInputFormat.setInput( job, DBInputWritable.class, "emp", //input table name null, null, new String[] { "empid", "name" ,"dept"} // table columns ); DBOutputFormat.setOutput( job, "employee1", // output table name new String[] { "name", "id","dept" } //table columns ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
This piece of code lets us configure the
output table name in SQL DB.The parameters are job instance,output
table name and the output column names respectively.
DBOutputFormat.setOutput( job, "employee1", // output table name new String[] { "name", "id","dept" } //table columns );
Mapper: Same as Case 1
Reducer:
package com.inputFormat.copy; import java.io.IOException; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> { protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) { int sum = 0; String line[] = key.toString().split("\t"); try { ctx.write(new DBOutputWritable(line[0].toString(),Integer.parseInt(line[1].toString()),line[2].toString()), NullWritable.get()); } catch(IOException e) { e.printStackTrace(); } catch(InterruptedException e) { e.printStackTrace(); } } }
Command to Run:
hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table
Output: Transferred Data From EMP Table in MySQL to Another Table Employee1 in MySQL
Case 3: Transfer From Table in MySQL to NoSQL (Hbase) Table
Creating Hbase table to accomodate output from the SQL table:
create 'employee','official_info'
Driver Class:
package Dbtohbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; public class MainDbToHbase { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); HTableInterface mytable=new HTable(conf,"emp"); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver class "jdbc:mysql://localhost:3306/edureka", // db url "root", // user name "root"); //password Job job = new Job(conf,"dbtohbase"); job.setJarByClass(MainDbToHbase.class); job.setMapperClass(Map.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); TableMapReduceUtil.initTableReducerJob("employee",Reduce.class, job); job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); DBInputFormat.setInput( job, DBInputWritable.class, "emp", //input table name null, null, new String[] { "empid", "name" ,"dept" } // table columns ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
This piece of code lets you configure the output key class which in case of hbase is ImmutableBytesWritable
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class);
Here we are passing the hbase table name and the reducer to act on the table.
TableMapReduceUtil.initTableReducerJob("employee",Reduce.class, job);
Mapper:
package Dbtohbase; import java.io.IOException; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; public class Map extends Mapper<LongWritable, DBInputWritable, ImmutableBytesWritable, Text> { private IntWritable one = new IntWritable(1); protected void map(LongWritable id, DBInputWritable value, Context context) { try { String line = value.getName(); String cd = value.getId()+""; String dept = value.getDept(); context.write(new ImmutableBytesWritable(Bytes.toBytes(cd)),new Text(line+" "+dept)); } catch(IOException e) { e.printStackTrace(); } catch(InterruptedException e) { e.printStackTrace(); } } }
In this piece of code we are taking values from the getters of the DBinputwritable class and then passing them in
ImmutableBytesWritable so that they reach the reducer in bytewriatble form which Hbase understands.
ImmutableBytesWritable so that they reach the reducer in bytewriatble form which Hbase understands.
String line = value.getName(); String cd = value.getId()+""; String dept = value.getDept(); context.write(new ImmutableBytesWritable(Bytes.toBytes(cd)),new Text(line+" "+dept));
Reducer:
package Dbtohbase; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; public class Reduce extends TableReducer< ImmutableBytesWritable, Text, ImmutableBytesWritable> { public void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String[] cause=null; // Loop values for(Text val : values) { cause=val.toString().split(" "); } // Put to HBase Put put = new Put(key.get()); put.add(Bytes.toBytes("official_info"), Bytes.toBytes("name"),Bytes.toBytes(cause[0])); put.add(Bytes.toBytes("official_info"), Bytes.toBytes("department"), Bytes.toBytes(cause[1])); context.write(key, put); } }
This piece of code lets us decide the
exact row and the column in which we would be storing values from the
reducer. Here we are storing each empid in separate row as we made empid
as row key which would be unique. In each row we are storing the
official information of the employees under column family
“official_info” under columns “name” and “department” respectively.
Put put = new Put(key.get()); put.add(Bytes.toBytes("official_info"), Bytes.toBytes("name"),Bytes.toBytes(cause[0])); put.add(Bytes.toBytes("official_info"), Bytes.toBytes("department"), Bytes.toBytes(cause[1])); context.write(key, put);
Transferred Data in Hbase:
scan employee
As we see we were able to complete the task of migrating our business data from a relational SQL DB to a NoSQL DB successfully.
In the next blog we’ll learn how to write and execute codes for other input and output formats.
Keep posting your comments, questions or any feedback. I would love to hear from you.
Got a question for us? Please mention it in the comments section and we will get back to you.
No comments:
Post a Comment