Writing a reduce phase with reduce_phase_only_1 and additional parameters

Bryan Fink bryan at basho.com
Fri Jun 1 09:57:48 EDT 2012


On Wed, May 23, 2012 at 3:03 PM, Manuel Gomez <manuel at inaka.net> wrote:
> This is my function so far:
>
> reduce_slice(WList,{Page,PageSize}) ->
>   lager:info("Page and PageSize ~p - ~p",[Page,PageSize]),
>   lists:sublist(WList, Page, PageSize).
>
>
> This "works". The problem is that because the reduce phase gets executed
> with whatever it has at any given moment, the end result is always
> different. So I need to call this phase with reduce_phase_only_1 as a param,
> and here is where I'm a bit lost, this is how I call the function now:
>
> {reduce,{modfun,whisper_db,reduce_slice},{Page,PageSize},true}])
>
> I see that in the Riak's MapReduce documentation you can call a function
> with "reduce_phase_only_1" like so:
>
> {reduce, FunSpec, [reduce_phase_only_1], Keep}
>
> So I tried:
>
> {reduce,{modfun,whisper_db,reduce_slice},[reduce_phase_only_1,{Page,PageSize}],true}])
>
> Which throws (of course because of the the function definition is not
> expecting a list):
>
> Supervisor riak_pipe_vnode_worker_sup had child undefined started with
> {riak_pipe_vnode_worker,start_link,undefined} at <0.2927.0> exit with reason
> no function clause matching whisper_db:reduce_slice([

Hi, Manuel.  You were headed in exactly the right direction.  This is
just a matter of getting the match spec right in your function clause.
 The key thing to know is that third element of your phase spec is
passed, in its entirety, to your reduce function as its second
argument.

So, when you changed from

    {reduce, {modfun, whisper_db, reduce_slice}, {Page, PageSize}, true}

to

    {reduce, {modfun, whisper_db, reduce_slice}, [reduce_phase_only_1,
{Page, PageSize}], true}

the argument that was passed to your reduce phase changed from

    {Page, PageSize}

to

    [reduce_phase_only_1, {Page, PageSize}]

The easiest way to figure this out would have been to modify your
function to accept anything and do some "printf" debugging, like so:

    %% just print out Arg so we can see what we're dealing with
    reduce_slice_debug(_Input, Arg) ->
        lager:info("Arg: ~p", [Arg]),
        [].  %% just return nothing for now while we debug

Running the MapReduce again with [reduce_phase_only_1, {1, 10}] will
cause this to print out on the Riak console:

    09:43:02.125 [info] Arg: [reduce_phase_only_1,{1,10}]

So, all that needs to be done is to modify that Arg to match the whole
list, instead of just the page-size tuple.  The most direct option is:

    %% Most direct: just match the expected option list
    reduce_slice(WList, [reduce_phase_only_1, {Page, PageSize}]) ->
        lager:info("Page and PageSize ~p - ~p", [Page, PageSize]),
        lists:sublist(WList, Page, PageSize).

This is also the most brittle option, however, as it will break if you
leave reduce_phase_only_1 out of the list, or if you put it at the end
instead.  A slightly more flexible solution is to match the whole
argument, and then look through it to find the page-size option:

    %% Slightly more flexible: look for a tuple that is correctly shaped
    reduce_slice(WList, Options) ->
        {Page, PageSize} = find_page_option(Options),
        lager:info("Page and PageSize ~p - ~p", [Page, PageSize]),
        lists:sublist(WList, Page, PageSize).

    find_page_option([{Page, PageSize}|_])
      when is_integer(Page), is_integer(PageSize) ->
        {Page, PageSize};
    find_page_option([_|Rest]) ->
        find_page_option(Rest);
    find_page_option([]) ->
        %% without this clause, the reduce evaluation will error out if
        %% the page/size option is omitted or incorrectly formed
        DefaultPage = 1,
        DefaultPageSize = 100,
        {DefaultPage, DefaultPageSize}.

This allows you to leave reduce_phase_only_1 out, or put it elsewhere
in the list.  It also allows you to leave the page-size option out,
and have a default value filled in, or even to add other options at
will.

I'll also offer you one additional version, which demonstrates an
extremely common way to pass around many options in Erlang, known as
"tagged tuples" or "proplists":

    %% Most flexible: tagged tuples
    reduce_slice(WList, Options) ->
        DefaultPage = 1,
        DefaultPageSize = 100,
        Page = proplists:get_value(page, Options, DefaultPage),
        PageSize = proplists:get_value(page_size, Options, DefaultPageSize),
        lager:info("Page and PageSize ~p - ~p", [Page, PageSize]),
        lists:sublist(WList, Page, PageSize).

This version expects a slightly different argument.  Instead of a
{Page, PageSize} tuple, it expects to find two tuples: {page, Page}
and {page_size, PageSize}.  The MapReduce spec would look like this
(to start at 1 and grab 10 results):

    {reduce, {modfun, whisper_db, reduce_slice3}, [{page, 1},
{page_size, 10}, reduce_phase_only_1], true}

I hope that helps.

-Bryan




More information about the riak-users mailing list