Twister: A Runtime for Iterative MapReduce

Report
Twister: A Runtime for Iterative
MapReduce
HPDC – 2010 MAPREDUCE’10 Workshop, Chicago, 06/22/2010
Jaliya Ekanayake
Community Grids Laboratory,
Digital Science Center
Pervasive Technology Institute
Indiana University
SALSA
Acknowledgements to:
• Co authors:
Hui Li, Binging Shang, Thilina Gunarathne
Seung-Hee Bae, Judy Qiu, Geoffrey Fox
School of Informatics and Computing
Indiana University Bloomington
•
Team at IU
SALSA
Motivation
Data
Deluge
Experiencing in
many domains
MapReduce
Classic Parallel
Runtimes (MPI)
Data Centered, QoS
Efficient and
Proven techniques
Expand the Applicability of MapReduce to more
classes of Applications
Iterative MapReduce
Map-Only
Input
map
Output
MapReduce
More Extensions
iterations
Input
map
Input
map
reduce
Pij
reduce
SALSA
Features of Existing Architectures(1)
Google, Apache Hadoop, Sector/Sphere,
Dryad/DryadLINQ (DAG based)
• Programming Model
– MapReduce (Optionally “map-only”)
– Focus on Single Step MapReduce computations (DryadLINQ supports
more than one stage)
• Input and Output Handling
– Distributed data access (HDFS in Hadoop, Sector in Sphere, and shared
directories in Dryad)
– Outputs normally goes to the distributed file systems
• Intermediate data
– Transferred via file systems (Local disk-> HTTP -> local disk in Hadoop)
– Easy to support fault tolerance
– Considerably high latencies
SALSA
Features of Existing Architectures(2)
• Scheduling
– A master schedules tasks to slaves depending on the availability
– Dynamic Scheduling in Hadoop, static scheduling in Dryad/DryadLINQ
– Naturally load balancing
• Fault Tolerance
–
–
–
–
–
Data flows through disks->channels->disks
A master keeps track of the data products
Re-execution of failed or slow tasks
Overheads are justifiable for large single step MapReduce computations
Iterative MapReduce
SALSA
A Programming Model for Iterative
MapReduce
• Distributed data access
• In-memory MapReduce
• Distinction on static data
and variable data (data
flow vs. δ flow)
• Cacheable map/reduce
tasks (long running tasks)
• Combine operation
• Support fast intermediate
data transfers
Static
data
Configure()
Iterate
User
Program
Map(Key, Value)
δ flow
Reduce (Key, List<Value>)
Combine (Map<Key,Value>)
Close()
Twister Constraints for Side Effect Free map/reduce tasks
Computation Complexity >> Complexity of Size of the Mutant
Data (State)
SALSA
Twister Programming Model
Worker Nodes
configureMaps(..)
Local Disk
configureReduce(..)
Cacheable map/reduce tasks
while(condition){
runMapReduce(..)
May send <Key,Value> pairs directly
Iterations
Map()
Reduce()
Combine()
operation
updateCondition()
} //end while
close()
User program’s process space
Communications/data transfers
via the pub-sub broker network
Two configuration options :
1. Using local disks (only for maps)
2. Using pub-sub bus
SALSA
Twister API
1.configureMaps(PartitionFile partitionFile)
2.configureMaps(Value[] values)
3.configureReduce(Value[] values)
4.runMapReduce()
5.runMapReduce(KeyValue[] keyValues)
6.runMapReduceBCast(Value value)
7.map(MapOutputCollector collector, Key key, Value val)
8.reduce(ReduceOutputCollector collector, Key key,List<Value>
values)
9.combine(Map<Key, Value> keyValues)
SALSA
Twister Architecture
Master Node
B
Twister
Driver
B
B
B
Pub/sub
Broker Network
Main Program
One broker
serves several
Twister daemons
Twister Daemon
Twister Daemon
map
reduce
Cacheable tasks
Worker Pool
Local Disk
Worker Node
Worker Pool
Scripts perform:
Data distribution, data collection,
and partition file creation
Local Disk
Worker Node
SALSA
Input/Output Handling
Node 0
Node 1
Node n
Data
Manipulation Tool
Partition File
A common directory in local
disks of individual nodes
e.g. /tmp/twister_data
• Data Manipulation Tool:
– Provides basic functionality to manipulate data across the local
disks of the compute nodes
– Data partitions are assumed to be files (Contrast to fixed sized
blocks in Hadoop)
– Supported commands:
• mkdir, rmdir, put,putall,get,ls,
• Copy resources
• Create Partition File
SALSA
Partition File
File No
Node IP
Daemon No
File partition path
4
156.56.104.96
2
/home/jaliya/data/mds/GD-4D-23.bin
5
156.56.104.96
2
/home/jaliya/data/mds/GD-4D-0.bin
6
156.56.104.96
2
/home/jaliya/data/mds/GD-4D-27.bin
7
156.56.104.96
2
/home/jaliya/data/mds/GD-4D-20.bin
8
156.56.104.97
4
/home/jaliya/data/mds/GD-4D-23.bin
9
156.56.104.97
4
/home/jaliya/data/mds/GD-4D-25.bin
10
156.56.104.97
4
/home/jaliya/data/mds/GD-4D-18.bin
11
156.56.104.97
4
/home/jaliya/data/mds/GD-4D-15.bin
• Partition file allows duplicates
• One data partition may reside in multiple nodes
• In an event of failure, the duplicates are used to reschedule the tasks
SALSA
The use of pub/sub messaging
• Intermediate data transferred via the broker network
• Network of brokers used for load balancing
– Different broker topologies
• Interspersed computation and data transfer minimizes
large message load at the brokers
• Currently supports
– NaradaBrokering
– ActiveMQ
E.g.
100 map tasks, 10 workers in 10 nodes
~ 10 tasks are
producing outputs at
once
map task queues
Map workers
Broker network
Reduce()
SALSA
Scheduling
• Twister supports long running tasks
• Avoids unnecessary initializations in each
iteration
• Tasks are scheduled statically
– Supports task reuse
– May lead to inefficient resources utilization
• Expect user to randomize data distributions to
minimize the processing skews due to any
skewness in data
SALSA
Fault Tolerance
• Recover at iteration boundaries
• Does not handle individual task failures
• Assumptions:
– Broker network is reliable
– Main program & Twister Driver has no failures
• Any failures (hardware/daemons) result the
following fault handling sequence
– Terminate currently running tasks (remove from
memory)
– Poll for currently available worker nodes (& daemons)
– Configure map/reduce using static data (re-assign data
partitions to tasks depending on the data locality)
– Re-execute the failed iteration
SALSA
Performance Evaluation
• Hardware Configurations
Cluster ID
# nodes
# CPUs in each node
Cluster-I
32
6
Cluster-II
230
2
# Cores in each CPU
8
4
Total CPU cores
Supported OSs
768
1840
Linux (Red Hat Enterprise Linux
Red Hat Enterprise Linux
Server release 5.4 -64 bit)
Server release 5.4 -64 bit
Windows (Windows Server 2008 64 bit)
• We use the academic release of DryadLINQ, Apache Hadoop
version 0.20.2, and Twister for our performance comparisons.
• Both Twister and Hadoop use JDK (64 bit) version 1.6.0_18, while
DryadLINQ and MPI uses Microsoft .NET version 3.5.
SALSA
Pair wise Sequence Comparison using
Smith Waterman Gotoh
• Typical MapReduce computation
• Comparable efficiencies
• Twister performs the best
SALSA
Pagerank – An Iterative MapReduce Algorithm
Partial
Adjacency
Matrix
Current
Page ranks
(Compressed)
M
Partial
Updates
R
Iterations
C
Partially
merged
Updates
• Well-known pagerank algorithm [1]
• Used ClueWeb09 [2] (1TB in size) from CMU
• Reuse of map tasks and faster communication pays off
[1] Pagerank Algorithm, http://en.wikipedia.org/wiki/PageRank
[2] ClueWeb09 Data Set, http://boston.lti.cs.cmu.edu/Data/clueweb09/
SALSA
Multi-dimensional Scaling
While(condition)
{
<X> = [A] [B] <C>
C = CalcStress(<X>)
}
While(condition)
{
<T> = MapReduce1([B],<C>)
<X> = MapReduce2([A],<T>)
C = MapReduce3(<X>)
}
• Maps high dimensional data to lower dimensions (typically 2D or 3D)
• SMACOF (Scaling by Majorizing of COmplicated Function)[1]
[1] J. de Leeuw, "Applications of convex analysis to multidimensional
scaling," Recent Developments in Statistics, pp. 133-145, 1977.
SALSA
Conclusions & Future Work
• Twister extends the MapReduce to iterative algorithms
• Several iterative algorithms we have implemented
–
–
–
–
–
K-Means Clustering
Pagerank
Matrix Multiplication
Multi dimensional scaling (MDS)
Breadth First Search
• Integrating a distributed file system
• Programming with side effects yet support fault
tolerance
SALSA
Related Work
• General MapReduce References:
– Google MapReduce
– Apache Hadoop
– Microsoft DryadLINQ
– Pregel : Large-scale graph computing at Google
– Sector/Sphere
– All-Pairs
– SAGA: MapReduce
– Disco
SALSA
Questions?
Thank you!
SALSA
Extra Slides
SALSA
Hadoop (Google) Architecture
Job tracker assigns
some map outputs to
a reducer
4
Job
Tracker
3
Task
Tracker
Task tracker
notifies job
tracker
M
Map task reads
Input data from
HDFS
2
Map output
goes to
local disk
first
Task
Tracker
Reducer
downloads
map outputs
using HTTP
5
1
R
6
Reduce output
goes to HDFS
Local
Local
HDFS
•
•
•
HDFS stores blocks, manages replications, handle failures
Map/reduce are Java processes, not long running
Failed maps are re-executed, failed reducers collect data from maps again
SALSA
Twister Architecture
Twister API
1. configureMaps(PartitionFile partitionFile)
Main program
2. configureMaps(Value[] values)
3. configureReduce(Value[] values)
Broker
Network
Twister
Driver
4. String key=addToMemCache(Value value)
5. removeFromMemCache(String key)
B
6. runMapReduce()
B
B
4
7. runMapReduce(KeyValue[] keyValues)
Broker
Connection
8. runMapReduceBCast(Value value)
Receive static data (1)
OR
Variable data (key,value)
via the brokers (2)
2
1
M
Read static
data from
local disk
Twister
Daemon
Twister
Daemon
1
Local
3
Map output
goes directly
to reducer
R
4
Reduce output
goes to local disk
OR
to Combiner
Local
• Scripts for file manipulations
• Twister daemon is a process, but Map/Reduce tasks are Java
Threads (Hybrid approach)
SALSA
Twister
• In-memory MapReduce
• Distinction on static data
and variable data (data flow
vs. δ flow)
• Cacheable map/reduce tasks
(long running tasks)
• Combine operation
• Support fast intermediate
data transfers
Static
data
Iterate
Configure()
User
Program
Map(Key, Value)
δ flow
Reduce (Key, List<Value>)
Combine (Key, List<Value>)
Close()
Different synchronization
and intercommunication
mechanisms used by the
parallel runtimes
SALSA
Publications
1.
Jaliya Ekanayake, (Advisor: Geoffrey Fox) Architecture and Performance of Runtime
Environments for Data Intensive Scalable Computing, Accepted for the Doctoral Showcase,
SuperComputing2009.
2.
Xiaohong Qiu, Jaliya Ekanayake, Scott Beason, Thilina Gunarathne, Geoffrey Fox, Roger Barga,
Dennis Gannon, Cloud Technologies for Bioinformatics Applications, Accepted for publication in
2nd ACM Workshop on Many-Task Computing on Grids and Supercomputers,
SuperComputing2009.
3.
Jaliya Ekanayake, Atilla Soner Balkir, Thilina Gunarathne, Geoffrey Fox, Christophe Poulain, Nelson
Araujo, Roger Barga, DryadLINQ for Scientific Analyses, Accepted for publication in Fifth IEEE
International Conference on e-Science (eScience2009), Oxford, UK.
4.
Jaliya Ekanayake and Geoffrey Fox, High Performance Parallel Computing with Clouds and Cloud
Technologies, First International Conference on Cloud Computing (CloudComp2009), Munich,
Germany. – An extended version of this paper goes to a book chapter.
5.
Geoffrey Fox, Seung-Hee Bae, Jaliya Ekanayake, Xiaohong Qiu, and Huapeng Yuan, Parallel Data
Mining from Multicore to Cloudy Grids, High Performance Computing and Grids workshop, 2008.
– An extended version of this paper goes to a book chapter.
6.
Jaliya Ekanayake, Shrideep Pallickara, Geoffrey Fox, MapReduce for Data Intensive Scientific
Analyses, Fourth IEEE International Conference on eScience, 2008, pp.277-284.
7.
Jaliya Ekanayake, Shrideep Pallickara, and Geoffrey Fox, A collaborative framework for scientific
data analysis and visualization, Collaborative Technologies and Systems(CTS08), 2008, pp. 339346.
8.
Shrideep Pallickara, Jaliya Ekanayake and Geoffrey Fox, A Scalable Approach for the Secure and
Authorized Tracking of the Availability of Entities in Distributed Systems, 21st IEEE International
Parallel & Distributed Processing Symposium (IPDPS 2007).
SALSA

similar documents