MapReduce scalability

Rune Skou Larsen rsl at trifork.com
Thu Feb 28 12:02:37 EST 2013


A general comment on Riak's Reduce scalability.

Map scales fine but Reduce doesn't. The single coordinating node does 
the final reducing, so latency from reduces will not improve from adding 
more nodes. MR jobs where lots of data is sent from map to reduce, will 
also get the coordinating node in trouble holding it in memory.

Pre-reduce to the rescue! Work done in reduce-phases can usually be at 
least partly distributed. For instance counting, finding the highest 
number or even sorting. Riak supports this by means of a pre-reduce 
phase, which takes place on the same node as the map-function emitting 
the values. The idea is, if each set of results sent to the coordinating 
node is already pre-aggregated, you may be able to reduce inter-node 
communication, work and/or memory usage of the coordinating node.

Example: Your map-phase emits a lot of numbers, and you want to find the 
highest. Without pre-reduce, all values are sent to the coordinating 
node - most over the network. With pre-reduce, 1 value is sent per 
partition.

Prereduce has it limitations though. A common example being a map-phase 
emitting {key, value}-pairs where for each key, you want to aggregate 
the value.
Example: Riak contains purchase orders, and you want to calculate the 
average purchase amount based on zip-code. The map-function runs through 
all orders and emits {zip-code, amount}. To distribute the aggregation, 
you need a http://en.wikipedia.org/wiki/MapReduce#Partition_function, 
like hadoop's shuffle phase. This is currently unsupported in Riak, but 
definitely doable with Riak Pipe, so I hope it'll come soon :-)


BR Rune

BTW: Christian's reply is entirely accurate to my understanding, but 
misses the pre-reduce stuff.


Den 28-02-2013 16:17, Christian Dahlqvist skrev:
> Hi Bernard,
>
> The description in the documentation is entirely accurate and not at 
> all purely theoretical. Riak will automatically select a covering set 
> of vnodes/partitions that hold the data set required to complete the 
> job. All physical nodes may therefore net need to participate in the 
> job. When performing this selection, the coordinating node will take 
> into account any node outages.
>
> Any map phases will then run on all of these vnodes and use the data 
> stored on each local partition. In order to make it as efficient as 
> possible, it will use only the versions of the data available locally 
> and will not perform a quorum read against all the replicas holding a 
> copy of that data as this would result in a lot of network traffic 
> when running large jobs. The outputs of any map phases are then sent 
> over to the coordinating node where any reduce phases would normally run.
>
> As the input to the map phase only reads from one replica for every KV 
> pair, results can differ from run to run if all replicas are not in 
> sync. This likelihood of this happening should however be reduced with 
> the introduction of active anti-entropy in release 1.3 of Riak, but 
> will due to the eventually consistent nature of Riak never be 
> completely eliminated.
>
> MapReduce is quite resilient to data issues as long as any map phase 
> functions used have been designed to handle notfounds and tombstones. 
> Nodes going down during a MapReduce job will however in many cases 
> cause it to fail.
>
> Although it would technically be possible to create a map phase 
> function in Erlang that performs a quorum read using the internal Riak 
> client and then performs any processing based on this object instead 
> of the one passed in, this is strongly discouraged as it would add a 
> lot of additional network traffic and pose a significant risk of 
> overloading the cluster.
>
> Best regards,
>
> Christian
>
>
> On 28 Feb 2013, at 13:53, Bernard Fouché <bernard.fouche at kuantic.com 
> <mailto:bernard.fouche at kuantic.com>> wrote:
>
>> Hi Christian,
>>
>> At 
>> http://docs.basho.com/riak/1.3.0/references/appendices/MapReduce-Implementation/ 
>> , one can read "...any Riak node can also coordinate a MapReduce 
>> query by sending a map-step evaluation request directly to the node 
>> responsible for maintaining the input data. Map-step results are sent 
>> back to the coordinating node, where reduce-step processing can 
>> produce a unified result.".
>>
>> What you wrote means that the above description is purely theoretical 
>> since if there is any problem to get access to data in a node, then 
>> the MR fails. We have also seen that deleting a key while doing a MR 
>> just makes the MR to run forever so it makes me think that your 
>> description is accurate and for the documentation to be correct it 
>> seems that one must first be sure that all input data reading will 
>> never trigger any kind of error processing, otherwise the MR job will 
>> fail (or be stuck). Please correct me if I've misunderstood!
>>
>> Now if I want to split processing of a list of keys in the cluster, 
>> is there a way to know what node is supposed to have at least one 
>> copy of a K/V ?
>>
>> If so, we can setup our own kind of MR, by sending subset of keys to 
>> nodes known to have at least one version of the K/V pair. Hence if 
>> R==2, there will be one local read in the node receiving the subset 
>> and only one more read in another node that holds a copy. Then this 
>> distributed processing can handle read-repair, aggregate data and 
>> send the result to the coordinating node.
>>
>> Best Regards,
>>
>> 	Bernard
>> Le 28/02/2013 10:32, Christian Dahlqvist a écrit :
>>> Hi Boris,
>>>
>>> Apart from not scaling quite as well as straight K/V access, 
>>> emulating multiGET through MapReduce also has another significant 
>>> drawback. MapReduce has no concept of quorum reads, and only work on 
>>> a single copy of the data, which can be thought of basically as a 
>>> read with R=1 that does not trigger read-repair. It is therefore 
>>> possible that it can give inconsistent or incorrect results if all 
>>> replicas do not have the same data. It is worth noting that 
>>> MapReduce was designed as a way to efficiently spread compute work 
>>> across the cluster, and re-appropriating it for use with data 
>>> collection is not its designed purpose.
>>>
>>> The recommended way to implement efficient multiget is to perform 
>>> normal GET operations in parallel. If you are retrieving 20 objects, 
>>> you don't necessarily need to do all 20 GETs in parallel, but could 
>>> set it up to use perhaps 3 or 4 connections. If you then pair this 
>>> with a connection pool that can grow and shrink in size (perhaps 
>>> between a minimum and a maximum value) as load requires, you should 
>>> be able to retrieve the objects in a reasonable time without 
>>> overloading the cluster.
>>>
>>> Best regards,
>>>
>>> Christian
>>>
>>>
>>> On 27 Feb 2013, at 02:18, Boris Okner <boris.okner at gmail.com 
>>> <mailto:boris.okner at gmail.com>> wrote:
>>>
>>>> Thanks Christian,
>>>>
>>>> The problem I'm trying to solve is to find the way to retrieve 
>>>> values for limited number of keys with the best possible latency 
>>>> (or maybe with decent latency which is balanced with decent 
>>>> throughput). Let's say we have keys stored in some cache
>>>> on top of Riak, and want to retrieve values, 20 at the time, to be 
>>>> able to implement pagination. Another alternative to mapreduce 
>>>> would to send multiple asynchronous gets, but then we'd have to 
>>>> worry about connection pool being exhausted if there's too many 
>>>> such "page" requests. So what would be the proper way to deal with 
>>>> the situation when we need to emulate multiple key retrieval?
>>>>
>>>> On Tue, Feb 26, 2013 at 1:57 AM, Christian Dahlqvist 
>>>> <christian at basho.com <mailto:christian at basho.com>> wrote:
>>>>
>>>>     Hi Boris,
>>>>
>>>>     MapReduce is a very flexible and powerful way of querying Riak
>>>>     and allows processing to be performed locally where the data
>>>>     resides, which allows for efficient processing of larger data
>>>>     sets. A result of this is that every mapreduce job requires a
>>>>     covering set of vnodes (all vnodes that hold the data required
>>>>     for processing) to participate, meaning that it puts
>>>>     considerable more load on the system compared to straight K/V
>>>>     access and therefore does not scale quite as well. It is
>>>>     primarily designed for batch type processing over reasonably
>>>>     large amounts of data and scales well with increased data
>>>>     volumes as new nodes are added. We do however usually not
>>>>     recommended using it as an interface for realtime queries where
>>>>     low and predictable latencies are required and the concurrency
>>>>     level, and therefore load level on the cluster, can not be
>>>>     controlled.
>>>>
>>>>     I am not sure I understand what you mean by the performance
>>>>     degrading with the number of nodes, unless you are strictly
>>>>     measuring latency rather than throughput. As the number of
>>>>     nodes increase, it gets more and more likely that multiple
>>>>     physical nodes will be involved in the job, which will add to
>>>>     the amount of communication and coordination required between
>>>>     the nodes, thereby increasing latency. Could you please explain
>>>>     in more detail what you are trying to achieve?
>>>>
>>>>     Best regards,
>>>>
>>>>     Christian
>>>>
>>>>
>>>>     On 25 Feb 2013, at 16:41, Boris Okner <boris.okner at gmail.com
>>>>     <mailto:boris.okner at gmail.com>> wrote:
>>>>
>>>>>     Hello,
>>>>>
>>>>>     I'm experimenting with 2 Riak 1.3.0 nodes (both are "bare
>>>>>     metal"), and it looks like mapreduce performs better when one
>>>>>     of the nodes is down. The mapreduce requests are running on
>>>>>     20-key blocks. So am I doing something wrong, or is it an
>>>>>     expected behaviour, i.e. mapreduce degrades with the the
>>>>>     number of nodes increased? If the former, could
>>>>>     you give me some pointers on how to set up it to get advantage
>>>>>     of multiple nodes?
>>>>>
>>>>>     Thanks in advance for your help,
>>>>>     Boris
>>>>>     _______________________________________________
>>>>>     riak-users mailing list
>>>>>     riak-users at lists.basho.com <mailto:riak-users at lists.basho.com>
>>>>>     http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>>>
>>>>
>>>> _______________________________________________
>>>> riak-users mailing list
>>>> riak-users at lists.basho.com <mailto:riak-users at lists.basho.com>
>>>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>>
>>>
>>>
>>> _______________________________________________
>>> riak-users mailing list
>>> riak-users at lists.basho.com
>>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>
>


-- 
sdfd

Best regards / Venlig hilsen

*Rune Skou Larsen*
NoSQL Team Lead
Trifork Public A/S
Margrethepladsen 4, 8000 Århus C, Denmark
Phone: +45 3160 2497	Skype: runeskoularsen	twitter: @RuneSkouLarsen

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.basho.com/pipermail/riak-users_lists.basho.com/attachments/20130228/2315e399/attachment.html>


More information about the riak-users mailing list