[CRDT_OP-in-CommitHook]

DeadZen deadzen at deadzen.com
Thu Mar 2 21:54:21 EST 2017


Nice to hear that level of fidelity

On Thu, Mar 2, 2017 at 9:23 PM, 李明 <lmlmlmlalala at gmail.com> wrote:
> Thanks a lot Russell,
>     The problem was fixed now,  i am trying to do a testing about how fast
> it could be using external and internal API.
>
> 2017-03-02 18:05 GMT+08:00 <Russell.Brown at bet365.com>:
>>
>> Hi
>>
>> With regard to creating a serial actor,  that is what vnodes are.
>>
>>
>>
>> For your hook, if you need the context, then keep the C:get, but for the
>> PUT just send the crdt_op, almost exactly as per
>> https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162.
>>
>>
>>
>> O = riak_kv_crdt:new({BType, B}, Key, Mod),
>>
>> CrdtOp = #crdt_op{mod=Mod, op=Op, ctx=Ctx},
>>
>> Options = [{crdt_op, CrdtOp},
>>
>>                {retry_put_coordinator_failure, false}],
>>
>> Resp =  C:put(O, Options),
>>
>>
>>
>>
>>
>> Where Mod is riak_dt_orswot, should be enough for you to imitate a CRDT
>> operation API call from your hook. DO NOT update the fetched CRDT, let
>> Riak’s vnode do that.
>>
>>
>>
>> Let Riak handle the serial actor bit. You just send the crdt_op populated
>> with the operation and context.
>>
>>
>>
>> Like I said in my last email, IF folk want to use CRDTs from hooks then
>> there should be an internal API that enables it. If you create an issue for
>> that on github I’d quite enjoy writing the code and raising a PR. I no
>> longer work at Basho, but since Riak is open source and I’m very fond of the
>> CRDT feature, I’m sure they’d appreciate the contribution.
>>
>>
>>
>> Cheers
>>
>>
>>
>> Russell
>>
>>
>>
>>
>>
>>
>>
>> From: riak-users [mailto:riak-users-bounces at lists.basho.com] On Behalf Of
>> ??
>> Sent: 02 March 2017 09:50
>> To: Russell Brown
>> Cc: riak-users
>> Subject: Re: [CRDT_OP-in-CommitHook]
>>
>>
>>
>> Thanks Russell,
>>
>>     Yes, it was a big trouble using the internal api, especially with very
>> few knowledge about erlang.
>>
>> Have read the riak_kv_pb_crdt, but it confused me a lot.  How to define a
>> serial actor and is there a simple example creating an `riak_client:put`
>> option object?
>>
>>
>>
>> The reason why i need to use the internal API is i believed it will bring
>> less performance loss than trying to create a new client which may be more
>> convenient, we are running a very heavy riak cluster and i want to keep the
>> hook working lightly.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 2017-03-02 16:32 GMT+08:00 Russell Brown <russell.brown at mac.com>:
>>
>> Hi,
>>
>> You’re using internal details of the CRDT implementation, I’m not sure
>> that is such a great idea. You always have your `Context` set to `undefined`
>> but if your ops are all adds that shouldn’t matter in this case.
>>
>> The issue is that you’re calling `riak_kv_crdt:update` that needs to be
>> called from within the vnode. The `ThisNode` value is not a serial actor, so
>> you may have many concurrent updates with the actor `ThisNode`, and that’s
>> not how to do it. It is crucial that the actor updating the CRDT acts in
>> serial issuing an increasing count of events. Thats why we put the CRDT code
>> inside Riak, inside the vnode.
>>
>> You’re doing neither one thing nor the other, in that your using a
>> datatype bucket but not the datatype API (it sends Ops from the client,
>> you’re doing read/modify/write.)
>>
>> I can see the issue here is that the API is external only, if you _must_
>> use the internal API have a look at
>> https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl and
>> you see that the CRDT_OP is all that is sent by the client.
>>
>> https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162
>>
>> Those put options matter too, especially for counters, less so for sets.
>>
>> I guess it would be great if riak_client had some internal API functions
>> that made this easier to do from a hook. If you open an issue on
>> github.com/basho/riak_kv I can look into that and make a PR.
>>
>> Hope that helps
>>
>> Russell
>>
>>
>> On 1 Mar 2017, at 09:30, 李明 <lmlmlmlalala at gmail.com> wrote:
>>
>> > Hi
>> >    I am new to erlang and riak.  I started to use riak as a kv store
>> > couple of months ago. Now i want to implement a commit hook to riak so that
>> > riak could help me to make some statistics.
>> > i read some docs and write a pre-hook scripts, which will fetch the
>> > object key and store it into a set.
>> >    This hook works fine if there is only one client write to riak, but
>> > if i increase the connection to riak writing, i found it lost some elements
>> > in the set. Looks like the crdt_op did not do the merge operation.And there
>> > is no obvious error in the log files.
>> >
>> >    Could someone help me to finger out what happened or what i has
>> > missed.
>> >
>> > i am using the riak 2.1.3
>> >
>> > Thanks all!
>> >
>> >
>> > Here is the hook scripts:
>> >
>> >
>> > ------------------------------------------------------------------------------------------------------
>> >
>> > -module(myhook).
>> > -export([pretest/1]).
>> >
>> > now_to_local_string({MegaSecs, Secs, MicroSecs}) ->
>> >     LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}),
>> >     {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime,
>> >     TimeStr =
>> > lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w",
>> >                 [Year, Month, Day, Hour, Minute])),
>> >     TimeStr.
>> >
>> > is_deleted(Object)->
>> >     case
>> > dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of
>> >         {ok,_} ->
>> >             true;
>> >         _ ->
>> >             false
>> >     end.
>> >
>> > pretest(Object) ->
>> >     % timer:sleep(10000),
>> >     try
>> >       ObjBucket = riak_object:bucket(Object),
>> >   %           riak_object:bucket(Obj).
>> >               % {<<"cn-archive">>,<<"local-test">>}
>> >
>> >               Bucket = element(2, ObjBucket),
>> >               BucketType = element(1, ObjBucket),
>> >
>> >               ObjKey = riak_object:key(Object),
>> >               % Key = binary_to_list(ObjKey),
>> >               % ObjData = riak_object:get_value(Object),
>> >               % Msg = binary_to_list(ObjData),
>> >           CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b,
>> > Bucket}, {k, ObjKey}, {t, BucketType}]})),
>> >
>> >           case is_deleted(Object) of
>> >               true ->
>> >                       KeyPrefix = "delete";
>> >                       _ ->
>> >                               KeyPrefix = "update"
>> >               end,
>> >
>> >               CurMin = now_to_local_string(os:timestamp()),
>> >           IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin,
>> > KeyPrefix])),
>> >
>> >           %% Get a riak client
>> >       {ok, C} = riak:local_client(),
>> >       % get node obj
>> >               ThisNode = atom_to_binary(node(), latin1),
>> >
>> >               % get index obj and set context
>> >               BType = <<"archive">>,
>> >               B = <<"local-test">>,
>> >
>> >               {SetObj, Context} = case C:get({BType, B}, IndexKey) of
>> >                   {error, notfound} ->
>> >                       ThisSetObj = riak_kv_crdt:new({BType, B},
>> > IndexKey, riak_dt_orswot),
>> >                       {ThisSetObj, undefined};
>> >                   {ok, ThisSetObj} ->
>> >                       % The datatype update requires the context if the
>> > value exists
>> >                       {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj,
>> > riak_dt_orswot),
>> >                       {ThisSetObj, Ctx}
>> >               end,
>> >
>> >               UpdateIndex = [{add, CommitItem}],
>> >               % UpdateOp = {crdt_op, riak_dt_orswot, {update,
>> > UpdateIndex}, Context},
>> >               UpdateOp = {crdt_op, riak_dt_orswot, {update,
>> > UpdateIndex}, undefined},
>> >               NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp),
>> >
>> >               error_logger:info_msg("Updating index for ~s,to set ~s~n",
>> > [binary:bin_to_list(CommitItem), IndexKey]),
>> >
>> >               C:put(NewObj),
>> >               Object
>> >     catch
>> >       error:Error ->
>> >                       {fail,
>> > lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))}
>> >       end.
>> >
>> >
>> > ------------------------------------------------------------------------------------------------------
>> >
>> >
>> > This is the set bucket props
>> >
>> > ------------------------------------------------------------------------------------------------------
>> >
>> > active: true
>> > allow_mult: true
>> > basic_quorum: false
>> > big_vclock: 50
>> > chash_keyfun: {riak_core_util,chash_std_keyfun}
>> > claimant: 'riak at 192.168.100.2'
>> > datatype: set
>> > dvv_enabled: true
>> > dw: quorum
>> > last_write_wins: false
>> > linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun}
>> > n_val: 3
>> > notfound_ok: true
>> > old_vclock: 86400
>> > postcommit: []
>> > pr: 0
>> > precommit: []
>> > pw: 0
>> > r: quorum
>> > rw: quorum
>> > small_vclock: 50
>> > w: quorum
>> > young_vclock: 20
>> >
>> >
>>
>> > _______________________________________________
>> > riak-users mailing list
>> > riak-users at lists.basho.com
>> > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>
>>
>>
>> This email and any files transmitted with it are confidential and contain
>> information which may be privileged or confidential and are intended solely
>> to be for the use of the individual(s) or entity to which they are
>> addressed. If you are not the intended recipient be aware that any
>> disclosure, copying, distribution or use of the contents of this information
>> is strictly prohibited and may be illegal. If you have received this email
>> in error, please notify us by telephone or email immediately and delete it
>> from your system. Activity and use of our email system is monitored to
>> secure its effective operation and for other lawful business purposes.
>> Communications using this system will also be monitored and may be recorded
>> to secure effective operation and for other lawful business purposes.
>> Internet emails are not necessarily secure. We do not accept responsibility
>> for changes made to this message after it was sent. You are advised to scan
>> this message for viruses and we cannot accept liability for any loss or
>> damage which may be caused as a result of any computer virus.
>>
>> This email is sent by a bet365 group entity. The bet365 group includes the
>> following entities: Hillside (Shared Services) Limited (registration no.
>> 3958393), Hillside (Spain New Media) Plc (registration no. 07833226), bet365
>> Group Limited (registration no. 4241161), Hillside (Technology) Limited
>> (registration no. 8273456), Hillside (Media Services) Limited (registration
>> no. 9171710), Hillside (Trader Services) Limited (registration no. 9171598)
>> each registered in England and Wales with a registered office address at
>> bet365 House, Media Way, Stoke-on-Trent, ST1 5SZ, United Kingdom; Hillside
>> (Gibraltar) Limited (registration no. 97927), Hillside (Sports) GP Limited
>> (registration no. 111829) and Hillside (Gaming) GP Limited (registered no.
>> 111830) each registered in Gibraltar with a registered office address at
>> Unit 1.1, First Floor, Waterport Place, 2 Europort Avenue, Gibraltar;
>> Hillside (UK Sports) LP (registration no. 117), Hillside (Sports) LP
>> (registration no. 118), Hillside (International Sports) LP (registration no.
>> 119), Hillside (Gaming) LP (registration no. 120) and Hillside
>> (International Gaming) LP (registration no. 121) each registered in
>> Gibraltar with a principal place of business at Unit 1.1, First Floor,
>> Waterport Place, 2 Europort Avenue, Gibraltar; Hillside España Leisure S.A
>> (CIF no. A86340270) registered in Spain with a registered office address at
>> C/ Conde de Aranda nº20, 2º, 28001 Madrid, Spain; Hillside (Australia New
>> Media) Pty Limited (registration no. 148 920 665) registered in Australia
>> with a registered office address at Level 4, 90 Arthur Street, North Sydney,
>> NSW 2060, Australia; Hillside (New Media Malta) Limited, (registration no
>> c.66039) registered in Malta with a registered office address at Office
>> 1/2373, Level G, Quantum House, 75 Abate Rigord Street, Ta’ Xbiex XBX 1120,
>> Malta and Hillside (New Media Cyprus) Limited, (registration no. HE 361612)
>> registered in Cyprus with a registered office address at Omrania Centre,
>> 313, 28th October Avenue, 3105 Limassol, Cyprus. Hillside (Shared Services)
>> Limited, Hillside (Spain New Media) Plc and Hillside (New Media Malta)
>> Limited also have places of business at Unit 1.1, First Floor, Waterport
>> Place, 2 Europort Avenue, Gibraltar. For residents of Greece, this email is
>> sent on behalf of B2B Gaming Services (Malta) Limited (registration number
>> C41936) organised under the laws of Malta with a registered office at
>> Apartment 21, Suite 41, Charles Court, St. Luke's Road, Pietà, Malta.
>
>
>
> _______________________________________________
> riak-users mailing list
> riak-users at lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>




More information about the riak-users mailing list