1111import javax .crypto .Cipher ;
1212import java .nio .ByteBuffer ;
1313import java .security .GeneralSecurityException ;
14+ import java .util .concurrent .atomic .AtomicBoolean ;
1415import java .util .concurrent .atomic .AtomicLong ;
1516
1617public class CipherSubscriber implements Subscriber <ByteBuffer > {
1718 private final AtomicLong contentRead = new AtomicLong (0 );
1819 private final Subscriber <? super ByteBuffer > wrappedSubscriber ;
19- private Cipher cipher ;
20+ private final Cipher cipher ;
2021 private final Long contentLength ;
21- private boolean isLastPart ;
22+ private final boolean isLastPart ;
23+ private final int tagLength ;
24+ private final AtomicBoolean finalBytesCalled = new AtomicBoolean (false );
2225
2326 private byte [] outputBuffer ;
2427
2528 CipherSubscriber (Subscriber <? super ByteBuffer > wrappedSubscriber , Long contentLength , CryptographicMaterials materials , byte [] iv , boolean isLastPart ) {
2629 this .wrappedSubscriber = wrappedSubscriber ;
2730 this .contentLength = contentLength ;
28- cipher = materials .getCipher (iv );
31+ this . cipher = materials .getCipher (iv );
2932 this .isLastPart = isLastPart ;
33+ this .tagLength = materials .algorithmSuite ().cipherTagLengthBytes ();
3034 }
3135
3236 CipherSubscriber (Subscriber <? super ByteBuffer > wrappedSubscriber , Long contentLength , CryptographicMaterials materials , byte [] iv ) {
@@ -46,20 +50,48 @@ public void onNext(ByteBuffer byteBuffer) {
4650 if (amountToReadFromByteBuffer > 0 ) {
4751 byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
4852 outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
53+
4954 if (outputBuffer == null || outputBuffer .length == 0 ) {
5055 // The underlying data is too short to fill in the block cipher.
5156 // Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
5257 // in practice SunJCE and ACCP return an empty buffer instead, hence checks for
5358 // null OR length == 0.
54- if (contentRead .get () = = contentLength ) {
59+ if (contentRead .get () + tagLength > = contentLength ) {
5560 // All content has been read, so complete to get the final bytes
56- this .onComplete ();
61+ finalBytes ();
62+ return ;
5763 }
5864 // Otherwise, wait for more bytes. To avoid blocking,
5965 // send an empty buffer to the wrapped subscriber.
6066 wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
6167 } else {
62- wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
68+ /*
69+ Check if stream has read all expected content.
70+ Once all content has been read, call `finalBytes`.
71+
72+ This determines that all content has been read by checking if
73+ the amount of data read so far plus the tag length is at least the content length.
74+ Once this is true, downstream will never call `request` again
75+ (beyond the current request that is being responded to in this onNext invocation.)
76+ As a result, this class can only call `wrappedSubscriber.onNext` one more time.
77+ (Reactive streams require that downstream sends a `request(n)`
78+ to indicate it is ready for more data, and upstream responds to that request by calling `onNext`.
79+ The `n` in request is the maximum number of `onNext` calls that downstream
80+ will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.)
81+ Since this class can only call `wrappedSubscriber.onNext` once,
82+ it must send all remaining data in the next onNext call,
83+ including the result of cipher.doFinal(), if applicable.
84+ Calling `wrappedSubscriber.onNext` more than once for `request(1)`
85+ violates the Reactive Streams specification and can cause exceptions downstream.
86+ */
87+ if (contentRead .get () + tagLength >= contentLength ) {
88+ // All content has been read; complete the stream.
89+ finalBytes ();
90+ } else {
91+ // Needs to read more data, so send the data downstream,
92+ // expecting that downstream will continue to request more data.
93+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
94+ }
6395 }
6496 } else {
6597 // Do nothing
@@ -91,21 +123,63 @@ public void onError(Throwable t) {
91123
92124 @ Override
93125 public void onComplete () {
126+ // In rare cases, e.g. when the last part of a low-level MPU has 0 length,
127+ // onComplete will be called before onNext is called once.
128+ if (contentRead .get () + tagLength <= contentLength ) {
129+ finalBytes ();
130+ }
131+ wrappedSubscriber .onComplete ();
132+ }
133+
134+ /**
135+ * Finalize encryption, including calculating the auth tag for AES-GCM.
136+ * As such this method MUST only be called once, which is enforced using
137+ * `finalBytesCalled`.
138+ */
139+ private void finalBytes () {
140+ if (!finalBytesCalled .compareAndSet (false , true )) {
141+ // already called, don't repeat
142+ return ;
143+ }
144+
145+ // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
146+ // doFinal requires that all parts have been processed to compute the tag,
147+ // so the tag will only be computed when the last part is processed.
94148 if (!isLastPart ) {
95- // If this isn't the last part, skip doFinal, we aren't done
96- wrappedSubscriber .onComplete ();
149+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
97150 return ;
98151 }
152+
153+ // If this is the last part, compute doFinal and include its result in the value sent downstream.
154+ // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
155+ byte [] finalBytes ;
99156 try {
100- outputBuffer = cipher .doFinal ();
101- // Send the final bytes to the wrapped subscriber
102- wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
157+ finalBytes = cipher .doFinal ();
103158 } catch (final GeneralSecurityException exception ) {
159+ // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
160+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
104161 // Forward error, else the wrapped subscriber waits indefinitely
105162 wrappedSubscriber .onError (exception );
106163 throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
107164 }
108- wrappedSubscriber .onComplete ();
165+
166+ // Combine the bytes from outputBuffer and finalBytes into one onNext call.
167+ // Downstream has requested one item in its request method, so this class can only call onNext once.
168+ // This single onNext call must contain both the bytes from outputBuffer and the tag.
169+ byte [] combinedBytes ;
170+ if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
171+ combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
172+ System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
173+ System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
174+ } else if (outputBuffer != null && outputBuffer .length > 0 ) {
175+ combinedBytes = outputBuffer ;
176+ } else if (finalBytes != null && finalBytes .length > 0 ) {
177+ combinedBytes = finalBytes ;
178+ } else {
179+ combinedBytes = new byte [0 ];
180+ }
181+
182+ wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
109183 }
110184
111185}
0 commit comments