Installing Hadoop CDH5 on Ubuntu 64bit Trusty

Step 1 : Check ubuntu code name
$ cat /etc/lsb-release

Step 2 : Add repository to Ubuntu Trusty

$ sudo wget 'http://archive.cloudera.com/cdh5/ubuntu/trusty/amd64/cdh/cloudera.list' \
    -O /etc/apt/sources.list.d/cloudera.list

Step 3 : Additional step for Trusty

This step ensures that you get the right ZooKeeper package for the current CDH release. You need to prioritize the Cloudera repository you have just added, such that you install the CDH version of ZooKeeper rather than the version that is bundled with Ubuntu Trusty.
To do this, create a file at /etc/apt/preferences.d/cloudera.pref with the following contents:

Package: *
Pin: release o=Cloudera, l=Cloudera
Pin-Priority: 501

Package: *
Pin: release n=raring
Pin-Priority: 100

Package: *
Pin: release n=trusty-cdh5
Pin-Priority: 600

Step 4 : Optionally Add a Repository Key[Ubuntu Trusty]

$ wget http://archive.cloudera.com/cdh5/ubuntu/trusty/amd64/cdh/archive.key -O archive.key
$ sudo apt-key add archive.key
$ sudo apt-get update

Step 5 : Install Hadoop in pseudo mode 

$sudo apt-get install hadoop-0.20-conf-pseudo
$dpkg -L hadoop-0.20-conf-pseudo
$sudo -u hdfs hdfs namenode -format
$for x in `cd /etc/init.d ; ls hadoop-hdfs-*` ; do sudo service $x start ; done [if it not get started edit hadoo-env.sh file like below
sudo gedit /etc/hadoop/conf/hadoop-env.sh
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0 
]
$ sudo -u hdfs hadoop fs -mkdir -p /tmp
$ sudo -u hdfs hadoop fs -chmod -R 1777 /tmp
$ sudo -u hdfs hadoop fs -mkdir -p /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
$ sudo -u hdfs hadoop fs -chmod 1777 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
$ sudo -u hdfs hadoop fs -chown -R mapred /var/lib/hadoop-hdfs/cache/mapred
$ sudo -u hdfs hadoop fs -ls -R /
$ for x in `cd /etc/init.d ; ls hadoop-0.20-mapreduce-*` ; do sudo service $x start ;
done
$ sudo -u hdfs hadoop fs -mkdir -p /user/$USER
$ sudo -u hdfs hadoop fs -chown $USER /user/$USER
$ hadoop fs -mkdir input
$ hadoop fs -put /etc/hadoop/conf/*.xml input
$ hadoop fs -ls input
$ /usr/bin/hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar grep input output 'dfs[a-z.]+'
$ hadoop fs -ls
$ hadoop fs -ls output
$ hadoop fs -cat output/part-00000 | head

Configuraing Hadoop in CDH5

Step 1 : Configuring Network Names
  •  Run uname -a and check that the hostname matches the output of the hostname command.
  •  Make sure the /etc/hosts file on each system contains the IP addresses and fully-qualified domain names (FQDN) of all the members of the cluster.
  •  Make sure the /etc/sysconfig/network file on each system contains the hostname you have just set (or verified) for that system
  •  Run /sbin/ifconfig and note the value of inet addr in the eth0 entry.
  •  Run host -v -t A `hostname` and make sure that hostname matches the output of the hostname command, and has the same IP address as reported by ifconfig for eth0.


Step 2 : Copy Hadoop Configuration

  • $ sudo cp -r /etc/hadoop/conf.empty /etc/hadoop/conf.my_cluster [Copy the default configuration to your custom directory]
  • $ sudo update-alternatives --install /etc/hadoop/conf hadoop-conf /etc/hadoop/conf.my_cluster 50
  • $ sudo update-alternatives --set hadoop-conf /etc/hadoop/conf.my_cluster [To manually set the configuration on Ubuntu and SLES systems]

Step 3 : Configuring HDFS



3.1. core-site.xml[coonfiguration] (sudo gedit /etc/hadoop/conf.my_cluster/core-site.xml)
i. fs.defaultFS ->  Specifies the NameNode and the default file system, in the form hdfs://<namenode host>:<namenode port>/. The default value is file///. The default file system is used to resolve relative paths; for example, if fs.default.name or fs.defaultFS is set to hdfs://mynamenode/, the relative URI /mydir/myfile resolves to hdfs://mynamenode/mydir/myfile. Note: for the cluster to function correctly, the <namenode> part of the string must be the hostname not the IP address.

<property>
 <name>fs.defaultFS</name>
 <value>hdfs://namenode-host.company.com:8020</value>
</property>
[considering host is localhost]
<property>
 <name>fs.defaultFS</name>
 <value>hdfs://localhost:8020</value>
</property>


3.2. hdfs-site.xml[coonfiguration](sudo gedit /etc/hadoop/conf.my_cluster/hdfs-site.xml)
i. dfs.permissions.superusergroup -> Specifies the UNIX group containing users that will be treated as superusers by HDFS. You can stick with the value of 'hadoop' or pick your own group depending on the security policies at your site.

<property>
 <name>dfs.permissions.superusergroup</name>
 <value>hadoop</value>
</property>
ii.  dfs.name.dir or dfs.namenode.name.dir [on the NameNode]

This property specifies the URIs of the directories where the NameNode stores its metadata and edit logs. Cloudera recommends that you specify at least two directories. One of these should be located on an NFS mount point.

<property>
 <name>dfs.namenode.name.dir</name>
 <value>file:///data/1/dfs/nn,file:///nfsmount/dfs/nn</value>
</property>

iii.  dfs.data.dir or dfs.datanode.data.dir [on each DataNode]
This property specifies the URIs of the directories where the DataNode stores blocks. Cloudera recommends that you configure the disks on the DataNode in a JBOD configuration, mounted at /data/1/ through /data/N, and configure dfs.data.dir or dfs.datanode.data.dir to specify file:///data/1/dfs/dn through file:///data/N/dfs/dn/.

<property>
 <name>dfs.datanode.data.dir</name>
 <value>file:///data/1/dfs/dn,file:///data/2/dfs/dn,file:///data/3/dfs/dn,file:///data/4/dfs/dn</value>
</property>
 After specifying these directories as shown above, you must create the directories and assign the correct file permissions to them on each node in your cluster.

In the following instructions, local path examples are used to represent Hadoop parameters. Change the path examples to match your configuration.

Local directories:

    The dfs.name.dir or dfs.namenode.name.dir parameter is represented by the /data/1/dfs/nn and /nfsmount/dfs/nn path examples.
    The dfs.data.dir or dfs.datanode.data.dir parameter is represented by the /data/1/dfs/dn, /data/2/dfs/dn, /data/3/dfs/dn, and /data/4/dfs/dn examples.

3.3. To configure local storage directories for use by HDFS:

3.3.1. On a NameNode host: create the dfs.name.dir or dfs.namenode.name.dir local directories:

$ sudo mkdir -p /data/1/dfs/nn /nfsmount/dfs/nn

3.3.2. On all DataNode hosts: create the dfs.data.dir or dfs.datanode.data.dir local directories:

$ sudo mkdir -p /data/1/dfs/dn /data/2/dfs/dn /data/3/dfs/dn /data/4/dfs/dn

3.3.3. Configure the owner of the dfs.name.dir or dfs.namenode.name.dir directory, and of the dfs.data.dir or dfs.datanode.data.dir directory, to be the hdfs user:

$ sudo chown -R hdfs:hdfs /data/1/dfs/nn /nfsmount/dfs/nn /data/1/dfs/dn /data/2/dfs/dn /data/3/dfs/dn /data/4/dfs/dn

Here is a summary of the correct owner and permissions of the local directories:

 dfs.name.dir or dfs.namenode.name.dir -> hdfs:hdfs -> drwx------
 dfs.data.dir or dfs.datanode.data.dir -> hdfs:hdfs -> drwx------
[The Hadoop daemons automatically set the correct permissions for you on dfs.data.dir or dfs.datanode.data.dir. But in the case of dfs.name.dir or dfs.namenode.name.dir, permissions are currently incorrectly set to the file-system default, usually drwxr-xr-x (755). Use the chmod command to reset permissions for these dfs.name.dir or dfs.namenode.name.dir directories to drwx------ (700); for example:

$ sudo chmod 700 /data/1/dfs/nn /nfsmount/dfs/nn
[sudo chmod 700 /data/1/dfs/nn /nfsmount/dfs/nn /data/1/dfs/dn /data/2/dfs/dn /data/3/dfs/dn /data/4/dfs/dn]
or

$ sudo chmod go-rx /data/1/dfs/nn /nfsmount/dfs/nn]

3.4. Formatting the NameNode

sudo -u hdfs hdfs namenode -format [Before starting the NameNode for the first time you need to format the file system. ]

3.5. Configuring the Secondary NameNode
Add the name of the machine that will run the Secondary NameNode to the masters file.
Add the following property to the hdfs-site.xml file:
<property>
  <name>dfs.namenode.http-address</name>
  <value><namenode.host.address>:50070</value>
  <description>
    The address and the base port on which the dfs NameNode Web UI will listen.
  </description>
</property>

[considering host is localhost]
<property>
  <name>dfs.namenode.http-address</name>
  <value>localhost:50070</value>
  <description>
    The address and the base port on which the dfs NameNode Web UI will listen.
  </description>
</property>
[In most cases, you should set dfs.namenode.http-address to a routable IP address with port 50070. you may want to set dfs.namenode.http-address to 0.0.0.0:50070 on the NameNode machine only, and set it to a real, routable address on the Secondary NameNode machine.The different addresses are needed in this case because HDFS uses dfs.namenode.http-address for two different purposes: it defines both the address the NameNode binds to, and the address the Secondary NameNode connects to for checkpointing. Using 0.0.0.0 on the NameNode allows the NameNode to bind to all its local addresses, while using the externally-routable address on the the Secondary NameNode provides the Secondary NameNode with a real address to connect to.]

3.6. Enabling Trash
Trash is configured with the following properties in the core-site.xml file:
fs.trash.interval -> 60
fs.trash.checkpoint.interval -> 60

3.7. Configuring Storage-Balancing for the DataNodes[optional]

3.8. Enabling WebHDFS

 Set the following property in hdfs-site.xml:

<property>
  <name>dfs.webhdfs.enabled</name>
  <value>true</value>
</property>

Step 4 : Deploying MapReduce v1 (MRv1) on a Cluster [i.e. Configuring Jobtracker & Tasktracker]


4.1. mapred-site.xml (sudo gedit /etc/hadoop/conf.my_cluster/mapred-site.xml)
4.1.1 : Configuring Properties for MRv1 Clusters

mapred.job.tracker(on job tracker i.e. on namenode) [If you plan to run your cluster with MRv1 daemons you need to specify the hostname and (optionally) port of the JobTracker's RPC server, in the form <host>:<port>. If the value is set to local, the default, the JobTracker runs on demand when you run a MapReduce job; do not try to start the JobTracker yourself in this case. Note: if you specify the host (rather than using local) this must be the hostname (for example mynamenode) not the IP address. ]

<property>
 <name>mapred.job.tracker</name>
 <value>jobtracker-host.company.com:8021</value>
</property>

<property>
 <name>mapred.job.tracker</name>
 <value>localhost:8021</value>
</property>

4.1.2: Configure Local Storage Directories for Use by MRv1 Daemons
mapred.local.dir (on each TaskTracker ) [This property specifies the directories where the TaskTracker will store temporary data and intermediate map output files while running MapReduce jobs. Cloudera recommends that this property specifies a directory on each of the JBOD mount points; for example, /data/1/mapred/local through /data/N/mapred/local. ]

<property>
 <name>mapred.local.dir</name>
 <value>/data/1/mapred/local,/data/2/mapred/local,/data/3/mapred/local</value>
</property>

4.2. Create the mapred.local.dir local directories:

$ sudo mkdir -p /data/1/mapred/local /data/2/mapred/local /data/3/mapred/local /data/4/mapred/local

4.3. Configure the owner of the mapred.local.dir directory to be the mapred user:[Permissions :- drwxr-xr-x ]

$ sudo chown -R mapred:hadoop /data/1/mapred/local /data/2/mapred/local /data/3/mapred/local /data/4/mapred/local

4.4. Configure a Health Check Script for DataNode Processes

#!/bin/bash
if ! jps | grep -q DataNode ; then
 echo ERROR: datanode not up
fi

4.5. Enabling JobTracker Recovery

By default JobTracker recovery is off, but you can enable it by setting the property mapreduce.jobtracker.restart.recover to true in mapred-site.xml.

4.6. If Necessary, Deploy your Custom Configuration to your Entire Cluster [already done in previous step]

 To deploy your configuration to your entire cluster:

    Push your custom directory (for example /etc/hadoop/conf.my_cluster) to each node in your cluster; for example:

    $ scp -r /etc/hadoop/conf.my_cluster myuser@myCDHnode-<n>.mycompany.com:/etc/hadoop/conf.my_cluster

Manually set alternatives on each node to point to that directory, as follows.

 To manually set the configuration on Ubuntu and SLES systems:

$ sudo update-alternatives --install /etc/hadoop/conf hadoop-conf /etc/hadoop/conf.my_cluster 50
$ sudo update-alternatives --set hadoop-conf /etc/hadoop/conf.my_cluster

4.7. If Necessary, Start HDFS on Every Node in the Cluster

Start HDFS on each node in the cluster, as follows:

for x in `cd /etc/init.d ; ls hadoop-hdfs-*` ; do sudo service $x start ; done

4.8. If Necessary, Create the HDFS /tmp Directory[already created in the previous steps]

 Create the /tmp directory after HDFS is up and running, and set its permissions to 1777 (drwxrwxrwt), as follows:

$ sudo -u hdfs hadoop fs -mkdir /tmp
$ sudo -u hdfs hadoop fs -chmod -R 1777 /tmp

4.9. Create MapReduce /var directories [already created in the previous steps]

sudo -u hdfs hadoop fs -mkdir -p /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
sudo -u hdfs hadoop fs -chmod 1777 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
sudo -u hdfs hadoop fs -chown -R mapred /var/lib/hadoop-hdfs/cache/mapred

4.10. Verify the HDFS File Structure

$ sudo -u hdfs hadoop fs -ls -R /

You should see:

drwxrwxrwt   - hdfs     supergroup          0 2012-04-19 15:14 /tmp
drwxr-xr-x   - hdfs     supergroup          0 2012-04-19 15:16 /var
drwxr-xr-x   - hdfs     supergroup          0 2012-04-19 15:16 /var/lib
drwxr-xr-x   - hdfs     supergroup          0 2012-04-19 15:16 /var/lib/hadoop-hdfs
drwxr-xr-x   - hdfs     supergroup          0 2012-04-19 15:16 /var/lib/hadoop-hdfs/cache
drwxr-xr-x   - mapred   supergroup          0 2012-04-19 15:19 /var/lib/hadoop-hdfs/cache/mapred
drwxr-xr-x   - mapred   supergroup          0 2012-04-19 15:29 /var/lib/hadoop-hdfs/cache/mapred/mapred
drwxrwxrwt   - mapred   supergroup          0 2012-04-19 15:33 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging

4.11. Create and Configure the mapred.system.dir Directory in HDFS

After you start HDFS and create /tmp, but before you start the JobTracker, you must also create the HDFS directory specified by the mapred.system.dir parameter (by default ${hadoop.tmp.dir}/mapred/system and configure it to be owned by the mapred user.

 To create the directory in its default location:

$ sudo -u hdfs hadoop fs -mkdir /tmp/mapred/system
$ sudo -u hdfs hadoop fs -chown mapred:hadoop /tmp/mapred/system

4.12. Start MapReduce

To start MapReduce, start the TaskTracker and JobTracker services

On each TaskTracker system:

$ sudo service hadoop-0.20-mapreduce-tasktracker start

On the JobTracker system:

$ sudo service hadoop-0.20-mapreduce-jobtracker start

4.13. Create a Home Directory for each MapReduce User[already created in the previous steps]

Create a home directory for each MapReduce user. It is best to do this on the NameNode; for example:

$ sudo -u hdfs hadoop fs -mkdir  /user/<user>
$ sudo -u hdfs hadoop fs -chown <user> /user/<user>

where <user> is the Linux username of each user.

Alternatively, you can log in as each Linux user (or write a script to do so) and create the home directory as follows:

sudo -u hdfs hadoop fs -mkdir /user/$USER
sudo -u hdfs hadoop fs -chown $USER /user/$USER


4.14. Set HADOOP_MAPRED_HOME
For each user who will be submitting MapReduce jobs using MapReduce v1 (MRv1), or running Pig, Hive, or Sqoop in an MRv1 installation, set the HADOOP_MAPRED_HOME environment variable as follows:

sudo gedit .bashrc
export HADOOP_MAPRED_HOME=/usr/lib/hadoop-0.20-mapreduce
export HADOOP_HOME=/usr/lib/hadoop

5. Trouble Shooting

Though the above steps are enough to run hadoop successfully in your machine but still sometime you might run into some problem. Like I have faced one of JAVA_HOME not set or not found though I have set the $JAVA_HOME variable in .bashrc. So question is what is the issue ? Well the issue here is very simple as the runtime environment of hadoop could not be able to find the JAVA_HOME because it is not set in hadoop-env file. Below steps you need to do here to resolve this issue:-


sudo gedit /etc/hadoop/conf/hadoop-env.sh
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0 

Hadoop Web Interfaces

Hadoop comes with several web interfaces which are by default (see conf/hadoop-default.xml) available at these locations:
These web interfaces provide concise information about what’s happening in your Hadoop cluster. You might want to give them a try.

Apache Tajo brings data warehousing to Hadoop

Organizations that want to extract more intelligence from their Hadoop deployments might find help from the relatively little known Tajo open source data warehouse software, which the Apache Software Foundation has pronounced as ready for commercial use.
The new version of Tajo, Apache software for running a data warehouse over Hadoop data sets, has been updated to provide greater connectivity to Java programs and third party databases such as Oracle and PostGreSQL.
While less well-known than other Apache big data projects such as Spark or Hive, Tajo could be a good fit for organizations outgrowing their commercial data warehouses. It could also be a good fit for companies wishing to analyze large sets of data stored on Hadoop data processing platforms using familiar commercial business intelligence tools instead of Hadoop’s MapReduce framework.
Tajo performs the necessary ETL (extract-transform-load process) operations to summarize large data sets stored on an HDFS (Hadoop Distributed File System). Users and external programs can then query the data through SQL.
The latest version of the software, issued Monday, comes with a newly improved JDBC (Java Database Connectivity) driver that its project managers say makes Tajo as easy to use as a standard relational database management system. The driver has been tested against a variety of commercial business intelligence software packages and other SQL-based tools.
Other new features include catalogs of built-in SQL commands from both Oracle and PostgreSQL systems.
Like a growing number of database systems, Tajo now features full support for JSON (JavaScript Object Notation), easing the process for Web developers to work with Tajo. Tajo can also work directly with Amazon S3 (Simple Storage Service)
Gruter, a big data infrastructure startup in South Korea, is leading the charge to develop Tajo. Engineers from Intel, Etsy, NASA, Cloudera and Hortonworks also contribute to the project.
Perhaps because of its South Korean home base, the software is not very widely known elsewhere in the world, compared to other open-source SQL-based Hadoop packages such as Hive or Impala.
At least in one test of the software, conducted in 2013, Tajo appeared to possess a speed advantage, according to Gruter. Korea’s SK Telecom telecommunications firm ran Tajo against 1.7 terabytes worth of data, and found it could complete queries with greater speed than either Hive or Impala, in most instances.
As with most benchmarks, results may vary according to the specific workload. New editions of Hive and Impala may have also closed the speed gap as well.
SK Telecom uses the software in production duties, as does Korea University and NASA’s Jet Propulsion Laboratory. The Korean music streaming service Melon uses the software for analytical processing, and has found that Tajo executes ETL jobs 1.5 to 10 times faster than Hive.
The Apache Software Foundation provides support and oversight for more than 350 open source projects, including Hadoop, the Cassandra NoSQL database and the Apache HTTP server.

DBInputFormat‬ to transfer data from ‪SQL‬ to ‪NoSQL‬ database

In this blog we will explore the capabilities and possibilities of one of the most important components of Hadoop technology i.e. MapReduce.
Today, companies are adopting Hadoop framework as their first choice for data storage because of its capabilities to handle large data effectively. But we also know that the data is versatile and exist in various structures and formats. To control such a huge variety of data and its different formats there should be a mechanism to accommodate all the varieties and yet produce an effective and consistent result.
The most powerful component in Hadoop framework is MapReduce which can provide the control on the data and its structure better than its other counterparts. Though it requires overhead of learning curve and the programming complexity, if you can handle these complexities you can surely handle any kind of data with Hadoop.
MapReduce framework breaks all its processing tasks into basically two phases : Map and Reduce.
Preparing your raw data for these phases requires understanding of some basic classes and interfaces. The super class for these reprocessing is InputFormat.
The InputFormat class is one of the core classes in the Hadoop MapReduce API. This class is responsible for defining two main things:
  • Data splits
  • Record reader
Data split is a fundamental concept in Hadoop MapReduce framework which defines both the size of individual map tasks and its potential execution server. The Record Reader is responsible for actual reading records from the input file and submitting them (as key/value pairs) to the mapper.
Number of mappers is decided based on the number of splits. It is the job of InputFormat to create the splits.  Most of the time split size is equivalent to block size but it’s not always that splits will be created based on the HDFS block size. It totally depends on how the  getSplits() method of your InputFormat has been overridden.
There is a fundamental difference between MR split and HDFS block. A block is a physical chunk of data while a split is just a logical chunk which a mapper reads. A split does not contain the input data, it just holds a reference or address of the data. A split basically has two things: A length in bytes and a set of storage locations, which are just strings.
To understand this better, let’s take one example: Processing data stored in your MySQL using MR. Since there is no concept of blocks in this case, the theory: “splits are always created based on the HDFS block”, fails. One possibility is to create splits based on ranges of rows in your MySQL table (and this is what DBInputFormat does, an input format for reading data from a relational databases). We may have k number of splits consisting of n rows.
It is only for the InputFormats based on FileInputFormat (an InputFormat for handling data stored in files) that the splits are created based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. If you have a file smaller than the HDFS block size, you’ll get only 1 mapper for that file. If you want to have some different behavior, you can use mapred.min.split.size. But it again depends solely on the getSplits() of your InputFormat.
We have so many pre-existing input formats available under package org.apache.hadoop.mapreduce.lib.input.
[TXT] CombineFileInputFormat.html
[TXT] CombineFileRecordReader.html
[TXT] CombineFileRecordReaderWrapper.html
[TXT] CombineFileSplit.html
[TXT] CombineSequenceFileInputFormat.html
[TXT] CombineTextInputFormat.html
[TXT] FileInputFormat.html
[TXT] FileInputFormatCounter.html
[TXT] FileSplit.html
[TXT] FixedLengthInputFormat.html
[TXT] InvalidInputException.html
[TXT] KeyValueLineRecordReader.html
[TXT] KeyValueTextInputFormat.html
[TXT] MultipleInputs.html
[TXT] NLineInputFormat.html
[TXT] SequenceFileAsBinaryInputFormat.html
[TXT] SequenceFileAsTextInputFormat.html
[TXT] SequenceFileAsTextRecordReader.html
[TXT] SequenceFileInputFilter.html
[TXT] SequenceFileInputFormat.html
[TXT] SequenceFileRecordReader.html
[TXT] TextInputFormat.html
The default being TextInputFormat.
Similarly, we have so many output formats which reads the data from reducers and  stores it into HDFS:
[TXT] FileOutputCommitter.html
[TXT] FileOutputFormat.html
[TXT] FileOutputFormatCounter.html
[TXT] FilterOutputFormat.html
[TXT] LazyOutputFormat.html
[TXT] MapFileOutputFormat.html
[TXT] MultipleOutputs.html
[TXT] NullOutputFormat.html
[TXT] PartialFileOutputCommitter.html
[TXT] PartialOutputCommitter.html
[TXT] SequenceFileAsBinaryOutputFormat.html
[TXT] SequenceFileOutputFormat.html
[TXT] TextOutputFormat.html
Default being TextOutputFormat.
By the time you finish reading this blog,  you would have learned:
  • How to write a map reduce program
  • About different types of InputFormats available in Mapreduce
  • What is the need of  InputFormats
  • How to write custom InputFormats
  • How to transfer data from SQL databases to HDFS
  • How to transfer data from SQL(here MySQL)  databases to NoSQL databases (here Hbase)
  • How to transfer data from one SQL databases to other table in SQL databases (Perhaps this may not be that much important if we do this in the same SQL database. However, there is nothing wrong in having a knowledge of the same. You never know how it can come into use)
Prerequisite:
  • Hadoop pre-installed
  • SQL pre-installed
  • Hbase pre-installed
  • Java basic understanding
  • MapReduce knowledge
  • Hadoop framework basic knowledge
Let’s understand the problem statement which we are going to solve here:
We have an employee table in MySQL DB in our relational database Edureka. Now as per the business requirement we have to shift all the data available in relational DB to Hadoop file system i.e. HDFS, NoSQL DB known as Hbase .
We have many options to do this task:
  • Sqoop
  • Flume
  • MapReduce
Now, you do not want to install and configure any other tool for this operation. You are left with only one option which is Hadoop’s processing framework MapReduce. MapReduce framework would give you full control over the data while transferring. You can manipulate the columns and put directly at any of the two target locations.
Note:
  • We need to download and put the MySQL connector in the classpath of Hadoop to fetch tables from MySQL table. To do this download the connector com.mysql.jdbc_5.1.5.jar and keep it under Hadoop_home/share/Hadoop/MaPreduce/lib directory.
cp Downloads/com.mysql.jdbc_5.1.5.jar  $HADOOP_HOME/share/hadoop/mapreduce/lib/
  • Also, put all Hbase jars under Hadoop classpath in order to make your MR program access Hbase. To do this, execute the following command:
cp $HBASE_HOME/lib/* $HADOOP_HOME/share/hadoop/mapreduce/lib/
The software versions that I have used in the execution of this task are:
  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Eclipse Luna
In order to avoid the program in any compatibility issue, I prescribe my readers to run the command with similar environment.

Custom DBInputWritable:

package com.inputFormat.copy;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBInputWritable implements Writable, DBWritable
{
 private int id;
 private String name,dept;
public void readFields(DataInput in) throws IOException { }
public void readFields(ResultSet rs) throws SQLException
 //Resultset object represents the data returned from a SQL statement
 {
 id = rs.getInt(1);
 name = rs.getString(2);
 dept = rs.getString(3);
 }
public void write(DataOutput out) throws IOException { }
public void write(PreparedStatement ps) throws SQLException
 {
 ps.setInt(1, id);
 ps.setString(2, name);
 ps.setString(3, dept);
 }
public int getId()
 {
 return id;
 }
public String getName()
 {
 return name;
 }

 public String getDept()
 {
 return dept;
 }
}

Custom DBOutputWritable:

package com.inputFormat.copy;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBOutputWritable implements Writable, DBWritable
{
 private String name;
 private int id;
 private String dept;
public DBOutputWritable(String name, int id,String dept )
 {
 this.name = name;
 this.id = id;
 this.dept = dept;
 }
public void readFields(DataInput in) throws IOException { }
public void readFields(ResultSet rs) throws SQLException
 {
 }
public void write(DataOutput out) throws IOException { }
public void write(PreparedStatement ps) throws SQLException
 {
 ps.setString(1, name);
 ps.setInt(2, id);
 ps.setString(3, dept);
 }
}

Input Table:

create database edureka;
create table emp(empid int not null,name varchar(30),dept varchar(20),primary key(empid));
insert into emp values(1,"abhay","developement"),(2,"brundesh","test");
select * from emp;

Case 1: Transfer from MySQL to HDFS

package com.inputFormat.copy;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class MainDbtohdfs
{
 public static void main(String[] args) throws Exception
 {
 Configuration conf = new Configuration();
 DBConfiguration.configureDB(conf,
 "com.mysql.jdbc.Driver", // driver class
 "jdbc:mysql://localhost:3306/edureka", // db url
 "root", // user name
 "root"); //password
Job job = new Job(conf);
 job.setJarByClass(MainDbtohdfs.class);
 job.setMapperClass(Map.class);

 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);

 job.setInputFormatClass(DBInputFormat.class);

 FileOutputFormat.setOutputPath(job, new Path(args[0]));

 DBInputFormat.setInput(
 job,
 DBInputWritable.class,
 "emp", //input table name
 null,
 null,
 new String[] { "empid", "name" ,"dept"} // table columns
 );

 Path p=new Path(args[0]);
 FileSystem fs= FileSystem.get(new URI(args[0]), conf);
 fs.delete(p);

 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}
This piece of code lets us prepare or configure the inputformat to access our source SQL DB.The parameter includes the driver class, the URL has the address of the SQL database, its username and the password.
DBConfiguration.configureDB(conf,
 "com.mysql.jdbc.Driver", // driver class
 "jdbc:mysql://localhost:3306/edureka", // db url
 "root", // user name
 "root"); //password
This piece of code lets us pass the details of the tables in the database and set it in the job object. The parameters includes of course the job instance, the custom writable class which must implement DBWritable interface, the source table name, condition if any else null, any sorting parameters else null, the list of table columns respectively.
DBInputFormat.setInput(
 job,
 DBInputWritable.class,
 "emp", //input table name
 null,
 null,
 new String[] { "empid", "name" ,"dept"} // table columns
 );

Mapper

package com.inputFormat.copy;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
public class Map extends Mapper<LongWritable, DBInputWritable, Text, IntWritable>
{
protected void map(LongWritable key, DBInputWritable value, Context ctx)
 {
 try
 {
 String name = value.getName();
 IntWritable id = new IntWritable(value.getId());
 String dept = value.getDept();
ctx.write(new Text(name+"\t"+id+"\t"+dept),id);
} catch(IOException e)
 {
 e.printStackTrace();
 } catch(InterruptedException e)
 {
 e.printStackTrace();
 }
 }
 }

Reducer: Identity Reducer Used

Command to run:
hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs /dbtohdfs

Output: MySQL Table Transferred to HDFS

hadoop dfs -ls /dbtohdfs/*

Case 2: Transfer From One Table in MySQL to Another in MySQL

creating output table in MySQL
create table employee1(name varchar(20),id int,dept varchar(20));
package com.inputFormat.copy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
public class Mainonetable_to_other_table
{
 public static void main(String[] args) throws Exception
 {
 Configuration conf = new Configuration();
 DBConfiguration.configureDB(conf,
 "com.mysql.jdbc.Driver", // driver class
 "jdbc:mysql://localhost:3306/edureka", // db url
 "root", // user name
 "root"); //password
Job job = new Job(conf);
 job.setJarByClass(Mainonetable_to_other_table.class);
 job.setMapperClass(Map.class);
 job.setReducerClass(Reduce.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 job.setOutputKeyClass(DBOutputWritable.class);
 job.setOutputValueClass(NullWritable.class);
 job.setInputFormatClass(DBInputFormat.class);
 job.setOutputFormatClass(DBOutputFormat.class);

 DBInputFormat.setInput(
 job,
 DBInputWritable.class,
 "emp", //input table name
 null,
 null,
 new String[] { "empid", "name" ,"dept"} // table columns
 );
DBOutputFormat.setOutput(
 job,
 "employee1", // output table name
 new String[] { "name", "id","dept" } //table columns
 );
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}
This piece of code lets us configure the output table name in SQL DB.The parameters are job instance,output table name and the output column names respectively.
DBOutputFormat.setOutput(
 job,
 "employee1", // output table name
 new String[] { "name", "id","dept" } //table columns
 );

Mapper: Same as Case 1

Reducer:
package com.inputFormat.copy;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable>
{
 protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)
 {
 int sum = 0;
String line[] = key.toString().split("\t");
try
 {
 ctx.write(new DBOutputWritable(line[0].toString(),Integer.parseInt(line[1].toString()),line[2].toString()), NullWritable.get());
 } catch(IOException e)
 {
 e.printStackTrace();
 } catch(InterruptedException e)
 {
 e.printStackTrace();
 }
 }
}

Command to Run:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Output: Transferred Data From EMP Table in MySQL to Another Table Employee1 in MySQL

Case 3: Transfer From Table in MySQL to NoSQL (Hbase) Table 

Creating Hbase table to accomodate output from the SQL table:
create 'employee','official_info'

Driver Class:

package Dbtohbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;

public class MainDbToHbase
{
 public static void main(String[] args) throws Exception
 {
 Configuration conf = HBaseConfiguration.create();
 HTableInterface mytable=new HTable(conf,"emp");
 DBConfiguration.configureDB(conf,
 "com.mysql.jdbc.Driver", // driver class
 "jdbc:mysql://localhost:3306/edureka", // db url
 "root", // user name
 "root"); //password

 Job job = new Job(conf,"dbtohbase");
 job.setJarByClass(MainDbToHbase.class);
 job.setMapperClass(Map.class);

 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 job.setMapOutputValueClass(Text.class);

 TableMapReduceUtil.initTableReducerJob("employee",Reduce.class, job);
job.setInputFormatClass(DBInputFormat.class);
 job.setOutputFormatClass(TableOutputFormat.class);

 DBInputFormat.setInput(
 job,
 DBInputWritable.class,
 "emp", //input table name
 null,
 null,
 new String[] { "empid", "name" ,"dept" } // table columns
 );
System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}
This piece of code lets you configure the output key class which in case of hbase is ImmutableBytesWritable
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 job.setMapOutputValueClass(Text.class);
Here we are passing the hbase table name and the reducer to act on the table.
 TableMapReduceUtil.initTableReducerJob("employee",Reduce.class, job);

Mapper:

package Dbtohbase;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class Map extends Mapper<LongWritable, DBInputWritable, ImmutableBytesWritable, Text>
{
 private IntWritable one = new IntWritable(1);
protected void map(LongWritable id, DBInputWritable value, Context context)
 {
 try
 {
 String line = value.getName();
 String cd = value.getId()+"";
 String dept = value.getDept();
 context.write(new ImmutableBytesWritable(Bytes.toBytes(cd)),new Text(line+" "+dept));

 } catch(IOException e)
 {
 e.printStackTrace();
 } catch(InterruptedException e)
 {
 e.printStackTrace();
 }
 }
}
In this piece of code we are taking values from the getters of the DBinputwritable class and then passing them in
ImmutableBytesWritable so that they reach the reducer in bytewriatble form which Hbase understands.

 String line = value.getName();
 String cd = value.getId()+"";
 String dept = value.getDept();
 context.write(new ImmutableBytesWritable(Bytes.toBytes(cd)),new Text(line+" "+dept));

Reducer:

package Dbtohbase;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

public class Reduce extends TableReducer<
ImmutableBytesWritable, Text, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<Text> values, 
 Context context) throws IOException, InterruptedException {

 String[] cause=null;

 // Loop values
 for(Text val : values)
 {
 cause=val.toString().split(" ");

 }
 // Put to HBase
 Put put = new Put(key.get());
 put.add(Bytes.toBytes("official_info"), Bytes.toBytes("name"),Bytes.toBytes(cause[0]));
 put.add(Bytes.toBytes("official_info"), Bytes.toBytes("department"), Bytes.toBytes(cause[1]));
 context.write(key, put);
 }
 }
 
This piece of code lets us decide the exact row and the column in which we would be storing values from the reducer. Here we are storing each empid in separate row as we made empid as row key which would be unique. In each row we are storing the official information of the employees under column family “official_info” under columns “name” and “department” respectively.

 Put put = new Put(key.get());
 put.add(Bytes.toBytes("official_info"), Bytes.toBytes("name"),Bytes.toBytes(cause[0]));
 put.add(Bytes.toBytes("official_info"), Bytes.toBytes("department"), Bytes.toBytes(cause[1]));
 context.write(key, put);

Transferred Data in Hbase:

scan employee

As we see we were able to complete the task of migrating our business data from a relational SQL DB to a NoSQL DB successfully.
In the next blog we’ll learn how to write and execute codes for other input and output formats.
Keep posting your comments, questions or any feedback. I would love to hear from you.
Got a question for us? Please mention it in the comments section and we will get back to you.

Hadoop Interview Question

1) Explain how Hadoop is different from other parallel computing solutions.
2) What are the modes Hadoop can run in?
3) What is a NameNode and what is a DataNode?
4) What is Shuffling in MapReduce?
5) What is the functionality of Task Tracker and Job Tracker in Hadoop? How many instances of a Task Tracker and Job Tracker can be run on a single Hadoop Cluster?
6) How does NameNode tackle DataNode failures?
7) What is InputFormat in Hadoop?
8) What is the purpose of RecordReader in Hadoop?
9) What are the points to consider when moving from an Oracle database to Hadoop clusters? How would you decide the correct size and number of nodes in a Hadoop cluster?
10) If you want to analyze 100TB of data, what is the best architecture for that?
11) What is InputSplit in MapReduce?
12)I n Hadoop, if custom partitioner is not defined then, how is data partitioned before it is sent to the reducer?
13) What is replication factor in Hadoop and what is default replication factor level Hadoop comes with?
14) What is SequenceFile in Hadoop and Explain its importance?
15) What is Speculative execution in Hadoop?
16) If you are the user of a MapReduce framework, then what are the configuration parameters you need to specify?
17) How do you benchmark your Hadoop Cluster with Hadoop tools?
18) Explain the difference between ORDER BY and SORT BY in Hive?
19) What is WebDAV in Hadoop?
20) How many Daemon processes run on a Hadoop System?
21) Hadoop attains parallelism by isolating the tasks across various nodes; it is possible for some of the slow nodes to rate-limit the rest of the program and slows down the program. What method Hadoop provides to combat this?
22) How are HDFS blocks replicated?
23) What will a Hadoop job do if developers try to run it with an output directory that is already present?
24) What happens if the number of reducers is 0?
25) What is meant by Map-side and Reduce-side join in Hadoop?
26) How can the NameNode be restarted?
27) When doing a join in Hadoop, you notice that one reducer is running for a very long time. How will address this problem in Pig?
28) How can you debug your Hadoop code?
29) What is distributed cache and what are its benefits?
30) Why would a Hadoop developer develop a Map Reduce by disabling the reduce step?
31) Explain the major difference between an HDFS block and an InputSplit.
32) Are there any problems which can only be solved by MapReduce and cannot be solved by PIG? In which kind of scenarios MR jobs will be more useful than PIG?
33) What is the need for having a password-less SSH in a distributed environment?
34) Give an example scenario on the usage of counters.
35) Does HDFS make block boundaries between records?
36) What is streaming access?
37) What do you mean by “Heartbeat” in HDFS?
38) If there are 10 HDFS blocks to be copied from one machine to another. However, the other machine can copy only 7.5 blocks, is there a possibility for the blocks to be broken down during the time of replication?
39) What is the significance of conf.setMapper class?
40) What are combiners and when are these used in a MapReduce job?
41) Which command is used to do a file system check in HDFS?
42) Explain about the different parameters of the mapper and reducer functions.
43) How can you set random number of mappers and reducers for a Hadoop job?
44) Did you ever built a production process in Hadoop? If yes, what was the process when your Hadoop job fails due to any reason? (Open Ended Question)
45) Explain about the functioning of Master Slave architecture in Hadoop?
46) What is fault tolerance in HDFS?
47) Give some examples of companies that are using Hadoop architecture extensively.
48) How does a DataNode know the location of the NameNode in Hadoop cluster?
49) How can you check whether the NameNode is working or not?
50) Explain about the different types of “writes” in HDFS.