Graph

Report
GraphX:
Graph Processing in a
Distributed Dataflow Framework
Joseph Gonzalez
Postdoc, UC-Berkeley AMPLab
Co-founder, GraphLab Inc.
Joint work with Reynold Xin, Ankur Dave, Daniel
Crankshaw, Michael Franklin, and Ion Stoica
OSDI 2014
UC BERKELEY
Modern Analytics
Link Table
Hyperlinks
PageRank Top 20 Page
Title PR
Title Link
Raw
Wikipedia
Com. PR..
<</ />>
</>
Top Communities
XML
Discussion
Table
User Disc.
Community
User
Editor Graph Detection Community
User Com.
Tables
Link Table
Hyperlinks
PageRank Top 20 Page
Title PR
Title Link
Raw
Wikipedia
Com. PR..
<</ />>
</>
Top Communities
XML
Discussion
Table
User Disc.
Community
User
Editor Graph Detection Community
User Com.
Graphs
Link Table
Hyperlinks
PageRank Top 20 Page
Title PR
Title Link
Raw
Wikipedia
Com. PR..
<</ />>
</>
Top Communities
XML
Discussion
Table
User Disc.
Community
User
Editor Graph Detection Community
User Com.
Separate Systems
Tables
Graphs
Separate Systems
Dataflow Systems
Graphs
Table
Row
Row
Row
Row
Resul
t
Separate Systems
Dataflow Systems
Table
Dependency
Graph
Row
Row
Row
Row
Graph Systems
Resul
t
Difficult to Use
Users must Learn, Deploy, and
Manage multiple systems
Leads to brittle and often
complex interfaces
8
Inefficient
Extensive data movement and duplication across
the network and file system
<</ />>
</>
XML
HDFS
HDFS
HDFS
HDFS
Limited reuse internal data-structures
across stages
9
GraphX Unifies Computation on
Tables and Graphs
GraphX
Table View
Spark Dataflow
Framework
Graph View
Enabling a single system to easily and
efficiently support the entire pipeline
Separate Systems
Dataflow Systems
Table
Dependency
Graph
Row
Row
Row
Row
Graph Systems
Resul
t
Separate Systems
Dataflow Systems
Table
Dependency
Graph
Row
Row
Row
Row
Graph Systems
Resul
t
PageRank on the Live-Journal Graph
Hadoop
Spark
1340
354
?
GraphLab
22
0
200
400
600
800
1000
1200
1400
1600
Runtime (in seconds, PageRank for 10 iterations)
Hadoop is 60x slower than GraphLab
Spark is 16x slower than GraphLab
Key Question
How can we naturally express and
efficiently execute graph
computation in a general purpose
dataflow framework?
Distill the lessons learned
from specialized graph systems
Key Question
How can we naturally express and
efficiently execute graph
computation in a general purpose
dataflow framework?
Representation Optimizations
Link Table
Hyperlinks
PageRank Top 20 Page
Title PR
Title Link
Raw
Wikipedia
Com. PR..
<</ />>
</>
Top Communities
XML
Discussion
Table
User Disc.
Community
User
Editor Graph Detection Community
User Com.
Example Computation:
PageRank
Express computation locally:
Rank of
Page i
Random
Reset
Prob.
Weighted sum of
neighbors’ ranks
Iterate until convergence
17
“Think like a Vertex.”
- Malewicz et al., SIGMOD’10
18
Graph-Parallel Pattern
Gonzalez et al. [OSDI’12]
Gather information from
neighboring vertices
19
Graph-Parallel Pattern
Gonzalez et al. [OSDI’12]
Apply an update the
vertex property
20
Graph-Parallel Pattern
Gonzalez et al. [OSDI’12]
Scatter information to
neighboring vertices
21
Many Graph-Parallel
Algorithms
Collaborative Filtering
» Alternating Least Squares
» Stochastic Gradient Descent
» Tensor Factorization
MACHINE
LEARNING
Programs
Structured Prediction
» Loopy Belief Propagation
» Max-Product Linear
» Gibbs Sampling
Community Detection
» Triangle-Counting
» K-core Decomposition
» K-Truss
NETWORK
ANALYSIS
» Shortest Path
Graph Analytics
» PageRank
» Personalized PageRank
» Graph Coloring
Semi-supervised ML
» Graph SSL
» CoEM
22
Specialized
Computational
Pattern
Specialized
Graph
Optimizations
Graph System
Optimizations
Specialized
Data-Structures
Message
Combiners
Vertex-Cuts
Partitioning
Remote
Caching / Mirroring
Active Set Tracking
24
Representation
Join
Distributed
Graphs
Horizontally
Partitioned Tables
Vertex
Programs
Dataflow
Operators
Optimizations
Advances in Graph Processing Systems
Distributed Join
Optimization
Materialized View
Maintenance
Property Graph Data Model
Property Graph
B
Vertex Property:
C
A
A
A
D
D
D
• User Profile
• Current PageRank Value
Edge Property:
F
E
• Weights
• Timestamps
Encoding Property Graphs as Tables
Property Graph
Part. 1
C
A
VertexA
ACut
E
Part. 2
A
A 1 2
B
C
Machine 2
F
D
D
D
Routing
Table
(RDD)
Machine 1
B
Vertex
Table
(RDD)
D
Edge
Table
(RDD)B
A
A
C
B 1
B
C
C 1
C
D
D 1 2
A
E
A
F
E
D
E
F
E
E 2
F
F 2
Separate Properties and Structure
Reuse structural information across multiple
graphs
Input Graph
Vertex Routing
Table
Table
Transformed Graph
Edge
Table
Vertex Routing
Table
Table
Transform Vertex
Properties
Edge
Table
Graph Operators (Scala)
class Graph [ V, E ] {
def Graph(vertices: Table[ (Id, V) ],
edges: Table[ (Id, Id, E) ])
// Table Views ----------------def vertices: Table[ (Id, V) ]
def edges: Table[ (Id, Id, E) ]
def triplets: Table [ ((Id, V), (Id, V), E) ]
// Transformations -----------------------------def reverse: Graph[V, E]
def subgraph(pV: (Id, V) => Boolean,
pE: Edge[V,E] => Boolean): Graph[V,E]
def mapV(m: (Id, V) => T ): Graph[T,E]
def mapE(m: Edge[V,E] => T ): Graph[V,T]
// Joins ---------------------------------------def joinV(tbl: Table [(Id, T)]): Graph[(V, T), E ]
def joinE(tbl: Table [(Id, Id, T)]): Graph[V, (E, T)]
// Computation ---------------------------------def mrTriplets(mapF: (Edge[V,E]) => List[(Id, T)],
reduceF: (T, T) => T): Graph[T, E]
}
30
Graph Operators (Scala)
class Graph [ V, E ] {
def Graph(vertices: Table[ (Id, V) ],
edges: Table[ (Id, Id, E) ])
// Table Views ----------------def vertices: Table[ (Id, V) ]
def edges: Table[ (Id, Id, E) ]
def triplets: Table [ ((Id, V), (Id, V), E) ]
// Transformations -----------------------------def reverse: Graph[V, E]
def subgraph(pV: (Id, V) => Boolean,
pE: Edge[V,E] => Boolean): Graph[V,E]
def mapV(m: (Id, V) => T ): Graph[T,E]
def mapE(m: Edge[V,E] => T ): Graph[V,T]
// Joins ---------------------------------------def joinV(tbl: Table [(Id, T)]): Graph[(V, T), E ]
def joinE(tbl: Table [(Id, Id, T)]): Graph[V, (E, T)]
// Computation ---------------------------------def mrTriplets(mapF: (Edge[V,E]) => List[(Id, T)],
reduceF: (T, T) => T): Graph[T, E]
capture the Gather-Scatter pattern from
specialized graph-processing systems
}
31
Triplets Join Vertices and
Edges
The triplets operator joins vertices and
edges:
Vertices
Triplets
A
Edges
A
B
B
A
C
A
C
C
B
C
B
C
D
C
D
C
D
Map-Reduce Triplets
Map-Reduce triplets collects information
about the neighborhood of each vertex:
Src. or Dst.
MapFunction(A
B
)  (B,
MapFunction(A
C
)  (C,
MapFunction(B
C
)  (C,
MapFunction(C
D
)  (D,
)
)
)
)
Reduce
(B,
)
(C,
+ )
(D,
)
Message
Combiners
Using these basic GraphX operators
we implemented Pregel and GraphLab
in under 50
lines of code!
34
The GraphX Stack
(Lines of Code)
PageRank
(20)
K-core
(60)
Connected
Comp. (20)
Pregel API (34)
Triangl
e
Count
(50)
LDA SVD++
(220) (110)
GraphX (2,500)
Spark (30,000)
Some algorithms are more naturally expressed
using the GraphX primitive operators
Representation
Join
Distributed
Graphs
Horizontally
Partitioned Tables
Vertex
Programs
Dataflow
Operators
Optimizations
Advances in Graph Processing Systems
Distributed Join
Optimization
Materialized View
Maintenance
Join Site Selection using Routing
Tables
Routing
Table
(RDD)
Vertex
Table
(RDD)
A 1 2
A
B 1
B
C 1
C
D 1 2
D
E 2
E
F 2
F
Never Shuffle
Edges!
Edge Table
(RDD)
A
B
A
C
B
C
C
D
A
E
A
F
E
D
E
F
Caching for Iterative mrTriplets
Vertex
Table
(RDD)
Edge Table
(RDD)
Mirror
Cache
A
A
B
A
C
B
C
C
D
A
E
A
F
E
E
D
F
E
F
A
Reusable
Hash Index
B
Scan
B
B
A
C
D
C
C
Mirror
Cache
D
D
A
FF
Reusable
Hash Index
D
Scan
E
E
Incremental Updates for Triplets View
Vertex
Table
(RDD)
Change
A
B
C
D
Edge Table
(RDD)
Mirror
Cache
A
B
A
C
B
C
C
D
A
E
A
F
E
E
D
F
E
F
A
B
C
D
Mirror
Cache
Change
E
F
D
Scan
A
Aggregation for Iterative mrTriplets
Vertex
Table
(RDD)
Change
Change
Change
Change
Edge Table
(RDD)
Mirror
Cache
A
B
A
B
A
C
B
C
C
D
A
E
A
F
E
E
D
F
E
F
A
Local
Aggregate
B
C
D
C
Mirror
Cache
D
Change
Change
E
F
Local
Aggregate
D
Scan
A
Reduction in
Communication Due to
Cached Updates
Connected Components on Twitter Graph
Network Comm. (MB)
10000
1000
100
10
Most vertices are within 8 hops
of all vertices in their comp.
1
0.1
0
2
4
6
8
Iteration
10
12
14
16
Benefit of Indexing Active
Vertices
Connected Components on Twitter Graph
Runtime (Seconds)
30
25
20
15
Without Active Tracking
10
Active Vertex Tracking
5
0
0
2
4
6
8
Iteration
10
12
14
16
Join Elimination
Identify and bypass joins for unused triplet
fields
» Java bytecode
inspection
PageRank
on Twitter
12000
10000
Better
Communication (MB)
14000
8000
6000
4000
2000
Factor of 2 reduction in communication
0
0
5
10
Iteration
15
20
43
Additional Optimizations
Indexing and Bitmaps:
» To accelerate joins across graphs
» To efficiently construct sub-graphs
Lineage based fault-tolerance
» Exploits Spark lineage to recover in parallel
» Eliminates need for costly check-points
Substantial Index and Data Reuse:
» Reuse routing tables across graphs and subgraphs
» Reuse edge adjacency information and indices
44
System Comparison
Goal:
Demonstrate that GraphX achieves
performance parity with specialized graphprocessing systems.
Setup:
16 node EC2 Cluster (m2.4xLarge) + 1GigE
Compare against GraphLab/PowerGraph
(C++), Giraph (Java), & Spark (Java/Scala)
PageRank Benchmark
EC2 Cluster of 16 x m2.4xLarge (8 cores) + 1GigE
Twitter Graph (42M Vertices,1.5B Edges)
Runtime (Seconds)
3500
3000
2500
2000
1500
1000
500
0
7x
9000
8000
7000
6000
5000
4000
3000
2000
1000
0
UK-Graph (106M Vertices, 3.7B Edge
18x
GraphX performs comparably to
state-of-the-art graph processing systems.
Connected Comp.
Benchmark
EC2 Cluster of 16 x m2.4xLarge (8 cores) + 1GigE
Runtime (Seconds)
2500
2000
1500
8x
1000
500
0
UK-Graph (106M Vertices, 3.7B Edge
9000
8000
7000
6000
5000
4000 10x
3000
2000
1000
0
Out-of-Memory
Twitter Graph (42M Vertices,1.5B Edges)
GraphX performs comparably to
state-of-the-art graph processing systems.
Graphs are just one stage….
What about a pipeline?
A Small Pipeline in GraphX
Raw Wikipedia
Hyperlinks
<</ />>
</>
PageRank
HDFS
XML
Spark Preprocess
Top 20 Pages
HDFS
Compute
Spark
Spark Post.
1492
Giraph + Spark
605
GraphLab + Spark
375
GraphX
342
0
200
400
600
800 1000 1200 1400 1600
Total Runtime (in Seconds)
Timed end-to-end GraphX is the fastest
Adoption and Impact
GraphX is now part of Apache Spark
• Part of Cloudera Hadoop Distribution
In production at Alibaba Taobao
• Order of magnitude gains over Spark
Inspired GraphLab Inc. SFrame technology
• Unifies Tables & Graphs on Disk
GraphX  Unified Tables and
Graphs
New API
New System
Blurs the distinction
between Tables and
Graphs
Unifies Data-Parallel
Graph-Parallel Systems
Enabling users to easily and efficiently
express the entire analytics pipeline
What did we Learn?
Specialized
Systems
Graph Systems
Integrated
Frameworks
GraphX
Future Work
Specialized
Systems
Graph Systems
Integrated
Frameworks
GraphX
Parameter Server
?
Future Work
Specialized
Systems
Graph Systems
Parameter Server
Integrated
Frameworks
GraphX
Asynchrony
Non-deterministic
Shared-State
Thank You
http://amplab.cs.berkeley.edu/projects/gra
phx/
[email protected]
Reynold
Xin
Ankur
Dave
Daniel
Crankshaw
Michael
Franklin
Ion
Stoica
Related Work
Specialized Graph-Processing Systems:
GraphLab [UAI’10], Pregel [SIGMOD’10], SignalCollect [ISWC’10], Combinatorial BLAS
[IJHPCA’11], GraphChi [OSDI’12], PowerGraph
[OSDI’12],
Ligra [PPoPP’13], X-Stream [SOSP’13]
Alternative to Dataflow framework:
Naiad [SOSP’13]: GraphLINQ
Hyracks: Pregelix [VLDB’15]
Distributed Join Optimization:
Multicast Join [Afrati et al., EDBT’10]
Semi-Join in MapReduce [Blanas et al.,
Edge Files Have Locality
UK-Graph (106M Vertices, 3.7B Edges)
1200
GraphX preserves the
on-disk layout through
Spark.
 Better Vertex-Cut
Runtime (Seconds)
GraphLab rebalances the
edge-files on-load.
1000
800
600
400
200
0
GraphLab GraphX +
Shuffle
GraphX
Scalability
Twitter Graph (42M Vertices,1.5B Edges)
500
Scales slightly better
than
PowerGraph/GraphLab
450
400
Runtime
350
300
250
200
150
100
50
0
8
16
24
32
40
EC2-Nodes
48
56
64
Apache Spark Dataflow
Platform Zaharia et al., NSDI’12
Resilient Distributed Datasets (RDD):
RDD
RDD
HDF
S
Load
Map
HDF
S
Load
Map
RDD
Reduce
Apache Spark Dataflow
Platform Zaharia et al., NSDI’12
Resilient Distributed Datasets (RDD):
RDD
RDD
HDF
S
Load
Map
HDF
S
Load
Map
Persist
in Memory
RDD
Reduce
.cache()
Optimized for iterative access to data.
Shared Memory
Advantage
Spark Shared Nothing Model
Cor
e
Cor
e
Cor
e
Cor
e
Shuffle
Files
GraphLab Shared Memory
Shared
De-serialized
In-Memory
Graph
Cor
e
Cor
e
Cor
e
Cor
e
Shared Memory
Advantage
Spark Shared Nothing Model
Cor
e
Cor
e
Cor
e
Cor
e
GraphLab No SHM.
TCP/IP
Cor
e
Cor
e
Cor
e
Cor
e
500
450
400
350
300
250
200
150
100
50
0
Runtime (Seconds)
Shuffle
Files
Twitter Graph (42M Vertices,1.5B Edges)
GraphLab GraphLab
NoSHM
GraphX
PageRank Benchmark
Twitter Graph (42M Vertices,1.5B Edges)
3500
3000
2500
2000
1500
1000
500
0
UK-Graph (106M Vertices, 3.7B Edges
9000
8000
7000
6000
5000
4000
3000
2000
1000
0
GraphX performs comparably to
state-of-the-art graph processing systems.
Connected Comp.
Benchmark
EC2 Cluster of 16 x m2.4xLarge Nodes + 1GigE
2000
8x
1500
1000
500
0
9000
8000
7000
6000
5000
4000
3000
2000
1000
0
10x
GraphX performs comparably to
state-of-the-art graph processing systems.
Out-of-Memory
Runtime (Seconds)
2500
UK-Graph (106M Vertices, 3.7B Edge
Out-of-Memory
Twitter Graph (42M Vertices,1.5B Edges)
Fault-Tolerance
Runtime (Seconds)
Leverage Spark Fault-Tolerance Mechanism
1000
900
800
700
600
500
400
300
200
100
0
No Failure
Lineage
Restart
Graph-Processing Systems
oogle
Ligra
CombBLAS
X-Stream
GraphChi
GPS
Kineograph
Representation
Expose specialized API to simplify graph
programming.
67
Vertex-Program Abstraction
Pregel_PageRank(i, messages) :
// Receive all the messages
total = 0
foreach( msg in messages) :
total = total + msg
i
// Update the rank of this vertex
R[i] = 0.15 + total
// Send new messages to neighbors
foreach(j in out_neighbors[i]) :
Send msg(R[i]) to vertex j
68
The Vertex-Program Abstraction
GraphLab_PageRank(i)
// Compute sum over neighbors
total = 0
foreach( j in neighbors(i)):
total += R[j] * wji
R[4] * w41
4
+
1
+
3
2
// Update the PageRank
R[i] = 0.15 + total
Low, Gonzalez, et al. [UAI’10]69
Example: Oldest Follower
23
Calculate the number of older
followers for each user?
val olderFollowerAge = graph
.mrTriplets(
e => // Map
if(e.src.age > e.dst.age) {
(e.srcId, 1)
else { Empty }
,
(a,b) => a + b // Reduce
)
.vertices
42
B
C
30
A
D
E
19
F
16
70
75
Enhanced Pregel in GraphX
pregelPR(i, messageList
):
messageSum
Require Message
Combiners
// Receive all the messages
total = 0
messageSum
foreach( msg in messageList) :
total = total + msg
// Update the rank of this vertex
R[i] = 0.15 + total
combineMsg(a, b):
// Compute
summessages
of two messages
//
Send
new
to neighbors
sendMsg(ij,
R[i],
R[j],
E[i,j]):
return
a + b in out_neighbors[i]) :
foreach(j
// Compute single message
Send
msg(R[i]/E[i,j]) to vertex
return
msg(R[i]/E[i,j])
Remove Message
Computation
from the
Vertex Program
71
PageRank in GraphX
// Load and initialize the graph
val graph = GraphBuilder.text(“hdfs://web.txt”)
val prGraph = graph.joinVertices(graph.outDegrees)
// Implement and Run PageRank
val pageRank =
prGraph.pregel(initialMessage = 0.0, iter = 10)(
(oldV, msgSum) => 0.15 + 0.85 * msgSum,
triplet => triplet.src.pr / triplet.src.deg,
(msgA, msgB) => msgA + msgB)
72
Example Analytics Pipeline
// Load raw data tables
val articles = sc.textFile(“hdfs://wiki.xml”).map(xmlParser)
val links = articles.flatMap(article => article.outLinks)
// Build the graph from tables
val graph = new Graph(articles, links)
// Run PageRank Algorithm
val pr = graph.PageRank(tol = 1.0e-5)
// Extract and print the top 20 articles
val topArticles = articles.join(pr).top(20).collect
for ((article, pageRank) <- topArticles) {
println(article.title + ‘\t’ + pageRank)
}
Apache Spark Dataflow
Platform Zaharia et al., NSDI’12
Resilient Distributed Datasets (RDD):
RDD
RDD
HDF
S
Load
Map
HDF
S
Load
Map
Reduce
.cache()
Lineage:
HDF
S
Load
Persist
in Memory
RDD
RDD
Map
RDD
Reduce
RDD

similar documents