Using Bucket Data Types slowed insert performance

Mark Schmidt mschmidt at orcawave.net
Wed Oct 21 16:28:51 EDT 2015



Hi Russell,

I think we were getting ahead of ourselves with our use of CRDTs within Riak
for this case, we're still very new to the platform so we're still climbing
the learning curve. 

We'll be maintaining a set schema of registers, so your comments below makes
a lot of sense. Based off of everyone's feedback and further research by our
dev's, we'll be dropping our use of Riak data types for this case.

However... From a general operations standpoint, I'm still curious why we
saw a drastic throughput performance hit when incorporating the use of
CRDTs. We're setting up some additional monitoring points and various
comparison tests to see if we can pinpoint the bottleneck.

I'll update everyone with our test scenarios and findings once we have
something to share.

Thank you all,

- Mark Schmidt



-----Original Message-----
From: Russell Brown [mailto:russell.brown at me.com] 
Sent: Wednesday, October 21, 2015 12:39 AM
To: Dennis Nicolay <dnicolay at orcawave.net>
Cc: Christopher Mancini <cmancini at basho.com>; Mark Schmidt
<mschmidt at orcawave.net>; Alexander Sicular <siculars at gmail.com>; riak-users
<riak-users at lists.basho.com>
Subject: Re: Using Bucket Data Types slowed insert performance

Honestly, right now we need to work on optimising the Map. We do have a
smaller/faster map in a branch that we're working on shipping soon, as well
as other optimisation planned.

Does your use case have you adding and removing registers, or is this
basically a set schema of registers per key? If you're not
removing/re-adding registers, I would use a CRDT not in Riak, but in your
application.

CRDTs in Riak make sense for causal data types: where the actor management
is onerous for the client. What you're modelling using the map looks like
Last Write Wins element Set. This is a pretty simple CRDT to make in your
own programming language/application, and you can write this data type
yourself, and simply store the binary representation of it in riak, using
riak's siblings (allow_mult=true.)  hen your application gets siblings
values, simply run you application code's merge function.

There are details of the last write elements set here
https://github.com/soundcloud/roshi, but all you need to is store a pair
(element, TS) for each member in the set. If you're going to store removing
registers it gets more complex.are you?

Cheers

Russell

On 20 Oct 2015, at 20:25, Dennis Nicolay <dnicolay at orcawave.net> wrote:

>  
>   ResultObject cdr;
>                     while (queued.TryDequeue(out cdr))
>                     {
>                         long beforeProcessing = DateTime.Now.Ticks;
>                         UpdateMap.Builder builder = BuildMapObject(bucket,
cdr);
>                         UpdateMap cmd = builder.Build();
>                         RiakResult rslt = client.Execute(cmd);
>  
>  
>  
>  
> private static UpdateMap.Builder BuildMapObject(string bucketname,
ResultObject cdr )
>         {
>          
>             var builder = new UpdateMap.Builder()
>                .WithBucketType("maps")
>                .WithBucket(bucketname)
>                .WithKey(cdr.CdrKey);      
>             var mapOperation = new UpdateMap.MapOperation();
>             mapOperation.SetRegister("FileTimeStamp",
cdr.CdrValue.FileTimeStamp.ToString());
>             mapOperation.SetRegister("AuditId",
cdr.CdrValue.AuditId.ToString());
>             mapOperation.SetRegister("CdrId",
cdr.CdrValue.CdrId.ToString());
>             mapOperation.SetRegister("IsBillable",
cdr.CdrValue.IsBillable.ToString());
>             mapOperation.SetRegister("SwitchId",
cdr.CdrValue.SwitchId.ToString());
>             mapOperation.SetRegister("SwitchDescription",
cdr.CdrValue.SwitchDescription.ToString());
>             mapOperation.SetRegister("SequenceNumber",
cdr.CdrValue.SequenceNumber.ToString());
>             mapOperation.SetRegister("CallDirection",
cdr.CdrValue.CallDirection.ToString());
>             mapOperation.SetRegister("CallTypeId",
cdr.CdrValue.CallTypeId.ToString());
>             mapOperation.SetRegister("Partition",
cdr.CdrValue.Partition.ToString());
>             mapOperation.SetRegister("CustomerTrunkId",
cdr.CdrValue.CustomerTrunkId.ToString());
>             mapOperation.SetRegister("OrigIpAddress",
cdr.CdrValue.OrigIpAddress.ToString());
>             mapOperation.SetRegister("OrigPort",
cdr.CdrValue.OrigPort.ToString());
>             mapOperation.SetRegister("SupplierTrunkId",
cdr.CdrValue.SupplierTrunkId.ToString());
>             mapOperation.SetRegister("TermIpAddress",
cdr.CdrValue.TermIpAddress.ToString());
>             mapOperation.SetRegister("TermPort",
cdr.CdrValue.TermPort.ToString());
>             mapOperation.SetRegister("Ani", cdr.CdrValue.Ani.ToString());
>             mapOperation.SetRegister("OutpulseNumber",
cdr.CdrValue.OutpulseNumber.ToString());
>             mapOperation.SetRegister("SubscriberNumber",
cdr.CdrValue.SupplierTrunkId.ToString());
>             mapOperation.SetRegister("CallingNoa",
cdr.CdrValue.CallingNoa.ToString());
>             mapOperation.SetRegister("DialedNoa",
cdr.CdrValue.DialedNoa.ToString());
>             mapOperation.SetRegister("OutpulseNoa",
cdr.CdrValue.OutpulseNumber.ToString());
>             mapOperation.SetRegister("TreatmentCode",
cdr.CdrValue.TreatmentCode.ToString());
>             mapOperation.SetRegister("CompletionCode",
cdr.CdrValue.CompletionCode.ToString());
>             mapOperation.SetRegister("CustomerName",
cdr.CdrValue.CustomerName.ToString());
>             mapOperation.SetRegister("CustId",
cdr.CdrValue.CustId.ToString());
>             mapOperation.SetRegister("CustContractId",
cdr.CdrValue.CustContractId.ToString());
>             mapOperation.SetRegister("CustCountryCode",
cdr.CdrValue.CustCountryCode.ToString());
>             mapOperation.SetRegister("CustDuration",
cdr.CdrValue.CustDuration.ToString());
>             mapOperation.SetRegister("Price",
cdr.CdrValue.Price.ToString());
>             mapOperation.SetRegister("BasePrice",
cdr.CdrValue.BasePrice.ToString());
>             mapOperation.SetRegister("BillingDestinationName",
cdr.CdrValue.BillingDestinationName.ToString());
>             mapOperation.SetRegister("BillingGroupId",
cdr.CdrValue.BillingGroupId.ToString());
>             mapOperation.SetRegister("SupplierName",
cdr.CdrValue.SupplierName.ToString());
>             mapOperation.SetRegister("SuppId",
cdr.CdrValue.SuppId.ToString());
>             mapOperation.SetRegister("SuppContractId",
cdr.CdrValue.SuppContractId.ToString());
>             mapOperation.SetRegister("SuppCountryCode",
cdr.CdrValue.SuppCountryCode.ToString());
>             mapOperation.SetRegister("SuppDuration",
cdr.CdrValue.SuppDuration.ToString());
>             mapOperation.SetRegister("Cost",
cdr.CdrValue.Cost.ToString());
>             mapOperation.SetRegister("BaseCost",
cdr.CdrValue.BaseCost.ToString());
>             mapOperation.SetRegister("RoutingDestinationName",
cdr.CdrValue.RoutingDestinationName.ToString());
>             mapOperation.SetRegister("RoutingGroupId",
cdr.CdrValue.RoutingGroupId.ToString());
>             mapOperation.SetRegister("RouteToCountryCode",
cdr.CdrValue.RouteToCountryCode.ToString());
>             mapOperation.SetRegister("Pdd", cdr.CdrValue.Pdd.ToString());
>             mapOperation.SetRegister("RealDuration",
cdr.CdrValue.RealDuration.ToString());
>             mapOperation.SetRegister("StartTime",
cdr.CdrValue.StartTime.ToString());
>             mapOperation.SetRegister("EndTime",
cdr.CdrValue.EndTime.ToString());
>             mapOperation.SetRegister("NumberCalled",
cdr.CdrValue.NumberCalled.ToString());
>             mapOperation.SetRegister("CallingLataOcn",
cdr.CdrValue.CallingLataOcn.ToString());
>             mapOperation.SetRegister("DialedLataOcn",
cdr.CdrValue.DialedLataOcn.ToString());
>             mapOperation.SetRegister("LrnLataOcn",
cdr.CdrValue.LrnLataOcn.ToString());
>             mapOperation.SetRegister("CustomerPrefix",
cdr.CdrValue.CustomerPrefix.ToString());
>             mapOperation.SetRegister("SupplierPrefix",
cdr.CdrValue.SupplierPrefix.ToString());
>             mapOperation.SetRegister("OriginationCountryCode",
cdr.CdrValue.OriginationCountryCode.ToString());
>             mapOperation.SetRegister("OriginationCost",
cdr.CdrValue.OriginationCost.ToString());
>             mapOperation.SetRegister("FixedPricePerCall",
cdr.CdrValue.FixedPricePerCall.ToString());
>             mapOperation.SetRegister("FixedCostPerCall",
cdr.CdrValue.FixedCostPerCall.ToString());
>             mapOperation.SetRegister("InvoiceId",
cdr.CdrValue.InvoiceId.ToString());
>             mapOperation.SetRegister("BusinessId",
cdr.CdrValue.BusinessId.ToString());
>  
>             builder.WithMapOperation(mapOperation);
>             return builder;
>         }
>  
>  
> From: Christopher Mancini [mailto:cmancini at basho.com] 
> Sent: Tuesday, October 20, 2015 11:52 AM
> To: Mark Schmidt; Alexander Sicular; Dennis Nicolay
> Cc: riak-users at lists.basho.com
> Subject: Re: Using Bucket Data Types slowed insert performance
>  
> Hi Mark / Dennis,
> 
> Can you provide the snippet of the code that puts a 5k record onto Riak as
a map?
> 
> Chris
>  
> On Tue, Oct 20, 2015 at 11:30 AM Mark Schmidt <mschmidt at orcawave.net>
wrote:
> Hi folks, sorry for the confusion.
>  
> Our scenario is as follows:
>  
> We have a 6 node development cluster running on its own network segment
using HAProxy to facilitate load-balancing across the nodes. A single
Riak-dot-NET client service is performing the insert operations from
dedicated hardware located within the same network segment. We have basic
network throughput capabilities of 100 Mbit with an average speed achievable
of 75 Mbit.
>  
> The data we are attempting to insert is composed of phone call record
receipts from telephone carriers. These records are batched and written to a
flat file for incorporation into our reporting engine. 1) Our Riak client
process takes a flat file (In this case, a 40MB collection of records, each
record being approximately 5k in size) and parses the entire file so each
record can be added to a local .NET queue.
> 2) Once the entire file has been parsed and each record loaded into the
local queue, 20 threads are spawned and connections are opened to our Riak
nodes via the HAProxy.
> 3) Each thread will pull a 5k record from the queue on a first come first
served basis and perform a put to the Riak environment.
>  
> When first testing our client insert process, we were pushing the 5K
records as whole strings into the Riak environment. Network throughput
topped out at around 80 Mbits with a total load time of 90 seconds for 149k
records. When the client process was modified (same queuing and de-queuing
methods) so that a map datatype bucket would be created and keys stored as
registers, we saw network throughput drop to around 10 Mbit with total
upload time increase to around 270 seconds for the 149k records.  
>  
> It appears as though we've either encountered a potential bottleneck
unrelated to network throughput, or we're just seeing an expected processing
penalty for our use of Riak datatypes. Please note, we're configuring Zabbix
so we can monitor disk IO on each node as processor and memory resources
don't appear to be the culprit either.
>  
> If the reduction in processing speed is a natural consequence to utilizing
Riak data types, is the inter-node network the optimum place to increase
resources? Our eventual datacenter implementation will support speeds of
over 40 Gbit for inter-node communication. We're just trying to identify
which levers from an operational standpoint we can throw to boost
performance, or if our client implementation is suspect.
>  
> You bring up some excellent points regarding our use of CRDTs. In our
case, the call data records are mutable as they are subject to changes by
phone carriers for billing error corrections, incorrect data and a host of
other reasons. We may be better served by treating the records as immutable
and performing wide scale record removal and "reprocessing" in the event
changes to existing records are received/requested.
>  
> Thank you,
>  
> Mark Schmidt
>  
> From: Alexander Sicular [mailto:siculars at gmail.com] 
> Sent: Tuesday, October 20, 2015 10:55 AM
> To: Dennis Nicolay <dnicolay at orcawave.net>
> Cc: Christopher Mancini <cmancini at basho.com>; riak-users at lists.basho.com;
Mark Schmidt <mschmidt at orcawave.net>
> 
> Subject: Re: Using Bucket Data Types slowed insert performance
>  
> Let's talk about Riak data types for a moment. Riak data types are
collectively implementations of what academia refer to as CRDT's (convergent
or conflict free replicated data types.) The key benefit a CRDT offers, over
a traditional KV by contrast, is in automatic conflict resolution. The
various CRDT's provided in Riak have specific conflict resolution
strategies. This does not come for free. There is a computational cost
associated with CRDT's. If your use case requires automated conflict
resolution strategies than CRDT's are a good fit. Internally CRDT's rely on
vector clocks (see DVV's in the documentation) to resolve conflict. 
>  
> Considering your ETL use case I'm going to presume that your data is
immutable (I could very well be wrong here.) If your data is immutable I
would consider simply using a KV and not paying the CRDT computational
penalty (and possibly even the write once bucket.) The CRDT penalty you pay
is obviously subjective to your use case, configuration, hw deployment etc. 
>  
> Hope that helps!
> -Alexander 
>  
> 
> @siculars
> http://siculars.posthaven.com
>  
> Sent from my iRotaryPhone
> 
> On Oct 20, 2015, at 12:39, Dennis Nicolay <dnicolay at orcawave.net> wrote:
> 
> Hi Alexander,
>  
> I'm parsing the file and storing each row with own key in a map datatype
bucket and each column is a register. 
>  
> Thanks,
> Dennis
>  
> From: Alexander Sicular [mailto:siculars at gmail.com] 
> Sent: Tuesday, October 20, 2015 10:34 AM
> To: Dennis Nicolay
> Cc: Christopher Mancini; riak-users at lists.basho.com
> Subject: Re: Using Bucket Data Types slowed insert performance
>  
> Hi Dennis,
>  
> It's a bit unclear what you are trying to do here. Are you 1. uploading
the entire file and saving it to one key with the value being the file? Or
are you 2. parsing the file and storing each row as a register in a map? 
>  
> Either of those approaches are not appropriate in Riak KV. For the first
case I would point you to Riak S2 which is designed to manage large binary
object storage. You can keep the large file as a single addressable entity
and access it via Amazon S3 or Swift protocol. For the second case I would
consider maintaining one key (map) per row in the file and have a register
per column in the row. Or not use Riak data types (maps, sets, registers,
flags and counters) and simply keep each row in the file as a KV in Riak
either as a raw string or as a serialized json string. ETL'ing out of
relational databases and into Riak is a very common use case and often
implemented in the fashion I described. 
>  
> As Chris mentioned, soft upper bound on value size should be 1MB. I say
soft because we won't enforce it although there are settings in the config
that can be changed to enforce it (default 5MB warning, 50MB reject I
believe.) 
> 
> Best,
> Alexander
> 
> 
> 
> @siculars
> http://siculars.posthaven.com
>  
> Sent from my iRotaryPhone
> 
> On Oct 20, 2015, at 10:22, Christopher Mancini <cmancini at basho.com> wrote:
> 
> Hi Dennis,
> 
> I am not the most experienced, but what I do know is that a file that size
causes a great deal of network chatter because it has to handoff that data
to the other nodes in the network and will cause delays in Riak's ability to
send and confirm consistency across the ring. Typically we recommend that
you try to structure your objects to around 1mb or less to ensure consistent
performance. That max object size can vary of course based on your network /
server specs and configuration.
> 
> I hope this helps.
> 
> Chris
>  
> On Tue, Oct 20, 2015 at 8:18 AM Dennis Nicolay <dnicolay at orcawave.net>
wrote:
> Hi,
>  
> I'm using .net RiakClient 2.0 to insert a 44mb delimited file with 139k
rows of data into riak.  I switched to a map bucket data type with
registers.   It is taking about 3 times longer to insert into this bucket vs
non data typed bucket.  Any suggestions?
>  
> Thanks in advance,
> Dennis
> _______________________________________________
> riak-users mailing list
> riak-users at lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
> _______________________________________________
> riak-users mailing list
> riak-users at lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
> _______________________________________________
> riak-users mailing list
> riak-users at lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com





More information about the riak-users mailing list