Disorderly Distributed Programming with Dedalus

Report
Declarative Distributed
Programming with
Dedalus and Bloom
Peter Alvaro, Neil Conway
UC Berkeley
This Talk
1. Background
– BOOM Analytics
2. Theory
– Dedalus
– CALM
3. Practice
– Bloom
– Lattices
Berkeley Orders of Magnitude
Vision:
Can we build small programs
for large distributed systems?
Approach:
• Language  system design
• System  language design
Initial Language: Overlog
• Data-centric programming
– Uniform representation of system state
• High-level, declarative query language
– Distributed variant of Datalog
• Express systems as queries
BOOM Analytics
Goal: “Big Data” stack
– API-compliant
– Competitive performance
System: [EuroSys’10]
– Distributed file system
• HDFS compatible
– Hadoop job scheduler
What Worked Well
• Concise, declarative implementation
– 10-20x more concise than Java (LOCs)
– Similar performance (within 10-20%)
• Separation of policy and mechanism
• Ease of evolution
1. High availability (failover + Paxos)
2. Scalability (hash partitioned FS master)
3. Monitoring as an aspect
What Worked Poorly
Unclear semantics
– “Correct” semantics defined by interpreter
behavior
In particular,
1. change (e.g., state update)
2. uncertainty (e.g., async communication)
Temporal Ambiguity
Goal:
• Increment a counter upon “request” message
• Send response message with value of counter
When is counter incremented?
counter(“hostname”,0).
counter(To,X+1) :- counter(To,X),
req(To,_).
response(@From,X) :- counter(@To,X),
req(@To,From).
What does response contain?
Implicit Communication
path(@S,D) :- link(@S,Z), path(@Z,D).
Implicit communication was the wrong
abstraction for systems programming.
– Hard to reason about partial failure
Example: we never used distributed joins in
the file system!
Received Wisdom
We argue that objects that interact in a
distributed system need to be dealt with in ways
that are intrinsically different from objects that
interact in a single address space. These
differences are required because distributed
systems require that the programmer be aware
of latency, have a different model of memory
access, and take into account issues of
concurrency and partial failure.
Jim Waldo et al.,
A Note on Distributed Computing (1994)
Dedalus
(it’s about time)
Explicitly represent logical time
as an attribute of all knowledge
“Time is a device that was invented to keep
everything from happening at once.”
(Unattributed)
Dedalus: Syntax
Datalog + temporal modifiers
1. Instantaneous (deduction)
2. Deferred (sequencing)
• True at successor time
3. Asynchronous (communication)
• True at nondeterministic future time
Dedalus: Syntax
(1) Deductive rule: (Plain Datalog)
Logical time
p(A,B,S) :- q(A,B,T), T=S.
(2) Inductive rule: (Constraint across “next” timestep)
p(A,B,S) :- q(A,B,T), S=T+1.
(3) Async rule: (Constraint across arbitrary
timesteps)
p(A,B,S) :- q(A,B,T), time(S),
choose((A,B,T), (S)).
Syntax Sugar
(1) Deductive rule: (Plain Datalog)
p(A,B) :- q(A,B).
(2) Inductive rule: (Constraint across “next” timestep)
p(A,B)@next :- q(A,B).
(3) Async rule: (Constraint across arbitrary
timesteps)
p(A,B)@async:- q(A,B).
State Update
p(A, B)@next :- p(A, B), notin p_del(A, B).
Example Trace:
p(1, 2)@101;
p(1, 3)@102;
p_del(1, 3)@300;
Time p(1, 2)
101
102
...
300
301
p(1, 3)
p_del(1, 3)
Logic and time
Overlog:
Relationships
among facts
Dedalus: Also,
relationships
between states
Key relationships:
• Atomicity
• Mutual exclusion
• Sequentiality
Change and Asynchrony
Overlog
counter(“hostname”,0).
counter(To,X+1) :- counter(To,X), req(To,_).
response(@From,X) :- counter(@To,X), req(@To,From).
Dedalus
Increment is deferred
counter(“hostname”,0).
counter(To,X+1) :[email protected]
counter(To,X), req(To,_).
counter(To,X)
:counter(To,X), notin req(To,_).
@next
response(@From,X) @async
:- counter(X), req(@To,From).
Pre-increment value sent
Non-deterministic
delivery time
Dedalus: Semantics
Goal: declarative semantics
– Reason about a program’s meaning rather
than its executions
Approach: model theory
Minimal Models
A negation-free (monotonic) Datalog
program has a unique minimal model
Model
Minimal
Unique
No “missing” facts
No “extra” facts
Program has a single meaning
Stable Models
The consequences of async rules hold at
a nondeterministic future time
– Captured by the choice construct
• Greco and Zaniolo (1998)
– Each choice leads to a distinct model
Intuition:
A stable model is an execution trace
Traces and models
counter(To, X+1)@next :- counter(To, X), request(To, _).
counter(To, X)@next :- counter(To, X), notin request(To, _).
response(@From, X)@async :- counter(@To, X),
request(@To, From).
response(From, X)@next :- response(From, X).
Persistence rules lead
to infinitely large models
Async rules lead to
infinitely many models
An Execution
counter(To, X+1)@next :- counter(To, X), request(To, _).
counter(To, X)@next :- counter(To, X), notin request(To, _).
response(@From, X)@async :- counter(@To, X),
request(@To, From).
response(From, X)@next :- response(From, X).
counter(“node1”, 0)@0.
request(“node1”, “node2”)@0.
A Stable Model
counter(To, X+1)@next :- counter(To, X), request(To, _).
counter(To, X)@next :- counter(To, X), notin request(To, _).
response(@From, X)@async :- counter(@To, X),
request(@To, From).
response(From, X)@next :- response(From, X).
counter(“node1”, 0)@0.
request(“node1”, “node2”)@0.
counter(“node1”, 1)@1.
counter(“node1”,1)@2.
[…]
response(“node2”, 0)@100.
counter(“node1”, 1)@101.
counter(“node1”, 1)@102.
response(“node2”, 0)@101.
response(“node2”, 0)@102.
[…]
A stable model
for choice = 100
Ultimate Models
A stable model characterizes an
execution
– Many of these models are not
“interestingly” different
Wanted: a characterization of outcomes
– An ultimate model contains exactly those
facts that are “eventually always true”
Traces and models
counter(To,X+1)@next :- counter(To,X), request(To,_).
counter(To,X)@next :- counter(To,X), notin request(To,_).
response(@From,X)@async :- counter(@To,X), request(@To,From).
response(From,X)@next :- response(From,X).
counter(“node1”, 0)@0.
request(“node1”, “node2”)@0.
counter(“node1”, 1)@1.
counter(“node1”, 1)@2.
[…]
response(“node2”, 0)@100.
counter(“node1”, 1)@101.
counter(“node1”, 1)@102.
response(“node2”, 0)@101.
response(“node2”, 0)@102.
[…]
Traces and models
counter(To,X+1)@next :- counter(To,X), request(To,_).
counter(To,X)@next :- counter(To,X), notin request(To,_).
response(@From,X)@async :- counter(@To,X), request(@To,From).
response(From,X)@next :- response(From,X).
Ultimate Model
counter(“node1”, 1).
response(“node2”, 0).
Confluence
This program has a unique ultimate model
– In fact, all negation-free Dedalus programs
have a unique ultimate model [DL2’12]
We call such programs confluent:
same program outcome, regardless of
network non-determinism
The Bloom
Programming
Language
Lessons from Dedalus
1. Clear program semantics is essential
2. Avoid implicit communication
3. Confluence seems promising
Lessons From Building Systems
1. Syntax matters!
– Datalog syntax is cryptic and foreign
2. Adopt, don’t reinvent
– DSL > standalone language
– Use host language’s type system (E. Meijer)
3. Modularity is important
– Scoping
– Encapsulation
Bloom Operational Model
Now
State
Update
Clock
Events
Inbound
Network
Messages
State
Update
Bloom Rules
atomic, local,
deterministic
Outbound
Network
Messages
Bloom Rule Syntax
Local computation
<collection>
<merge op>
State update
<expr>
table
persistent state
<=
now
map, flat_map
scratch
transient state
<+
next
reduce, group
channel
network transient
<-
delete (at next)
join, outerjoin
<~
async
empty?, include?
Asynchronous
message passing
Example: Quorum Vote
QUORUM_SIZE = 5
RESULT_ADDR = "example.org"
class QuorumVote
include Bud
Ruby class definition
Communication interfaces
state do
channel :vote_chn, [:@addr, :voter_id]
channel :result_chn, [:@addr]
table :votes, [:voter_id]
scratch :cnt, [] => [:cnt]
end
Bloom state
Coordinator state
Asynchronous messaging
Accumulate votes
Bloom logic
bloom do
Count votes
votes
<= vote_chn {|v| [v.voter_id]}
cnt
<= votes.group(nil, count(:voter_id))
result_chn <~ cnt {|c| [RESULT_ADDR] if c >= QUORUM_SIZE}
end
end
Send message when quorum reached
34
Question:
How does confluence
relate to practical problems
of distributed consistency?
Common Technique:
Replicate state at multiple sites, for:
• Fault tolerance
• Reduced latency
• Read throughput
Problem:
Different replicas
might observe events
in different orders
… and then reach
different conclusions!
Alternative #1:
Enforce consistent
event order at all nodes
(“Strong Consistency”)
Alternative #1:
Enforce consistent
event order at all nodes
(“Strong Consistency”)
Problems:
• Availability
• CAP Theorem
• Latency
Alternative #2:
Achieve correct results for
any network order
(“Weak Consistency”)
Alternative #2:
Achieve correct results for
any network order
(“Weak Consistency”)
Concerns:
Writing order-independent
programs is hard!
Challenge:
How can we make it
easier to write
order-independent
programs?
Order-Independent Programs
Alternative #1:
– Start with a conventional language
– Reason about when order can be relaxed
• This is hard! Especially for large programs.
Taking Order For Granted
Data
(Ordered)
array of bytes
Compute (Ordered)
sequence of
instructions
Writing order-sensitive
programs is too easy!
Order-Independent Programs
Alternative #1:
– Start with a conventional language
– Reason about when order can be relaxed
• This is hard! Especially for large programs.
Alternative #2:
– Start with an order-independent language
– Add order explicitly, only when necessary
– “Disorderly Programming”
(Leading) Question:
So, where might we find
a nice order-independent
programming language?
Recall:
All monotone Dedalus programs
are confluent.
Monotonic Logic
• As input set grows,
output set does not
shrink
• Order independent
• e.g., map, filter, join,
union, intersection
Non-Monotonic Logic
• New inputs might
invalidate previous
outputs
• Order sensitive
• e.g., aggregation,
negation
Consistency
As
Logical
Monotonicity
CALM Analysis [CIDR’11]
1. Monotone programs are
deterministic (confluent)
[Ameloot’11, Marczak’12]
2. Simple syntactic test for
monotonicity
Result: Whole-program
static analysis for
eventual convergence
Case Study
Cart
Replica
Add/ Remove
Items
Client
Cart
Replica
Checkout
Request
Cart
Replica
Lazy
Replication
Scenario
ADD
{beer: 2, wine: 1}
Cart
Replica
Client
Cart
Replica
Cart
Replica
Scenario
Cart
Replica
Client
ADD
{beer: 2, wine: 1}
Cart
Replica
Cart
Replica
Scenario
Cart
Replica
Client
Cart
Replica
REMOVE
{beer: 1}
Cart
Replica
Scenario
Cart
Replica
RESPONSE
Client
CHECKOUT
REQUEST
Cart
Replica
Cart
Replica
Questions
1. Will cart replicas eventually converge?
– “Eventual Consistency”
2. What will client observe on checkout?
– Goal: checkout reflects all session actions
3. To achieve #1 and #2, how much
additional coordination is required?
Design #1: Mutable State
Add(item x, count c):
if kvs[x] exists:
old = kvs[x]
kvs.delete(x)
else
old = 0
kvs[x] = old + c
Remove(item x, count c):
if kvs[x] exists:
old = kvs[x]
kvs.delete(x)
if old > c
kvs[x] = old – c
Non-monotonic!
CALM Analysis
Non-monotonic!
Cart
Replica
Add/ Remove
Items
Client
Cart
Replica
Checkout
Request
Cart
Replica
Conclusion:
Every operation might
require coordination!
Lazy
Replication
Subtle Bug
Add(item x, count c):
if kvs[x] exists:
old = kvs[x]
kvs.delete(x)
else
old = 0
kvs[x] = old + c
Remove(item x, count c):
if kvs[x] exists:
old = kvs[x]
kvs.delete(x)
if old > c
kvs[x] = old – c
What if remove
before add?
Design #2: “Disorderly”
Add(item x, count c):
Append x,c to add_log
Checkout():
Group add_log by item ID;
sum counts.
Remove(item x, count c):
Append x,c to del_log
Non-monotonic!
Group del_log by item ID;
sum counts.
For each item, subtract
deletions from additions.
CALM Analysis
Monotonic
Cart
Replica
Add/ Remove
Items
Client
Cart
Replica
Checkout
Request
Cart
Replica
Conclusion:
Replication is safe;
might need to
coordinate on checkout
Lazy
Replication
Takeaways
• Major difference in coordination cost!
– Coordinate once per operation vs.
Coordinate once per checkout
• Disorderly accumulation when possible
– Monotone growth  confluent
• “Disorderly”: common design in practice!
– e.g., Amazon Dynamo
Generalizing Monotonicity
• Monotone logic: growing sets over time
– Partial order: set containment
• In practice, other kinds of growth:
– Version numbers, timestamps
– “In-progress”  committed/aborted
– Directories, sequences, …
Example: Quorum Vote
QUORUM_SIZE = 5
RESULT_ADDR = "example.org"
class QuorumVote
include Bud
state do
channel :vote_chn, [:@addr, :voter_id]
channel :result_chn, [:@addr]
table :votes, [:voter_id]
scratch :cnt, [] => [:cnt]
end
Not (set-wise)
monotonic!
bloom do
votes
<= vote_chn {|v| [v.voter_id]}
cnt
<= votes.group(nil, count(:voter_id))
result_chn <~ cnt {|c| [RESULT_ADDR] if c >= QUORUM_SIZE}
end
end
62
Challenge:
Extend monotone logic to
allow other kinds of “growth”
hS,t,?i is a bounded join semilattice iff:
– S is a set
– t is a binary operator (“least upper bound”)
• Induces a partial order on S: x ·S y if x t y = y
• Associative, Commutative, and Idempotent
– “ACID 2.0”
• Informally, LUB is “merge function” for S
– ? is the “least” element in S
• 8x 2 S: ? t x = x
Time
Set
(t = Union)
Increasing Int
(t = Max)
Boolean
(t = Or)
f : ST is a monotone function iff:
8a,b 2 S : a ·S b ) f(a) ·T f(b)
Time
Monotone function:
set  increase-int
size()
Set
(t = Union)
Monotone function:
increase-int  boolean
>= 3
Increasing Int
(t = Max)
Boolean
(t = Or)
Quorum Vote with Lattices
QUORUM_SIZE = 5
RESULT_ADDR = "example.org"
class QuorumVote
include Bud
state do
channel :vote_chn, [:@addr, :voter_id]
Program state
channel :result_chn, [:@addr]
Lattice state declarations
lset :votes
lmax :vote_cnt
lbool :got_quorum
Accumulate votes
end
into set Monotone function: set  max
Monotone function: max  bool
bloom do
votes
<= vote_chn {|v| v.voter_id}
Program
vote_cnt <= votes.size
got_quorum <= vote_cnt.gt_eq(QUORUM_SIZE)
result_chn
<~new
got_quorum.when_true
Merge
votes together { [RESULT_ADDR] }
end Merge using lmax LUB
with stored
votes test
(set on
LUB)
Threshold
bool (monotone)
end
logic
68
Conclusions
• Interplay between language and system
design
• Key question: what should be explicit?
– Initial answer: asynchrony, state update
– Refined answer: order
• Disorderly programming for disorderly
networks
Thank You!
Queries welcome.
gem install bud
http://www.bloom-lang.net
Collaborators:
Emily Andrews
Peter Bailis
William Marczak
David Maier
Tyson Condie
Joseph M. Hellerstein
Rusty Sears
Sriram Srinivasan
Extra slides
Ongoing Work
1. Lattices
– Concurrent editing
– Distributed garbage collection
2. Confluence and concurrency control
– Support for “controlled non-determinism”
– Program analysis for serializability?
3. Safe composition of monotone and
non-monotone code
Overlog
“Our intellectual powers are rather geared to master
static relations and […] our powers to visualize processes
evolving in time are relatively poorly developed. For that
reason we should do (as wise programmers aware of our
limitations) our utmost to shorten the conceptual gap
between the static program and the dynamic process, to
make the correspondence between the program (spread
out in text space) and the process (spread out in time) as
trivial as possible.”
Edgar Djikstra
(Themes)
• Disorderly / order-light programming
• (understanding, simplifying) the relation
btw program syntax and outcomes
• Determinism (in asynchronous
executions) as a correctness criterion
• Coordination – theoretical basis and
mechanisms
– What programs require coordination?
– How can we coordinate them efficiently?
Traces and models
counter(X+1)@next :- counter(X), request(_, _).
counter(X)@next :- counter(X), notin request(_, _).
response(@From, X)@async :- counter(X), request(To, From).
response(From, X)@next :- response(From, X).
counter(0)@0.
request(“node1”, “node2”)@0.
Traces and models -- 0
counter(0+1)@1 :- counter(0)@0, request(“node1”, “node2”)@0.
counter(X)@next :- counter(X), notin request(_, _).
response(“node2”, 0)@100 :- counter(0)@0, request(“node1”, “node2”)@0.
response(From, X)@next :- response(From, X).
counter(0)@0.
request(“node1”, “node2”)@0.
counter(1)@1.
[…]
response(“node2”, 0)@100.
Traces and models -- 1
counter(X+1)@next :- counter(X), request(To, From).
counter(1)@?+1 :- counter(1)@?, notin request(_, _)@?.
response(From, X)@async :- counter(X), request(To, From).
response(“node2”, 0)@101:- response(“node2”, 0)@100.
counter(0)@0.
request(“node1”, “node2”)@0.
counter(1)@1.
counter(1)@2.
[…]
Traces and models -- 100
counter(X+1)@next :- counter(X), request(To, From).
counter(1)@101 :- counter(1)@100, notin request(_, _)@100.
response(From, X)@async :- counter(X), request(To, From).
response(“node2”, 0)@101:- response(“node2”, 0)@100.
counter(0)@0.
request(“node1”, “node2”)@0.
counter(1)@1.
counter(1)@2.
[…]
response(“node2”, 0)@100.
counter(1)@101.
response(“node2”, 0)@101.
Traces and models – 101+
counter(X+1)@next :- counter(X), request(To, From).
counter(1)@102 :- counter(1)@101, notin request(_, _)@101.
response(From, X)@async :- counter(X), request(To, From).
response(“node2”, 0)@102:- response(“node2”, 0)@101.
counter(0)@0.
request(“node1”, “node2”)@0.
counter(1)@1.
counter(1)@2.
[…]
response(“node2”, 0)@100.
counter(1)@101.
counter(1)@102.
response(“node2”, 0)@101.
response(“node2”, 0)@102.
[…]
A stable model
for choice = 100
Traces and models
counter(X+1)@next :- counter(X), request(_, _).
counter(X)@next :- counter(X), notin request(_, _).
response(@From, X)@async :- counter(X), request(To, From).
response(From, X)@next :- response(From, X).
counter(0)@0.
request(“node1”, “node2”)@0.
Stable models:
{
counter(0)@0, counter(1)@1, counter(1)@2, […]
response(“node2”, 0)@k, response(“node2”, 0)@k+1, […]
}
Studying confluence in Dedalus
Bob
Alice
q(Bob,1)@1
q(Bob, 2)@2
e(1). e(2).
replica(Bob).
Replica(Carol).
p(1), p(2)
Carol
UUM
q(#L, X)@async <- e(X), replica(L).
p(X) <- q(_, X)
p(X)@next <- p(X).
q(Carol, 2)@1
q(Carol, 1)@2
p(1), p(2)
Studying confluence in Dedalus
Bob
Alice
q(Bob,1)@1
r(Bob, 1)@2
e(1). f(1).
replica(Bob).
replica(Carol).
q(#L, X)@async <- e(X), replica(L).
r(#L, X)@async <- f(X), replica(L).
p(X) <- q(_, X), r(_, X).
p(X)@next <- p(X).
{}
Multiple
ultimate
models
Carol
q(Carol, 1)@1
r(Carol, 1)@1
p(1)
Studying confluence in Dedalus
Bob
Alice
q(Bob,1)@1
r(Bob, 1)@2
e(1). f(1).
replica(Bob).
replica(Carol).
q(#L, X)@async <- e(X), replica(L).
r(#L, X)@async <- f(X), replica(L).
p(X) <- q(_, X), r(_, X).
p(X)@next <- p(X).
q(L, X)@next <- q(L, X).
p(1)
Carol
Multiple
ultimate
models
r(Carol, 1)@1
q(Carol, 1)@2
{}
Studying confluence in Dedalus
Bob
Alice
q(Bob,1)@1
r(Bob, 1)@2
e(1). f(1).
replica(Bob).
replica(Carol).
p(1)
Carol
UUM
q(#L, X)@async <- e(X), replica(L).
r(#L, X)@async <- f(X), replica(L).
p(X) <- q(_, X), r(_, X).
p(X)@next <- p(X).
q(L, X)@next <- q(L, X).
r(L, X)@next <- r(L, X).
r(Carol, 1)@1
q(Carol, 1)@2
p(1)
Studying confluence in Dedalus
Bob
Alice
q(Bob,1)@1
r(Bob, 1)@2
e(1). f(1).
replica(Bob).
replica(Carol).
q(#L, X)@async <- e(X), replica(L).
r(#L, X)@async <- f(X), replica(L).
p(X) <- q(_, X), NOT r(_, X).
p(X)@next <- p(X).
q(L, X)@next <- q(L, X).
r(L, X)@next <- r(L, X).
p(1)
Carol
Multiple
ultimate
models
r(Carol, 1)@1
q(Carol, 1)@2
{}
CALM – Consistency as logical
monotonicity
• Logically monotonic => confluent
• Consequence: a (conservative) static
analysis for eventual consistency
• Practical implications:
– Language support for weakly-consistent,
coordination-free distributed systems!
Does CALM help?
• Is the monotonic subset of Dedalus
sufficiently expressive / convenient to
implement distributed systems?
Coordination
• CALM’s complement:
– Nonmonotonic => order-sensitive
– Ensuring deterministic outcomes may
require controlling order.
• We could constrain the order of
– Data
• E.g., via ordered delivery
– Computation
• E.g., via evaluation barriers
Coordination mechanisms
Alice
e(1). f(1).
replica(Bob).
replica(Carol).
Bob
r(Bob,1)@1
q(Bob, 1)@2
{}
Carol
Approach 1:
Deliver the q() and r() tuples in the same
total order to all replicas.
r(Bob,1)@1
q(Bob, 1)@2
{}
Coordination mechanisms
Bob
Alice
e(1). f(1).
replica(Bob).
replica(Carol).
q(Bob,1)@1
r(Bob, 1)@2
p(X) <- q(_, X),
NOT r(_, X).
{}
Carol
Approach 2:
Do not evaluate “NOT r(X)” until its
contents are completely determined.
r(Bob,1)@1
q(Bob, 1)@2
{}
Ordered delivery vs.
stratification
(Differences)
• Stratified evaluation
– Unique outcome across all executions
– Finite inputs
– Communication between producers and
consumers
• Ordered delivery
– Different outcomes in different runs
– No restriction on inputs
– Multiple producers and consumers => need
distributed consensus
Ordered delivery vs.
stratification
(Similarities)
• Stratified evaluation
– Control order of evaluation at a course grain
• table by table.
– Order is given by program syntax
• Ordered delivery
– Fine-grained order of evaluation
• Row by row
– Order is ND chosen by oracle (e.g. Paxos)
– Analogy:
• Assign a stratum to each tuple.
• Ensure that all replicas see the same stratum
assignments

similar documents