Fwd: http based queries

Richard Bucker richard at bucker.net
Wed Feb 17 17:19:16 EST 2010


Kevin,

this was a good email with lot's of good information.... I have a use-case
that I need to evaluate with Riak and I'm hoping you can offer me some
insite.

My asterisk switches are generating 1M records a day. I need to aggregate
(reduce) the data for reporting.  Is this going to be efficient so that I do
not have to reread all 1M records for every day I need to report on?

/r

On Feb 17, 2010, at 10:34 AM, Richard Bucker wrote:

> Nice answer... but it begs a number of related questions:
>
>  - what is the actual syntax of the /mapred call

The JSON document is the syntax. It is fully documented in doc/
js-mapreduce.org in the Riak code tree. I've also attached a plain ASCII
version of the org-mode file, in case you don't have Emacs or org-mode
configured. The name of the file is a little misleading as the JSON syntax
works for M/R jobs written in both Erlang and Javascript. The basics of
Riak's core map/reduce machinery -- with an Erlang flavor -- is documented
doc/basic-mapreduce.txt, also in the code tree.

I've also recorded a short screencast illustrating how to use the new HTTP
M/R interface and Javascript support. You can view the video here:
http://vimeo.com/9188550. Some people have reported the video gets a little
fuzzy towards the end, so apologies if that's the case.

>  - when do I get my results

You get your results when the job completes. However, the /mapred endpoint
also knows how to stream job results. This is useful for reducing latency
and memory consumption for jobs which return lots of data. You can enable
job streaming by posting to the URL /mapred?chunked=true. The chunked query
param tells the Riak HTTP interface to return results as multipart-encoded
JSON.

>  - what happens when there are long running queries

The HTTP interface currently enforces a hard limit of 2 minutes for M/R
queries. We are going to extend the JSON query format to allow users to
specify their own timeout values per M/R job for next release.

>  - where do I specify indexes

M/R jobs only understand inputs of a single bucket name or a list of
bucket/key pairs.

>  - looking at the gist you attached below; where is it documented? what
APIs are documented?

See above.

--Kevin



>
> /r
>
> Richard -
>
> You can do this using Riak's map/reduce feature.
>
> Assuming the documents were contained in a bucket named 'foo' you could
craft a JSON document like this gist
https://gist.github.com/434e7d1bb596bbc214e4. Posting the document to your
Riak server's /mapred URL will run the query and return the results as
JSON-encoded data.
>
> --Kevin
> On Feb 17, 2010, at 9:31 AM, Richard Bucker wrote:
>
> > I must be missing something in the DOC.  Is there a way to GET documents
from the DB using some search terms? For example I have some document:
/raw/emails/<some uuid> and the contents might be
{'created':'2010-01-01','subject':'hello', 'message':'hello world'}
> >
> > Using http how would I find all the documents in the 'emails' bucket
that were created on Jan 1, 2010?
> >
> > /r _______________________________________________
> > riak-users mailing list
> > riak-users at lists.basho.com
> > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>
>
> _______________________________________________
> riak-users mailing list
> riak-users at lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.basho.com/pipermail/riak-users_lists.basho.com/attachments/20100217/b70df646/attachment.html>
-------------- next part --------------
                     Using Javascript with Riak Map/Reduce
                     =====================================


Table of Contents
=================
1 Simple Example
    1.1 Load data
    1.2 Run query
    1.3 Explanation
2 Query Syntax
    2.1 Inputs
    2.2 Query
        2.2.1 Map
        2.2.2 Reduce
        2.2.3 Link
3 Javascript Functions
    3.1 Function Parameters
        3.1.1 Map functions
        3.1.2 Reduce functions
        3.1.3 Link functions
4 How Map/Reduce Queries Work
    4.1 Map/Reduce Intro
    4.2 Riak-specific Map/Reduce
        4.2.1 How Riak Spreads Processing
        4.2.2 How Riak's Map/Reduce Queries Are Specified
        4.2.3 How a Map Phase Works in Riak
        4.2.4 How a Reduce Phase Works in Riak
        4.2.5 How a Link Phase Works in Riak
        4.2.6 Using Named Functions


1 Simple Example
~~~~~~~~~~~~~~~~

  This section hits the ground running with a quick example to
  demonstrate what HTTP/Javascript map/reduce looks like in Riak.
  This example will store several chunks of text in Riak, and then
  compute a word counts on the set of documents.

1.1 Load data
=============

   We will use the "raw" HTTP interface to store the texts we want to
   process:

 $ curl -X PUT -H "content-type: text/plain" \
  http://localhost:8098/raw/alice/p1 --data-binary @-
 Alice was beginning to get very tired of sitting by her sister on the
 bank, and of having nothing to do: once or twice she had peeped into the
 book her sister was reading, but it had no pictures or conversations in
 it, 'and what is the use of a book,' thought Alice 'without pictures or
 conversation?'
 ^D
 $ curl -X PUT -H "content-type: text/plain" \
  http://localhost:8098/raw/alice/p2 --data-binary @-
 So she was considering in her own mind (as well as she could, for the
 hot day made her feel very sleepy and stupid), whether the pleasure
 of making a daisy-chain would be worth the trouble of getting up and
 picking the daisies, when suddenly a White Rabbit with pink eyes ran
 close by her.
 ^D
 $ curl -X PUT -H "content-type: text/plain" \
  http://localhost:8098/raw/alice/p5 --data-binary @-
 The rabbit-hole went straight on like a tunnel for some way, and then
 dipped suddenly down, so suddenly that Alice had not a moment to think
 about stopping herself before she found herself falling down a very deep
 well.

1.2 Run query
=============

   With data loaded, we can now run a query:

 $ curl -X POST -H "content-type: application/json" http://localhost:8098/mapred --data @-
 {"inputs":[["alice","p1"],["alice","p2"],["alice","p5"]],"query":[{"map":{"language":"javascript","source":"function(v) { var m = v.values[0].data.toLowerCase().match('\\\\w*','g'); var r = []; for(var i in m) if (m[i] != '') { var o = {}; o[m[i]] = 1; r.push(o); } return r; }"}},{"reduce":{"language":"javascript","source":"function(v) { var r = {}; for (var i in v) { for(var w in v[i]) { if (w in r) r[w] += v[i][w]; else r[w] = v[i][w]; } } return [r]; }"}}]}
 ^D

   And we end up with the word counts for the three documents.

 [{"the":8,"rabbit":2,"hole":1,"went":1,"straight":1,"on":2,"like":1,"a":6,"tunnel":1,"for":2,"some":1,"way":1,"and":5,"then":1,"dipped":1,"suddenly":3,"down":2,"so":2,"that":1,"alice":3,"had":3,"not":1,"moment":1,"to":3,"think":1,"about":1,"stopping":1,"herself":2,"before":1,"she":4,"found":1,"falling":1,"very":3,"deep":1,"well":2,"was":3,"considering":1,"in":2,"her":5,"own":1,"mind":1,"as":2,"could":1,"hot":1,"day":1,"made":1,"feel":1,"sleepy":1,"stupid":1,"whether":1,"pleasure":1,"of":5,"making":1,"daisy":1,"chain":1,"would":1,"be":1,"worth":1,"trouble":1,"getting":1,"up":1,"picking":1,"daisies":1,"when":1,"white":1,"with":1,"pink":1,"eyes":1,"ran":1,"close":1,"by":2,"beginning":1,"get":1,"tired":1,"sitting":1,"sister":2,"bank":1,"having":1,"nothing":1,"do":1,"once":1,"or":3,"twice":1,"peeped":1,"into":1,"book":2,"reading":1,"but":1,"it":2,"no":1,"pictures":2,"conversations":1,"what":1,"is":1,"use":1,"thought":1,"without":1,"conversation":1}]

1.3 Explanation
===============

   For more details about what each bit of syntax means, and other
   syntax options, read the following sections.  As a quick
   explanation of how this example map/reduce query worked, though:

   1. The objects named `p1', `p2', and `p5' from the `alice' bucket
      were given as inputs to the query.

   2. The map function from the phase was run on each object.  The
      function:

function(v) {
   var m = v.values[0].data.match('\\w*','g');
   var r = [];
   for(var i in m)
      if (m[i] != '') {
         var o = {};
         o[m[i]] = 1;
         r.push(o);
      }
   return r;
}

      creates a list of JSON objects, one for each word (non-unique)
      in the text.  The object has as a key, the word, and as the
      value for that key, the integer 1.

   3. The reduce function from the phase was run on the outputs of the
      map functions.  The function:

function(v) {
   var r = {};
   for (var i in v) {
      for(var w in v[i]) {
         if (w in r)
            r[w] += v[i][w];
         else
            r[w] = v[i][w];
      }
   }
   return [r];
 }

      looks at each JSON object in the input list.  It steps through
      each key in each object, and produces a new object. That new
      object has a key for each key in every other object, the value
      of that key being the sum of the values of that key in the other
      objects.  It returns this new object in a list, because is may
      be run a second time on a list including that object and more
      inputs from the map phase.

   4. The final output is a list with one element: a JSON object with
      a key for each word in all of the documents (unique), with the
      value of that key being the number of times the word appeared in
      the documents.

2 Query Syntax
~~~~~~~~~~~~~~

  Map/Reduce queries are issued over HTTP via a POST to the /mapred
  resource.  The body should be `application/json' of the form
  `{"inputs":[...inputs...],"query":[...query...]}'.

2.1 Inputs
==========

   The list of input objects is given as a list of 2-element lists of
   the form `[Bucket,Key]' or 3-element lists of the form
   `[Bucket,Key,KeyData]'.

   You may also pass just the name of a bucket
   (`{"inputs":"mybucket",...}'), which is equivalent to passing all
   of the keys in that bucket as inputs (i.e. "a map/reduce across the
   whole bucket").  You should be aware that this triggers the
   somewhat expensive "list keys" operation, so you should use it
   sparingly.

2.2 Query
=========

   The query is given as a list of phases, each phase being of the
   form `{PhaseType:{...spec...}}'.  Valid `PhaseType' values are
   "map", "reduce", and "link".

   Every phase spec may include a `keep' field, which must have a
   boolean value: `true' means that the results of this phase should
   be included in the final result of the map/reduce, `false' means
   the results of this phase should be used only by the next phase.
   Omitting the `keep' field accepts its default value, which is
   `false' for all phases except the final phase (Riak assumes that
   you were most interested in the results of the last phase of your
   map/reduce query).

2.2.1 Map
---------

    Map phases must be told where to find the code for the function to
    execute, and what language that function is in.

    Function source can be specified directly in the query by using
    the "source" spec field.  Function source can also be loaded from
    a pre-stored riak object by providing "bucket" and "key" fields in
    the spec.

    For example:

{"map":{"language":"javascript","source":"function(v) { return [v]; }","keep":true}}

    would run the Javascript function given in the spec, and include
    the results in the final output of the m/r query.

{"map":{"language":"javascript","bucket":"myjs","key":"mymap","keep":false}}

    would run the Javascript function declared in the content of the
    Riak object under `mymap' in the `myjs' bucket, and the results of
    the funciton would not be included in the final output of the m/r
    query.

    Map phases may also be passed static arguments by using the "arg"
    spec field.

2.2.2 Reduce
------------

    Reduce phases look exactly like map phases, but are labeled "reduce".

2.2.3 Link
----------

    Link phases accept `bucket' and `tag' fields that specify which
    links match the link query.  The string "_" (underscore) in each
    field means "match all", while any other string means "match
    exactly this string".  If either field is left out, it is
    considered to be set to "_" (match all).

    For example:

{"link":{"bucket":"foo","keep":false}}

    Would follow all links pointing to objects in the `foo' bucket,
    regardless of their tag.

3 Javascript Functions
~~~~~~~~~~~~~~~~~~~~~~

3.1 Function Parameters
=======================

3.1.1 Map functions
-------------------

    Map functions are passed three parameters: the object that the map
    is being applied to, the "keydata" for that object, and the static
    argument for the phase.

    The object will be a JSON object of the form:

 {
  "bucket":BucketAsString,
  "key":KeyAsString,
  "vclock":VclockAsString,
  "values":[
            {
             "metadata":{
                         "X-Riak-VTag":VtagAsString,
                         "X-riak-Last-Modified":LastModAsString,
                         ...other metadata...
                        },
             "data":ObjectData
            },
            ...other metadata/data values (siblings)...
           ]
 }

    `object.values[0].data' is probably what you will be interested in
    most of the time, but the rest of the details of the object are
    provided for your use.

    The "keydata" is the third element of the item from the input
    bucket/key list (called `KeyData' in the [Inputs] section above), or
    "undefined" if none was provided.

    The static argument for the phase is the value of the `arg' field
    from the map spec in the query list.

    A map phase should produce a list of results.  You will see errors
    if the output of your map function is not a list.  Return the
    empty list if your map function chooses not to produce output.

3.1.2 Reduce functions
----------------------

    Reduce functions are passed two parameters: a list of inputs to
    reduce, and the static argument for the phase.

    The list of inputs to reduce may contain values from previous
    executions of the reduce function.  It will also contain results
    produced by the preceding map or reduce phase.

    The static argument for the phase is the value of the `arg' field
    from the reduce spec in the query list.

    A reduce phase should produce a list of results.  You will see
    errors if the output of your reduce function is not a list.  The
    function should return an empty list, if it has no other output to
    produce.

3.1.3 Link functions
--------------------

    If you are storing data through the "raw" interface, and using the
    `Link' HTTP header, you do not need to worry about writing a
    link-extraction function.  Just use the predefined
    `raw_link_walker_resource:mapreduce_linkfun/3'.

    But, if you need to extract links from your data in some other
    manner, there are many ways to specify Javascript functions to do
    that.  They all start with setting the `linkfun' bucket property.
    Through the raw HTTP interface:

$ curl -X PUT -H "application/json" http://localhost:8098/raw/bucket \
> --data "{\"props\":{\"linkfun\":{...function...}}}"

    The three ways to fill in the value of the `linkfun' key are:

    + Quoted source code, as the value of the `jsanon' key:

      {"jsanon":"function(v,kd,bt) { return []; }"}

    + The bucket and key of an object containing the function source:

      {"jsanon":{"bucket":Bucket,"key":Key}}

    + The name of a predefined Javascript function:

      {"jsfun":FunctionName}

    The function has basically the same contract as a map function.
    The first argument is the object from which links should be
    extracted.  The second argument is the `KeyData' for the object.

    The third argument is a Javascript object representing the links
    to match at return.  The two fields in the object, `bucket' and
    `tag', will have the values given in the link phase spec from the
    query.

    The link fun should return a list of the same form as the `inputs'
    list: 2-item bucket/key lists, or 3-item bucket/key/keydata lists.

4 How Map/Reduce Queries Work
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

4.1 Map/Reduce Intro
====================

   The main goal of Map/Reduce is to spread the processing of a query
   across many systems to take advantage of parallel processing power.
   This is generally done by dividing the query into several steps,
   dividing the dataset into several chunks, and then running those
   step/chunk pairs in separate physical hosts.

   One step type is called "map".  Map functions take one piece of
   data as input, and produce zero or more results as output.  If
   you're familiar with "mapping over a list" in functional
   programming style, you're already familiar with "map" steps in a
   map/reduce query.

   Another step type is called "reduce".  The purpose of a "reduce"
   step is to combine the output of many "map" step evaluations, into
   one result.

   The common example of a map/reduce query involves a "map" step that
   takes a body of text as input, and produces a word count for that
   body of text.  A reduce step then takes the word counts produced
   from many bodies of text and either sums them to provide a word
   count for the corpus, or filters them to produce a list of
   documents containing only certain counts.

4.2 Riak-specific Map/Reduce
============================

4.2.1 How Riak Spreads Processing
---------------------------------

   Riak's map/reduce has an additional goal: increasing data-locality.
   When processing a large dataset, it's often much more efficient to
   take the computation to the data than it is to bring the data to
   the computation.

   It is Riak's solution to the data-locality problem that determines
   how Riak spreads the processing across the cluster.  In the same
   way that any Riak node can coordinate a read or write by sending
   requests directly to the other nodes responsible for maintaining
   that data, any Riak node can also coordinate a map/reduce query by
   sending a map-step evaluation request directly to the node
   responsible for maintaining the input data. Map-step results are
   sent back to the coordinating node, where reduce-step processing
   can produce a unified result.

   Put more simply: Riak runs map-step functions right on the node
   holding the input data for those functions, and it runs reduce-step
   functions on the node coordinating the map/reduce query.

4.2.2 How Riak's Map/Reduce Queries Are Specified
-------------------------------------------------

    Map/Reduce queries in Riak have two components: a list of inputs
    and a list of "steps", or "phases".

    Each element of the input list is a bucket-key pair.  This
    bucket-key pair may also be annotated with "key-data", which will
    be passed as an argument to a map function, when evaluated on the
    object stored under that bucket-key pair.

    Each element of the phases list is a description of a map
    function, a reduce function, or a link function.  The description
    includes where to find the code for the phase function (for map
    and reduce phases), static data passed to the function every time
    it is executed during that phase, and a flag indicating whether or
    not to include the results of that phase in the final output of
    the query.

    The phase list describes the chain of operations each input will
    flow through.  That is, the initial inputs will be fed to the
    first phase in the list, and the output of that phase will be fed
    as input to the next phase in the list.  This stream will continue
    through the final phase.

4.2.3 How a Map Phase Works in Riak
-----------------------------------

    The input list to a map phase must be a list of (possibly
    annotated) bucket-key pairs.  For each pair, Riak will send the
    request to evaluate the map function to the partition that is
    responsible for storing the data for that bucket-key.  The vnode
    hosting that partition will lookup the object stored under that
    bucket-key, and evaluation the map function with the object as an
    argument.  The other arguments to the function will be the
    annotation, if any is included, with the bucket-key, and the
    static data for the phase, as specified in the query.

4.2.4 How a Reduce Phase Works in Riak
--------------------------------------

    Reduce phases accept any list of data as input, and produce any
    list of data as output.  They also receive a phase-static value,
    specified in the query definition.

    The important thing to understand is that the function defining
    the reduce phase may be evaluated multiple times, and the input of
    later evaluations will include the input of earlier evaluations.

    For example, a reduce phase may implement the "set-union"
    function.  In that case, the first set of inputs might be
    `[1,2,2,3]', and the output would be `[1,2,3]'.  When the phase
    receives more inputs, say `[3,4,5]', the function will be called
    with the concatentation of the two lists: `[1,2,3,3,4,5]'.

    Other systems refer to the second application of the reduce
    function as a "re-reduce".  There are at least a couple of
    reduce-query implementation strategies that work with Riak's model.

    One strategy is to implement the phase preceeding the reduce
    phase, such that its output is "the same shape" as the output of
    the reduce phase.  This is how the examples in this document are
    written, and the way that we have found produces cleaner code.

    An alternate strategy is to make the output of a reduce phase
    recognizable, such that it can be extracted from the input list on
    subsequent applications.  For example, if inputs from the
    preceeding phase are numbers, outputs from the reduce phase could
    be objects or strings.  This would allow the function to find the
    previous result, and apply new inputs to it.

4.2.5 How a Link Phase Works in Riak
------------------------------------

    Link phases find links matching patterns specified in the query
    definition.  The patterns specify which buckets and tags links
    must have.

    "Following a link" means adding it to the output list of this
    phase.  The output of this phase is often most useful as input to
    a map phase, or another reduce phase.

4.2.6 Using Named Functions
---------------------------

    Riak can also use pre-defined named functions for map and reduce
    phase processing. Named functions are invoked with the following
    form:

 {"map": {"language": "javascript", "name": "Riak.mapValues", "keep": true}}

 {"reduce": {"language": "javascript", "name": "Riak.reduceSort", "keep": true}}

    The key `name' in both examples points to the name of the function
    to be used. Riak expects the function to be defined prior to the
    execution of the phase using it.

* Defining Named Functions

  Defining a named function for Riak is a simple process.

  1. Create a Javascript source file containing the definitions for
     all the functions you would like Riak to pre-define.
  2. Edit the `app.config' of your Riak nodes and add the line
     `{js_source_dir, <path_to_source_dir>}' to the `riak'
     configuration block. `<path_to_source_dir>' should point to
     the directory where the file created in step #1 was saved.
  3. Start using the functions in your map/reduce jobs.

  When `js_source_dir' is enabled, Riak scans the directory for
  files ending in `.js'. These files are then loaded into each
  Javascript VM when it is created.

  NOTE: Named functions must be available on all nodes in a cluster
  for proper map/reduce results.

* Why Use Named Functions?

  Named functions can be better than anonymous functions in certain
  situations. Since named functions live in a file they can be
  managed using source code control and deployed automatically
  using tools such as Chef or Puppet. This can be a significant
  advantage when administrating large Riak clusters.

  More important, though, is the fact named functions execute much
  faster than the equivalent anonymous functions. Invoking
  anonymous functions requires Riak to ensure the anonymous
  function is defined before invoking it. Named functions allow
  Riak to skip the definition check and execute the function call
  immediately.

  Also, since named functions do not change between invocations,
  Riak is able to cache named function call results and short
  circuit the call entirely. Currently, Riak performs this
  optimization on named functions executed during map phases only.

  In general, anonymous functions should be used during development
  and named functions should be used for production deployments
  where possible. This combination provides the maximum flexibility
  and performance.

* Riak-Supplied Functions

  Riak supplies several named functions out of the box. These
  functions are defined on a global Javascript object named `Riak'
  and should not be modified or overridden. These functions, along
  with descriptions and notes on their use are described in the
  next two sections.

  + Named Map Functions

    + `Riak.mapValues(values, keyData, arg)'
      Extracts and returns only the values contained in a bucket and key.

    + `Riak.mapValuesJson(values, keyData, arg)'
      Same as `mapValues' except the values are passed through a JSON
      decoder first.

  + Named Reduce Functions

    + `Riak.reduceSum(values, arg)'
      Returns the sum of `values'

    + `Riak.reduceMin(values, arg)'
      Returns the minimum value from `values'

    + `Riak.reduceMax(values, arg)'
      Returns the maximum value from `values'

    + `Riak.reduceSort(values, arg)'
      Returns the sorted version of `values'. If `arg' is the source
      to a Javascript function, it will be eval'd and used to
      control the sort via `Array.sort'.

    + `Riak.reduceLimit(values, arg)'
      Returns the leftmost n members of values where `arg' is used as n.

    + `Riak.reduceSlice(values, arg)'
      Returns a slice of the values array. `arg' must be a two
      element array containing the starting and ending positions for
      the slice.


More information about the riak-users mailing list