Big Data Analytics
Hadoop: Big Picture
Hadoop: Big Picture
High-level languages
Execution engine
light-weight DB
Centralized tool
for coordination
Distributed File system
HDFS + MapReduce are enough to have things working
HDFS: Hadoop Distributed File System
• HDFS is a master-slave architecture
– Master: namenode
– Slave: datanode (100s or 1000s of nodes)
– Single namenode and many datanodes
– Namenode maintains the file system metadata
– Files are split into fixed sized blocks and stored on data
nodes (Default 64MB)
– Data blocks are replicated for fault tolerance and fast
access (Default is 3)
HDFS Architecture
• Default placement policy: where to put a given block?
Frist copy is written to the node creating the file (write affinity)
Second copy is written to a datanode within the same rack
Third copy is written to a datanode in a different rack
Objectives: load balancing, fast access, fault tolerance
MapReduce: Hadoop Execution Layer
• JobTracker knows everything about
submitted jobs
• Divides jobs into tasks and decides
where to run each task
• Continuously communicating with
• TaskTrackers execute task (multiple per
• Monitors the execution of each task
• Continuously sending feedback to
• MapReduce is a master-slave architecture
– Master: JobTracker
– Slave: TaskTrackers (100s or 1000s of tasktrackers)
• Every datanode is running a TaskTracker
High-level MapReduce Pipeline
Hadoop MapReduce Data Flow
Hadoop Computing Model
• Mapper and Reducers consume and produce (key,
value) pairs
• Users define the data type of the Key and Value
• Shuffling and Sorting phase
– Map output is shuffled such that all same-key records go
to the same reducer
– Each reducer may receive multiple key sets
– Each reducer sorts its records to group similar keys, then
process each group
Hadoop Configuration and Installation
What is MapReduce?
MapReduce is a simple programming model for processing
huge datasets in parallel.
MapReduce is to divide a task into the sub-tasks in parallel, and
aggregate the results of the sub-tasks to form the final output.
Programs written in MapReduce are automatically parallelized.
Programmers write two functions: map and reduce.
MapReduce programs are generally used to process large files.
The input and output for the map and reduce functions are
expressed in the form of key-value pairs.
Three required components in a Hadoop MapReduce program:
• The mapper class
• The reducer class
• The driver: initializing the job with its configuration details
• Job configuration
• Specifying the mapper and the reducer class
• The input files
• The output files
• Number of maps, reduces,..
General Framework of a MapReduce Program
mapper class{
// map() function
reducer class{
// reduce() function
driver class{
// parameters about job configurations
MapReduce Pipeline
• map(inKey, inValue)  list of (intermediateKey,
• The input to the map function is the form of keyvalue pairs, even though the input to a MapReduce
program is a file or files.
– By default, the value is a data record and the key is the
offset of the data record from the beginning of the data
• The output consists of a collection of key-value pairs
which are input for the reduce function.
– Word count example.
– The input to the mapper is each line of the file, while the
output from each mapper is a set of key-value pairs
where one word is the key and the number 1 is the
– (5539, “I am taking IDS594 class. This class is fun.”) 
(I, 1) (am, 1) (taking, 1) (IDS594, 1) (class, 1) (This, 1)
(Class, 1) (is, 1) (fun, 1)
Mapper Class
public class MapperClass extends MapReduceBase implements Mapper<inKeyType
inKey, inValueType inValue, intermediateKeyType intermediatekey,
intermediateValueType intermediateValue> {
// some variable definitions here;
public void map(inKeyType inKey, inValueType inValue,
OutputCollector<intermediateKeyType, intermediateValueType> output, Reporter
reporter) throws IOException {
// implementation body;
• reduce(intermediateKey, list(intermediateValue)) 
list(outKey, outValue)
• Each reduce function processes the intermediate values
for a particular key generated by the map function and
generates the output.
• There exists a one-one mapping between keys and
reducers. They are independent of one another.
• The number of reducers is decided by the user. By
default, it is 1.
• Word count example
– (I, 1) (am, 1) (taking, 1) (IDS594, 1) (class, 1) (This, 1)
(Class, 1) (is, 1) (fun, 1)  (I, 1) (am, 1) (taking, 1)
(IDS594, 1) (class, 2) (This, 1) (is, 1) (fun, 1)
Reducer Class
public class ReducerClass extends MapReduceBase implements
Mapper<intermediateKeyType intermediateKey,
Iterator<intermediateValueType> inValues, outKeyType outkey,
outValueType outValue> {
// some variable definitions here;
public void reduce(intermediateKeyType inKey,
Iterator<intermediateValueType> inValues, OutputCollector<outKeyType,
outValueType> output, Reporter reporter) throws IOException {
// implementation body;
MapReduce Process
Driver Class
• It is responsible for triggering the map reduce job in
public class DriverClass extends Configured implements Tool{
public int run(String[] args) throws Exception{
// some configuration statements here…
public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(), new driver(),args);
Word Count Example
Mapper Class
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text,
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//map method that performs the tokenizer job and framing the initial key value pairs
public void map(LongWritable key,Text value, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException{
//taking one line at a time and tokenizing the same
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
//iterating through all the words available in that line and forming the key value pair
while (tokenizer.hasMoreTokens()){
//sending to output collector which in turn passes the same to reducer
output.collect(word, one);
Reducer Class
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text,
//reduce method accepts the Key Value pairs from mappers
//do the aggregation based on keys and produce the final out put
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable>
output, Reporter reporter) throws IOException{
int sum = 0;
/*iterates through all the values available with a key
*and add them together
*and give the final result as the key and sum of its values
while (values.hasNext()){
sum += values.next().get();
output.collect(key, new IntWritable(sum));
Driver Class
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount extends Configured implements Tool{
public int run(String[] args) throws Exception{
//creating a JobConf object and assigning a job name for identification purposes
JobConf conf = new JobConf(getConf(), WordCount.class);
//Setting configuration object with the Data Type of output Key and Value
//Providing the mapper and reducer class names
//the hdfs input and output directory to be fetched from the command line
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
return 0;
public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(), new WordCount(),args);
Driver Class
• Sometimes, Programmers put the driver into the
main method instead of having a separate driver
Run MapReduce Programs under Hadoop
• Create a jar file either using export in Eclipse or using jar command.
– jar -cvf <jar filename> <class file> <class file> …
• Move the data to HDFS
– ./bin/hadoop fs –copyFromLocal <Local path to files> <HDFS path>
– -put
• Execute Hadoop job
– ./bin/hadoop jar <jar file> <Class Name> <parameters>
– Specify the number of reduce tasks
• -D mapred.reduce.tasks=15
• Merge the output
– Each reducer will generate its own output
• Use –getmerge to get a merged result
Monitor Jobs
• HDFS - http://localhost:50070/
• JobTracker - http://localhost:50030/
Input Files
• This is where the data for a MapReduce job is
initially stored.
• Can be any types of format
– Line-based log files
– Binary format
– Multi-line input records
• Typically very large – tens of gigabytes or more
InputFormat Class
• How these input files are split up and read is defined by
the InputFormat class.
• Functionalities:
– Selects the files or other objects that should be used for input
– Defines the InputSplits that break a file into tasks
– Provides a factory for RecordReader objects that read the file
Types of InputFormat
Default format; read lines of
text files
The byte offset of the line The line contents
Parses lines into key, value
Everything up to the first The reminder of
tab character
the line
A Hadoop-specific highperformance binary format
• TextInputFormat is useful for unformatted data or line-based records like
log files.
• KeyValueInputFormat is useful for reading the output of one MapReduce
job as the input to another.
• SequenceFileInputFormat reads special binary files that are specific to
– Sequence files are block-compressed and provide direct serialization and
deserialization of several arbitrary data types(not just text).
• An InputSplit describes a unit of work that comprises a
single mapper task in a MapReduce program.
• By default, the FileInputFormat and its descendants break
a file up into 64MB chunks (the same size as blocks in
– mapred.min.split.size in hadoop-site.xml
– Overriding this parameter in the JobConf object used in the
driver class.
• Map tasks are performed in parallel.
– mapred.tasktracker.map.tasks.maximum for on-node
It actually loads the data from its source and converts
it into (key, value) pairs.
The default InputFormat: TextInputFormat,
provides a line RecordReader, which treats each
line of the input file as a new value.
The RecordReader is invoked repeatedly on the
input until the entire InputSplit has been consumed.
Each invocation of the RecordReader leads to a call
to the map() function of the Mapper.
Customized InputFormat
• Subclass the FileInputFormat rather than implement
InputFormat directly.
• We have to override the createRecordReader()
method, which returns an instance of RecordReader: an
object that can read from the input source.
Ball, 3.5, 12.7, 9.0
Car, 15, 23.76, 42.23
Device, 0.0, 12.4, -67.1
public class ObjectPositionInputFormat extends FileInputFormat<Text, Point3D> {
public RecordReader<Text, Point3D> createRecordReader( InputSplit input, JobConf job,
Reporter reporter) throws IOException {
return new ObjPosRecordReader(job, (FileSplit)input);
public class ObjPosRecordReader implements RecordReader<Text, Point3D> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public ObjPosRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
public boolean next(Text key, Point3D value) throws IOException {
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
// parse the lineValue which is in the format: objName, x, y, z
String [] pieces = lineValue.toString().split(",");
if (pieces.length != 4) {
throw new IOException("Invalid record received");
// try to parse floating point components of value
float fx, fy, fz;
try {
fx = Float.parseFloat(pieces[1].trim());
fy = Float.parseFloat(pieces[2].trim());
fz = Float.parseFloat(pieces[3].trim());
} catch (NumberFormatException nfe) {
throw new IOException("Error parsing floating point value in record");
// now that we know we'll succeed, overwrite the output objects
key.set(pieces[0].trim()); // objName is the output key.
value.x = fx; value.y = fy; value.z = fz;
return true;
public Text createKey() {
return new Text("");
public Point3D createValue() {
return new Point3D();
public long getPos() throws IOException {
return lineReader.getPos();
public void close() throws IOException {
public float getProgress() throws IOException {
return lineReader.getProgress();
Another example of customized InputFormat
public class NLinesInputFormat extends TextInputFormat{
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) {
return new NLinesRecordReader();
public class NLinesRecordReader extends RecordReader<LongWritable, Text>{
private final int NLINESTOPROCESS = 3;
public boolean nextKeyValue() throws IOException, InterruptedException {
final Text endline = new Text("\n");
int newSize = 0;
for(int i=0;i<NLINESTOPROCESS;i++){
Text v = new Text();
while (pos < end) {
newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, endpos),maxLineLength));
value.append(v.getBytes(),0, v.getLength());
value.append(endline.getBytes(),0, endline.getLength());
if (newSize == 0)
pos += newSize;
if (newSize < maxLineLength)
• In the driver class, add the following line:
• map() function in the Mapper class will be changed.
public void map(LongWritable key, Text value,Context context) throws
java.io.IOException ,InterruptedException
String lines = value.toString();
String [] lineArr = lines.split("\n");
int lcount = lineArr.length;
context.write(new Text(new Integer(lcount).toString()),new IntWritable(1));
• A new instance of Mapper is instantiated in a separate Java
process for each map task(InputSplit).
• The individual Mappers are not provided a mechanism to
communicate with one another.
• Two additional parameters in map() method
– OutputCollector: has a method collect() which will forward a
(key, value) pair to the reduce phase of the job.
– Reporter: provides information about the current task.
Partition and Shuffle
• Moving map outputs to the reducers is known as shuffling.
• A different subset of the intermediate key space is assigned
to each reduce node. These subsets(“partitions”) are the
inputs to the reduce task.
• All values for the same key are always reduced together
regardless of which mapper is its origin.
• Determines which partition a given(key, value) pair will go to.
• Determines which reducer instance will receive which
intermediate keys and values.
• The default partitioner computes a hash value for the key and
assigns the partition based on this result.
• Hadoop MapReduce determines how many partitions it will
divide the data into. It is based on the number of reduce tasks
(controlled by JobConf.setNumReduceTasks() method).
Customize Partitioner
public interface Partitioner<K, V> extends JobConfigurable {
int getPartition(K key, V value, int numPartitions);
• The default partitioner implementation is called
– Uses hashCode() method of the key objects modulo the
number of partitions total to determine which partition
to send a given(key, value) pair to.
More on Partitioner
• For most randomly-distributed data, HashPartitioner
should result in all partitions being of roughly equal size.
• For some cases, if hashCode() can not provide a
uniformly-distributed values over its range, then data may
not be sent to reducers evenly.
• Poor partitioning may result in unbalanced workloads and
consequently an inefficient performance.
• JobConf.setPartitionerClass() method to tell Hadoop to
use a specific partitioner.
• Each reduce task may reduce the values associating
with several intermediate keys from map task.
• The set of intermediate keys on a single node is
automatically sorted by Hadoop before they are
presented to the Reducer.
• reduce() method receives a key as well as an iterator
over all the values associated with the key.
• OutputCollector and Reporter as the same as in the
map() method.
• The (key, value) pairs provided to the OutputCollector are
then written to output files.
• Each reducer writes to a separate file in a common output
• These files will typically be named part-nnnnn, where
nnnnn is the partition id associated with the reduce task.
• FileOutputFormat.setOutputPath()
Default; writes lines in "key \t value" form
Writes binary files suitable for reading into subsequent MapReduce jobs
Disregards its inputs
• The NullOutputFormat generates no output files and disregards any (key,
value) pairs passed to it by the OutputCollector.
• It is useful if you are explicitly writing your own output files in the
reduce() method, and do not want additional output files generated by
the Hadoop framework.
• The Combiner will receive as input all data emitted
by the Mapper instances on a given node. The
output from the Combiner is then sent to the
Reducers, instead of the output from the Mappers.
• A "mini-reduce" process which operates only on
data generated by one machine.
• conf.setCombinerClass(Reduce.class);
Data Type
• Writable types
– IntWritable
– LongWritable
– FloatWritable
– BooleanWritable
– Your own classes which implement Writable (serialization)
public class Point3D implements Writable {
public float x;
public float y;
public float z;
public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
public Point3D() {
this(0.0f, 0.0f, 0.0f);
public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
public String toString() {
return Float.toString(x) + ", ” + Float.toString(y) + ", ” + Float.toString(z);
MapReduce Example
Finding the intersection of edges among multiple graphs
Key-Value Pairs
• Mapper Input Format :
• Mapper Intermediate Output / Reducer Input Format:
<source:destination, graphId>
• Reducer Output Format
<source:destination, graphId>
public static class IntersectMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text gId = new Text(); // graphId
private Text srcDestPair = new Text(); // source, destination pair
public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
// get the graph id
// setting the key as source and the value as the destination, and they are delimited by ":" here
srcDestPair.set(itr.nextToken() + ":" + itr.nextToken());
//emit key, value pair from mapper
//here the key is the source:destination pair and the graph id is the value
context.write(srcDestPair, gId);
public static class IntersectReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {
boolean first = true; //used for formatting the output
StringBuilder toReturn = new StringBuilder();
Set<String> graphIds = new HashSet<String>();
Iterator<String> graphIdItr;
int numberOfGraphs = 3;
for (Text val : values) {
graphIdItr = graphIds.iterator();
// Iterate through the graphs and append the graph ids to a stringbuilder
while (graphIdItr.hasNext()) {
//for better formatting of the output
if (!first)
first = false;
String intersect = new String(toReturn);
StringTokenizer st = new StringTokenizer(intersect, "^");
// use StringTokenizer and if the number of graph ids is equal to the number of graphs, write to the context
if (st.countTokens() == numberOfGraphs){
//emit the key, value pair from the reducer, here the key is the source:destination pair and the value will be the concatenated
graph ids that has this source:destination pair
context.write(key, new Text(intersect));
public class IntersectionExample extends Configured implements Tool{
public int run(String[] args) throws Exception{
//creating a JobConf object and assigning a job name for identification purposes
JobConf conf = new JobConf(getConf(), IntersectionExample.class);
//Setting configuration object with the Data Type of output Key and Value
//Providing the mapper and reducer class names
//the hdfs input and output directory to be fetched from the command line
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
return 0;
public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(), new IntersectionExample(),args);
Customized Partitioner Example
• Input data format
• Find the maximum scorer in each gender and three age categories:
<20, 20 - 50, > 50. Alice<tab>23<tab>female<tab>45
Partition - 0: (this partition contains the maximum scorers for each gender whose age is less than
Nancy<tab>age- 7<tab>female<tab>score-98
Adam<tab>age- 9<tab>male<tab>score-37
Partition - 1: (this partition contains the maximum scorers for each gender whose age is between
20 and 50)
Kristine<tab>age- 38<tab>female<tab>score-53
Bob<tab>age- 34<tab>male<tab>score-89
Partition - 2: (this partition contains the maximum scorers for each gender whose age is greater
than 50)
Monica<tab>age- 56<tab>female<tab>score-92
Chris<tab>age- 67<tab>male<tab>score-97
public static class PartitionMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException,
InterruptedException {
String[] tokens = value.toString().split("\t");
String gender = tokens[2].toString();
String nameAgeScore = tokens[0]+"\t"+tokens[1]+"\t"+tokens[3];
//the mapper emits key, value pair where the key is the gender and the value is the other
information which includes name, age and score
context.write(new Text(gender), new Text(nameAgeScore));
public static class AgePartitioner extends Partitioner<Text, Text> {
public int getPartition(Text key, Text value, int numReduceTasks) {
String [] nameAgeScore = value.toString().split("\t");
String age = nameAgeScore[1];
int ageInt = Integer.parseInt(age);
//this is done to avoid performing mod with 0
if(numReduceTasks == 0)
return 0;
//if the age is <20, assign partition 0
if(ageInt <=20){
return 0;
//else if the age is between 20 and 50, assign partition 1
if(ageInt >20 && ageInt <=50){
return 1 % numReduceTasks;
//otherwise assign partition 2
return 2 % numReduceTasks;
static class ParitionReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, Context context) throws IOException,
InterruptedException {
int maxScore = Integer.MIN_VALUE;
String name = " ";
String age = " ";
String gender = " ";
int score = 0;
//iterating through the values corresponding to a particular key
for(Text val: values){
String [] valTokens = val.toString().split("\\t");
score = Integer.parseInt(valTokens[2]);
if(score > maxScore){
name = valTokens[0];
age = valTokens[1];
gender = key.toString();
maxScore = score;
context.write(new Text(name), new Text("age- "+age+"\t"+gender+"\tscore"+maxScore));
• It is almost the same as driver classes we introduced
so far. But you have to add the following statement
to use your customized partition.
• job.setPartitionerClass(AgePartitioner.class);

similar documents