Java MR

niedomnie niedomnie at gmail.com
Thu Dec 4 03:27:34 EST 2014


I have got no clue why in Groove it does not work.
But in meantime (in Java)
I've got another problem - when running 

I want to achieve HTTP equivalent 
curl -s -X POST -H "Content-Type: application/json"
http://localhost:8098/mapred -d
'{"inputs":{"bucket":["bucket_type","bucket"],"index":"$key",
"start":"2013",end":"2015"}, "query":[{"map":{"language":"javascript",
"name":"Riak.mapValues"}} ]}'
or 
curl -s -X POST -H "Content-Type: application/json"
http://localhost:8098/mapred -d
'{"inputs":{"bucket":["bucket_type","bucket"],"index":"*$key*",
"key_filters":[["starts_with", "2014"]]},
"query":[{"map":{"language":"javascript", "name":"Riak.mapValues"}} ]}'
because both are working (*second one even if $key is replaced by \$key - I
do not know which one is appropriate?*) but I think that second one was
achieved by me in java with BucketMapReduce

  MapReduce mr = new IndexMapReduce.Builder()
                .withNamespace(ns)
                .withIndex("$key")
                .withRange(BinaryValue.unsafeCreate(from.getBytes()),
BinaryValue.unsafeCreate(to.getBytes()))
                .withMapPhase(Function.newNamedJsFunction("Riak.mapValues"),
true)
                .build();

I am receiving 

Exception in thread "main" java.util.concurrent.ExecutionException:
java.nio.channels.ClosedChannelException
	at com.basho.riak.client.core.FutureOperation.get(FutureOperation.java:260)
	at
com.basho.riak.client.api.commands.CoreFutureAdapter.get(CoreFutureAdapter.java:52)
	at com.basho.riak.client.api.RiakCommand.execute(RiakCommand.java:89)
	at com.basho.riak.client.api.RiakClient.execute(RiakClient.java:293)
	at Riak_MR_3M.main(Riak_MR_3M.java:57)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.nio.channels.ClosedChannelException

or 

Exception in thread "main" java.util.concurrent.ExecutionException:
com.basho.riak.client.core.netty.RiakResponseException: Error processing
incoming message: error:function_clause:[{riak_index,
                                                           parse_binary,
                                                           [{struct,
                                                             [{<<"value">>,
                                                              
<<"MjAxMzExMDRfMDkwNTIwLjU3Ni4w">>}]}],
                                                           [{file,
                                                            
"src/riak_index.erl"},
                                                            {line,543}]},
                                                          {riak_index,
                                                           parse_field,3,
                                                           [{file,
                                                            
"src/riak_index.erl"},
                                                            {line,192}]},
                                                          {riak_index,
                                                          
'-parse_fields/1-fun-0-',
                                                           3,
                                                           [{file,
                                                            
"src/riak_index.erl"},
                                                            {line,158}]},
                                                          {lists,foldl,3,
                                                          
[{file,"lists.erl"},
                                                            {line,1248}]},
                                                          {riak_index,
                                                           parse_fields,1,
                                                           [{file,
                                                            
"src/riak_index.erl"},
                                                            {line,167}]},
                                                         
{riak_kv_mapred_json,
                                                          
parse_index_input,
                                                           1,
                                                           [{file,
                                                            
"src/riak_kv_mapred_json.erl"},
                                                            {line,238}]},
                                                         
{riak_kv_mapred_json,
                                                           parse_request,1,
                                                           [{file,
                                                            
"src/riak_kv_mapred_json.erl"},
                                                            {line,55}]},
                                                         
{riak_kv_pb_mapred,
                                                           decode,2,
                                                           [{file,
                                                            
"src/riak_kv_pb_mapred.erl"},
                                                            {line,70}]}]

but this is working fine

MapReduce mr = new BucketMapReduce.Builder()
                        .withNamespace(ns)
                        .withKeyFilter(new BetweenFilter<String>(from, to))
                       
.withRange(BinaryValue.unsafeCreate(from.getBytes()),
BinaryValue.unsafeCreate(to.getBytes()))
                       
.withMapPhase(Function.newNamedJsFunction("Riak.mapValues"), true)
                        .build();
               
so maybe this is not proper index (for main primary key I have used  $key)
so I've change it to index (index_name) - it is only my quest what kind of
index it is (I do not want to use solr/search - I only wanted to use
secondary_index - 2i, and the most I want to use primary index which is
named $key)

  MapReduce mr = new IndexMapReduce.Builder()
                .withNamespace(ns)
                .withIndex("index_name")
                .withRange(BinaryValue.unsafeCreate(from.getBytes()),
BinaryValue.unsafeCreate(to.getBytes()))
                .withMapPhase(Function.newNamedJsFunction("Riak.mapValues"),
true)
                .build();

and bucket has got search_index defined
{
    "props": {
        .....
        "search_index": "index_name",
        ....
    }
}

and entries were added with 
//        
riakObject.getIndexes().getIndex(LongIntIndex.named("index_name")).add(System.currentTimeMillis());
StoreValue store = new StoreValue.Builder(ro).withLocation(new Location(ns,
key)).build();
StoreValue.Response response = client.execute(store);

I am receiving 

Exception in thread "main" java.util.concurrent.ExecutionException:
com.basho.riak.client.core.netty.RiakResponseException:
{inputs,[{unknown_field_type,<<"index_name">>}]}
	at com.basho.riak.client.core.FutureOperation.get(FutureOperation.java:260)
	at
com.basho.riak.client.api.commands.CoreFutureAdapter.get(CoreFutureAdapter.java:52)
	at com.basho.riak.client.api.RiakCommand.execute(RiakCommand.java:89)
	at com.basho.riak.client.api.RiakClient.execute(RiakClient.java:293)

what am I doing wrong?



--
View this message in context: http://riak-users.197444.n3.nabble.com/Java-MR-tp4032117p4032227.html
Sent from the Riak Users mailing list archive at Nabble.com.




More information about the riak-users mailing list