mapreduce timeout

Deyan Dyankov dyankov at cloudxcel.com
Sun Jul 14 07:16:13 EDT 2013


Hi everyone,

first time here. Thanks in advance.

I am experiencing issues with MapReduce and it seems to timeout after a certain volume data threshold is reached. The reducer is only one and here is the mapreduce initiation script:
#!/usr/bin/env ruby
[…]
@client = Riak::Client.new(
  :nodes => [
    {:host => 'db1', :pb_port => 8087, :http_port => 8098},
    {:host => 'db2', :pb_port => 8087, :http_port => 8098},
    {:host => 'db3', :pb_port => 8087, :http_port => 8098}
  ],
  :protocol => 'pbc'
)

start_key = "#{cust}:#{setup}:#{start_time}"
end_key = "#{cust}:#{setup}:#{end_time}"

result = Riak::MapReduce.new(@client).
  index(bucket_name, index_name, start_key..end_key).
  map('map95th').
  reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => true).
  run()

puts result

The following is the code for the map95th and reduce95th javascript functions:
function map95th(v, keyData, arg) {
  var key_elements = v['key'].split(':');
  var cust = key_elements[0];
  var setup = key_elements[1];
  var sid = key_elements[2];
  var ts = key_elements[3];

  var result_key = cust + ':' + setup + ':' + ts;
  var obj = {}
  var obj_data = Riak.mapValuesJson(v)[0];

  obj_data['bps'] = (obj_data['rx_bytes'] + obj_data['tx_bytes']) / 60;
  return_val = obj_data['bps'];
  return [ return_val ];
}

// if used, this must be a single reducer! Call from Ruby like this:
//  reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => true).
function reduce95th(values) {
  var sorted = values.sort(function(a,b) { return a - b; });
  var pct = sorted.length / 100;
  var element_95th = pct * 95;
  element_95th = parseInt(element_95th, 10) + 1;

  return [ sorted[element_95th] ];
}



Now here is the interesting part. The MR goes through one record per minute. If I run it for a period of less than ~20 days, it executes. Otherwise, it times out:
[deyandyankov at azobook ~/repos/loshko/mapreduce/ruby (master)]$
[deyandyankov at azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb yellingtone default $((`date +%s` - 20 * 86400)) `date +%s`
125581.51666666666
[deyandyankov at azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb yellingtone default $((`date +%s` - 30 * 86400)) `date +%s`
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:182:in `decode_response': Expected success from Riak but received 0. {"phase":1,"error":"timeout","input":null,"type":null,"stack":null} (Riak::ProtobuffsFailedRequest)
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:116:in `mapred'
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:325:in `block in mapred'
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:435:in `block in recover_from'
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/innertube-1.0.2/lib/innertube.rb:127:in `take'
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:433:in `recover_from'
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:379:in `protobuffs'
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:133:in `backend'
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:324:in `mapred'
	from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/map_reduce.rb:217:in `run'
	from ./95h.rb:29:in `<main>'
[deyandyankov at azobook ~/repos/loshko/mapreduce/ruby (master)]$

The records being processed look lie this:
{"rx_bytes":3485395.0,"tx_bytes":1658479.0}

When running the script with more than 20 days worth of data (two records per minute are processed, which amounts to 2 * 60 * 24 * 20 = more than 57,600 processed), the script times out and here are some things from the logs:
==> /var/log/riak/erlang.log.1 <==
Erlang has closed

==> /var/log/riak/error.log <==
2013-07-14 13:03:51.580 [error] <0.709.0>@riak_pipe_vnode:new_worker:768 Pipe worker startup failed:fitting was gone before startup

==> /var/log/riak/console.log <==
2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in state wait_for_input terminated with reason: timeout

==> /var/log/riak/error.log <==
2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in state wait_for_input terminated with reason: timeout

==> /var/log/riak/console.log <==
2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process <0.22049.4326> with 0 neighbours exited with reason: timeout in gen_fsm:terminate/7 line 611

==> /var/log/riak/crash.log <==
2013-07-14 13:03:51 =CRASH REPORT====
  crasher:
    initial call: riak_pipe_vnode_worker:init/1
    pid: <0.22049.4326>
    registered_name: []
    exception exit: {timeout,[{gen_fsm,terminate,7,[{file,"gen_fsm.erl"},{line,611}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,227}]}]}
    ancestors: [<0.710.0>,<0.709.0>,riak_core_vnode_sup,riak_core_sup,<0.129.0>]
    messages: []
    links: [<0.710.0>,<0.709.0>]
    dictionary: [{eunit,[{module,riak_pipe_vnode_worker},{partition,388211372416021087647853783690262677096107081728},{<0.709.0>,<0.709.0>},{details,{fitting_details,{fitting,<18125.23420.4566>,#Ref<18125.0.5432.50467>,<<"C�������������������">>,1},1,riak_kv_w_reduce,{rct,#Fun<riak_kv_w_reduce.0.20542221>,{struct,[{<<"reduce_phase_only_1">>,true}]}},{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined},[{log,sink},{trace,[error]},{sink,{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined}},{sink_type,{fsm,10,infinity}}],64}}]}]
    trap_exit: false
    status: running
    heap_size: 832040
    stack_size: 24
    reductions: 1456611
  neighbours:

==> /var/log/riak/error.log <==
2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process <0.22049.4326> with 0 neighbours exited with reason: timeout in gen_fsm:terminate/7 line 611

==> /var/log/riak/crash.log <==
2013-07-14 13:03:52 =SUPERVISOR REPORT====
     Supervisor: {<0.710.0>,riak_pipe_vnode_worker_sup}
     Context:    child_terminated
     Reason:     timeout
     Offender:   [{pid,<0.22049.4326>},{name,undefined},{mfargs,{riak_pipe_vnode_worker,start_link,undefined}},{restart_type,temporary},{shutdown,2000},{child_type,worker}]


==> /var/log/riak/console.log <==
2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor riak_pipe_vnode_worker_sup had child undefined started with {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with reason timeout in context child_terminated

==> /var/log/riak/error.log <==
2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor riak_pipe_vnode_worker_sup had child undefined started with {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with reason timeout in context child_terminated


The data is in leveldb and is accessed through secondary indexes. 
This is a 3 node cluster with 32GB ram, current usage is about 12G per node. n_val=3. The same issues occurs on a similar 2 node cluster with 8GB of ram (usage is ~6G).

The following is my app.config:
[
 {riak_api, [
            {pb_ip,   "0.0.0.0" },
            {pb_port, 8087 },
            {pb_backlog, 100 }
            ]},
 {riak_core, [
              {default_bucket_props, [
                    {n_val, 3},
                    {last_write_wins, true}
                    ]},
              {ring_state_dir, "/storage/riak/ring"},
              {ring_creation_size, 256},
              {http, [ {"0.0.0.0", 8098 } ]},
              {https, [{ "0.0.0.0", 8069 }]},
              {ssl, [
                     {certfile, "/etc/ssl/riak/server.crt"},
                     {cacertfile, "/etc/ssl/riak/root.crt"},
                     {keyfile, "/etc/ssl/riak/server.key"}
                    ]},
              {handoff_port, 8099 },
              {dtrace_support, false},
              {enable_health_checks, true},
              {platform_bin_dir, "/usr/sbin"},
              {platform_data_dir, "/storage/riak"},
              {platform_etc_dir, "/etc/riak"},
              {platform_lib_dir, "/usr/lib/riak/lib"},
              {platform_log_dir, "/var/log/riak"}
             ]},
 {riak_kv, [
            {storage_backend, riak_kv_eleveldb_backend},
            {anti_entropy, {on, []}},
            {anti_entropy_build_limit, {1, 3600000}},
            {anti_entropy_expire, 604800000},
            {anti_entropy_concurrency, 2},
            {anti_entropy_tick, 15000},
            {anti_entropy_data_dir, "/storage/riak/anti_entropy"},
            {anti_entropy_leveldb_opts, [{write_buffer_size, 4194304},
                                         {max_open_files, 20}]},

            {mapred_name, "mapred"},
            {mapred_2i_pipe, true},
            {map_js_vm_count, 16 },
            {reduce_js_vm_count, 12 },
            {hook_js_vm_count, 20 },
            {js_max_vm_mem, 8},
            {js_thread_stack, 16},
            {js_source_dir, "/etc/riak/mapreduce/js_source"},
            {http_url_encoding, on},
            {vnode_vclocks, true},
            {listkeys_backpressure, true},
            {vnode_mailbox_limit, {1, 5000}}
           ]},

 {riak_search, [
                {enabled, true}
               ]},

 {merge_index, [
                {data_root, "/storage/riak/merge_index"},
                {buffer_rollover_size, 1048576},
                {max_compact_segments, 20}
               ]},

 {bitcask, [
             {data_root, "/storage/riak/bitcask"}
           ]},

 {eleveldb, [
             {cache_size, 1024},
             {max_open_files, 64},
             {data_root, "/storage/riak/leveldb"}
            ]},

 {lager, [
            {handlers, [
                           {lager_file_backend, [
                               {"/var/log/riak/error.log", error, 10485760, "$D0", 5},
                               {"/var/log/riak/console.log", info, 10485760, "$D0", 5}
                           ]}
                       ] },

            {crash_log, "/var/log/riak/crash.log"},
            {crash_log_msg_size, 65536},
            {crash_log_size, 10485760},
            {crash_log_date, "$D0"},
            {crash_log_count, 5},
            {error_logger_redirect, true}
        ]},

 {riak_sysmon, [
         {process_limit, 30},
         {port_limit, 2},
         {gc_ms_limit, 0},
         {heap_word_limit, 40111000},
         {busy_port, true},
         {busy_dist_port, true}
        ]},

 {sasl, [
         {sasl_error_logger, false}
        ]},

Sorry to bug you with such a long e-mail but I wanted to be as thorough as possible. I tried raising a few options but it didn't help: map_js_vm_count, reduce_js_vm_count, js_max_vm_mem
I also tried adding a timeout argument to the map reduce caller code but even if I set it to 60,000 or more (this is milliseconds), the script is terminating with timeout error after 10-12 secs. The same behaviour is observed if I use http instead of pbc.

What seems to be the problem? Is this a matter of configuration? I am surprised about the fact that the job runs with 20-25 days of data and not more.

thank you for your efforts,
Deyan
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.basho.com/pipermail/riak-users_lists.basho.com/attachments/20130714/9da7e8cc/attachment.html>


More information about the riak-users mailing list