Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

gunin at mail.mipt.ru gunin at mail.mipt.ru
Thu Jan 31 12:51:21 EST 2013


Hello Bryan.
I'm detect problem.

Problem is in reduce phase.

1. See riak_kv_mrc_pipe:mr2pipe_phases implementation. It convert MapReduce job spec to riak_pipe spec. 
In this fun created ConstHashCookie as  Now = now(), and use it as chashfun value for fitting  in reduce phase.
This generated value actually used in riak_kv_w_reduce:done function, you try make prereduce not reduced data and send to output.
But output vnode in that case is
preflist for ConstHashCookie,i.e. some random value and n_val for this phase is always 1, that why sometimes calculated perflist for this phase is empty.

Do you have any suggestion how we can fix it?

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: gunin at mail.mipt.ru
Кому: "Bryan Fink" <bryan at basho.com>
Копия: "Riak-Users" <riak-users at lists.basho.com>
Отправленные: Четверг, 31 Январь 2013 г 17:34:34
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce	when one node is down.

Thank your for response.
1. Riak 1.2. I'm clone it form github master branch some times ago.
2. 6 nodes in out test environment.
3. more than 100 millions.(100 thousands per day)
4. near 100 thousands (index by date).

I'm prepare simple test module and test scenario. It must help you to help me:)

1. Generate four riak node.
[xx riak]$ make devrel
2. Start all nodes and join it to cluster.
[xx dev]$ dev1/bin/riak-admin ringready
TRUE All nodes agree on the ring ['dev1 at 127.0.0.1','dev2 at 127.0.0.1',
                                  'dev3 at 127.0.0.1','dev4 at 127.0.0.1']

3. Kill fourth node.
[xx dev]$ dev4/bin/riak stop

[xx dev]$ dev1/bin/riak-admin ringready
FALSE ['dev4 at 127.0.0.1'] down.  All nodes need to be up to check.

4. Compile my test module and include it in code path on all started nodes(I'm include it in riak_kv before compiling riak).

5. Join to erlang console on node dev1.
[xx dev]$ dev1/bin/riak attach

6. Test

Generate test dataset (50 objects).
(dev1 at 127.0.0.1)1> mapred_test:make_data(50).
ok

Check that all data available.
(dev1 at 127.0.0.1)7> mapred_test:check_data(50).
{not_available,0}
Ok. All data saved.
Check that all data available 50 times.
(dev1 at 127.0.0.1)8> mapred_test:check_data_n(50,50).
ok
Ok. All data really saved.

Try Count object count this MapReduce.
(dev1 at 127.0.0.1)23> mapred_test:pipe_mapreduce_check(50).
{not_available,0}
Ok. 
....Repeat 7 times.
Attempts #8.
(dev1 at 127.0.0.1)23> mapred_test:pipe_mapreduce_check(50).
{not_available,50}
!!!Fail!!!MapReduce actualy return {ok,[]}.

Try generate some statistics. Repeat MapReduce task 50 times.
(dev1 at 127.0.0.1)29> mapred_test:pipe_mapreduce_check_n(50,50).
ok count 39
failed count 11.

Every 1 of 5 task failed.

Code of mapred_test module:

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-module(mapred_test).


-export([map/3,
		 reduce/2,
		 make_data/1,
		 check_data_n/2,
		 check_data/1,
		 pipe_mapreduce_check/1,
		 pipe_mapreduce_check_n/2]).

%%map for counter
map({error,notfound},_,_)->
	[];
map(_,_,_)->
	[1].

%%simle sum.
reduce(L,_)->
	[lists:foldl(fun(I,Acc)->I+Acc end,0,L)].

%%Generate data set for test.
make_data(Count)->
	{ok,Conn} = riak:local_client(),
	make_data_loop(Count,Conn).
make_data_loop(0,_)->
	ok;
make_data_loop(Count,Conn)->
	Key = list_to_binary(integer_to_list(Count)),
	RObj = riak_object:new(<<"mapred_test">>,Key,1),
	case Conn:put(RObj) of
		ok->
			make_data_loop(Count-1,Conn);
		Else->
			{error,Else}
	end.

	
check_data(Count)->
	{ok,Conn} = riak:local_client(),
	{not_available,check_data_loop(Count,Conn,0)}.

check_data_loop(0,_Conn,Acc)->
	Acc;
check_data_loop(Count,Conn,Acc)->
	Key = list_to_binary(integer_to_list(Count)),
	case Conn:get(<<"mapred_test">>,Key) of
		{ok,_}->
			check_data_loop(Count-1,Conn,Acc);
		_->
			check_data_loop(Count-1,Conn,Acc+1)
	end.

check_data_n(_Count,0)->
	ok;
check_data_n(Count,N)->
	case check_data(Count) of
		{not_available,0}->
			check_data_n(Count,N-1);
		Else->
			Else
	end.

pipe_mapreduce_check(Count)->
	Query = [{map, {modfun,?MODULE,map},[do_prereduce,none],false},
			 {reduce, {modfun,?MODULE,reduce},[{reduce_phase_batch_size, 1000}], true}],
	case riak_kv_mrc_pipe:mapred(<<"mapred_test">>,Query,60000) of
		{ok,[I]} when is_integer(I)->
			{not_available,Count-I};
		{ok,[]}->
			{not_available,Count};
		Else->
			Else
	end.

pipe_mapreduce_check_n(Count,N)->
	{Good,Bad}=pipe_mapreduce_check_n(Count,N,{0,0}),
	io:format("ok count ~p ~nfailed count ~p~n",[Good,Bad]).

pipe_mapreduce_check_n(_Count,0,Acc)->
	Acc;
pipe_mapreduce_check_n(Count,N,{Good,Bad})->
	case pipe_mapreduce_check(Count) of
		{not_available,0}->
			pipe_mapreduce_check_n(Count,N-1,{Good+1,Bad});
		{not_available,_}->
			pipe_mapreduce_check_n(Count,N-1,{Good,Bad+1});
		Else->
			{0,{runtime_error,Else}}
	end.


%%%%%%%%%%%%%%%%%%%%%%

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: "Bryan Fink" <bryan at basho.com>
Кому: gunin at mail.mipt.ru
Копия: "John Daily" <jdaily at basho.com>, "Riak-Users" <riak-users at lists.basho.com>
Отправленные: Четверг, 31 Январь 2013 г 17:03:09
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when one node is down.

On Thu, Jan 31, 2013 at 6:07 AM,  <gunin at mail.mipt.ru> wrote:
> Sorry John. You don't understand my question.
> 1. One node(I mean physical(erlang) node) in cluster is down.
> 2. It was down when i'm start job,when perform job and after it. We power off this node. It's under repair. But we don't remove this node from cluster.

Aha. Thank you for the clarification. Sorry for pushing John in the
wrong direction. Your new description leads me to think that the
problem is likely in the reduce phase (where we do, yes, use an nval
of 1, but also a constant hash that doesn't account for node
liveness).

As yet, I've been unable to reproduce exactly what you'r seeing,
though. I always get an error instead of an empty result. Answers to
some of these questions may help me:

1. What version of Riak are you running?
2. How many nodes do you have in the cluster?
3. About how many keys are in this bucket?
4. About how many keys do you expect to match the index query?

Thanks,
Bryan

_______________________________________________
riak-users mailing list
riak-users at lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com




More information about the riak-users mailing list