11package com .github .netty .protocol .dubbo ;
22
3- import com .github .netty .protocol .dubbo .packet .BodyFail ;
4- import com .github .netty .protocol .dubbo .packet .BodyHeartBeat ;
5- import com .github .netty .protocol .dubbo .packet .BodyRequest ;
6- import com .github .netty .protocol .dubbo .packet .BodyResponse ;
73import io .netty .buffer .ByteBuf ;
84import io .netty .channel .ChannelHandlerContext ;
95import io .netty .handler .codec .ByteToMessageDecoder ;
106
11- import java .io .ByteArrayInputStream ;
12- import java .io .IOException ;
137import java .util .List ;
14- import java .util .Map ;
158
169import static com .github .netty .protocol .dubbo .Constant .*;
1710
@@ -34,7 +27,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
3427 case READ_HEADER : {
3528 if (buffer .readableBytes () >= HEADER_LENGTH ) {
3629 try {
37- this .packet = new DubboPacket (readHeader (buffer ));
30+ this .packet = new DubboPacket (Header . readHeader (buffer ));
3831 } catch (Exception e ) {
3932 exception (ctx , buffer , e );
4033 throw e ;
@@ -51,7 +44,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
5144 ByteBuf body = buffer .readRetainedSlice (this .packet .header .bodyLength );
5245 int markReaderIndex = body .readerIndex ();
5346 try {
54- this .packet .body = readBody (body );
47+ this .packet .body = Body . readBody (body , packet . header );
5548 } catch (Exception e ) {
5649 exception (ctx , buffer , e );
5750 this .packet .release ();
@@ -76,103 +69,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
7669 } while (hasNext );
7770 }
7871
79- protected Header readHeader (ByteBuf buffer ) {
80- int readerIndex = buffer .readerIndex ();
81-
82- // request and serialization flag. -62 isHeartBeat
83- byte flag = buffer .getByte (readerIndex + 2 );
84- byte status = buffer .getByte (readerIndex + 3 );
85- long requestId = buffer .getLong (readerIndex + 4 );
86- // 8 - 1-request/0-response
87- byte type = buffer .getByte (readerIndex + 8 );
88- int bodyLength = buffer .getInt (readerIndex + 12 );
89-
90- ByteBuf headerBytes = buffer .readRetainedSlice (HEADER_LENGTH );
91- return new Header (headerBytes , flag , status , requestId , type , bodyLength );
92- }
93-
94- protected Body readBody (ByteBuf buffer ) throws IOException , ClassNotFoundException {
95- // request and serialization flag.
96- byte flag = packet .header .flag ;
97- byte status = packet .header .status ;
98- int bodyLength = packet .header .bodyLength ;
99- boolean flagResponse = (flag & FLAG_REQUEST ) == 0 ;
100- byte serializationProtoId = (byte ) (flag & SERIALIZATION_MASK );
101- if (flagResponse ) {
102- // decode response.
103- if (status == OK ) {
104- if ((flag & FLAG_EVENT ) != 0 ) {
105- return readHeartBeat (buffer , bodyLength , serializationProtoId );
106- } else {
107- try (Serialization .ObjectInput in = Serialization .codeOfDeserialize (serializationProtoId , buffer , bodyLength )) {
108- byte responseWith = buffer .readByte ();
109- BodyResponse packetResponse ;
110- switch (responseWith ) {
111- case RESPONSE_NULL_VALUE :
112- packetResponse = new BodyResponse (null , null , null );
113- break ;
114- case RESPONSE_VALUE :
115- packetResponse = new BodyResponse (in .readObject (), null , null );
116- break ;
117- case RESPONSE_WITH_EXCEPTION :
118- packetResponse = new BodyResponse (null , in .readThrowable (), null );
119- break ;
120- case RESPONSE_NULL_VALUE_WITH_ATTACHMENTS :
121- packetResponse = new BodyResponse (null , null , in .readAttachments ());
122- break ;
123- case RESPONSE_VALUE_WITH_ATTACHMENTS :
124- packetResponse = new BodyResponse (in .readObject (), null , in .readAttachments ());
125- break ;
126- case RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS :
127- packetResponse = new BodyResponse (null , in .readThrowable (), in .readAttachments ());
128- break ;
129- default :
130- throw new IOException ("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + responseWith );
131- }
132- return packetResponse ;
133- }
134- }
135- } else {
136- try (Serialization .ObjectInput in = Serialization .codeOfDeserialize (serializationProtoId , buffer , bodyLength )) {
137- return new BodyFail (in .readUTF ());
138- }
139- }
140- } else {
141- // decode request.
142- if ((flag & FLAG_EVENT ) != 0 ) {
143- return readHeartBeat (buffer , bodyLength , serializationProtoId );
144- } else {
145- try (Serialization .ObjectInput in = Serialization .codeOfDeserialize (serializationProtoId , buffer , bodyLength )) {
146- String dubboVersion = in .readUTF ();
147- String path = in .readUTF ();
148- String version = in .readUTF ();
149- String methodName = in .readUTF ();
150- String parameterTypesDesc = in .readUTF ();
151- int countArgs = countArgs (parameterTypesDesc );
152- Object [] args = new Object [countArgs ];
153- for (int i = 0 ; i < countArgs ; i ++) {
154- args [i ] = in .readArg ();
155- }
156- Map <String , Object > attachments = in .readAttachments ();
157- return new BodyRequest (dubboVersion , path , version , methodName , parameterTypesDesc , attachments , args );
158- }
159- }
160- }
161- }
162-
163- protected BodyHeartBeat readHeartBeat (ByteBuf buffer , int bodyLength , byte serializationProtoId ) throws IOException , ClassNotFoundException {
164- Object data ;
165- byte [] payload = Serialization .getPayload (buffer , bodyLength );
166- if (Serialization .isHeartBeat (payload , serializationProtoId )) {
167- data = null ;
168- } else {
169- try (Serialization .ObjectInput input = Serialization .codeOfDeserialize (serializationProtoId , new ByteArrayInputStream (payload ))) {
170- data = input .readEvent ();
171- }
172- }
173- return new BodyHeartBeat (data );
174- }
175-
17672 protected <E extends Exception > void exception (ChannelHandlerContext ctx , ByteBuf buffer , E cause ) throws Exception {
17773 buffer .release ();
17874 ctx .close ();
0 commit comments