Sailfish: A Framework For Large Scale Data Processing

Report
Sailfish: A Framework For Large
Scale Data Processing
Sriram Rao
[email protected]
Oct. 15, 2012
Joint Work With Colleagues…
• Raghu Ramakrishnan (at Yahoo and now at
Microsoft)
• Adam Silberstein (at Yahoo and now at
LinkedIn)
• Mike Ovsiannikov and Damian Reeves (at
Quantcast)
Motivation
• “Big data” is a booming industry:
– Collect massive amounts of data (10’s of TB/day)
– Use data intensive compute frameworks (Hadoop,
Cosmos, Map-Reduce) to extract value from the
collected data
– Volume of data processed is bragging rights
• How do frameworks handle data at scale?
– Not well studied in the literature
M/R Dataflow
0
DFS
0
DFS


DFS
2
1
1
DFS
DFS
Disk Overheads
• Intermediate data transfer is seek intensive => I/O’s are small/random
– # of disk seeks for transferring intermediate data proportional to M * R
DFS
1
1
0
0
DFS
2
Why Is Scale Important?
• Yahoo cluster workload characteristics:
– Vast majority of jobs (about 95%) are small
– A minority of jobs (about 5%) are big
• Involve 1000’s of tasks that are run on many
machines in the cluster
• Run for several hours processing TB’s of data
• Size of intermediate data (i.e., map output) is at
least as big as the input
Can We Minimize Seeks?
• Problem space: Size of intermediate data
exceeds amount of RAM in cluster
• Reducer reads data from disk => one seek per
reducer
• Lower bound is proportional to R
Solving A Seek Problem…
• Minimize disk seeks via “group commit” is well
known
• Why isn’t this idea implemented?
• Difficult to implement in the past
– Datacenter bandwidth is a contended resource
– Any solution that mentions “network” was
beginning of a (futile) negotiation
What Is New…
• Network b/w in a datacenter is going up…
– Lower “over-subscription”
• 1/5/10-Gbps between any pair
• Can we leverage this trend to do distributed
aggregation and improve disk performance?
• Building using this trend is being explored:
– Flat Datacenter Storage (OSDI’12): Blob store
– ThemisMR (SOCC’12): M/R at scale
Key Ideas
0
0
RAM
I-file
1
1
RAM
I-file
2
2
RAM
I-file
1. I-files, a network-wide
data aggregation
mechanism
2. Observe intermediate
data in I-files during
map phase to plan
reduce phase
Our Work
• Build I-files by extending a DFS
• Build Sailfish (by modifying Hadoop) in which Ifiles are used to transport intermediate data
– Leverage I-files to gather statistics on intermediate
data to plan reduce phase:
• (1) # of reducers depend on data, (2) handle skew
– Eliminate tuning parameters
• No more map-side tuning, choosing # of reducers
• Results show 20% to 5x speedup on a
representative mix of (large) real jobs/datasets at
Yahoo
Talk Outline
• Motivation
• I-files: A data aggregation mechanism
• Sailfish: Map-Reduce using I-files
• Experimental Evaluation
• Summary and On-going work
Using I-files for Intermediate Data
0
0
RAM
I-file
1
1
RAM
I-file
2
2
RAM
I-file
Aggregator
• I-files are a container
for data aggregation in
general
• Per I-file aggregator:
– Buffers data from writers
in RAM
– “Group commit” data to
disk
• # of disk seeks is
proportional to R
Issues
0
0
RAM
I-file
1
1
RAM
I-file
2
2
RAM
I-file
• Fault-tolerance
• Scale
• Skew: Suppose there is
skew in data written to
I-files
• Hot-spots:
– Suppose a partition
becomes hot
– All map tasks generate
data for that partition
Big data => Scale out!
“Scale out” Aggregation
• Build using distributed
aggregation (scale out)
– Rather than 1 aggregator
per I-file, use multiple
I-file
0
…
Aggregator
10
• Bind subset of mappers
to each aggregator
11
…
20
Aggregator
What Does This get?
• Fault-tolerance
– When an aggregator fails, need to re-run the subset of
maps that wrote to that aggregator
• Re-run in parallel…
• Mitigate skew, hot-spots, better scale
• Seeks: Goes up (!)
– Reducer input is now stored at multiple aggregators
• Read from multiple places
– Will comeback to this issue…
I-files Design -> Implementation
• Extend DFS to support data aggregation
– Use KFS in our work (KFS ≅ HDFS)
• Single metaserver (≅ HDFS NameNode)
• Multiple Chunkservers (≅ HDFS DataNodes)
• Files are striped across nodes in chunks
– Chunks can be variable in size with a fixed maximal value
– Currently, max size of a chunk: 128MB
• Adapt KFS to support I-files
– Leverage multi-writer capabilities of KFS
KFS I-files Characteristics
• I-files provide a record-oriented interface
• Append-only
– Clients append records to an I-file :
record_append(fd, <key, value>)
• Append on a file translates to append on a chunk
• Records do not span chunk boundaries
– Chunkserver is the aggregator
• Supports data retrieval by-key:
– scan(fd, <key range>)
Appending To KFS I-files
Alloc chunk
Map Task
record_append()
KFS metaserver
Bind to
Chunk1
• Multiple appenders per
chunk
• Multiple chunks appended
to concurrently
Chunk1
Map
tasks
Chunk2
Map tasks
KFS I-files
• I-file constructed via sequential I/O in a
distributed manner
– Network-wide batching
– Multiple appenders per chunk
– Multiple chunks appended to concurrently
• On a per-chunk basis chunkserver responsible for
that chunk is the aggregator
– Chunkserver aggregates records and commits to disk
– Append is atomic: Chunkserver serializes concurrent
appends
Distributed Aggregation With KFS Ifiles
Design
• Bind subset of mappers to
an aggregator
• Use multiple aggregators
per I-file
• Minimize # of aggregators
per I-file
•
•
•
•
Implementation
Multiple writers/chunk
Multiple chunks appended
to concurrently
# of chunks per I-file scales
based on data
Chunk allocation is key:
– Need to pack data into as few
chunks as possible
Talk Outline
• Motivation
• I-files: A data aggregation mechanism
• Sailfish: Map-Reduce using I-files
• Experimental Evaluation
• Summary and On-going work
Sailfish: MapReduce Using I-files
• Modify Hadoop-0.20.2 to use I-files
– Mappers append their output to per-partition Ifiles using record_append()
• Map output is appended concurrent to task execution
– During map phase, gather statistics on
intermediate data and plan reduce phase
• # of reducers, task assignment done at run-time
– Reducer scans its input from a per-partition I-file
• Merge records from chunks and reduce()
• For efficient scan(), sort and index an I-file chunk
Sailfish Dataflow
0
DFS
DFS
0
1
DFS
DFS
1
2
DFS
3
DFS
Sailfish Dataflow
DFS
0
DFS
1
Chunkserver
Chunkserver
DFS
2
DFS
3
Sorter
Sailfish Dataflow
Chunkserver
1
DFS
2
DFS
Chunkserver
Leveraging I-files
• Gather statistics on intermediate data
whenever a chunk is sorted
– Statistics are gathered during map phase as part
of execution
• During sorting augment each chunk with an
index
– Index supports efficient scans
Reduce Phase Implementation
• Plan reduce phase based on data:
– # of reduce tasks per I-file = Size of I-file / Work
per task
– # of tasks scale based on data; handles skew
• On a per I-file basis, partition key space by
constructing “split points”
• Each reduce task processes a range of keys
within an I-file
– “Hierarchical partitioning” of data in an I-file
Sailfish: Reduce Phase
Objectives
• Avoid specifying # of
reducers in a job at job
submission time
• Handle skew
• Auto-scale
Implementation
• Gather statistics on I-files to
plan reduce phase
– # of reduce tasks is
determined at run-time in a
data dependent manner
• Hierarchical scheme
– Partition map output into a
large number of I-files
– Assign key-ranges within a Ifile to a reduce task
• Reducers/I-file is data
dependent
How many seeks?
• Goal: # of seeks proportional to R
• With Sailfish,
– A reducer reads input from all chunks of a single I-file
– Suppose that each I-file has c chunks
– # of seeks during read is proportional to c * R
• # of seeks during appends is also proportional to c * R
– But sorters also cause seeks
• # of seeks during sorting is proportional to 2 * c * R
• Packing data into as few chunks as possible is
critical for I-file effectiveness
Talk Outline
• Motivation
• I-files: A data aggregation mechanism
• Sailfish: Map-Reduce using I-files
• Experimental Evaluation
• Summary and On-going work
Experimental Evaluation
• Cluster comprises of ~150 machines (5 racks)
– 2008-vintage machines
• 8 cores, 16GB RAM, 4-750GB drives/node
– 1Gbps between any pair of nodes
• Used lzo compression for handling intermediate
data (for both Hadoop and Sailfish)
• Evaluations involved:
– Synthetic benchmark that generates its own data
– Actual jobs/data run at Yahoo
How did we do? (Synthetic Benchmark)
Sailfish
Hadoop
1800
1600
Runtime (min.)
1400
1200
1000
800
600
400
200
0
1
2
4
8
Intermediate Data Size (TB)
16
32
64
How Many Seeks…
Hadoop
• With Stock Hadoop, it is
proportional to M * R
• # of map tasks generating
64TB data
• M = (64TB / 1GB per map
task) = 65536
Sailfish
• With Sailfish, it is proportional
to c * R
• 64TB of intermediate data split
over 512 I-files
– Chunks per I-file: ((64TB / 512)
/ 128MB)) = 1024
– In practice: c varies from 1032
to 1048
• Results show that chunks are
packed
– Chunk allocation policy works
well in practice 
Data read by a reduce task
per disk I/O (in MB)
Sailfish: More data read per seek…
Stock Hadoop
Sailfish
10
1
0.1
0.01
1
10
Intermediate data size (TB)
100
Sailfish: Faster reduce phase…
Disk Read Throughput (MB/s)
45
Stock Hadoop
Sailfish
40
35
30
25
20
15
10
5
0
0
0.5
1
1.5
2
2.5
Time (in hours)
3
3.5
4
Sailfish In Practice
• Use actual jobs+datasets that are used in
production
Job Name
Characteristics
Input size (TB)
Int. data size (TB)
LogCount
Data reduction
1.1
0.04
LogProc
Skew in map
output
1.1
1.1
LogRead
Skew in reduce
input
1.1
1.1
Nday Model
Incr. computation
3.54
3.54
Behavior Model
Big data
3.6
9.47
Click Attribution
Big data
6.8
8.2
14.1
25.2
Segment Exploder Data explosion
How Did We Do…
Sailfish
Hadoop
900
800
Runtime (min.)
700
600
500
400
300
200
100
0
LogCount
LogProc
LogRead
Nday-Model
Job
BehaviorModel
ClickAttr
SegmentExploder
Handling Skew In Reducer Input
Work per task (MB)
5000
Hadoop
Sailfish
4500
4000
3500
3000
2500
2000
1500
1000
500
0
0
100
200
300
400
500
Task #
600
LogRead job
700
800
900
Fault-tolerance
• Sailfish handles (temporary) loss of
intermediate data via recomputes
– Bookkeeping that tracks map tasks that wrote to
the lost block and re-run those
• “Scale out” mitigates the impact of data loss
– 15% increase in run-time for a run with failure
• Described in detail in the paper…
Related Work
• ThemisMR (SOCC’12) addresses same problem as Sailfish
– Does not (yet) support fault-tolerance: design space is small
clusters where failures are rare
– Design requires reducer input to fit in RAM
• [Starfish] Parameter tuning (for Hadoop)
– Construct a job profile and use that to tune Hadoop parameters
– Gains are limited by Hadoop’s intermediate data handling
mechanisms
• Lot of work in DB literature on handling skew
– Run job on a sample of input and collect statistics to construct
partition boundaries
– Use statistics to drive actual run
Summary
• Explore idea of network-wide aggregation to
improve disk subsystem performance
• Develop I-files as a data aggregation construct
– Implement I-files in KFS (a distributed filesystem)
• Use I-files to build Sailfish, a M/R infra
– Sailfish improves job completion times: 20% to 5x
On-going Work
• Extending Sailfish to support
elasticity/preemption (Amoeba, SOCC’12)
• Working on integrating many of the core ideas
in Sailfish into Hadoop 2.x (aka YARN)
– Work started at Yahoo! Labs
– Being continued in [email protected]
• http://issues.apache.org/jira/browse/MAPREDUCE4584
• http://issues.apache.org/jira/browse/YARN-45
Software Available
• Sailfish released as open source project
– http://code.google.com/p/sailfish

similar documents