"consistent" map/reduce

Kresten Krab Thorup krab at trifork.com
Mon Nov 28 12:38:48 EST 2011

Super. Thanks. I'll have to play around with it a bit and get back to you. 


On 28/11/2011, at 16.17, "Bryan Fink" <bryan at basho.com> wrote:

> On Mon, Nov 21, 2011 at 2:46 PM, Kresten Krab Thorup <krab at trifork.com> wrote:
>> I'd like to be able to do a "consistent map/reduce" job i.e., with "R=2 semantics" for an "N=3 bucket".  Maybe other people have the same need, but I can't see if this is possible ... perhaps with the new riak_pipe infrastructure?
> Hi, Kresten.  Indeed, this same topic came up in an independent
> conversation last week.  I think there are a few ways to attack it.
> Let's start with yours:
>> The map function yields {Key, [{VectorClock,1,Hash}]} for each replica, but needs to run on *all* replicas of objects in a given Bucket.   Hash is the real value I'm interested in i.e., the content-hash for the object; but it could be some other "map" function output.
>> Then, the reduce phase needs to "merge" a list of {VectorClock,N,Hash} tuples, by considering the VectorClocks to determine if results are in "conflict", or if one is before/after the other.  N is reduced to the sum of all elements with equal Hash value.
> I like many of the ideas in this approach.  It has a nice distributed
> data-provenance feel to it.  I think the danger lies in the work that
> the reduce phase would have to do.  With distributed parallel
> keylisting, there's no way to guarantee the order that the map results
> arrive at the reduce processor.  This means that the reduce state may
> become quite large tracking all of the key/version pairs
> produced-but-not-yet-satisfying-R.  Maybe this is manageable, though,
> so I'll also try to answer your questions:
>> - How can I have a M/R job run on *all* vnodes?  Not just for objects that are owned by a primary?
> The only way to do this right now is to use Riak Pipe directly.  Setup
> a pipe with your "map" and "reduce" fittings, then send inputs to it
> such that one input hashes to each vnode.  Using riak_pipe_qcover_fsm
> with N=1 might ease this process.
>> - The M/R "input" is essentially  listkeys(Bucket)  ... can this be done using "async keylisting", so that the operation does not hold up the vnode while listing?
> Yes, absolutely.  The riak_kv_pipe_listkeys module does just this, and
> is also an example of using riak_pipe_qcover_fsm.
> These two questions and answers lead to the basic pipe layout of:
> [
> {module= riak_kv_pipe_listkeys}, %% setup with qcover N=1
> {module= riak_kv_pipe_get,
>  chashfun= follow},              %% each vnode processes keys it produces
> {module= riak_kv_mrc_map,
>  chashfun= follow},
> {module= riak_kv_w_reduce,
>  chashfun= ContstantOrCustom}    %% see below
> ]
> Riak KV reduce fittings are normally set up with a constant chashfun,
> such that *all* results are processed in one place.  To help alleviate
> the reduce-state-size problem, I might suggest using a chashfun that
> spreads results, yet makes sure all results for each key end up at the
> same reducer (most likely, hash the result's key, just as you would
> for determining its KV preflist).
> Note also that the "get" fitting has a 'follow' chashfun.  This is
> also different from normal MR usage, since we would normally set N=3
> (or whatever the bucket has set) for the qcover-ing listkeys fitting.
> The normal setting ensures that each key is produced only once, but
> N=1 will produce the same key multiple times for buckets where N>1.
> You want each result processed locally ('follow') to get the
> possibly-different vclock/hash stored at that vnode.
> It may also be possible to take a completely different approach.  A
> simple modification of riak_kv_pipe_get could allow it to attempt to
> read all N replicas, perhaps even by simply starting a
> riak_kv_get_fsm.  In this case, all of the merging of vclocks would
> happen before the mapping instead of after.  But, it would also miss
> keys that were not fully replicated, since you'd likely want to
> maintain N=3 for the keylisting qcover operation.  I also haven't put
> as much thought into this path, so there may be other demons lurking.
>> If someone can sketch a solution, I'd be happy to go hacking on it ...
> Hopefully that's enough sketching to at least generate a second round
> of questions.  ;)  I'd be very interested in hearing how it goes.
> Please fire back with anything that needs more explanation.
> -Bryan

More information about the riak-users mailing list