christian at basho.com
Thu Feb 28 10:17:52 EST 2013
The description in the documentation is entirely accurate and not at all purely theoretical. Riak will automatically select a covering set of vnodes/partitions that hold the data set required to complete the job. All physical nodes may therefore net need to participate in the job. When performing this selection, the coordinating node will take into account any node outages.
Any map phases will then run on all of these vnodes and use the data stored on each local partition. In order to make it as efficient as possible, it will use only the versions of the data available locally and will not perform a quorum read against all the replicas holding a copy of that data as this would result in a lot of network traffic when running large jobs. The outputs of any map phases are then sent over to the coordinating node where any reduce phases would normally run.
As the input to the map phase only reads from one replica for every KV pair, results can differ from run to run if all replicas are not in sync. This likelihood of this happening should however be reduced with the introduction of active anti-entropy in release 1.3 of Riak, but will due to the eventually consistent nature of Riak never be completely eliminated.
MapReduce is quite resilient to data issues as long as any map phase functions used have been designed to handle notfounds and tombstones. Nodes going down during a MapReduce job will however in many cases cause it to fail.
Although it would technically be possible to create a map phase function in Erlang that performs a quorum read using the internal Riak client and then performs any processing based on this object instead of the one passed in, this is strongly discouraged as it would add a lot of additional network traffic and pose a significant risk of overloading the cluster.
On 28 Feb 2013, at 13:53, Bernard Fouché <bernard.fouche at kuantic.com> wrote:
> Hi Christian,
> At http://docs.basho.com/riak/1.3.0/references/appendices/MapReduce-Implementation/ , one can read "...any Riak node can also coordinate a MapReduce query by sending a map-step evaluation request directly to the node responsible for maintaining the input data. Map-step results are sent back to the coordinating node, where reduce-step processing can produce a unified result.".
> What you wrote means that the above description is purely theoretical since if there is any problem to get access to data in a node, then the MR fails. We have also seen that deleting a key while doing a MR just makes the MR to run forever so it makes me think that your description is accurate and for the documentation to be correct it seems that one must first be sure that all input data reading will never trigger any kind of error processing, otherwise the MR job will fail (or be stuck). Please correct me if I've misunderstood!
> Now if I want to split processing of a list of keys in the cluster, is there a way to know what node is supposed to have at least one copy of a K/V ?
> If so, we can setup our own kind of MR, by sending subset of keys to nodes known to have at least one version of the K/V pair. Hence if R==2, there will be one local read in the node receiving the subset and only one more read in another node that holds a copy. Then this distributed processing can handle read-repair, aggregate data and send the result to the coordinating node.
> Best Regards,
> Le 28/02/2013 10:32, Christian Dahlqvist a écrit :
>> Hi Boris,
>> Apart from not scaling quite as well as straight K/V access, emulating multiGET through MapReduce also has another significant drawback. MapReduce has no concept of quorum reads, and only work on a single copy of the data, which can be thought of basically as a read with R=1 that does not trigger read-repair. It is therefore possible that it can give inconsistent or incorrect results if all replicas do not have the same data. It is worth noting that MapReduce was designed as a way to efficiently spread compute work across the cluster, and re-appropriating it for use with data collection is not its designed purpose.
>> The recommended way to implement efficient multiget is to perform normal GET operations in parallel. If you are retrieving 20 objects, you don't necessarily need to do all 20 GETs in parallel, but could set it up to use perhaps 3 or 4 connections. If you then pair this with a connection pool that can grow and shrink in size (perhaps between a minimum and a maximum value) as load requires, you should be able to retrieve the objects in a reasonable time without overloading the cluster.
>> Best regards,
>> On 27 Feb 2013, at 02:18, Boris Okner <boris.okner at gmail.com> wrote:
>>> Thanks Christian,
>>> The problem I'm trying to solve is to find the way to retrieve values for limited number of keys with the best possible latency (or maybe with decent latency which is balanced with decent throughput). Let's say we have keys stored in some cache
>>> on top of Riak, and want to retrieve values, 20 at the time, to be able to implement pagination. Another alternative to mapreduce would to send multiple asynchronous gets, but then we'd have to worry about connection pool being exhausted if there's too many such "page" requests. So what would be the proper way to deal with the situation when we need to emulate multiple key retrieval?
>>> On Tue, Feb 26, 2013 at 1:57 AM, Christian Dahlqvist <christian at basho.com> wrote:
>>> Hi Boris,
>>> MapReduce is a very flexible and powerful way of querying Riak and allows processing to be performed locally where the data resides, which allows for efficient processing of larger data sets. A result of this is that every mapreduce job requires a covering set of vnodes (all vnodes that hold the data required for processing) to participate, meaning that it puts considerable more load on the system compared to straight K/V access and therefore does not scale quite as well. It is primarily designed for batch type processing over reasonably large amounts of data and scales well with increased data volumes as new nodes are added. We do however usually not recommended using it as an interface for realtime queries where low and predictable latencies are required and the concurrency level, and therefore load level on the cluster, can not be controlled.
>>> I am not sure I understand what you mean by the performance degrading with the number of nodes, unless you are strictly measuring latency rather than throughput. As the number of nodes increase, it gets more and more likely that multiple physical nodes will be involved in the job, which will add to the amount of communication and coordination required between the nodes, thereby increasing latency. Could you please explain in more detail what you are trying to achieve?
>>> Best regards,
>>> On 25 Feb 2013, at 16:41, Boris Okner <boris.okner at gmail.com> wrote:
>>>> I'm experimenting with 2 Riak 1.3.0 nodes (both are "bare metal"), and it looks like mapreduce performs better when one of the nodes is down. The mapreduce requests are running on 20-key blocks. So am I doing something wrong, or is it an expected behaviour, i.e. mapreduce degrades with the the number of nodes increased? If the former, could
>>>> you give me some pointers on how to set up it to get advantage of multiple nodes?
>>>> Thanks in advance for your help,
>>>> riak-users mailing list
>>>> riak-users at lists.basho.com
>>> riak-users mailing list
>>> riak-users at lists.basho.com
>> riak-users mailing list
>> riak-users at lists.basho.com
-------------- next part --------------
An HTML attachment was scrubbed...
More information about the riak-users