Russell.Brown at bet365.com Russell.Brown at bet365.com
Thu Mar 2 05:05:34 EST 2017

With regard to creating a serial actor,  that is what vnodes are.

For your hook, if you need the context, then keep the C:get, but for the PUT just send the crdt_op, almost exactly as per https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162.

O = riak_kv_crdt:new({BType, B}, Key, Mod),
CrdtOp = #crdt_op{mod=Mod, op=Op, ctx=Ctx},
Options = [{crdt_op, CrdtOp},
               {retry_put_coordinator_failure, false}],
Resp =  C:put(O, Options),

Where Mod is riak_dt_orswot, should be enough for you to imitate a CRDT operation API call from your hook. DO NOT update the fetched CRDT, let Riak’s vnode do that.

Let Riak handle the serial actor bit. You just send the crdt_op populated with the operation and context.

Like I said in my last email, IF folk want to use CRDTs from hooks then there should be an internal API that enables it. If you create an issue for that on github I’d quite enjoy writing the code and raising a PR. I no longer work at Basho, but since Riak is open source and I’m very fond of the CRDT feature, I’m sure they’d appreciate the contribution.



From: riak-users [mailto:riak-users-bounces at lists.basho.com] On Behalf Of ??
Sent: 02 March 2017 09:50
To: Russell Brown
Cc: riak-users
Subject: Re: [CRDT_OP-in-CommitHook]

Thanks Russell,
    Yes, it was a big trouble using the internal api, especially with very few knowledge about erlang.
Have read the riak_kv_pb_crdt, but it confused me a lot.  How to define a serial actor and is there a simple example creating an `riak_client:put` option object?

The reason why i need to use the internal API is i believed it will bring less performance loss than trying to create a new client which may be more convenient, we are running a very heavy riak cluster and i want to keep the hook working lightly.

2017-03-02 16:32 GMT+08:00 Russell Brown <russell.brown at mac.com<mailto:russell.brown at mac.com>>:

You’re using internal details of the CRDT implementation, I’m not sure that is such a great idea. You always have your `Context` set to `undefined` but if your ops are all adds that shouldn’t matter in this case.

The issue is that you’re calling `riak_kv_crdt:update` that needs to be called from within the vnode. The `ThisNode` value is not a serial actor, so you may have many concurrent updates with the actor `ThisNode`, and that’s not how to do it. It is crucial that the actor updating the CRDT acts in serial issuing an increasing count of events. Thats why we put the CRDT code inside Riak, inside the vnode.

You’re doing neither one thing nor the other, in that your using a datatype bucket but not the datatype API (it sends Ops from the client, you’re doing read/modify/write.)

I can see the issue here is that the API is external only, if you _must_ use the internal API have a look at https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl and you see that the CRDT_OP is all that is sent by the client.


Those put options matter too, especially for counters, less so for sets.

I guess it would be great if riak_client had some internal API functions that made this easier to do from a hook. If you open an issue on github.com/basho/riak_kv<http://github.com/basho/riak_kv> I can look into that and make a PR.

Hope that helps


On 1 Mar 2017, at 09:30, 李明 <lmlmlmlalala at gmail.com<mailto:lmlmlmlalala at gmail.com>> wrote:

> Hi
>    I am new to erlang and riak.  I started to use riak as a kv store couple of months ago. Now i want to implement a commit hook to riak so that riak could help me to make some statistics.
> i read some docs and write a pre-hook scripts, which will fetch the object key and store it into a set.
>    This hook works fine if there is only one client write to riak, but if i increase the connection to riak writing, i found it lost some elements in the set. Looks like the crdt_op did not do the merge operation.And there is no obvious error in the log files.
>    Could someone help me to finger out what happened or what i has missed.
> i am using the riak 2.1.3
> Thanks all!
> Here is the hook scripts:
> ------------------------------------------------------------------------------------------------------
> -module(myhook).
> -export([pretest/1]).
> now_to_local_string({MegaSecs, Secs, MicroSecs}) ->
>     LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}),
>     {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime,
>     TimeStr = lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w",
>                 [Year, Month, Day, Hour, Minute])),
>     TimeStr.
> is_deleted(Object)->
>     case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of
>         {ok,_} ->
>             true;
>         _ ->
>             false
>     end.
> pretest(Object) ->
>     % timer:sleep(10000),
>     try
>       ObjBucket = riak_object:bucket(Object),
>   %           riak_object:bucket(Obj).
>               % {<<"cn-archive">>,<<"local-test">>}
>               Bucket = element(2, ObjBucket),
>               BucketType = element(1, ObjBucket),
>               ObjKey = riak_object:key(Object),
>               % Key = binary_to_list(ObjKey),
>               % ObjData = riak_object:get_value(Object),
>               % Msg = binary_to_list(ObjData),
>           CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, Bucket}, {k, ObjKey}, {t, BucketType}]})),
>           case is_deleted(Object) of
>               true ->
>                       KeyPrefix = "delete";
>                       _ ->
>                               KeyPrefix = "update"
>               end,
>               CurMin = now_to_local_string(os:timestamp()),
>           IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, KeyPrefix])),
>           %% Get a riak client
>       {ok, C} = riak:local_client(),
>       % get node obj
>               ThisNode = atom_to_binary(node(), latin1),
>               % get index obj and set context
>               BType = <<"archive">>,
>               B = <<"local-test">>,
>               {SetObj, Context} = case C:get({BType, B}, IndexKey) of
>                   {error, notfound} ->
>                       ThisSetObj = riak_kv_crdt:new({BType, B}, IndexKey, riak_dt_orswot),
>                       {ThisSetObj, undefined};
>                   {ok, ThisSetObj} ->
>                       % The datatype update requires the context if the value exists
>                       {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, riak_dt_orswot),
>                       {ThisSetObj, Ctx}
>               end,
>               UpdateIndex = [{add, CommitItem}],
>               % UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, Context},
>               UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, undefined},
>               NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp),
>               error_logger:info_msg("Updating index for ~s,to set ~s~n", [binary:bin_to_list(CommitItem), IndexKey]),
>               C:put(NewObj),
>               Object
>     catch
>       error:Error ->
>                       {fail, lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))}
>       end.
> ------------------------------------------------------------------------------------------------------
> This is the set bucket props
> ------------------------------------------------------------------------------------------------------
> active: true
> allow_mult: true
> basic_quorum: false
> big_vclock: 50
> chash_keyfun: {riak_core_util,chash_std_keyfun}
> claimant: 'riak at<mailto:riak at>'
> datatype: set
> dvv_enabled: true
> dw: quorum
> last_write_wins: false
> linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun}
> n_val: 3
> notfound_ok: true
> old_vclock: 86400
> postcommit: []
> pr: 0
> precommit: []
> pw: 0
> r: quorum
> rw: quorum
> small_vclock: 50
> w: quorum
> young_vclock: 20
> _______________________________________________
> riak-users mailing list
> riak-users at lists.basho.com<mailto:riak-users at lists.basho.com>
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

This email and any files transmitted with it are confidential and contain information which may be privileged or confidential and are intended solely to be for the use of the individual(s) or entity to which they are addressed. If you are not the intended recipient be aware that any disclosure, copying, distribution or use of the contents of this information is strictly prohibited and may be illegal. If you have received this email in error, please notify us by telephone or email immediately and delete it from your system. Activity and use of our email system is monitored to secure its effective operation and for other lawful business purposes. Communications using this system will also be monitored and may be recorded to secure effective operation and for other lawful business purposes. Internet emails are not necessarily secure. We do not accept responsibility for changes made to this message after it was sent. You are advised to scan this message for viruses and we cannot accept liability for any loss or damage which may be caused as a result of any computer virus.

This email is sent by a bet365 group entity. The bet365 group includes the following entities: Hillside (Shared Services) Limited (registration no. 3958393), Hillside (Spain New Media) Plc (registration no. 07833226), bet365 Group Limited (registration no. 4241161), Hillside (Technology) Limited (registration no. 8273456), Hillside (Media Services) Limited (registration no. 9171710), Hillside (Trader Services) Limited (registration no. 9171598) each registered in England and Wales with a registered office address at bet365 House, Media Way, Stoke-on-Trent, ST1 5SZ, United Kingdom; Hillside (Gibraltar) Limited (registration no. 97927), Hillside (Sports) GP Limited (registration no. 111829) and Hillside (Gaming) GP Limited (registered no. 111830) each registered in Gibraltar with a registered office address at Unit 1.1, First Floor, Waterport Place, 2 Europort Avenue, Gibraltar; Hillside (UK Sports) LP (registration no. 117), Hillside (Sports) LP (registration no. 118), Hillside (International Sports) LP (registration no. 119), Hillside (Gaming) LP (registration no. 120) and Hillside (International Gaming) LP (registration no. 121) each registered in Gibraltar with a principal place of business at Unit 1.1, First Floor, Waterport Place, 2 Europort Avenue, Gibraltar; Hillside España Leisure S.A (CIF no. A86340270) registered in Spain with a registered office address at C/ Conde de Aranda nº20, 2º, 28001 Madrid, Spain; Hillside (Australia New Media) Pty Limited (registration no. 148 920 665) registered in Australia with a registered office address at Level 4, 90 Arthur Street, North Sydney, NSW 2060, Australia; Hillside (New Media Malta) Limited, (registration no c.66039) registered in Malta with a registered office address at Office 1/2373, Level G, Quantum House, 75 Abate Rigord Street, Ta’ Xbiex XBX 1120, Malta and Hillside (New Media Cyprus) Limited, (registration no. HE 361612) registered in Cyprus with a registered office address at Omrania Centre, 313, 28th October Avenue, 3105 Limassol, Cyprus. Hillside (Shared Services) Limited, Hillside (Spain New Media) Plc and Hillside (New Media Malta) Limited also have places of business at Unit 1.1, First Floor, Waterport Place, 2 Europort Avenue, Gibraltar. For residents of Greece, this email is sent on behalf of B2B Gaming Services (Malta) Limited (registration number C41936) organised under the laws of Malta with a registered office at Apartment 21, Suite 41, Charles Court, St. Luke's Road, Pietà, Malta.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.basho.com/pipermail/riak-users_lists.basho.com/attachments/20170302/b7e0fbf1/attachment-0002.html>

More information about the riak-users mailing list