[email protected]

Report
Machine Learning in the Cloud
Yucheng
Low
Aapo
Kyrola
Joey
Gonzalez
Danny
Bickson
Carlos Guestrin
Joe Hellerstein
David O’Hallaron
Carnegie Mellon
Machine Learning in the Real World
13 Million
Wikipedia Pages
3.6 Billion
Flickr Photos
500 Million
Facebook Users
24 Hours a Minute
YouTube
Parallelism is Difficult
Wide array of different parallel architectures:
GPUs
Multicore
Clusters
Clouds
Supercomputers
Different challenges for each architecture
High Level Abstractions to make things easier.
4
MapReduce – Map Phase
1
2
.
9
CPU 1
4
2
.
3
CPU 2
2
1
.
3
CPU 3
2
5
.
8
CPU 4
Embarrassingly Parallel independent computation
No Communication needed
MapReduce – Map Phase
8
4
.
3
2
4
.
1
CPU 1
1
2
.
9
1
8
.
4
CPU 2
4
2
.
3
8
4
.
4
CPU 3
2
1
.
3
CPU 4
2
5
.
8
Embarrassingly Parallel independent computation
No Communication needed
MapReduce – Map Phase
6
7
.
5
1
7
.
5
CPU 1
1
2
.
9
2
4
.
1
1
4
.
9
CPU 2
4
2
.
3
8
4
.
3
3
4
.
3
CPU 3
2
1
.
3
1
8
.
4
CPU 4
2
5
.
8
8
4
.
4
Embarrassingly Parallel independent computation
No Communication needed
MapReduce – Reduce Phase
17
26
.
31
22
26
.
26
CPU 1
1
2
.
9
2
4
.
1
1
7
.
5
4
2
.
3
CPU 2
8
4
.
3
6
7
.
5
2
1
.
3
1
8
.
4
Fold/Aggregation
1
4
.
9
2
5
.
8
8
4
.
4
3
4
.
3
MapReduce and ML
Excellent for large data-parallel tasks!
Data-Parallel
Complex Parallel Structure
Map Reduce
Feature
Extraction
Cross
Validation
Computing Sufficient
Statistics
Is there more to
Machine Learning
?
9
Iterative Algorithms?
We can implement iterative algorithms in MapReduce:
Iterations
Data
Data
CPU 1
Data
CPU 1
Data
Data
Data
Data
Data
Data
Data
Data
CPU 2
CPU 2
CPU 2
Data
Data
Data
Data
Data
Data
Data
Data
CPU 3
CPU 3
CPU 3
Data
Data
Data
Data
Data
Data
Barrier
Data
Barrier
Data
Barrier
Slow
Processor
CPU 1
Data
Iterative MapReduce
System is not optimized for iteration:
Iterations
Data
Data
Data
CPU 1
CPU 1
CPU 3
Data
Data
Data
Data
Data
CPU 2
CPU 3
Data
Data
Data
Data
Data
Data
CPU 2
CPU 3
Disk Penalty
Data
Data
Data
Startup Penalty
Data
CPU 1
Disk Penalty
CPU 2
Data
Startup Penalty
Data
Disk Penalty
Startup Penalty
Data
Data
Data
Data
Data
Data
Data
Data
Iterative MapReduce
Only a subset of data needs computation:
(multi-phase iteration)
Iterations
Data
Data
CPU 1
Data
CPU 1
Data
CPU 1
Data
Data
Data
Data
Data
Data
Data
Data
CPU 2
CPU 2
CPU 2
Data
Data
Data
Data
Data
Data
Data
Data
CPU 3
CPU 3
CPU 3
Data
Data
Data
Data
Data
Barrier
Data
Barrier
Data
Barrier
Data
MapReduce and ML
Excellent for large data-parallel tasks!
Data-Parallel
Complex Parallel Structure
Map Reduce
Feature
Extraction
Cross
Validation
Computing Sufficient
Statistics
Is there more to
Machine Learning
?
13
Structured Problems
Example Problem: Will I be successful in research?
Success depends on the
success of others.
May not be able to safely
update neighboring nodes.
[e.g., Gibbs Sampling]
Interdependent
Computation:
Not Map-Reducible
14
Space of Problems
Sparse Computation
Dependencies
Can be decomposed
into local “computationkernels”
Asynchronous Iterative
Computation
Repeated iterations
over local kernel
computations
15
Parallel Computing and ML
Not all algorithms are efficiently data parallel
Data-Parallel
Structured Iterative Parallel
Map Reduce
Feature
Extraction
Cross
Validation
Computing Sufficient
Statistics
GraphLab
Lasso
?
Tensor
Factorization
Kernel
Methods
Belief
Propagation
Learning
SVM
Graphical
Sampling
Models
Deep Belief
Neural
Networks
Networks
16
GraphLab Goals
Designed for ML needs
Express data dependencies
Iterative
Simplifies the design of parallel programs:
Abstract away hardware issues
Addresses multiple hardware architectures
Multicore
Distributed
GPU and others
GraphLab Goals
Simple
Models
Small
Data
Large
Data
Complex
Models
Now
Data-Parallel
Goal
GraphLab Goals
Simple
Models
Small
Data
Large
Data
Complex
Models
Now
Data-Parallel
GraphLab
GraphLab
A Domain-Specific Abstraction for
Machine Learning
Carnegie Mellon
Everything on a Graph
A Graph with data associated with every vertex and edge
:Data
Update Functions
Update Functions: operations applied on vertex 
transform data in scope of vertex
Update Functions
Update Function can Schedule the computation of any
other update function:
- FIFO Scheduling
- Prioritized Scheduling
- Randomized
Etc.
Scheduled computation is guaranteed to execute eventually.
Example: Page Rank
Graph = WWW
Update Function:
multiply adjacent pagerank values
with edge weights to get current
vertex’s pagerank
“Prioritized” PageRank Computation? Skip converged vertices.
Example: K-Means Clustering
Data
(Fully Connected?)
Bipartite Graph
Clusters
Update Function:
Cluster Update:
compute average of data
connected on a “marked” edge.
Data Update:
Pick the closest cluster and mark
the edge. Unmark remaining
edges.
Example: MRF Sampling
Graph = MRF
Update Function:
- Read samples on adjacent
vertices
- Read edge potentials
- Compute new sample for
current vertex
Not Message Passing!
Graph is a data-structure.
Update Functions perform
parallel modifications to the data-structure.
Safety
If adjacent update functions occur simultaneously?
Safety
If adjacent update functions occur simultaneously?
Importance of Consistency
ML resilient to soft-optimization?
Permit Races? “Best-effort” computation?
True for some algorithms.
Not true for many. May work empirically on
some datasets; may fail on others.
Importance of Consistency
Many algorithms require strict consistency, or performs
significantly better under strict consistency.
Alternating Least Squares
Error (RMSE)
12
10
Inconsistent Updates
8
6
Consistent Updates
4
2
0
0
10
20
# Iterations
30
Importance of Consistency
Fast ML Algorithm development cycle:
Build
Test
Debug
Tweak Model
Necessary for framework to behave predictably and
consistently and avoid problems caused by non-determinism.
Is the execution wrong? Or is the model wrong?
Sequential Consistency
GraphLab guarantees sequential consistency
 parallel execution,  sequential execution
of update functions which produce same result
CPU 1
Parallel
CPU 2
Sequential
CPU 1
time
Sequential Consistency
GraphLab guarantees sequential consistency
 parallel execution,  sequential execution
of update functions which produce same result
Formalization of the intuitive concept of a “correct program”.
- Computation does not read outdated data
from the past
- Computation does not read results
of computation that occurs in the future.
Primary Property of GraphLab
Global Information
What if we need global information?
Algorithm Parameters?
Sufficient Statistics?
Sum of all the vertices?
Shared Variables
Global aggregation through Sync Operation
A global parallel reduction over the graph data.
Synced variables recomputed at defined intervals
Sync computation is Sequentially Consistent
Permits correct interleaving of Syncs and Updates
Sync:
Loglikelihood
Sync: Sum of
Vertex Values
Sequential Consistency
GraphLab guarantees sequential consistency
 parallel execution,  sequential execution
of update functions and Syncs which produce same result
CPU 1
Parallel
CPU 2
Sequential
CPU 1
time
GraphLab in the Cloud
Carnegie Mellon
Moving towards the cloud…
Purchasing and
maintaining computers
is very expensive
Most computing
resources seldomly used
Only for deadlines…
Buy time, access
hundreds or thousands
of processors
Only pay for needed resources
Distributed GL Implementation
Mixed Multi-threaded / Distributed Implementation.
(Each machine runs only one instance)
Requires all data to be in memory. Move
computation to data.
MPI for management + TCP/IP for communication
Asynchronous C++ RPC Layer
Ran on 64 EC2 HPC Nodes = 512 Processors
Skip Implementation
Underlying Network
RPC Controller
Execution Engine
Distributed
Graph
Distributed
Locks
RPC Controller
Execution Engine
Shared Data
Execution
Threads
Cache Coherent
Distributed K-V Store
Distributed
Graph
Distributed
Locks
RPC Controller
Shared Data
Execution
Threads
Cache Coherent
Distributed K-V Store
Execution Engine
Distributed
Graph
Distributed
Locks
Execution Engine
Distributed Distributed Execution
Graph
Locks
Threads
RPC Controller
Shared Data
Execution
Threads
Cache Coherent
Distributed K-V Store
Execution Engine
Distributed
Graph
Distributed
Locks
Shared Data
Execution
Threads
Cache Coherent
Distributed K-V Store
Shared Data
Cache Coherent
Distributed K-V Store
Carnegie Mellon
GraphLab RPC
Carnegie Mellon
Write distributed programs easily
Asynchronous communication
Multithreaded support
Fast
Scalable
Easy To Use
(Every machine runs the same binary)
I
C++
Carnegie Mellon
Features
Easy RPC capabilities:
One way calls
rpc.remote_call([target_machine ID],
printf,
“%s %d %d %d\n”,
“hello world”, 1, 2, 3);
Requests (call with return value)
std::vector<int>& sort_vector(std::vector<int> &v) {
std::sort(v.begin(), v.end());
return v;
}
vec = rpc.remote_request(
[target_machine ID],
sort_vector,
vec);
Features
MPI-like primitives
dc.barrier()
dc.gather(...)
dc.send_to([target machine], [arbitrary object])
dc.recv_from([source machine], [arbitrary object ref])
Object Instance Context
RPC Controller
RPC Controller
RPC Controller
RPC Controller
K-V Object
K-V Object
K-V Object
K-V Object
MPI-Like Safety
Request Latency
Latency (us)
350
300
GraphLab RPC
250
MemCached
200
150
100
50
0
16
128
1024
Value Length (Bytes)
10240
Ping RTT = 90us
Mbps
One-Way Call Rate
1000
900
800
700
600
500
400
300
200
100
0
1Gbps physical peak
GraphLab RPC
ICE
16
128
1024
Value Length (Bytes)
10240
Serialization Performance
100,000 X
One way call of vector of 10 X {"hello", 3.14, 100}
Seconds (s)
0.8
0.7
Receive
0.6
Issue
0.5
0.4
0.3
0.2
0.1
0
ICE
RPC Buffered
RPC Unbuffered
Distributed Computing Challenges
Q1: How do we efficiently distribute the state ?
- Potentially varying #machines
Q2: How do we ensure sequential consistency ?
Keeping in mind:
Limited Bandwidth
High Latency
Performance
Distributed Graph
Carnegie Mellon
Two-stage Partitioning
Initial Overpartitioning of the Graph
Two-stage Partitioning
Initial Overpartitioning of the Graph
Generate Atom Graph
Two-stage Partitioning
Initial Overpartitioning of the Graph
Generate Atom Graph
Two-stage Partitioning
Initial Overpartitioning of the Graph
Generate Atom Graph
Repartition as needed
Two-stage Partitioning
Initial Overpartitioning of the Graph
Generate Atom Graph
Repartition as needed
Ghosting
Ghost vertices are a copy of neighboring vertices which
are on remote machines.
Ghost vertices/edges act as cache for remote data.
Coherency maintained using versioning.
Decrease bandwidth utilization.
Distributed Engine
Carnegie Mellon
Distributed Engine
Sequential Consistency can be guaranteed through
distributed locking. Direct analogue to shared memory impl.
To improve performance: User provides some “expert
knowledge” about the properties of the update function.
Full Consistency
User says: update function modifies all data in scope.
Acquire write-lock on all vertices.
Limited opportunities for parallelism.
Edge Consistency
User: update function only reads from adjacent vertices.
Acquire write-lock on center vertex, read-lock on adjacent.
More opportunities for parallelism.
Vertex Consistency
User: update function does not touch edges nor adjacent vertices
Acquire write-lock on current vertex.
Maximum opportunities for parallelism.
Performance Enhancements
Latency Hiding:
- “pipelining” of >> #CPU update function
calls.
(about 1K deep pipeline)
- Hides the latency of lock acquisition and
cache synchronization
Lock Strength Reduction:
- A trick where number of locks can be
decreased while still providing same guarantees
Video Cosegmentation
Segments mean the same
Gaussian EM clustering + BP on 3D grid
Model: 10.5 million nodes, 31 million edges
Speedups
Video Segmentation
Video Segmentation
Chromatic Distributed Engine
Locking overhead is too high in high-degree
models. Can we satisfy sequential consistency in a
simpler way?
Observation:
Scheduling using vertex colorings can be used
to automatically satisfy consistency.
Example: Edge Consistency
(distance 1) vertex coloring
Update functions can be executed on all vertices of the
same color in parallel.
Example: Full Consistency
(distance 2) vertex coloring
Update functions can be executed on all vertices of the
same color in parallel.
Example: Vertex Consistency
(distance 0) vertex coloring
Update functions can be executed on all vertices of the
same color in parallel.
Chromatic Distributed Engine
Time
Execute tasks
on all vertices of
color 0
Execute tasks
on all vertices of
color 0
Data Synchronization Completion + Barrier
Execute tasks
on all vertices of
Execute tasks
color 1
on all vertices of
color 1
Data Synchronization Completion + Barrier
Experiments
Netflix Collaborative Filtering
Alternating Least Squares Matrix Factorization
Model: 0.5 million nodes, 99 million edges
Users
Netflix
d
Movies
Netflix
Speedup Increasing size of the matrix factorization
16
Ideal
Speedup
14
d=100 (159.91 IPB)
12
d=50 (85.68 IPB)
10 d=20 (48.72 IPB)
d=5 (44.85 IPB)
8
6
4
2
1
4 8
16
24
32 40
#Nodes
48
56
64
Netflix
10
2
D=100
10
10
GraphLab
10
10
1
D=50
1
Cost($)
Cost($)
Hadoop
0
D=20
10
10
−1
1
10
2
10
3
10
Runtime(s)
10
4
D=5
0
−1
0.92
0.94
0.96
0.98
Error (RMSE)
1
4
10
3
Runtime(s)
10
Netflix
MPI
Hadoop
GraphLab
2
10
1
10
4 8
16
24
32
40
48
56
64
Experiments
Named Entity Recognition
(part of Tom Mitchell’s NELL project)
CoEM Algorithm
Web Crawl
eo Frame
Food
Religion
City
onion
garlic
noodles
blueberries
beans
Catholic
Fremasonry
Marxism
Catholic Chr.
Humanism
Munich
Cape Twn.
Seoul
Mexico Cty.
Winnipeg
(c) Segmentation
(d) NER
Model:
2 million nodes, 200 million
edges
of the ALS algorithm on the Netflix dataset after 30 iterations with different values of d. Lower
m the original video sequences. (c) Coseg: results of running the co-segmentation algorithm. We
Graph
is rather
he common
segments
such asdense.
“ sky” and “ grass.” (d) NER: Top words for several types.
A small number of vertices connect to almost all the vertices.
hrases and conseveral other research projects like clustering communi-
Named Entity Recognition (CoEM)
16
14
Ideal
Speedup
12
10
NER
8
6
4
2
1
4 8
16
24
32 40
#Nodes
48
56
64
85
Named Entity Recognition (CoEM)
MBPS per node
100
NER
80
60
Netflix
40
CoSeg
20
8
16
24
32 40
#Nodes
48
56
64
Bandwidth Bound
86
Named Entity Recognition (CoEM)
Runtime(s)
10
10
10
10
4
Hadoop
3
GraphLab
MPI
2
1
4 8
16
24
32 40
#Nodes
48
56
64
87
Future Work
Distributed GraphLab
Fault Tolerance  Spot Instances  Cheaper
Graph using off-memory store (disk/SSD)
GraphLab as a database
Self-optimized partitioning
Fast data  graph construction primitives
GPU GraphLab ?
Supercomputer GraphLab ?
Is GraphLab the Answer
to (Life the Universe and
Everything?)
Probably Not.
Carnegie Mellon
GraphLab
Microsoft
Safe
graphlab.ml.cmu.edu
Parallel/Distributed Implementation
LGPL (highly probable switch to MPL in a few weeks)
Danny Bickson Marketing Agency
bickson.blogspot.com
Very fast matrix factorization implementations,
other examples, installation, comparisons, etc
Carnegie Mellon
SVD
CoEM
Matrix
Factorization
Bayesian Tensor
Factorization
Lasso
Questions?
Gibbs Sampling
PageRank
SVM
Dynamic Block Gibbs Sampling
Many Others…
Belief Propagation
Carnegie Mellon
Video Cosegmentation
Naïve Idea:
Treat patches independently
Use Gaussian EM clustering (on image features)
E step: Predict membership of each patch given cluster centers
M step: Compute cluster centers given memberships of each patch
Does not take relationships among patches into account!
Video Cosegmentation
Better Idea:
Connect the patches using an MRF. Set edge potentials so
that adjacent (spatially and temporally) patches prefer to
be of the same cluster.
Gaussian EM clustering with a twist:
E step: Make unary potentials for each patch using cluster centers.
Predict membership of each patch using BP
M step: Compute cluster centers given memberships of each patch
D. Batra, et al. iCoseg: Interactive co-segmentation with intelligent scribble guidance. CVPR 2010.
Distributed Memory Programming APIs
…do not make it easy…
•
•
•
•
•
MPI
Global Arrays
GASnet
ARMCI
etc.
Synchronous computation.
Insufficient primitives for multi-threaded use.
Also, not exactly easy to use…
If all your data is a n-D array
Direct remote pointer
access. Severe limitations
depending on system
architecture.

similar documents