[CRDT_OP-in-CommitHook]

李明 lmlmlmlalala at gmail.com
Thu Mar 2 04:49:39 EST 2017


Thanks Russell,
    Yes, it was a big trouble using the internal api, especially with very
few knowledge about erlang.
Have read the riak_kv_pb_crdt, but it confused me a lot.  How to
define a serial
actor and is there a simple example creating an `riak_client:put` option
object?

The reason why i need to use the internal API is i believed it will bring
less performance loss than trying to create a new client which may be
more convenient,
we are running a very heavy riak cluster and i want to keep the hook
working lightly.




2017-03-02 16:32 GMT+08:00 Russell Brown <russell.brown at mac.com>:

> Hi,
>
> You’re using internal details of the CRDT implementation, I’m not sure
> that is such a great idea. You always have your `Context` set to
> `undefined` but if your ops are all adds that shouldn’t matter in this case.
>
> The issue is that you’re calling `riak_kv_crdt:update` that needs to be
> called from within the vnode. The `ThisNode` value is not a serial actor,
> so you may have many concurrent updates with the actor `ThisNode`, and
> that’s not how to do it. It is crucial that the actor updating the CRDT
> acts in serial issuing an increasing count of events. Thats why we put the
> CRDT code inside Riak, inside the vnode.
>
> You’re doing neither one thing nor the other, in that your using a
> datatype bucket but not the datatype API (it sends Ops from the client,
> you’re doing read/modify/write.)
>
> I can see the issue here is that the API is external only, if you _must_
> use the internal API have a look at https://github.com/basho/riak_
> kv/blob/develop/src/riak_kv_pb_crdt.erl and you see that the CRDT_OP is
> all that is sent by the client.
>
> https://github.com/basho/riak_kv/blob/develop/src/riak_kv_pb_crdt.erl#L162
>
> Those put options matter too, especially for counters, less so for sets.
>
> I guess it would be great if riak_client had some internal API functions
> that made this easier to do from a hook. If you open an issue on
> github.com/basho/riak_kv I can look into that and make a PR.
>
> Hope that helps
>
> Russell
>
> On 1 Mar 2017, at 09:30, 李明 <lmlmlmlalala at gmail.com> wrote:
>
> > Hi
> >    I am new to erlang and riak.  I started to use riak as a kv store
> couple of months ago. Now i want to implement a commit hook to riak so that
> riak could help me to make some statistics.
> > i read some docs and write a pre-hook scripts, which will fetch the
> object key and store it into a set.
> >    This hook works fine if there is only one client write to riak, but
> if i increase the connection to riak writing, i found it lost some elements
> in the set. Looks like the crdt_op did not do the merge operation.And there
> is no obvious error in the log files.
> >
> >    Could someone help me to finger out what happened or what i has
> missed.
> >
> > i am using the riak 2.1.3
> >
> > Thanks all!
> >
> >
> > Here is the hook scripts:
> >
> > ------------------------------------------------------------
> ------------------------------------------
> >
> > -module(myhook).
> > -export([pretest/1]).
> >
> > now_to_local_string({MegaSecs, Secs, MicroSecs}) ->
> >     LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}),
> >     {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime,
> >     TimeStr = lists:flatten(io_lib:format("~
> 4..0w~2..0w~2..0w~2..0w~2..0w",
> >                 [Year, Month, Day, Hour, Minute])),
> >     TimeStr.
> >
> > is_deleted(Object)->
> >     case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object))
> of
> >         {ok,_} ->
> >             true;
> >         _ ->
> >             false
> >     end.
> >
> > pretest(Object) ->
> >     % timer:sleep(10000),
> >     try
> >       ObjBucket = riak_object:bucket(Object),
> >   %           riak_object:bucket(Obj).
> >               % {<<"cn-archive">>,<<"local-test">>}
> >
> >               Bucket = element(2, ObjBucket),
> >               BucketType = element(1, ObjBucket),
> >
> >               ObjKey = riak_object:key(Object),
> >               % Key = binary_to_list(ObjKey),
> >               % ObjData = riak_object:get_value(Object),
> >               % Msg = binary_to_list(ObjData),
> >           CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b,
> Bucket}, {k, ObjKey}, {t, BucketType}]})),
> >
> >           case is_deleted(Object) of
> >               true ->
> >                       KeyPrefix = "delete";
> >                       _ ->
> >                               KeyPrefix = "update"
> >               end,
> >
> >               CurMin = now_to_local_string(os:timestamp()),
> >           IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin,
> KeyPrefix])),
> >
> >           %% Get a riak client
> >       {ok, C} = riak:local_client(),
> >       % get node obj
> >               ThisNode = atom_to_binary(node(), latin1),
> >
> >               % get index obj and set context
> >               BType = <<"archive">>,
> >               B = <<"local-test">>,
> >
> >               {SetObj, Context} = case C:get({BType, B}, IndexKey) of
> >                   {error, notfound} ->
> >                       ThisSetObj = riak_kv_crdt:new({BType, B},
> IndexKey, riak_dt_orswot),
> >                       {ThisSetObj, undefined};
> >                   {ok, ThisSetObj} ->
> >                       % The datatype update requires the context if the
> value exists
> >                       {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj,
> riak_dt_orswot),
> >                       {ThisSetObj, Ctx}
> >               end,
> >
> >               UpdateIndex = [{add, CommitItem}],
> >               % UpdateOp = {crdt_op, riak_dt_orswot, {update,
> UpdateIndex}, Context},
> >               UpdateOp = {crdt_op, riak_dt_orswot, {update,
> UpdateIndex}, undefined},
> >               NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp),
> >
> >               error_logger:info_msg("Updating index for ~s,to set
> ~s~n", [binary:bin_to_list(CommitItem), IndexKey]),
> >
> >               C:put(NewObj),
> >               Object
> >     catch
> >       error:Error ->
> >                       {fail, lists:flatten(io_lib:format("[
> PREHOOKEXCEPTION]~p",[Error]))}
> >       end.
> >
> > ------------------------------------------------------------
> ------------------------------------------
> >
> >
> > This is the set bucket props
> > ------------------------------------------------------------
> ------------------------------------------
> >
> > active: true
> > allow_mult: true
> > basic_quorum: false
> > big_vclock: 50
> > chash_keyfun: {riak_core_util,chash_std_keyfun}
> > claimant: 'riak at 192.168.100.2'
> > datatype: set
> > dvv_enabled: true
> > dw: quorum
> > last_write_wins: false
> > linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun}
> > n_val: 3
> > notfound_ok: true
> > old_vclock: 86400
> > postcommit: []
> > pr: 0
> > precommit: []
> > pw: 0
> > r: quorum
> > rw: quorum
> > small_vclock: 50
> > w: quorum
> > young_vclock: 20
> >
> >
> > _______________________________________________
> > riak-users mailing list
> > riak-users at lists.basho.com
> > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.basho.com/pipermail/riak-users_lists.basho.com/attachments/20170302/55ea87d4/attachment-0002.html>


More information about the riak-users mailing list