mapreduce timeout

Christian Dahlqvist christian at basho.com
Tue Jul 23 02:32:09 EDT 2013


Hi Deyan,

As mentioned, it is recommended to write reduce phases it so that it can run recursively [1], so that you can avoid having to use the 'reduce_phase_only_1' parameter. Once you have a reduce function that behaves this way, you can tune it by overriding the size of the 'reduce_phase_batch_size' parameter, which by default is 20. You can also specify the 'do_prereduce' parameter on the preceding map phase to make the first reduce iteration run in parallel across the cluster before finishing off at the coordinating node. This can significantly reduce the amount of data transferred across the cluster and speed up performance, although it may make the reduce phase functions a bit more difficult to write as they need to be able to handle a mix of output from the preceding map phase as well as results from previous iterations of the reduce phase.

As JavaScript map and reduce functions use VMs from a pool specified in the app.config file (map_js_vm_count and reduce_js_vm_count parameters), you will need to tune the size of these parameters based on your processing needs. As map phases run in parallel close to the partitions that hold the data, they often require considerably more VMs available than reduce phase functions. The exact number depends on the number of your ring size, the number of map phases you have in the job and the number of concurrent jobs you will be running. [2]

Using JavaScript functions is however considerably slower than using functions implemented in Erlang. For any functions that will execute regularly or as part of large jobs, we always recommend rewriting them in Erlang. In addition to speed things up, it removes the reliance on the JavaScript pools.

[1] http://docs.basho.com/riak/latest/dev/advanced/mapreduce/#How-Phases-Work
[2] http://riak-users.197444.n3.nabble.com/Follow-up-Riak-Map-Reduce-error-preflist-exhausted-td4024330.html


Best regards,

Christian


On 22 Jul 2013, at 17:03, Deyan Dyankov <dyankov at cloudxcel.com> wrote:

> Thanks Christian,
> 
> I was able to modify the job code in a similar manner as you suggested and the issue is now resolved. However, I'd still like to understand the cause of these timeouts and what parameter should be raised, if possible, to mitigate them? This particular job was not expected to perform in real time and we were willing to wait for it. We may have other such cases in the future..
> 
> best regards,
> Deyan
> 
> On Jul 15, 2013, at 4:49 PM, Christian Dahlqvist <christian at basho.com> wrote:
> 
>> Hi Deyan,
>> 
>> When running mapreduce jobs, reduce phases often end up being the bottleneck. This is especially true when all input data needs to be gathered on the coordinating node before it can be executed, as is the case if the reduce_phase_only_1 flag is enabled. Having this flag set will cause the mapreduce job to not scale very well.
>> 
>> Depending on your exact requirements, it may be worthwhile considering gathering the histogram data periodically, e.g. per hour and/or day. These aggregates can then be stored in separate buckets with a key that describes the content, e.g. <cust>_<setup>_<date> . Once this has been done, you can efficiently retrieve a limited number of objects that cover the period you want to get statistics for directly through the descriptive keys, and process these in the application layer. Even though this periodically requires a bit more work, it will most likely be much more efficient at query time and scale better.
>> 
>> Best regards,
>> 
>> Christian
>> 
>> 
>> On 14 Jul 2013, at 12:16, Deyan Dyankov <dyankov at cloudxcel.com> wrote:
>> 
>>> Hi everyone,
>>> 
>>> first time here. Thanks in advance.
>>> 
>>> I am experiencing issues with MapReduce and it seems to timeout after a certain volume data threshold is reached. The reducer is only one and here is the mapreduce initiation script:
>>> #!/usr/bin/env ruby
>>> […]
>>> @client = Riak::Client.new(
>>>   :nodes => [
>>>     {:host => 'db1', :pb_port => 8087, :http_port => 8098},
>>>     {:host => 'db2', :pb_port => 8087, :http_port => 8098},
>>>     {:host => 'db3', :pb_port => 8087, :http_port => 8098}
>>>   ],
>>>   :protocol => 'pbc'
>>> )
>>> 
>>> start_key = "#{cust}:#{setup}:#{start_time}"
>>> end_key = "#{cust}:#{setup}:#{end_time}"
>>> 
>>> result = Riak::MapReduce.new(@client).
>>>   index(bucket_name, index_name, start_key..end_key).
>>>   map('map95th').
>>>   reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => true).
>>>   run()
>>> 
>>> puts result
>>> 
>>> The following is the code for the map95th and reduce95th javascript functions:
>>> function map95th(v, keyData, arg) {
>>>   var key_elements = v['key'].split(':');
>>>   var cust = key_elements[0];
>>>   var setup = key_elements[1];
>>>   var sid = key_elements[2];
>>>   var ts = key_elements[3];
>>> 
>>>   var result_key = cust + ':' + setup + ':' + ts;
>>>   var obj = {}
>>>   var obj_data = Riak.mapValuesJson(v)[0];
>>> 
>>>   obj_data['bps'] = (obj_data['rx_bytes'] + obj_data['tx_bytes']) / 60;
>>>   return_val = obj_data['bps'];
>>>   return [ return_val ];
>>> }
>>> 
>>> // if used, this must be a single reducer! Call from Ruby like this:
>>> //  reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => true).
>>> function reduce95th(values) {
>>>   var sorted = values.sort(function(a,b) { return a - b; });
>>>   var pct = sorted.length / 100;
>>>   var element_95th = pct * 95;
>>>   element_95th = parseInt(element_95th, 10) + 1;
>>> 
>>>   return [ sorted[element_95th] ];
>>> }
>>> 
>>> 
>>> 
>>> Now here is the interesting part. The MR goes through one record per minute. If I run it for a period of less than ~20 days, it executes. Otherwise, it times out:
>>> [deyandyankov at azobook ~/repos/loshko/mapreduce/ruby (master)]$
>>> [deyandyankov at azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb yellingtone default $((`date +%s` - 20 * 86400)) `date +%s`
>>> 125581.51666666666
>>> [deyandyankov at azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb yellingtone default $((`date +%s` - 30 * 86400)) `date +%s`
>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:182:in `decode_response': Expected success from Riak but received 0. {"phase":1,"error":"timeout","input":null,"type":null,"stack":null} (Riak::ProtobuffsFailedRequest)
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:116:in `mapred'
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:325:in `block in mapred'
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:435:in `block in recover_from'
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/innertube-1.0.2/lib/innertube.rb:127:in `take'
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:433:in `recover_from'
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:379:in `protobuffs'
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:133:in `backend'
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:324:in `mapred'
>>> 	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/map_reduce.rb:217:in `run'
>>> 	from ./95h.rb:29:in `<main>'
>>> [deyandyankov at azobook ~/repos/loshko/mapreduce/ruby (master)]$
>>> 
>>> The records being processed look lie this:
>>> {"rx_bytes":3485395.0,"tx_bytes":1658479.0}
>>> 
>>> When running the script with more than 20 days worth of data (two records per minute are processed, which amounts to 2 * 60 * 24 * 20 = more than 57,600 processed), the script times out and here are some things from the logs:
>>> ==> /var/log/riak/erlang.log.1 <==
>>> Erlang has closed
>>> 
>>> ==> /var/log/riak/error.log <==
>>> 2013-07-14 13:03:51.580 [error] <0.709.0>@riak_pipe_vnode:new_worker:768 Pipe worker startup failed:fitting was gone before startup
>>> 
>>> ==> /var/log/riak/console.log <==
>>> 2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in state wait_for_input terminated with reason: timeout
>>> 
>>> ==> /var/log/riak/error.log <==
>>> 2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in state wait_for_input terminated with reason: timeout
>>> 
>>> ==> /var/log/riak/console.log <==
>>> 2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process <0.22049.4326> with 0 neighbours exited with reason: timeout in gen_fsm:terminate/7 line 611
>>> 
>>> ==> /var/log/riak/crash.log <==
>>> 2013-07-14 13:03:51 =CRASH REPORT====
>>>   crasher:
>>>     initial call: riak_pipe_vnode_worker:init/1
>>>     pid: <0.22049.4326>
>>>     registered_name: []
>>>     exception exit: {timeout,[{gen_fsm,terminate,7,[{file,"gen_fsm.erl"},{line,611}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,227}]}]}
>>>     ancestors: [<0.710.0>,<0.709.0>,riak_core_vnode_sup,riak_core_sup,<0.129.0>]
>>>     messages: []
>>>     links: [<0.710.0>,<0.709.0>]
>>>     dictionary: [{eunit,[{module,riak_pipe_vnode_worker},{partition,388211372416021087647853783690262677096107081728},{<0.709.0>,<0.709.0>},{details,{fitting_details,{fitting,<18125.23420.4566>,#Ref<18125.0.5432.50467>,<<"C�������������������">>,1},1,riak_kv_w_reduce,{rct,#Fun<riak_kv_w_reduce.0.20542221>,{struct,[{<<"reduce_phase_only_1">>,true}]}},{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined},[{log,sink},{trace,[error]},{sink,{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined}},{sink_type,{fsm,10,infinity}}],64}}]}]
>>>     trap_exit: false
>>>     status: running
>>>     heap_size: 832040
>>>     stack_size: 24
>>>     reductions: 1456611
>>>   neighbours:
>>> 
>>> ==> /var/log/riak/error.log <==
>>> 2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process <0.22049.4326> with 0 neighbours exited with reason: timeout in gen_fsm:terminate/7 line 611
>>> 
>>> ==> /var/log/riak/crash.log <==
>>> 2013-07-14 13:03:52 =SUPERVISOR REPORT====
>>>      Supervisor: {<0.710.0>,riak_pipe_vnode_worker_sup}
>>>      Context:    child_terminated
>>>      Reason:     timeout
>>>      Offender:   [{pid,<0.22049.4326>},{name,undefined},{mfargs,{riak_pipe_vnode_worker,start_link,undefined}},{restart_type,temporary},{shutdown,2000},{child_type,worker}]
>>> 
>>> 
>>> ==> /var/log/riak/console.log <==
>>> 2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor riak_pipe_vnode_worker_sup had child undefined started with {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with reason timeout in context child_terminated
>>> 
>>> ==> /var/log/riak/error.log <==
>>> 2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor riak_pipe_vnode_worker_sup had child undefined started with {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with reason timeout in context child_terminated
>>> 
>>> 
>>> The data is in leveldb and is accessed through secondary indexes. 
>>> This is a 3 node cluster with 32GB ram, current usage is about 12G per node. n_val=3. The same issues occurs on a similar 2 node cluster with 8GB of ram (usage is ~6G).
>>> 
>>> The following is my app.config:
>>> [
>>>  {riak_api, [
>>>             {pb_ip,   "0.0.0.0" },
>>>             {pb_port, 8087 },
>>>             {pb_backlog, 100 }
>>>             ]},
>>>  {riak_core, [
>>>               {default_bucket_props, [
>>>                     {n_val, 3},
>>>                     {last_write_wins, true}
>>>                     ]},
>>>               {ring_state_dir, "/storage/riak/ring"},
>>>               {ring_creation_size, 256},
>>>               {http, [ {"0.0.0.0", 8098 } ]},
>>>               {https, [{ "0.0.0.0", 8069 }]},
>>>               {ssl, [
>>>                      {certfile, "/etc/ssl/riak/server.crt"},
>>>                      {cacertfile, "/etc/ssl/riak/root.crt"},
>>>                      {keyfile, "/etc/ssl/riak/server.key"}
>>>                     ]},
>>>               {handoff_port, 8099 },
>>>               {dtrace_support, false},
>>>               {enable_health_checks, true},
>>>               {platform_bin_dir, "/usr/sbin"},
>>>               {platform_data_dir, "/storage/riak"},
>>>               {platform_etc_dir, "/etc/riak"},
>>>               {platform_lib_dir, "/usr/lib/riak/lib"},
>>>               {platform_log_dir, "/var/log/riak"}
>>>              ]},
>>>  {riak_kv, [
>>>             {storage_backend, riak_kv_eleveldb_backend},
>>>             {anti_entropy, {on, []}},
>>>             {anti_entropy_build_limit, {1, 3600000}},
>>>             {anti_entropy_expire, 604800000},
>>>             {anti_entropy_concurrency, 2},
>>>             {anti_entropy_tick, 15000},
>>>             {anti_entropy_data_dir, "/storage/riak/anti_entropy"},
>>>             {anti_entropy_leveldb_opts, [{write_buffer_size, 4194304},
>>>                                          {max_open_files, 20}]},
>>> 
>>>             {mapred_name, "mapred"},
>>>             {mapred_2i_pipe, true},
>>>             {map_js_vm_count, 16 },
>>>             {reduce_js_vm_count, 12 },
>>>             {hook_js_vm_count, 20 },
>>>             {js_max_vm_mem, 8},
>>>             {js_thread_stack, 16},
>>>             {js_source_dir, "/etc/riak/mapreduce/js_source"},
>>>             {http_url_encoding, on},
>>>             {vnode_vclocks, true},
>>>             {listkeys_backpressure, true},
>>>             {vnode_mailbox_limit, {1, 5000}}
>>>            ]},
>>> 
>>>  {riak_search, [
>>>                 {enabled, true}
>>>                ]},
>>> 
>>>  {merge_index, [
>>>                 {data_root, "/storage/riak/merge_index"},
>>>                 {buffer_rollover_size, 1048576},
>>>                 {max_compact_segments, 20}
>>>                ]},
>>> 
>>>  {bitcask, [
>>>              {data_root, "/storage/riak/bitcask"}
>>>            ]},
>>> 
>>>  {eleveldb, [
>>>              {cache_size, 1024},
>>>              {max_open_files, 64},
>>>              {data_root, "/storage/riak/leveldb"}
>>>             ]},
>>> 
>>>  {lager, [
>>>             {handlers, [
>>>                            {lager_file_backend, [
>>>                                {"/var/log/riak/error.log", error, 10485760, "$D0", 5},
>>>                                {"/var/log/riak/console.log", info, 10485760, "$D0", 5}
>>>                            ]}
>>>                        ] },
>>> 
>>>             {crash_log, "/var/log/riak/crash.log"},
>>>             {crash_log_msg_size, 65536},
>>>             {crash_log_size, 10485760},
>>>             {crash_log_date, "$D0"},
>>>             {crash_log_count, 5},
>>>             {error_logger_redirect, true}
>>>         ]},
>>> 
>>>  {riak_sysmon, [
>>>          {process_limit, 30},
>>>          {port_limit, 2},
>>>          {gc_ms_limit, 0},
>>>          {heap_word_limit, 40111000},
>>>          {busy_port, true},
>>>          {busy_dist_port, true}
>>>         ]},
>>> 
>>>  {sasl, [
>>>          {sasl_error_logger, false}
>>>         ]},
>>> 
>>> Sorry to bug you with such a long e-mail but I wanted to be as thorough as possible. I tried raising a few options but it didn't help: map_js_vm_count, reduce_js_vm_count, js_max_vm_mem
>>> I also tried adding a timeout argument to the map reduce caller code but even if I set it to 60,000 or more (this is milliseconds), the script is terminating with timeout error after 10-12 secs. The same behaviour is observed if I use http instead of pbc.
>>> 
>>> What seems to be the problem? Is this a matter of configuration? I am surprised about the fact that the job runs with 20-25 days of data and not more.
>>> 
>>> thank you for your efforts,
>>> Deyan
>>> _______________________________________________
>>> riak-users mailing list
>>> riak-users at lists.basho.com
>>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>> 
> 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.basho.com/pipermail/riak-users_lists.basho.com/attachments/20130723/aec8a664/attachment.html>


More information about the riak-users mailing list