Map Reduce and long queries -

Olav Frengstad olav at fwt.no
Mon Oct 15 04:13:45 EDT 2012


> The word everywhere is to avoid key filters. It effectively does a whole-bucket key-listing, and that starts to get seriously slow out past 100k items. Since you say test queries work I'll presume you've debugged your map and reduce on some queries where you manually add a set of keys. (Right?)

Just as a note, using the Erlang pb client you can use the key filters
for 2i queries if you include the riak_kv_mapred_filters module in
your client code path.

➜  riak-erlang-client git:(master) ✗ erl -pa ebin -pa deps/*/ebin -pa
~/src/riak/deps/riak_kv/ebin
Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:2:2]
[async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.9.1  (abort with ^G)
1> O1 = riakc_obj:new(<<"test">>, <<"abc/def/1">>, []),
1> O2 = riakc_obj:new(<<"test">>, <<"abc/def/2">>, []),
1> O3 = riakc_obj:new(<<"test">>, <<"hij/klm/1">>, []),
1> {ok, Pid} = riakc_pb_socket:start_link("127.0.0.1", 8087),
1> riakc_pb_socket:put(Pid, O1),riakc_pb_socket:put(Pid, O2),
riakc_pb_socket:put(Pid, O3).
ok
2> Index = {index, <<"test">>, <<"$key">>, <<0>>, <<255>>},
2> {ok, Filter} = riak_kv_mapred_filters:build_filter([[<<"ends_with">>,"1"]]),
2> MR = [
2>   { reduce
2>   , {qfun, fun(X, F) -> lists:filter(fun({A, B}) -> F(B) end, X) end}
2>   , riak_kv_mapred_filters:compose(Filter)
2>   , true}],
2> riakc_pb_socket:mapred(Pid, Index, MR).
{ok,[{0,
      [{<<"test">>,<<"hij/klm/1">>},
       {<<"test">>,<<"abc/def/1">>}]}]}

Olav

2012/10/14, Adam Lindsay <atl at alum.mit.edu>:
- Vis sitert tekst -
>> riak-users at lists.basho.com (mailto:riak-users at lists.basho.com)
>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>
>>
>
>

2012/10/14, Adam Lindsay <atl at alum.mit.edu>:
> Hi David,
>
> The word everywhere is to avoid key filters. It effectively does a
> whole-bucket key-listing, and that starts to get seriously slow out past
> 100k items. Since you say test queries work I'll presume you've debugged
> your map and reduce on some queries where you manually add a set of keys.
> (Right?)
>
> Since you're on LevelDB, it means you can use secondary indices ("2i") to
> drive these queries.
>
> I don't have access to your filter_map, so I don't have access to how you
> construct your keys, but if you have 2i turned on, then you get the first
> key-field "for free" from 2i.
>
> Let's say, hypothetically, that your keys are constructed as:
>  keyprefix:<date>:<country>:<campaign_id>
>
> Well, you can then rewrite the query input as:
>
> def main():
>     client = riak.RiakClient(host=riak_host,
>         port=8087,transport_class=riak.transports.pbc.RiakPbcTransport)
>     query = client.index(
>                     bucket,
>                     '$key',
>                     'keyprefix:201210',
>                     'keyprefix:201210~')
>     query.map('''function(value, keyData, arg) { ... }''')
>>
>
>
> That's fine as far as it goes, but it doesn't solve the problem of querying
> country or campaign id, right?
>
> As a temporary measure, I'd suggest trying your key filters, cranking up the
> timeout to something on the order of hours (I gave 5 minutes conservatively
> and arbitrarily), and going ahead and running it for however long it takes.
>
>
> If those queries do give good results, I'd suggest going ahead and
> re-indexing your existing entries with 'country_bin' and 'campaign_bin'.
> It's up to personal style whether you treat dates as int or bin.
>
> There are lots of tricks and further discussion on how best to get at every
> corner of your data, but how does this strike you so far?
> --
> Adam Lindsay
>
>
> On Sunday, 14 October 2012 at 12:57, David Montgomery wrote:
>
>> Hi,
>>
>> Below is my code for running a map reduce in python. I have a six
>> node cluster, 2 cores each with 4 gigs for ram. I am no load and
>> about 3 Mill keys and using leveldb with riak 1.2. Doing the below
>> is taking a terribly long time. Never finished and I dont even know
>> how I can check if it is even running other than the python script has
>> not timed out. I look at the number of executed mappers in stats and
>> it is flat lined when looking at Graphite. On test queries the below
>> works.
>>
>> So..how do I debug what is going on?
>>
>>
>> def main():
>> client =
>> riak.RiakClient(host=riak_host,port=8087,transport_class=riak.transports.pbc.RiakPbcTransport)
>> query = client.add(bucket)
>> filters = key_filter.tokenize(":", filter_map['date']) +
>> (key_filter.starts_with('201210'))
>> #& key_filter.tokenize(":", filter_map['country']).eq("US") \
>> #& key_filter.tokenize(":", filter_map['campaign_id']).eq("t1") \
>> query.add_key_filters(filters)
>>
>> query.map('''
>> function(value, keyData, arg) {
>> var data = Riak.mapValuesJson(value)[0];
>>
>> if(data['adx']=='gdn'){
>> var alt_key = data['hw'];
>> var obj = {};
>> obj[alt_key] = 1;
>> return [ obj ];
>> }else{
>> return [];
>> }
>>
>>
>> }''')
>>
>>
>> query.reduce('''
>> function(values, arg){
>> return [ values.reduce( function(acc, item) {
>> for (var state in item) {
>> if (acc[state])
>> acc[state] += item[state];
>> else
>> acc[state] = item[state];
>> }
>> return acc;
>> })];
>> }
>> ''')
>>
>> for result in query.run(timeout=300000):
>> print result
>>
>> _______________________________________________
>> riak-users mailing list
>> riak-users at lists.basho.com (mailto:riak-users at lists.basho.com)
>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>
>>
>
>
>




More information about the riak-users mailing list