@@ -21,6 +21,7 @@ public class ClusterWS {
2121 private PingHandler mPingHandler ;
2222 private List <Channel > mChannels ;
2323 private ReconnectionParams mReconnectionParams ;
24+ private static final byte [] PONG = "A" .getBytes ();
2425
2526 public ClusterWS (String url ) {
2627 if (url == null ) {
@@ -130,7 +131,10 @@ private void createSocket() {
130131 mSocket = new Socket (URI .create (mUrl ), new ISocketEvents () {
131132 @ Override
132133 public void onOpen () {
133- mClusterWSListener .onConnected ();
134+ for (Channel channel :
135+ mChannels ) {
136+ channel .subscribe ();
137+ }
134138 }
135139
136140 @ Override
@@ -145,8 +149,10 @@ public void onClose(int code, String reason) {
145149 if (mPingHandler .getPingTimer () != null ) {
146150 mPingHandler .getPingTimer ().cancel ();
147151 }
148- if (mReconnectionParams .isAutoReconnect () && code != 1000 && (mReconnectionParams .getReconnectionAttempts () == 0 || mReconnectionParams .getReconnectionsAttempted () < mReconnectionParams .getReconnectionAttempts ())) {
149- if (mSocket .getReadyState () == WebSocket .READYSTATE .CLOSED || mSocket .getReadyState () == WebSocket .READYSTATE .NOT_YET_CONNECTED ) {
152+ if (mReconnectionParams .isAutoReconnect ()
153+ && code != 1000
154+ && (mReconnectionParams .getReconnectionAttempts () == 0 || mReconnectionParams .getReconnectionsAttempted () < mReconnectionParams .getReconnectionAttempts ())) {
155+ if (mSocket .getReadyState () == WebSocket .READYSTATE .CLOSED || mSocket .getReadyState () == WebSocket .READYSTATE .NOT_YET_CONNECTED || mSocket .getReadyState () == WebSocket .READYSTATE .CLOSING ) {
150156 mReconnectionParams .incrementReconnectionsAttempted ();
151157 int randomDelay = ThreadLocalRandom .current ().nextInt (1 ,
152158 mReconnectionParams .getReconnectionIntervalMax () -
@@ -165,8 +171,17 @@ public void run() {
165171
166172 @ Override
167173 public void onBinaryMessage (ByteBuffer bytes ) {
168- String message = StandardCharsets .UTF_8 .decode (bytes ).toString ();
169- onMessageReceived (message );
174+ System .out .println ("GOT MESSAGE" );
175+ byte [] arr = new byte [bytes .remaining ()];
176+ bytes .get (arr );
177+ if (arr .length == 1 && arr [0 ] == 57 ) {
178+ mPingHandler .setMissedPingToZero ();
179+ mSocket .send (PONG );
180+ } else {
181+ String message = new String (arr , StandardCharsets .UTF_8 );
182+ onMessageReceived (message );
183+ }
184+
170185 }
171186
172187 @ Override
@@ -181,11 +196,7 @@ public void onMessage(String message) {
181196 }
182197
183198 private void onMessageReceived (String message ) {
184- if (message .equals ("#0" )) {
185- mPingHandler .setMissedPingToZero ();
186- send ("#1" , null , "ping" );
187- } else {
188- mMessageHandler .messageDecode (ClusterWS .this , message );
189- }
199+ System .out .println ("MESSAGE IS " + message );
200+ mMessageHandler .messageDecode (ClusterWS .this , message );
190201 }
191202}
0 commit comments