<html><head><meta http-equiv="Content-Type" content="text/html charset=utf-8"></head><body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; ">Hi Deyan,<div><br></div><div>When running mapreduce jobs, reduce phases often end up being the bottleneck. This is especially true when all input data needs to be gathered on the coordinating node before it can be executed, as is the case if the reduce_phase_only_1 flag is enabled. Having this flag set will cause the mapreduce job to not scale very well.</div><div><br></div><div>Depending on your exact requirements, it may be worthwhile considering gathering the histogram data periodically, e.g. per hour and/or day. These aggregates can then be stored in separate buckets with a key that describes the content, e.g. <cust>_<setup>_<date> . Once this has been done, you can efficiently retrieve a limited number of objects that cover the period you want to get statistics for directly through the descriptive keys, and process these in the application layer. Even though this periodically requires a bit more work, it will most likely be much more efficient at query time and scale better.</div><div><br></div><div>Best regards,</div><div><br></div><div>Christian</div><div><br></div><div><div apple-content-edited="true">
</div>
<br><div><div>On 14 Jul 2013, at 12:16, Deyan Dyankov <<a href="mailto:dyankov@cloudxcel.com">dyankov@cloudxcel.com</a>> wrote:</div><br class="Apple-interchange-newline"><blockquote type="cite"><meta http-equiv="Content-Type" content="text/html charset=utf-8"><div style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; ">Hi everyone,<div><br></div><div>first time here. Thanks in advance.</div><div><br></div><div>I am experiencing issues with MapReduce and it seems to timeout after a certain volume data threshold is reached. The reducer is only one and here is the mapreduce initiation script:</div><div>#!/usr/bin/env ruby</div><div>[…]</div><div><div>@client = Riak::Client.new(</div><div>  :nodes => [</div><div>    {:host => 'db1', :pb_port => 8087, :http_port => 8098},</div><div>    {:host => 'db2', :pb_port => 8087, :http_port => 8098},</div><div>    {:host => 'db3', :pb_port => 8087, :http_port => 8098}</div><div>  ],</div><div>  :protocol => 'pbc'</div><div>)</div><div><br></div><div>start_key = "#{cust}:#{setup}:#{start_time}"</div><div>end_key = "#{cust}:#{setup}:#{end_time}"</div><div><br></div><div>result = Riak::MapReduce.new(@client).</div><div>  index(bucket_name, index_name, start_key..end_key).</div><div>  map('map95th').</div><div>  reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => true).</div><div>  run()</div><div><br></div><div>puts result</div></div><div><br></div><div>The following is the code for the map95th and reduce95th javascript functions:</div><div><div>function map95th(v, keyData, arg) {</div><div>  var key_elements = v['key'].split(':');</div><div>  var cust = key_elements[0];</div><div>  var setup = key_elements[1];</div><div>  var sid = key_elements[2];</div><div>  var ts = key_elements[3];</div><div><br></div><div>  var result_key = cust + ':' + setup + ':' + ts;</div><div>  var obj = {}</div><div>  var obj_data = Riak.mapValuesJson(v)[0];</div><div><br></div><div>  obj_data['bps'] = (obj_data['rx_bytes'] + obj_data['tx_bytes']) / 60;</div><div>  return_val = obj_data['bps'];</div><div>  return [ return_val ];</div><div>}</div><div><br></div><div>// if used, this must be a single reducer! Call from Ruby like this:</div><div>//  reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => true).</div><div>function reduce95th(values) {</div><div>  var sorted = values.sort(function(a,b) { return a - b; });</div><div>  var pct = sorted.length / 100;</div><div>  var element_95th = pct * 95;</div><div>  element_95th = parseInt(element_95th, 10) + 1;</div><div><br></div><div>  return [ sorted[element_95th] ];</div><div>}</div></div><div><br></div><div><br></div><div><br></div><div>Now here is the interesting part. The MR goes through one record per minute. If I run it for a period of less than ~20 days, it executes. Otherwise, it times out:</div><div><div>[deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$</div><div>[deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb yellingtone default $((`date +%s` - 20 * 86400)) `date +%s`</div><div>125581.51666666666</div><div>[deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb yellingtone default $((`date +%s` - 30 * 86400)) `date +%s`</div><div>/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:182:in `decode_response': Expected success from Riak but received 0. {"phase":1,<b>"error":"timeout"</b>,"input":null,"type":null,"stack":null} (Riak::ProtobuffsFailedRequest)</div><div><span class="Apple-tab-span" style="white-space:pre">  </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:116:in `mapred'</div><div><span class="Apple-tab-span" style="white-space:pre">     </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:325:in `block in mapred'</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:435:in `block in recover_from'</div><div><span class="Apple-tab-span" style="white-space:pre">  </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/innertube-1.0.2/lib/innertube.rb:127:in `take'</div><div><span class="Apple-tab-span" style="white-space:pre">       </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:433:in `recover_from'</div><div><span class="Apple-tab-span" style="white-space:pre">   </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:379:in `protobuffs'</div><div><span class="Apple-tab-span" style="white-space:pre">     </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:133:in `backend'</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:324:in `mapred'</div><div><span class="Apple-tab-span" style="white-space:pre"> </span>from /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/map_reduce.rb:217:in `run'</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>from ./95h.rb:29:in `<main>'</div><div>[deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$</div></div><div><br></div><div>The records being processed look lie this:</div><div>{"rx_bytes":3485395.0,"tx_bytes":1658479.0}</div><div><br></div><div>When running the script with more than 20 days worth of data (two records per minute are processed, which amounts to 2 * 60 * 24 * 20 = more than 57,600 processed), the script times out and here are some things from the logs:</div><div><div>==> /var/log/riak/erlang.log.1 <==</div><div>Erlang has closed</div><div><br></div><div>==> /var/log/riak/error.log <==</div><div>2013-07-14 13:03:51.580 [error] <0.709.0>@riak_pipe_vnode:new_worker:768 Pipe worker startup failed:fitting was gone before startup</div><div><br></div><div>==> /var/log/riak/console.log <==</div><div>2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in state wait_for_input terminated with reason: timeout</div><div><br></div><div>==> /var/log/riak/error.log <==</div><div>2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in state wait_for_input terminated with reason: timeout</div><div><br></div><div>==> /var/log/riak/console.log <==</div><div>2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process <0.22049.4326> with 0 neighbours exited with reason: timeout in gen_fsm:terminate/7 line 611</div><div><br></div><div>==> /var/log/riak/crash.log <==</div><div>2013-07-14 13:03:51 =CRASH REPORT====</div><div>  crasher:</div><div>    initial call: riak_pipe_vnode_worker:init/1</div><div>    pid: <0.22049.4326></div><div>    registered_name: []</div><div>    exception exit: {timeout,[{gen_fsm,terminate,7,[{file,"gen_fsm.erl"},{line,611}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,227}]}]}</div><div>    ancestors: [<0.710.0>,<0.709.0>,riak_core_vnode_sup,riak_core_sup,<0.129.0>]</div><div>    messages: []</div><div>    links: [<0.710.0>,<0.709.0>]</div><div>    dictionary: [{eunit,[{module,riak_pipe_vnode_worker},{partition,388211372416021087647853783690262677096107081728},{<0.709.0>,<0.709.0>},{details,{fitting_details,{fitting,<18125.23420.4566>,#Ref<18125.0.5432.50467>,<<"C�������������������">>,1},1,riak_kv_w_reduce,{rct,#Fun<riak_kv_w_reduce.0.20542221>,{struct,[{<<"reduce_phase_only_1">>,true}]}},{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined},[{log,sink},{trace,[error]},{sink,{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined}},{sink_type,{fsm,10,infinity}}],64}}]}]</div><div>    trap_exit: false</div><div>    status: running</div><div>    heap_size: 832040</div><div>    stack_size: 24</div><div>    reductions: 1456611</div><div>  neighbours:</div><div><br></div><div>==> /var/log/riak/error.log <==</div><div>2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process <0.22049.4326> with 0 neighbours exited with reason: timeout in gen_fsm:terminate/7 line 611</div><div><br></div><div>==> /var/log/riak/crash.log <==</div><div>2013-07-14 13:03:52 =SUPERVISOR REPORT====</div><div>     Supervisor: {<0.710.0>,riak_pipe_vnode_worker_sup}</div><div>     Context:    child_terminated</div><div>     Reason:     timeout</div><div>     Offender:   [{pid,<0.22049.4326>},{name,undefined},{mfargs,{riak_pipe_vnode_worker,start_link,undefined}},{restart_type,temporary},{shutdown,2000},{child_type,worker}]</div><div><br></div><div><br></div><div>==> /var/log/riak/console.log <==</div><div>2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor riak_pipe_vnode_worker_sup had child undefined started with {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with reason timeout in context child_terminated</div><div><br></div><div>==> /var/log/riak/error.log <==</div><div>2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor riak_pipe_vnode_worker_sup had child undefined started with {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with reason timeout in context child_terminated</div></div><div><br></div><div><br></div><div>The data is in leveldb and is accessed through secondary indexes. </div><div>This is a 3 node cluster with 32GB ram, current usage is about 12G per node. n_val=3. The same issues occurs on a similar 2 node cluster with 8GB of ram (usage is ~6G).</div><div><br></div><div>The following is my app.config:</div><div><div>[</div><div> {riak_api, [</div><div>            {pb_ip,   "0.0.0.0" },</div><div>            {pb_port, 8087 },</div><div>            {pb_backlog, 100 }</div><div>            ]},</div><div> {riak_core, [</div><div>              {default_bucket_props, [</div><div>                    {n_val, 3},</div><div>                    {last_write_wins, true}</div><div>                    ]},</div><div>              {ring_state_dir, "/storage/riak/ring"},</div><div>              {ring_creation_size, 256},</div><div>              {http, [ {"0.0.0.0", 8098 } ]},</div><div>              {https, [{ "0.0.0.0", 8069 }]},</div><div>              {ssl, [</div><div>                     {certfile, "/etc/ssl/riak/server.crt"},</div><div>                     {cacertfile, "/etc/ssl/riak/root.crt"},</div><div>                     {keyfile, "/etc/ssl/riak/server.key"}</div><div>                    ]},</div><div>              {handoff_port, 8099 },</div><div>              {dtrace_support, false},</div><div>              {enable_health_checks, true},</div><div>              {platform_bin_dir, "/usr/sbin"},</div><div>              {platform_data_dir, "/storage/riak"},</div><div>              {platform_etc_dir, "/etc/riak"},</div><div>              {platform_lib_dir, "/usr/lib/riak/lib"},</div><div>              {platform_log_dir, "/var/log/riak"}</div><div>             ]},</div><div> {riak_kv, [</div><div>            {storage_backend, riak_kv_eleveldb_backend},</div><div>            {anti_entropy, {on, []}},</div><div>            {anti_entropy_build_limit, {1, 3600000}},</div><div>            {anti_entropy_expire, 604800000},</div><div>            {anti_entropy_concurrency, 2},</div><div>            {anti_entropy_tick, 15000},</div><div>            {anti_entropy_data_dir, "/storage/riak/anti_entropy"},</div><div>            {anti_entropy_leveldb_opts, [{write_buffer_size, 4194304},</div><div>                                         {max_open_files, 20}]},</div><div><br></div><div>            {mapred_name, "mapred"},</div><div>            {mapred_2i_pipe, true},</div><div>            {map_js_vm_count, 16 },</div><div>            {reduce_js_vm_count, 12 },</div><div>            {hook_js_vm_count, 20 },</div><div>            {js_max_vm_mem, 8},</div><div>            {js_thread_stack, 16},</div><div>            {js_source_dir, "/etc/riak/mapreduce/js_source"},</div><div>            {http_url_encoding, on},</div><div>            {vnode_vclocks, true},</div><div>            {listkeys_backpressure, true},</div><div>            {vnode_mailbox_limit, {1, 5000}}</div><div>           ]},</div><div><br></div><div> {riak_search, [</div><div>                {enabled, true}</div><div>               ]},</div><div><br></div><div> {merge_index, [</div><div>                {data_root, "/storage/riak/merge_index"},</div><div>                {buffer_rollover_size, 1048576},</div><div>                {max_compact_segments, 20}</div><div>               ]},</div><div><br></div><div> {bitcask, [</div><div>             {data_root, "/storage/riak/bitcask"}</div><div>           ]},</div><div><br></div><div> {eleveldb, [</div><div>             {cache_size, 1024},</div><div>             {max_open_files, 64},</div><div>             {data_root, "/storage/riak/leveldb"}</div><div>            ]},</div><div><br></div><div> {lager, [</div><div>            {handlers, [</div><div>                           {lager_file_backend, [</div><div>                               {"/var/log/riak/error.log", error, 10485760, "$D0", 5},</div><div>                               {"/var/log/riak/console.log", info, 10485760, "$D0", 5}</div><div>                           ]}</div><div>                       ] },</div><div><br></div><div>            {crash_log, "/var/log/riak/crash.log"},</div><div>            {crash_log_msg_size, 65536},</div><div>            {crash_log_size, 10485760},</div><div>            {crash_log_date, "$D0"},</div><div>            {crash_log_count, 5},</div><div>            {error_logger_redirect, true}</div><div>        ]},</div><div><br></div><div> {riak_sysmon, [</div><div>         {process_limit, 30},</div><div>         {port_limit, 2},</div><div>         {gc_ms_limit, 0},</div><div>         {heap_word_limit, 40111000},</div><div>         {busy_port, true},</div><div>         {busy_dist_port, true}</div><div>        ]},</div><div><br></div><div> {sasl, [</div><div>         {sasl_error_logger, false}</div><div>        ]},</div></div><div><br></div><div>Sorry to bug you with such a long e-mail but I wanted to be as thorough as possible. I tried raising a few options but it didn't help: map_js_vm_count, reduce_js_vm_count, js_max_vm_mem</div><div>I also tried adding a timeout argument to the map reduce caller code but even if I set it to 60,000 or more (this is milliseconds), the script is terminating with timeout error after 10-12 secs. The same behaviour is observed if I use http instead of pbc.</div><div><br></div><div>What seems to be the problem? Is this a matter of configuration? I am surprised about the fact that the job runs with 20-25 days of data and not more.</div><div><br></div><div>thank you for your efforts,</div><div>Deyan</div></div>_______________________________________________<br>riak-users mailing list<br><a href="mailto:riak-users@lists.basho.com">riak-users@lists.basho.com</a><br>http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com<br></blockquote></div><br></div></body></html>