Map phase timeout

Dmitri Zagidulin dzagidulin at
Mon Apr 8 11:01:50 EDT 2013


My recommendation to you is - don't use MapReduce for this use case. Fetch
the objects via regular Riak GETs (using connection pooling and
multithreading, preferably).

I'm assuming that you have a list of keys (either by keeping track of them
externally to Riak, or via a Secondary Index query or a Search query), and
you want to back up those objects.

The natural inclination, once you know the keys, is to want to fetch all of
those objects via a single query, and MapReduce immediately comes to mind.
(And to most developers, writing the MR function in Javascript is easier
and more familiar than in Erlang). Unfortunately, as Christian mentioned,
it's very easy for the JS VMs to run out of resources and crash or time
out. In addition, I've found that rewriting the MapReduce in Erlang affords
only a bit more resources -- once you hit a certain number of keys that you
want to fetch, or a certain object size threshold, even Erlang MR jobs can
time out (keep in mind, while the Map phase can happen in parallel on all
of the nodes in a cluster, all the object values have to be serialized on
the single coordinating node, which becomes the bottleneck).

The workaround for this, even though it might seem counter-intuitive, is --
if you know the list of keys, fetch them using GETs. Even a naive
single-threaded "while loop" way of fetching the objects can often be
faster than a MapReduce job (for this use case), and it doesn't time out.
Add to that connection-pooling and multiple worker threads, and this method
is invariably faster.


On Mon, Apr 8, 2013 at 4:27 AM, Christian Dahlqvist <christian at>wrote:

> Hi Matt,
> If you have a complicated mapreduce job containing multiple phases
> implemented in JavaScript, you will most likely see a lot of contention for
> the JavaScript VMs which will cause problems. While you can tune the
> configuration [1], you may find that you will need a very large pool size
> in order to properly support your job, especially for map phases as these
> run in parallel.
> The best way to speed up the mapreduce job and get around the VM pool
> contention is to implement the mapreduce functions in Erlang.
> Best regards,
> Christian
> [1]
> --------------------
> Christian Dahlqvist
> Client Services Engineer
> Basho Technologies
> EMEA Office
> E-mail: christian at
> Skype: c.dahlqvist
> Mobile: +44 7890 590 910
> On 8 Apr 2013, at 08:20, Matt Black < at> wrote:
> Thanks for the reply, Christian.
> I didn't explain well enough in my first post - the map reduce operation
> is merely loading a bunch of objects, and a Python script which makes the
> connection to Riak then will write these objects to disk. (It's probably
> obvious, but I'm using javascript and riak python client.)
> The query itself has many map phases where a composite object is built up
> from related objects spread across many buckets.
> I was hoping there may be some kind of timeout I could adjust on a per-map
> phase basis - clutching at straws really.
> Cheers
> Matt
> On 8 April 2013 17:14, Christian Dahlqvist <christian at> wrote:
>> Hi,
>> Without having access to the mapreduce functions you are running, I would
>> assume that a mapreduce job both writing data to disk as well as deleting
>> the written record from Riak might be quite slow. This is not really a use
>> case mapreduce was designed for, and when a mapreduce job crashes or times
>> out it is difficult to know how far along the processing of different
>> records it got.
>> I would therefore recommend considering running this type of archiving
>> and delete job as an external batch process instead as it will give you
>> better control over the execution and avoid timeout problems.
>> Best regards,
>> Christian
>> On 8 Apr 2013, at 00:49, Matt Black < at> wrote:
>> > Dear list,
>> >
>> > I'm currently getting a timeout during a single phase of a multi-phase
>> map reduce query. Is there anything I can do to assist this in running?
>> >
>> > It's purpose is to backup and remove objects from Riak, so it will run
>> periodically during quiet times moving old data out of Riak into file
>> storage.
>> >
>> > Traceback (most recent call last):
>> >   File "./tools/", line 185, in <module>
>> >     main()
>> >   File "./tools/", line 181, in main
>> >     args.func(**kwargs)
>> >   File "/srv/backup/tools/", line 295, in do_map_reduce
>> >     raise e
>> > Exception:
>> {"phase":2,"error":"timeout","input":"[<<\"cart-products\">>,<<\"cd67d7f6e2688bc2089e6fa79506ac05-2\">>,{struct,[{<<\"uid\">>,<<\"cd67d7f6e2688bc2089e6fa79506ac05\">>},{<<\"cart\">>,{struct,[{<<\"expired_ts\">>,<<\"2013-03-05T19:12:23.906228\">>},{<<\"last_updated\">>,<<\"2013-03-05T19:12:23.906242\">>},{<<\"tags\">>,{struct,[{<<\"type\">>,<<\"AB\">>}]}},{<<\"completed\">>,false},{<<\"created\">>,<<\"2013-03-04T02:10:18.638413\">>},{<<\"products\">>,[{struct,[{<<\"cost\">>,0},{<<\"bundleName\">>,<<\"Product\">>},...]},...]},...]}},...]}]","type":"exit","stack":"[{riak_kv_w_reduce,'-js_runner/1-fun-0-',3,[{file,\"src/riak_kv_w_reduce.erl\"},{line,283}]},{riak_kv_w_reduce,reduce,3,[{file,\"src/riak_kv_w_reduce.erl\"},{line,206}]},{riak_kv_w_reduce,maybe_reduce,2,[{file,\"src/riak_kv_w_reduce.erl\"},{line,157}]},{riak_pipe_vnode_worker,process_input,3,[{file,\"src/riak_pipe_vnode_worker.erl\"},{line,444}]},{riak_pipe_vnode_worker,wait_for_input,2,[{file,\"src/riak_pipe_vnode_worker.erl\"},{line,376}]},{gen_fsm,handle_msg,7,[{file,\"gen_fsm.erl\"},{line,494}]},{proc_lib,...}]"}
>> >
>> >
>> > _______________________________________________
>> > riak-users mailing list
>> > riak-users at
>> >
> _______________________________________________
> riak-users mailing list
> riak-users at
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <>

More information about the riak-users mailing list