riak java pb client does not let go of bad sockets

Will Gage wgage at shopzilla.com
Fri Mar 30 13:11:04 EDT 2012


Awesome, thanks for the quick turnaround Brian!  We'll test it out and let you know how it goes.  

Cheers,
Will
________________________________________
From: riak-users-bounces at lists.basho.com [riak-users-bounces at lists.basho.com] on behalf of Brian Roach [roach at basho.com]
Sent: Thursday, March 29, 2012 2:42 PM
To: Riak-Users
Subject: Re: riak java pb client does not let go of bad sockets

Thanks (and Sorry!) for reporting this Will.

This was a bug in the underlying Protocol Buffers RiakClient where Socket objects were not being closed when IOExceptions were received from their streams. This is now fixed in the 1.0.5 release, available today.

Thanks again,
Brian Roach


On Mar 28, 2012, at 12:09 PM, Will Gage wrote:

> Hello,
>
>
> I have run into a production issue that I think stems from either a defect in the com.basho.riak:riak-client:jar:1.0.4 library, or a misunderstanding in my use of it.  I'm actively trying to fix the issue, but I thought I'd put a feeler out to this list to see if others have encountered the issue, or whether there's a clear problem in our use of the library.
>
> Environment:
> ---------------------
> * Java web application running in Tomcat:
> ** JDK: jdk1.6.0_24-jce6
> ** Tomcat: apache-tomcat-7.0.23
> ** Basho Riak Client version: com.basho.riak:riak-client:jar:1.0.4
> * 6 node Riak cluster running Riak 1.0.1
>
> Error sequence:
> -----------------------
> The production issue has happened a few times, and it follows this sequence:
>
> 1. We get a rash of SocketException: Connection Reset errors
>
> java.net.SocketException: Connection reset
>    at java.net.SocketInputStream.read(SocketInputStream.java:168)
>    at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
>    at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
>    at java.io.DataInputStream.readInt(DataInputStream.java:370)
>    at com.basho.riak.pbc.RiakConnection.receive(RiakConnection.java:92)
>    at com.basho.riak.pbc.RiakClient.processFetchReply(RiakClient.java:278)
>    at com.basho.riak.pbc.RiakClient.fetch(RiakClient.java:252)
>    at com.basho.riak.pbc.RiakClient.fetch(RiakClient.java:241)
>    at com.basho.riak.client.raw.pbc.PBClientAdapter.fetch(PBClientAdapter.java:156)
>    at com.basho.riak.client.raw.pbc.PBClientAdapter.fetch(PBClientAdapter.java:139)
>    at com.basho.riak.client.raw.ClusterClient.fetch(ClusterClient.java:107)
>
> 2. Followed 50 milliseconds later by a steady stream of SocketException: Broken pipe messages, until we restart the Tomcat container.
>
> java.net.SocketException: Broken pipe
>    at java.net.SocketOutputStream.socketWrite0(Native Method)
>    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>    at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
>    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
>    at java.io.DataOutputStream.flush(DataOutputStream.java:106)
>    at com.basho.riak.pbc.RiakConnection.send(RiakConnection.java:82)
>    at com.basho.riak.pbc.RiakClient.fetch(RiakClient.java:251)
>    at com.basho.riak.pbc.RiakClient.fetch(RiakClient.java:241)
>    at com.basho.riak.client.raw.pbc.PBClientAdapter.fetch(PBClientAdapter.java:156)
>    at com.basho.riak.client.raw.pbc.PBClientAdapter.fetch(PBClientAdapter.java:139)
>    at com.basho.riak.client.raw.ClusterClient.fetch(ClusterClient.java:107)
>
> 3. Within our 6-node Riak cluster, almost exactly 1 minute after the initial connection reset errors, one node emits a crash log:
>
> 2012-03-27 14:56:51 =ERROR REPORT====
> ** Generic server <0.289.0> terminating
> ** Last message in was {inet_async,#Port<0.3319>,41462,{ok,#Port<0.73913715>}}
> ** When Server state == {state,riak_kv_pb_listener,#Port<0.3319>,{state,8087}}
> ** Reason for termination ==
> ** {timeout,{'gen_server2',call,[<0.1838.1574>,{set_socket,#Port<0.73913715>}]}}
> 2012-03-27 14:57:08 =CRASH REPORT====
>  crasher:
>    initial call: gen_nb_server:init/1
>    pid: <0.289.0>
>    registered_name: []
>    exception exit: {timeout,{'gen_server2',call,[<0.1838.1574>,{set_socket,#Port<0.73913715>}]}}
>      in function  gen_server:terminate/6
>      in call from proc_lib:init_p_do_apply/3
>    ancestors: [riak_kv_sup,<0.194.0>]
>    messages: [{#Ref<0.0.704.111074>,ok}]
>    links: [<0.200.0>]
>    dictionary: []
>    trap_exit: false
>    status: running
>    heap_size: 377
>    stack_size: 24
>    reductions: 117962
>  neighbours:
> 2012-03-27 14:57:09 =SUPERVISOR REPORT====
>     Supervisor: {local,riak_kv_sup}
>     Context:    child_terminated
>     Reason:     {timeout,{'gen_server2',call,[<0.1838.1574>,{set_socket,#Port<0.73913715>}]}}
>     Offender:   [{pid,<0.289.0>},{name,riak_kv_pb_listener},{mfargs,{riak_kv_pb_listener,start_link,[]}},{restart_type,permanent},{shutdown,5000},{child_type,worker}]
>
>
> The Riak cluster seems to bounce back to health (all nodes connected and responding) by the time we see the errors and check it, but the clients (the Tomcat application) never recover until we restart them.  It seems pretty clear that a process within the Riak cluster is dying and taking its sockets with it, after which the clients are not recovering.
>
>
> Theory
> ----------
> The working theory is that the client library is never flushing out bad connections.  You can see here that connections are always returned to the pool.  I have not yet seen any evidence that connections are ever tested for health once allocated.
>
> From com.basho.riak.pbc.RiakClient, line 224-237:
>
>       public RiakObject[] fetch(ByteString bucket, ByteString key, int readQuorum)
>                       throws IOException {
>               RpbGetReq req = RPB.RpbGetReq.newBuilder().setBucket(bucket)
>                               .setKey(key).setR(readQuorum).build();
>
>               RiakConnection c = getConnection();
>               try {
>                       c.send(MSG_GetReq, req);
>                       return processFetchReply(c, bucket, key).getObjects();
>               } finally {
>                       release(c);
>               }
>
>       }
>
> And this is how our reference to the client is created, using client-side cluster configs:
>
>            // set up a PBClientConfig per host
>            for(String host : hosts) {
>
>                PBClientConfig pbcConfig = new PBClientConfig.Builder()
>                        .withInitialPoolSize(config.getInitialPoolSize())
>                        .withPort(config.getRiakPort())
>                        .withHost(host.trim())
>                        .withConnectionTimeoutMillis(config.getConnectionWaitTimeoutMillis())
>                        .withIdleConnectionTTLMillis(config.getIdleConnectionTTLMillis())
>                        .withSocketBufferSizeKb(config.getBufferSizeKb())
>                        .withPoolSize(config.getMaximunPoolSize())
>                        .build();
>
>                clusterConfig.addClient(pbcConfig);
>
>            }
>
>            // Connection pooling is done internally in the PBClusterClient created by the factory
>            RawClient rawClient = PBClusterClientFactory.getInstance().newClient(clusterConfig);
>
>            this.client = rawClient;
>
> Is the expectation within the client library that our own application code would detect bad connections and recreate the pool / client once we've detected them?
>
>
> Thanks,
> Will
> _______________________________________________
> riak-users mailing list
> riak-users at lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com


_______________________________________________
riak-users mailing list
riak-users at lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com





More information about the riak-users mailing list