"consistent" map/reduce

Bryan Fink bryan at basho.com
Mon Nov 28 10:17:19 EST 2011


On Mon, Nov 21, 2011 at 2:46 PM, Kresten Krab Thorup <krab at trifork.com> wrote:
> I'd like to be able to do a "consistent map/reduce" job i.e., with "R=2 semantics" for an "N=3 bucket".  Maybe other people have the same need, but I can't see if this is possible ... perhaps with the new riak_pipe infrastructure?

Hi, Kresten.  Indeed, this same topic came up in an independent
conversation last week.  I think there are a few ways to attack it.
Let's start with yours:

> The map function yields {Key, [{VectorClock,1,Hash}]} for each replica, but needs to run on *all* replicas of objects in a given Bucket.   Hash is the real value I'm interested in i.e., the content-hash for the object; but it could be some other "map" function output.
>
> Then, the reduce phase needs to "merge" a list of {VectorClock,N,Hash} tuples, by considering the VectorClocks to determine if results are in "conflict", or if one is before/after the other.  N is reduced to the sum of all elements with equal Hash value.

I like many of the ideas in this approach.  It has a nice distributed
data-provenance feel to it.  I think the danger lies in the work that
the reduce phase would have to do.  With distributed parallel
keylisting, there's no way to guarantee the order that the map results
arrive at the reduce processor.  This means that the reduce state may
become quite large tracking all of the key/version pairs
produced-but-not-yet-satisfying-R.  Maybe this is manageable, though,
so I'll also try to answer your questions:

> - How can I have a M/R job run on *all* vnodes?  Not just for objects that are owned by a primary?

The only way to do this right now is to use Riak Pipe directly.  Setup
a pipe with your "map" and "reduce" fittings, then send inputs to it
such that one input hashes to each vnode.  Using riak_pipe_qcover_fsm
with N=1 might ease this process.

> - The M/R "input" is essentially  listkeys(Bucket)  ... can this be done using "async keylisting", so that the operation does not hold up the vnode while listing?

Yes, absolutely.  The riak_kv_pipe_listkeys module does just this, and
is also an example of using riak_pipe_qcover_fsm.

These two questions and answers lead to the basic pipe layout of:

[
 {module= riak_kv_pipe_listkeys}, %% setup with qcover N=1

 {module= riak_kv_pipe_get,
  chashfun= follow},              %% each vnode processes keys it produces

 {module= riak_kv_mrc_map,
  chashfun= follow},

 {module= riak_kv_w_reduce,
  chashfun= ContstantOrCustom}    %% see below
]

Riak KV reduce fittings are normally set up with a constant chashfun,
such that *all* results are processed in one place.  To help alleviate
the reduce-state-size problem, I might suggest using a chashfun that
spreads results, yet makes sure all results for each key end up at the
same reducer (most likely, hash the result's key, just as you would
for determining its KV preflist).

Note also that the "get" fitting has a 'follow' chashfun.  This is
also different from normal MR usage, since we would normally set N=3
(or whatever the bucket has set) for the qcover-ing listkeys fitting.
The normal setting ensures that each key is produced only once, but
N=1 will produce the same key multiple times for buckets where N>1.
You want each result processed locally ('follow') to get the
possibly-different vclock/hash stored at that vnode.

It may also be possible to take a completely different approach.  A
simple modification of riak_kv_pipe_get could allow it to attempt to
read all N replicas, perhaps even by simply starting a
riak_kv_get_fsm.  In this case, all of the merging of vclocks would
happen before the mapping instead of after.  But, it would also miss
keys that were not fully replicated, since you'd likely want to
maintain N=3 for the keylisting qcover operation.  I also haven't put
as much thought into this path, so there may be other demons lurking.

> If someone can sketch a solution, I'd be happy to go hacking on it ...

Hopefully that's enough sketching to at least generate a second round
of questions.  ;)  I'd be very interested in hearing how it goes.
Please fire back with anything that needs more explanation.

-Bryan




More information about the riak-users mailing list