From 5e1d3c34cd931280fa0fb754abddc087214bd564 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Fri, 14 Nov 2025 11:18:02 -0500 Subject: [PATCH 1/9] Add failing test demonstrating the initial request does not work when it is wrapped in a sigv4 event --- .../tests/structured_eventstream_tests.rs | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs index 4f52d7141f..f000858056 100644 --- a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs +++ b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs @@ -618,3 +618,67 @@ async fn test_streaming_operation_with_optional_data() { // Verify optional data was not provided assert_eq!(harness.server.optional_data(), None); } + +/// Test that SigV4-framed initial-request messages are properly handled. +/// This demonstrates the bug in issue #4397 where try_recv_initial_request +/// cannot see inside the SigV4 envelope to detect the initial-request event type. +#[tokio::test] +async fn test_sigv4_framed_initial_request_with_data() { + let _logs = show_filtered_test_logs( + "aws_smithy_http_server=trace,hyper_util=debug,rpcv2cbor_extras=trace", + ); + let mut harness = TestHarness::new("StreamingOperationWithInitialData").await; + + // Send a SigV4-framed initial-request with data + let signed_initial_request = build_sigv4_signed_initial_data("test-data"); + harness.client.send(signed_initial_request).await.unwrap(); + + harness.send_event("A").await; + + // The connection will be closed because the server cannot detect the initial-request + // inside the SigV4 envelope, so it thinks the initial data is missing + let result = harness.recv().await; + + // This demonstrates the bug: the server closes the connection instead of + // extracting the initial-request from inside the SigV4 envelope + assert!( + result.is_none(), + "Bug #4397: Server should extract initial-request from SigV4 envelope, but instead closes connection" + ); + + // The server never received the initial data because it couldn't unwrap the SigV4 envelope + assert_eq!(harness.server.initial_data(), None); +} + +fn build_sigv4_signed_initial_data(data: &str) -> Message { + use aws_smithy_eventstream::frame::write_message_to; + use std::time::{SystemTime, UNIX_EPOCH}; + + // Build the inner initial-request message with data + let inner_message = build_initial_data_message(data); + + // Serialize the inner message to bytes + let mut inner_bytes = Vec::new(); + write_message_to(&inner_message, &mut inner_bytes).unwrap(); + + // Create the SigV4 envelope with signature headers + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let headers = vec![ + Header::new( + ":chunk-signature", + HeaderValue::ByteArray(Bytes::from( + "example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + )), + ), + Header::new( + ":date", + HeaderValue::Timestamp(aws_smithy_types::DateTime::from_secs(timestamp as i64)), + ), + ]; + + Message::new_from_parts(headers, Bytes::from(inner_bytes)) +} From 42bdaaccd567dd3d1e42f571c6844d6783f3f0fd Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Fri, 14 Nov 2025 13:53:04 -0500 Subject: [PATCH 2/9] Fix SigV4 Event Stream support to work properly with Initial messages --- .../generators/http/HttpBindingGenerator.kt | 22 ++- codegen-server-test/build.gradle.kts | 2 +- .../tests/structured_eventstream_tests.rs | 21 +- .../SigV4EventStreamDecorator.kt | 17 +- .../SigV4EventStreamSupportStructures.kt | 179 ++++++++++-------- .../http/ServerResponseBindingGenerator.kt | 2 + rust-runtime/Cargo.lock | 74 ++++---- .../src/event_stream/receiver.rs | 33 +++- tools/ci-cdk/canary-runner/Cargo.lock | 2 +- 9 files changed, 202 insertions(+), 150 deletions(-) diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt index 4b6a39fc6e..90937ab2fa 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt @@ -102,6 +102,12 @@ sealed class HttpBindingSection(name: String) : Section(name) { val unionShape: UnionShape, val unmarshallerVariableName: String, ) : HttpBindingSection("BeforeCreatingEventStreamReceiver") + + data class WrapEventStreamReceiver( + val operationShape: OperationShape, + val unionShape: UnionShape, + val receiverExpression: String, + ) : HttpBindingSection("WrapEventStreamReceiver") } typealias HttpBindingCustomization = NamedCustomization @@ -296,7 +302,9 @@ class HttpBindingGenerator( rustTemplate( """ let body = std::mem::replace(body, #{SdkBody}::taken()); - Ok(#{receiver:W}) + let receiver = #{receiver:W}; + #{wrapReceiver:W} + Ok(receiver) """, "SdkBody" to RuntimeType.sdkBody(runtimeConfig), "receiver" to @@ -311,6 +319,18 @@ class HttpBindingGenerator( ) } }, + "wrapReceiver" to + writable { + for (customization in customizations) { + customization.section( + HttpBindingSection.WrapEventStreamReceiver( + operationShape, + targetShape, + "receiver", + ), + )(this) + } + }, ) } diff --git a/codegen-server-test/build.gradle.kts b/codegen-server-test/build.gradle.kts index dc2b562221..08155124c2 100644 --- a/codegen-server-test/build.gradle.kts +++ b/codegen-server-test/build.gradle.kts @@ -116,7 +116,7 @@ val commonCodegenTests = "../codegen-core/common-test-models".let { commonModels ) } // When iterating on protocol tests use this to speed up codegen: -// .filter { it.module == "rpcv2Cbor_extras" || it.module == "rpcv2Cbor_extras_no_initial_response" } + .filter { it.module == "rpcv2Cbor_extras" || it.module == "rpcv2Cbor_extras_no_initial_response" } val customCodegenTests = "custom-test-models".let { customModels -> listOf( diff --git a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs index f000858056..3d441feb2a 100644 --- a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs +++ b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs @@ -620,8 +620,8 @@ async fn test_streaming_operation_with_optional_data() { } /// Test that SigV4-framed initial-request messages are properly handled. -/// This demonstrates the bug in issue #4397 where try_recv_initial_request -/// cannot see inside the SigV4 envelope to detect the initial-request event type. +/// This verifies the fix for issue #4397 where try_recv_initial_request +/// can now see inside the SigV4 envelope to detect the initial-request event type. #[tokio::test] async fn test_sigv4_framed_initial_request_with_data() { let _logs = show_filtered_test_logs( @@ -635,19 +635,12 @@ async fn test_sigv4_framed_initial_request_with_data() { harness.send_event("A").await; - // The connection will be closed because the server cannot detect the initial-request - // inside the SigV4 envelope, so it thinks the initial data is missing - let result = harness.recv().await; - - // This demonstrates the bug: the server closes the connection instead of - // extracting the initial-request from inside the SigV4 envelope - assert!( - result.is_none(), - "Bug #4397: Server should extract initial-request from SigV4 envelope, but instead closes connection" - ); + // The server should now properly extract the initial-request from the SigV4 envelope + let resp = harness.expect_message().await; + assert_eq!(get_event_type(&resp), "A"); - // The server never received the initial data because it couldn't unwrap the SigV4 envelope - assert_eq!(harness.server.initial_data(), None); + // Verify the server received and parsed the initial data from inside the SigV4 envelope + assert_eq!(harness.server.initial_data(), Some("test-data".to_string())); } fn build_sigv4_signed_initial_data(data: &str) -> Message { diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamDecorator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamDecorator.kt index 0cf966749e..9aa078c4aa 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamDecorator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamDecorator.kt @@ -14,7 +14,6 @@ import software.amazon.smithy.model.shapes.Shape import software.amazon.smithy.model.shapes.ShapeId import software.amazon.smithy.rust.codegen.core.rustlang.RustType import software.amazon.smithy.rust.codegen.core.rustlang.Writable -import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.rustlang.writable import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig import software.amazon.smithy.rust.codegen.core.smithy.RustSymbolProvider @@ -114,20 +113,8 @@ class SigV4EventStreamCustomization(private val symbolProvider: RustSymbolProvid override fun section(section: HttpBindingSection): Writable = writable { when (section) { - is HttpBindingSection.BeforeCreatingEventStreamReceiver -> { - // Check if this service uses SigV4 auth - if (symbolProvider.usesSigAuth()) { - val codegenScope = - SigV4EventStreamSupportStructures.codegenScope(symbolProvider.config.runtimeConfig) - rustTemplate( - """ - let ${section.unmarshallerVariableName} = #{SigV4Unmarshaller}::new(${section.unmarshallerVariableName}); - """, - *codegenScope, - ) - } - } - + // Type wrapping happens via symbol provider + // SigV4Receiver::new() takes (unmarshaller, body) directly else -> {} } } diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt index 81f67f1dd4..ec6a9d7c83 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt @@ -25,42 +25,29 @@ object SigV4EventStreamSupportStructures { "ExtractionError" to extractionError(runtimeConfig), "SignedEventError" to signedEventError(runtimeConfig), "SignedEvent" to signedEvent(runtimeConfig), - "SigV4Unmarshaller" to sigV4Unmarshaller(runtimeConfig), + "SigV4Receiver" to sigV4Receiver(runtimeConfig), "extract_signed_message" to extractSignedMessage(runtimeConfig), ) /** - * Wraps an event stream Receiver type to handle SigV4 signed messages. - * Transforms: Receiver -> Receiver, SignedEventError> + * Wraps an event stream Receiver type with SigV4Receiver. + * Transforms: Receiver -> SigV4Receiver */ fun wrapInEventStreamSigV4( symbol: Symbol, runtimeConfig: RuntimeConfig, ): Symbol { - val signedEvent = signedEvent(runtimeConfig) - val signedEventError = signedEventError(runtimeConfig) - return symbol.mapRustType(signedEvent, signedEventError) { rustType -> + val sigV4Receiver = sigV4Receiver(runtimeConfig) + return symbol.mapRustType(sigV4Receiver) { rustType -> // Expect Application(Receiver, [T, E]) if (rustType is RustType.Application && rustType.name == "Receiver" && rustType.args.size == 2) { val eventType = rustType.args[0] val errorType = rustType.args[1] - // Create SignedEvent and SignedEventError - val wrappedEventType = - RustType.Application( - signedEvent.toSymbol().rustType(), - listOf(eventType), - ) - val wrappedErrorType = - RustType.Application( - signedEventError.toSymbol().rustType(), - listOf(errorType), - ) - - // Create new Receiver, SignedEventError> + // Create SigV4Receiver RustType.Application( - rustType.type, - listOf(wrappedEventType, wrappedErrorType), + sigV4Receiver.toSymbol().rustType(), + listOf(eventType, errorType), ) } else { PANIC("Called wrap in EventStreamSigV4 on ${symbol.rustType()} which was not an event stream receiver") @@ -103,8 +90,35 @@ object SigV4EventStreamSupportStructures { ##[non_exhaustive] InvalidTimestamp, } + + impl #{Display} for ExtractionError { + fn fmt(&self, f: &mut #{Formatter}<'_>) -> #{fmt_Result} { + match self { + ExtractionError::InvalidPayload { error } => { + write!(f, "invalid payload: {}", error) + } + ExtractionError::InvalidTimestamp => { + write!(f, "invalid or missing timestamp header") + } + } + } + } + + impl #{Error} for ExtractionError { + fn source(&self) -> #{Option}<&(dyn #{Error} + 'static)> { + match self { + ExtractionError::InvalidPayload { error } => #{Some}(error), + ExtractionError::InvalidTimestamp => #{None}, + } + } + } """, "EventStreamError" to CargoDependency.smithyEventStream(runtimeConfig).toType().resolve("error::Error"), + "Display" to RuntimeType.Display, + "Formatter" to RuntimeType.std.resolve("fmt::Formatter"), + "fmt_Result" to RuntimeType.std.resolve("fmt::Result"), + "Error" to RuntimeType.StdError, + *RuntimeType.preludeScope, ) } @@ -149,81 +163,94 @@ object SigV4EventStreamSupportStructures { ) } - private fun sigV4Unmarshaller(runtimeConfig: RuntimeConfig): RuntimeType = - RuntimeType.forInlineFun("SigV4Unmarshaller", supportModule) { + private fun sigV4Receiver(runtimeConfig: RuntimeConfig): RuntimeType = + RuntimeType.forInlineFun("SigV4Receiver", supportModule) { rustTemplate( """ - /// Unmarshaller wrapper that handles SigV4 signed event stream messages + /// Receiver wrapper that handles SigV4 signed event stream messages ##[derive(Debug)] - pub struct SigV4Unmarshaller { - inner: T, + pub struct SigV4Receiver { + inner: #{Receiver}, + initial_signature: #{Option}<#{SignatureInfo}>, } - impl SigV4Unmarshaller { - pub fn new(inner: T) -> Self { - Self { inner } + impl SigV4Receiver { + pub fn new( + unmarshaller: impl #{UnmarshallMessage} + #{Send} + #{Sync} + 'static, + body: #{SdkBody}, + ) -> Self { + Self { + inner: #{Receiver}::new(unmarshaller, body), + initial_signature: None, + } } - } - impl #{UnmarshallMessage} for SigV4Unmarshaller - where - T: #{UnmarshallMessage}, - { - type Output = #{SignedEvent}; - type Error = #{SignedEventError}; + /// Get the signature from the initial message, if it was signed + pub fn initial_signature(&self) -> #{Option}<&#{SignatureInfo}> { + self.initial_signature.as_ref() + } - fn unmarshall(&self, message: &#{Message}) -> #{Result}<#{UnmarshalledMessage}, #{EventStreamError}> { - // First, try to extract the signed message - match #{extract_signed_message}(message) { - Ok(MaybeSignedMessage::Signed { message: inner_message, signature }) => { - // Process the inner message with the base unmarshaller - match self.inner.unmarshall(&inner_message) { - Ok(unmarshalled) => match unmarshalled { - #{UnmarshalledMessage}::Event(event) => { - Ok(#{UnmarshalledMessage}::Event(#{SignedEvent} { - message: event, - signature: Some(signature), - })) - } - #{UnmarshalledMessage}::Error(err) => { - Ok(#{UnmarshalledMessage}::Error(#{SignedEventError}::Event(err))) - } - }, - Err(err) => Err(err), + /// Try to receive an initial message of the given type. + /// Handles SigV4-wrapped messages by extracting the inner message first. + pub async fn try_recv_initial( + &mut self, + message_type: #{event_stream}::InitialMessageType, + ) -> #{Result}<#{Option}<#{Message}>, #{SdkError}> + where + E: std::error::Error + 'static, + { + let result = self + .inner + .try_recv_initial_with_preprocessor(message_type, |message| { + match #{extract_signed_message}(&message) { + #{Ok}(MaybeSignedMessage::Signed { message: inner, signature }) => { + #{Ok}((inner, #{Some}(signature))) + } + #{Ok}(MaybeSignedMessage::Unsigned) => #{Ok}((message, #{None})), + #{Err}(err) => #{Err}(#{ResponseError}::builder().raw(#{RawMessage}::Decoded(message)).source(err).build()), } + }) + .await?; + match result { + #{Some}((message, signature)) => { + self.initial_signature = signature; + #{Ok}(#{Some}(message)) } - Ok(MaybeSignedMessage::Unsigned) => { - // Process unsigned message directly - match self.inner.unmarshall(message) { - Ok(unmarshalled) => match unmarshalled { - #{UnmarshalledMessage}::Event(event) => { - Ok(#{UnmarshalledMessage}::Event(#{SignedEvent} { - message: event, - signature: None, - })) - } - #{UnmarshalledMessage}::Error(err) => { - Ok(#{UnmarshalledMessage}::Error(#{SignedEventError}::Event(err))) - } - }, - Err(err) => Err(err), - } + #{None} => #{Ok}(#{None}), + } + } + + /// Receive the next event from the stream + pub async fn recv(&mut self) -> #{Result}<#{Option}<#{SignedEvent}>, #{SdkError}<#{SignedEventError}, #{RawMessage}>> + where + E: std::error::Error + 'static, + { + match self.inner.recv().await.map_err(|e| e.map_service_error(#{SignedEventError}::Event))? { + #{Some}(event) => { + // Wrap in SignedEvent with no signature (signatures only on initial message) + #{Ok}(#{Some}(#{SignedEvent} { + message: event, + signature: #{None}, + })) } - Err(extraction_err) => Ok(#{UnmarshalledMessage}::Error(#{SignedEventError}::InvalidSignedEvent(extraction_err))), + #{None} => #{Ok}(#{None}), } } } """, + "Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig), + "event_stream" to RuntimeType.smithyHttp(runtimeConfig).resolve("event_stream"), + "SdkBody" to RuntimeType.sdkBody(runtimeConfig), + "Message" to CargoDependency.smithyTypes(runtimeConfig).toType().resolve("event_stream::Message"), + "RawMessage" to CargoDependency.smithyTypes(runtimeConfig).toType().resolve("event_stream::RawMessage"), + "SdkError" to RuntimeType.sdkError(runtimeConfig), + "ResponseError" to RuntimeType.smithyRuntimeApiClient(runtimeConfig).resolve("client::result::ResponseError"), "UnmarshallMessage" to CargoDependency.smithyEventStream(runtimeConfig).toType() .resolve("frame::UnmarshallMessage"), - "UnmarshalledMessage" to - CargoDependency.smithyEventStream(runtimeConfig).toType() - .resolve("frame::UnmarshalledMessage"), - "Message" to CargoDependency.smithyTypes(runtimeConfig).toType().resolve("event_stream::Message"), - "EventStreamError" to CargoDependency.smithyEventStream(runtimeConfig).toType().resolve("error::Error"), "SignedEvent" to signedEvent(runtimeConfig), "SignedEventError" to signedEventError(runtimeConfig), + "SignatureInfo" to signatureInfo(), "extract_signed_message" to extractSignedMessage(runtimeConfig), *RuntimeType.preludeScope, ) diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/generators/http/ServerResponseBindingGenerator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/generators/http/ServerResponseBindingGenerator.kt index 98434672d5..2bd6d2813a 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/generators/http/ServerResponseBindingGenerator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/generators/http/ServerResponseBindingGenerator.kt @@ -75,6 +75,7 @@ class ServerResponseBeforeIteratingOverMapBoundWithHttpPrefixHeadersUnwrapConstr is HttpBindingSection.AfterDeserializingIntoAHashMapOfHttpPrefixHeaders, is HttpBindingSection.AfterDeserializingIntoADateTimeOfHttpHeaders, is HttpBindingSection.BeforeCreatingEventStreamReceiver, + is HttpBindingSection.WrapEventStreamReceiver, -> emptySection } } @@ -110,6 +111,7 @@ class ServerResponseBeforeRenderingHeadersHttpBindingCustomization(val codegenCo is HttpBindingSection.AfterDeserializingIntoAHashMapOfHttpPrefixHeaders, is HttpBindingSection.AfterDeserializingIntoADateTimeOfHttpHeaders, is HttpBindingSection.BeforeCreatingEventStreamReceiver, + is HttpBindingSection.WrapEventStreamReceiver, -> emptySection } } diff --git a/rust-runtime/Cargo.lock b/rust-runtime/Cargo.lock index 73ecb1bfc8..a6509befea 100644 --- a/rust-runtime/Cargo.lock +++ b/rust-runtime/Cargo.lock @@ -943,9 +943,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.47" +version = "1.2.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd405d82c84ff7f35739f175f67d8b9fb7687a0e84ccdc78bd3568839827cf07" +checksum = "c481bdbf0ed3b892f6f806287d72acd515b352a4ec27a208489b8c1bc839633a" dependencies = [ "find-msvc-tools", "jobserver", @@ -1119,9 +1119,9 @@ dependencies = [ [[package]] name = "crc" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" dependencies = [ "crc-catalog", ] @@ -2156,9 +2156,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.82" +version = "0.3.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b011eec8cc36da2aab2d5cff675ec18454fad408585853910a202391cf9f8e65" +checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" dependencies = [ "once_cell", "wasm-bindgen", @@ -2346,9 +2346,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "minicbor" -version = "0.24.2" +version = "0.24.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f8e213c36148d828083ae01948eed271d03f95f7e72571fa242d78184029af2" +checksum = "29be4f60e41fde478b36998b88821946aafac540e53591e76db53921a0cc225b" dependencies = [ "half", "minicbor-derive", @@ -3172,9 +3172,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" +checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" dependencies = [ "zeroize", ] @@ -3970,9 +3970,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" dependencies = [ "log", "pin-project-lite", @@ -3982,21 +3982,21 @@ dependencies = [ [[package]] name = "tracing-appender" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +checksum = "786d480bce6247ab75f005b14ae1624ad978d3029d9113f0a22fa1ac773faeaf" dependencies = [ "crossbeam-channel", - "thiserror 1.0.69", + "thiserror 2.0.17", "time", "tracing-subscriber", ] [[package]] name = "tracing-attributes" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -4005,9 +4005,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.34" +version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" dependencies = [ "once_cell", "valuable", @@ -4036,9 +4036,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.20" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" dependencies = [ "matchers", "nu-ansi-term", @@ -4232,9 +4232,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da95793dfc411fbbd93f5be7715b0578ec61fe87cb1a42b12eb625caa5c5ea60" +checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" dependencies = [ "cfg-if", "once_cell", @@ -4245,9 +4245,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.55" +version = "0.4.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "551f88106c6d5e7ccc7cd9a16f312dd3b5d36ea8b4954304657d5dfba115d4a0" +checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c" dependencies = [ "cfg-if", "js-sys", @@ -4258,9 +4258,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04264334509e04a7bf8690f2384ef5265f05143a4bff3889ab7a3269adab59c2" +checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4268,9 +4268,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420bc339d9f322e562942d52e115d57e950d12d88983a14c79b86859ee6c7ebc" +checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" dependencies = [ "bumpalo", "proc-macro2", @@ -4281,18 +4281,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76f218a38c84bcb33c25ec7059b07847d465ce0e0a76b995e134a45adcb6af76" +checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" dependencies = [ "unicode-ident", ] [[package]] name = "web-sys" -version = "0.3.82" +version = "0.3.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a1f95c0d03a47f4ae1f7a64643a6bb97465d9b740f0fa8f90ea33915c99a9a1" +checksum = "9b32828d774c412041098d182a8b38b16ea816958e07cf40eec2bc080ae137ac" dependencies = [ "js-sys", "wasm-bindgen", @@ -4669,18 +4669,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.30" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea879c944afe8a2b25fef16bb4ba234f47c694565e97383b36f3a878219065c" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.30" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf955aa904d6040f70dc8e9384444cb1030aed272ba3cb09bbc4ab9e7c1f34f5" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" dependencies = [ "proc-macro2", "quote", diff --git a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs index 733b8a6610..b7aaed5008 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs @@ -6,7 +6,7 @@ use aws_smithy_eventstream::frame::{ DecodedFrame, MessageFrameDecoder, UnmarshallMessage, UnmarshalledMessage, }; -use aws_smithy_runtime_api::client::result::{ConnectorError, SdkError}; +use aws_smithy_runtime_api::client::result::{ConnectorError, ResponseError, SdkError}; use aws_smithy_types::body::SdkBody; use aws_smithy_types::event_stream::{Message, RawMessage}; use bytes::Buf; @@ -229,8 +229,31 @@ impl Receiver { &mut self, message_type: InitialMessageType, ) -> Result, SdkError> { + self.try_recv_initial_with_preprocessor(message_type, |msg| Ok((msg.clone(), ()))) + .await + .map(|opt| opt.map(|(msg, _)| msg)) + } + + /// Tries to receive the initial response message with preprocessing support. + /// + /// The preprocessor function can transform the raw message (e.g., unwrap envelopes) + /// and return metadata alongside the transformed message. If the transformed message + /// matches the expected `message_type`, both the message and metadata are returned. + /// Otherwise, the transformed message is buffered and `Ok(None)` is returned. + #[doc(hidden)] + pub async fn try_recv_initial_with_preprocessor( + &mut self, + message_type: InitialMessageType, + preprocessor: F, + ) -> Result, SdkError> + where + F: FnOnce(Message) -> Result<(Message, M), ResponseError>, + { if let Some(message) = self.next_message().await? { - if let Some(event_type) = message + let (processed_message, metadata) = + preprocessor(message).map_err(|err| SdkError::ResponseError(err))?; + + if let Some(event_type) = processed_message .headers() .iter() .find(|h| h.name().as_str() == ":event-type") @@ -241,11 +264,11 @@ impl Receiver { .map(|s| s.as_str() == message_type.as_str()) .unwrap_or(false) { - return Ok(Some(message)); + return Ok(Some((processed_message, metadata))); } } - // Buffer the message so that it can be returned by the next call to `recv()` - self.buffered_message = Some(message); + // Buffer the processed message so that it can be returned by the next call to `recv()` + self.buffered_message = Some(processed_message); } Ok(None) } diff --git a/tools/ci-cdk/canary-runner/Cargo.lock b/tools/ci-cdk/canary-runner/Cargo.lock index a145ab9d44..c27f946f05 100644 --- a/tools/ci-cdk/canary-runner/Cargo.lock +++ b/tools/ci-cdk/canary-runner/Cargo.lock @@ -2650,7 +2650,7 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smithy-rs-tool-common" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-trait", From a748732742d1c815efa20e0c39270cc10db9f7fa Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Fri, 14 Nov 2025 14:14:03 -0500 Subject: [PATCH 3/9] Remove lots of un-needed custom code! --- .../generators/http/HttpBindingGenerator.kt | 36 --------- .../integration-tests/Cargo.lock | 2 +- .../server/smithy/ServerCodegenVisitor.kt | 7 +- .../SigV4EventStreamDecorator.kt | 61 ++------------- .../customize/ServerCodegenDecorator.kt | 12 --- .../http/ServerResponseBindingGenerator.kt | 4 - .../smithy/protocols/ServerProtocolLoader.kt | 75 +++++++++---------- .../smithy/testutil/ServerTestHelpers.kt | 2 +- 8 files changed, 42 insertions(+), 157 deletions(-) diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt index 90937ab2fa..cf6c44b5f6 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/smithy/generators/http/HttpBindingGenerator.kt @@ -96,18 +96,6 @@ sealed class HttpBindingSection(name: String) : Section(name) { data class AfterDeserializingIntoADateTimeOfHttpHeaders(val memberShape: MemberShape) : HttpBindingSection("AfterDeserializingIntoADateTimeOfHttpHeaders") - - data class BeforeCreatingEventStreamReceiver( - val operationShape: OperationShape, - val unionShape: UnionShape, - val unmarshallerVariableName: String, - ) : HttpBindingSection("BeforeCreatingEventStreamReceiver") - - data class WrapEventStreamReceiver( - val operationShape: OperationShape, - val unionShape: UnionShape, - val receiverExpression: String, - ) : HttpBindingSection("WrapEventStreamReceiver") } typealias HttpBindingCustomization = NamedCustomization @@ -288,22 +276,10 @@ class HttpBindingGenerator( "unmarshallerConstructorFn" to unmarshallerConstructorFn, ) - // Allow customizations to wrap the unmarshaller - for (customization in customizations) { - customization.section( - HttpBindingSection.BeforeCreatingEventStreamReceiver( - operationShape, - targetShape, - "unmarshaller", - ), - )(this) - } - rustTemplate( """ let body = std::mem::replace(body, #{SdkBody}::taken()); let receiver = #{receiver:W}; - #{wrapReceiver:W} Ok(receiver) """, "SdkBody" to RuntimeType.sdkBody(runtimeConfig), @@ -319,18 +295,6 @@ class HttpBindingGenerator( ) } }, - "wrapReceiver" to - writable { - for (customization in customizations) { - customization.section( - HttpBindingSection.WrapEventStreamReceiver( - operationShape, - targetShape, - "receiver", - ), - )(this) - } - }, ) } diff --git a/codegen-server-test/integration-tests/Cargo.lock b/codegen-server-test/integration-tests/Cargo.lock index 22c0999cfb..1d461df19c 100644 --- a/codegen-server-test/integration-tests/Cargo.lock +++ b/codegen-server-test/integration-tests/Cargo.lock @@ -76,7 +76,7 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.5" +version = "0.62.6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ServerCodegenVisitor.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ServerCodegenVisitor.kt index 7fd8d76b32..121001a0da 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ServerCodegenVisitor.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ServerCodegenVisitor.kt @@ -139,12 +139,7 @@ open class ServerCodegenVisitor( ServerProtocolLoader( codegenDecorator.protocols( service.id, - ServerProtocolLoader.defaultProtocols { it -> - codegenDecorator.httpCustomizations( - serverSymbolProviders.symbolProvider, - it, - ) - }, + ServerProtocolLoader.DefaultProtocols, ), ) .protocolFor(context.model, service) diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamDecorator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamDecorator.kt index 9aa078c4aa..e1ba675183 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamDecorator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamDecorator.kt @@ -11,17 +11,8 @@ import software.amazon.smithy.model.knowledge.ServiceIndex import software.amazon.smithy.model.shapes.MemberShape import software.amazon.smithy.model.shapes.OperationShape import software.amazon.smithy.model.shapes.Shape -import software.amazon.smithy.model.shapes.ShapeId -import software.amazon.smithy.rust.codegen.core.rustlang.RustType -import software.amazon.smithy.rust.codegen.core.rustlang.Writable -import software.amazon.smithy.rust.codegen.core.rustlang.writable -import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig import software.amazon.smithy.rust.codegen.core.smithy.RustSymbolProvider import software.amazon.smithy.rust.codegen.core.smithy.WrappingSymbolProvider -import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpBindingCustomization -import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpBindingSection -import software.amazon.smithy.rust.codegen.core.smithy.mapRustType -import software.amazon.smithy.rust.codegen.core.smithy.rustType import software.amazon.smithy.rust.codegen.core.smithy.traits.SyntheticInputTrait import software.amazon.smithy.rust.codegen.core.util.getTrait import software.amazon.smithy.rust.codegen.core.util.isEventStream @@ -35,59 +26,28 @@ class SigV4EventStreamDecorator : ServerCodegenDecorator { override val name: String = "SigV4EventStreamDecorator" override val order: Byte = 0 - override fun httpCustomizations( - symbolProvider: RustSymbolProvider, - protocol: ShapeId, - ): List { - return listOf(SigV4EventStreamCustomization(symbolProvider)) - } - override fun symbolProvider(base: RustSymbolProvider): RustSymbolProvider { - // We need access to the service shape to check for SigV4 trait, but the base interface doesn't provide it. - // For now, we'll wrap all event streams and let the runtime code handle the detection. - return SigV4EventStreamSymbolProvider(base) + if (base.usesSigAuth()) { + return SigV4EventStreamSymbolProvider(base) + } else { + return base + } } } internal fun RustSymbolProvider.usesSigAuth(): Boolean = ServiceIndex.of(model).getAuthSchemes(moduleProviderContext.serviceShape!!).containsKey(SigV4Trait.ID) -// Goes from `T` to `SignedEvent` -fun wrapInSignedEvent( - inner: Symbol, - runtimeConfig: RuntimeConfig, -) = inner.mapRustType { - RustType.Application( - SigV4EventStreamSupportStructures.signedEvent(runtimeConfig).toSymbol().rustType(), - listOf(inner.rustType()), - ) -} - -// Goes from `E` to `SignedEventError` -fun wrapInSignedEventError( - inner: Symbol, - runtimeConfig: RuntimeConfig, -) = inner.mapRustType { - RustType.Application( - SigV4EventStreamSupportStructures.signedEventError(runtimeConfig).toSymbol().rustType(), - listOf(inner.rustType()), - ) -} - /** * Symbol provider wrapper that modifies event stream types to support SigV4 signed messages. */ class SigV4EventStreamSymbolProvider( base: RustSymbolProvider, ) : WrappingSymbolProvider(base) { - private val serviceIsSigv4 = base.usesSigAuth() private val runtimeConfig = base.config.runtimeConfig override fun toSymbol(shape: Shape): Symbol { val baseSymbol = super.toSymbol(shape) - if (!serviceIsSigv4) { - return baseSymbol - } // We only want to wrap with Event Stream types when dealing with member shapes if (shape is MemberShape && shape.isEventStream(model)) { // Determine if the member has a container that is a synthetic input or output @@ -108,14 +68,3 @@ class SigV4EventStreamSymbolProvider( return baseSymbol } } - -class SigV4EventStreamCustomization(private val symbolProvider: RustSymbolProvider) : HttpBindingCustomization() { - override fun section(section: HttpBindingSection): Writable = - writable { - when (section) { - // Type wrapping happens via symbol provider - // SigV4Receiver::new() takes (unmarshaller, body) directly - else -> {} - } - } -} diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customize/ServerCodegenDecorator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customize/ServerCodegenDecorator.kt index d82d3ad65f..6e23bbd678 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customize/ServerCodegenDecorator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customize/ServerCodegenDecorator.kt @@ -10,10 +10,8 @@ import software.amazon.smithy.model.shapes.OperationShape import software.amazon.smithy.model.shapes.ServiceShape import software.amazon.smithy.model.shapes.ShapeId import software.amazon.smithy.model.shapes.StructureShape -import software.amazon.smithy.rust.codegen.core.smithy.RustSymbolProvider import software.amazon.smithy.rust.codegen.core.smithy.customize.CombinedCoreCodegenDecorator import software.amazon.smithy.rust.codegen.core.smithy.customize.CoreCodegenDecorator -import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpBindingCustomization import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolMap import software.amazon.smithy.rust.codegen.server.smithy.ServerCodegenContext import software.amazon.smithy.rust.codegen.server.smithy.ServerRustSettings @@ -34,11 +32,6 @@ interface ServerCodegenDecorator : CoreCodegenDecorator = emptyList() - fun validationExceptionConversion(codegenContext: ServerCodegenContext): ValidationExceptionConversionGenerator? = null @@ -102,11 +95,6 @@ class CombinedServerCodegenDecorator(decorators: List) : decorator.protocols(serviceId, protocolMap) } - override fun httpCustomizations( - symbolProvider: RustSymbolProvider, - protocol: ShapeId, - ): List = orderedDecorators.flatMap { it.httpCustomizations(symbolProvider, protocol) } - override fun validationExceptionConversion( codegenContext: ServerCodegenContext, ): ValidationExceptionConversionGenerator = diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/generators/http/ServerResponseBindingGenerator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/generators/http/ServerResponseBindingGenerator.kt index 2bd6d2813a..53a3f40689 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/generators/http/ServerResponseBindingGenerator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/generators/http/ServerResponseBindingGenerator.kt @@ -74,8 +74,6 @@ class ServerResponseBeforeIteratingOverMapBoundWithHttpPrefixHeadersUnwrapConstr is HttpBindingSection.BeforeRenderingHeaderValue, is HttpBindingSection.AfterDeserializingIntoAHashMapOfHttpPrefixHeaders, is HttpBindingSection.AfterDeserializingIntoADateTimeOfHttpHeaders, - is HttpBindingSection.BeforeCreatingEventStreamReceiver, - is HttpBindingSection.WrapEventStreamReceiver, -> emptySection } } @@ -110,8 +108,6 @@ class ServerResponseBeforeRenderingHeadersHttpBindingCustomization(val codegenCo is HttpBindingSection.BeforeIteratingOverMapShapeBoundWithHttpPrefixHeaders, is HttpBindingSection.AfterDeserializingIntoAHashMapOfHttpPrefixHeaders, is HttpBindingSection.AfterDeserializingIntoADateTimeOfHttpHeaders, - is HttpBindingSection.BeforeCreatingEventStreamReceiver, - is HttpBindingSection.WrapEventStreamReceiver, -> emptySection } } diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt index ea87569984..8d5e374867 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/protocols/ServerProtocolLoader.kt @@ -9,13 +9,11 @@ import software.amazon.smithy.aws.traits.protocols.AwsJson1_0Trait import software.amazon.smithy.aws.traits.protocols.AwsJson1_1Trait import software.amazon.smithy.aws.traits.protocols.RestJson1Trait import software.amazon.smithy.aws.traits.protocols.RestXmlTrait -import software.amazon.smithy.model.shapes.ShapeId import software.amazon.smithy.protocol.traits.Rpcv2CborTrait import software.amazon.smithy.rust.codegen.core.rustlang.Writable import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate import software.amazon.smithy.rust.codegen.core.rustlang.writable import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType -import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpBindingCustomization import software.amazon.smithy.rust.codegen.core.smithy.protocols.AwsJsonVersion import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolLoader import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolMap @@ -60,44 +58,39 @@ class StreamPayloadSerializerCustomization : ServerHttpBoundProtocolCustomizatio class ServerProtocolLoader(supportedProtocols: ProtocolMap) : ProtocolLoader(supportedProtocols) { companion object { - fun defaultProtocols( - httpBindingCustomizations: (ShapeId) -> List = { _ -> listOf() }, - ) = mapOf( - RestJson1Trait.ID to - ServerRestJsonFactory( - additionalServerHttpBoundProtocolCustomizations = - listOf( - StreamPayloadSerializerCustomization(), - ), - additionalHttpBindingCustomizations = httpBindingCustomizations(RestJson1Trait.ID), - ), - RestXmlTrait.ID to - ServerRestXmlFactory( - additionalServerHttpBoundProtocolCustomizations = - listOf( - StreamPayloadSerializerCustomization(), - ), - ), - AwsJson1_0Trait.ID to - ServerAwsJsonFactory( - AwsJsonVersion.Json10, - additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()), - additionalHttpBindingCustomizations = httpBindingCustomizations(AwsJson1_0Trait.ID), - ), - AwsJson1_1Trait.ID to - ServerAwsJsonFactory( - AwsJsonVersion.Json11, - additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()), - additionalHttpBindingCustomizations = httpBindingCustomizations(AwsJson1_1Trait.ID), - ), - Rpcv2CborTrait.ID to - ServerRpcV2CborFactory( - additionalServerHttpBoundProtocolCustomizations = - listOf( - StreamPayloadSerializerCustomization(), - ), - additionalHttpBindingCustomizations = httpBindingCustomizations(Rpcv2CborTrait.ID), - ), - ) + val DefaultProtocols = + mapOf( + RestJson1Trait.ID to + ServerRestJsonFactory( + additionalServerHttpBoundProtocolCustomizations = + listOf( + StreamPayloadSerializerCustomization(), + ), + ), + RestXmlTrait.ID to + ServerRestXmlFactory( + additionalServerHttpBoundProtocolCustomizations = + listOf( + StreamPayloadSerializerCustomization(), + ), + ), + AwsJson1_0Trait.ID to + ServerAwsJsonFactory( + AwsJsonVersion.Json10, + additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()), + ), + AwsJson1_1Trait.ID to + ServerAwsJsonFactory( + AwsJsonVersion.Json11, + additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()), + ), + Rpcv2CborTrait.ID to + ServerRpcV2CborFactory( + additionalServerHttpBoundProtocolCustomizations = + listOf( + StreamPayloadSerializerCustomization(), + ), + ), + ) } } diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/testutil/ServerTestHelpers.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/testutil/ServerTestHelpers.kt index 840d82720d..a03dfbdd4e 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/testutil/ServerTestHelpers.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/testutil/ServerTestHelpers.kt @@ -135,7 +135,7 @@ fun serverTestCodegenContext( fun loadServerProtocol(model: Model): ServerProtocol { val codegenContext = serverTestCodegenContext(model) val (_, protocolGeneratorFactory) = - ServerProtocolLoader(ServerProtocolLoader.defaultProtocols()).protocolFor(model, codegenContext.serviceShape) + ServerProtocolLoader(ServerProtocolLoader.DefaultProtocols).protocolFor(model, codegenContext.serviceShape) return protocolGeneratorFactory.buildProtocolGenerator(codegenContext).protocol } From 8944846fffb3e2be43841d720f390cd0d203c3a8 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Fri, 14 Nov 2025 14:39:18 -0500 Subject: [PATCH 4/9] Clean up tests --- .../tests/structured_eventstream_tests.rs | 162 ++++++++++++------ 1 file changed, 111 insertions(+), 51 deletions(-) diff --git a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs index 3d441feb2a..36f2efb953 100644 --- a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs +++ b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs @@ -352,39 +352,45 @@ fn build_event(event_type: &str) -> Message { Message::new_from_parts(headers, empty_cbor) } -fn build_sigv4_signed_event(event_type: &str) -> Message { +fn sign_message(inner_message: Message, signature: &[u8], timestamp_secs: i64) -> Message { use aws_smithy_eventstream::frame::write_message_to; - use std::time::{SystemTime, UNIX_EPOCH}; - - // Build the inner event message - let inner_event = build_event(event_type); - // Serialize the inner message to bytes let mut inner_bytes = Vec::new(); - write_message_to(&inner_event, &mut inner_bytes).unwrap(); - - // Create the SigV4 envelope with signature headers - let timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + write_message_to(&inner_message, &mut inner_bytes).unwrap(); let headers = vec![ Header::new( ":chunk-signature", - HeaderValue::ByteArray(Bytes::from( - "example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - )), + HeaderValue::ByteArray(Bytes::from(signature.to_vec())), ), Header::new( ":date", - HeaderValue::Timestamp(aws_smithy_types::DateTime::from_secs(timestamp as i64)), + HeaderValue::Timestamp(aws_smithy_types::DateTime::from_secs(timestamp_secs)), ), ]; Message::new_from_parts(headers, Bytes::from(inner_bytes)) } +fn build_sigv4_signed_event(event_type: &str) -> Message { + use std::time::{SystemTime, UNIX_EPOCH}; + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + sign_message( + build_event(event_type), + b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + timestamp as i64, + ) +} + +fn build_sigv4_signed_initial_data(data: &str, signature: &[u8], timestamp_secs: i64) -> Message { + sign_message(build_initial_data_message(data), signature, timestamp_secs) +} + fn get_event_type(msg: &Message) -> &str { msg.headers() .iter() @@ -494,6 +500,89 @@ async fn test_sigv4_signed_event_stream() { ); } +/// Test that initial_signature field is populated when initial message is SigV4 signed +#[tokio::test] +async fn test_sigv4_initial_signature() { + use std::sync::Arc; + use tokio::sync::Mutex; + + let initial_sig = Arc::new(Mutex::new(None)); + let sig_capture = initial_sig.clone(); + + let config = RpcV2CborServiceConfig::builder().build(); + let app = RpcV2CborService::builder::(config) + .streaming_operation_with_initial_data( + move |mut input: input::StreamingOperationWithInitialDataInput| { + let sig_capture = sig_capture.clone(); + async move { + // Capture the initial signature + *sig_capture.lock().await = input.events.initial_signature().map(|s| s.clone()); + + let _ev = input.events.recv().await; + Ok(output::StreamingOperationWithInitialDataOutput::builder() + .events(EventStreamSender::once(Ok(Events::A(Event {})))) + .build() + .unwrap()) + } + }, + ) + .build_unchecked(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let make_service = app.into_make_service(); + let server = hyper0::Server::from_tcp(listener.into_std().unwrap()) + .unwrap() + .http2_only(true) + .serve(make_service); + server.await.unwrap(); + }); + + let path = "/service/RpcV2CborService/operation/StreamingOperationWithInitialData"; + let mut client = ManualEventStreamClient::connect_to_service( + addr, + path, + vec![("Smithy-Protocol", "rpc-v2-cbor")], + ) + .await + .unwrap(); + + // Send SigV4-signed initial-data message with specific signature and timestamp + let test_signature = b"test-signature-12345"; + let test_timestamp = 1700000000i64; + let signed_initial = + build_sigv4_signed_initial_data("test-data", test_signature, test_timestamp); + client.send(signed_initial).await.unwrap(); + + // Send an event + client.send(build_event("A")).await.unwrap(); + + // Receive response + let _resp = client.recv().await.unwrap().unwrap(); + + // Verify initial_signature was captured and has expected values + let sig = initial_sig.lock().await; + let sig_info = sig + .as_ref() + .expect("initial_signature should be populated for signed initial message"); + + // Verify the chunk signature matches what we sent + assert_eq!( + sig_info.chunk_signature, test_signature, + "chunk_signature should match the signature sent in the message" + ); + + // Verify timestamp matches what we sent + use std::time::{SystemTime, UNIX_EPOCH}; + let expected_time = UNIX_EPOCH + std::time::Duration::from_secs(test_timestamp as u64); + assert_eq!( + sig_info.timestamp, expected_time, + "timestamp should match the timestamp sent in the message" + ); +} + /// Test that when alwaysSendEventStreamInitialResponse is disabled, no initial-response is sent #[tokio::test] async fn test_server_no_initial_response_when_disabled() { @@ -630,7 +719,11 @@ async fn test_sigv4_framed_initial_request_with_data() { let mut harness = TestHarness::new("StreamingOperationWithInitialData").await; // Send a SigV4-framed initial-request with data - let signed_initial_request = build_sigv4_signed_initial_data("test-data"); + let signed_initial_request = build_sigv4_signed_initial_data( + "test-data", + b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + 1700000000, + ); harness.client.send(signed_initial_request).await.unwrap(); harness.send_event("A").await; @@ -642,36 +735,3 @@ async fn test_sigv4_framed_initial_request_with_data() { // Verify the server received and parsed the initial data from inside the SigV4 envelope assert_eq!(harness.server.initial_data(), Some("test-data".to_string())); } - -fn build_sigv4_signed_initial_data(data: &str) -> Message { - use aws_smithy_eventstream::frame::write_message_to; - use std::time::{SystemTime, UNIX_EPOCH}; - - // Build the inner initial-request message with data - let inner_message = build_initial_data_message(data); - - // Serialize the inner message to bytes - let mut inner_bytes = Vec::new(); - write_message_to(&inner_message, &mut inner_bytes).unwrap(); - - // Create the SigV4 envelope with signature headers - let timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); - - let headers = vec![ - Header::new( - ":chunk-signature", - HeaderValue::ByteArray(Bytes::from( - "example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - )), - ), - Header::new( - ":date", - HeaderValue::Timestamp(aws_smithy_types::DateTime::from_secs(timestamp as i64)), - ), - ]; - - Message::new_from_parts(headers, Bytes::from(inner_bytes)) -} From a14bdf86b5f2cfcbb53c494becd04433cbf9b5d5 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Fri, 14 Nov 2025 16:09:02 -0500 Subject: [PATCH 5/9] Fix issues where only the initial request was tested --- .../integration-tests/Cargo.lock | 1 + .../integration-tests/eventstreams/Cargo.toml | 1 + .../tests/structured_eventstream_tests.rs | 332 ++++++++++++------ .../SigV4EventStreamSupportStructures.kt | 109 +++++- .../src/event_stream/receiver.rs | 4 +- 5 files changed, 313 insertions(+), 134 deletions(-) diff --git a/codegen-server-test/integration-tests/Cargo.lock b/codegen-server-test/integration-tests/Cargo.lock index 1d461df19c..6132cf38e4 100644 --- a/codegen-server-test/integration-tests/Cargo.lock +++ b/codegen-server-test/integration-tests/Cargo.lock @@ -452,6 +452,7 @@ dependencies = [ "rpcv2cbor_extras_no_initial_response", "tokio", "tokio-stream", + "tracing", ] [[package]] diff --git a/codegen-server-test/integration-tests/eventstreams/Cargo.toml b/codegen-server-test/integration-tests/eventstreams/Cargo.toml index 811e5f6785..2100e450f3 100644 --- a/codegen-server-test/integration-tests/eventstreams/Cargo.toml +++ b/codegen-server-test/integration-tests/eventstreams/Cargo.toml @@ -18,3 +18,4 @@ aws-smithy-runtime = { workspace = true } http-body-util = "0.1.3" hyper-util = { version = "0.1.17", features = ["client-legacy", "tokio", "http2", "http1"] } tokio-stream = "0.1.17" +tracing = "0.1.41" diff --git a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs index 36f2efb953..71ad7066d8 100644 --- a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs +++ b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs @@ -9,30 +9,34 @@ use aws_smithy_types::event_stream::{Header, HeaderValue, Message}; use bytes::Bytes; use eventstreams::{ManualEventStreamClient, RecvError}; use rpcv2cbor_extras::model::{Event, Events}; +use rpcv2cbor_extras::sigv4_event_stream::SignedEvent; use rpcv2cbor_extras::{error, input, output, RpcV2CborService, RpcV2CborServiceConfig}; use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; #[derive(Debug, Default, Clone)] struct StreamingOperationState { - events: Vec, + events: Vec>, num_calls: usize, + initial_signature: Option>, } #[derive(Debug, Default, Clone)] struct StreamingOperationWithInitialDataState { initial_data: Option, - events: Vec, + events: Vec>, #[allow(dead_code)] num_calls: usize, + initial_signature: Option>, } #[derive(Debug, Default, Clone)] struct StreamingOperationWithOptionalDataState { optional_data: Option, - events: Vec, + events: Vec>, #[allow(dead_code)] num_calls: usize, + initial_signature: Option>, } #[derive(Debug, Default, Clone)] @@ -90,7 +94,7 @@ impl TestServer { Self { addr, state } } - fn streaming_operation_events(&self) -> Vec { + fn streaming_operation_events(&self) -> Vec> { self.state .lock() .unwrap() @@ -99,7 +103,7 @@ impl TestServer { .clone() } - fn streaming_operation_with_initial_data_events(&self) -> Vec { + fn streaming_operation_with_initial_data_events(&self) -> Vec> { self.state .lock() .unwrap() @@ -117,7 +121,7 @@ impl TestServer { .clone() } - fn streaming_operation_with_optional_data_events(&self) -> Vec { + fn streaming_operation_with_optional_data_events(&self) -> Vec> { self.state .lock() .unwrap() @@ -134,6 +138,24 @@ impl TestServer { .optional_data .clone() } + + fn initial_signature(&self) -> Option> { + self.state + .lock() + .unwrap() + .streaming_operation_with_initial_data + .initial_signature + .clone() + } + + fn streaming_operation_initial_signature(&self) -> Option> { + self.state + .lock() + .unwrap() + .streaming_operation + .initial_signature + .clone() + } } async fn streaming_operation_handler( @@ -141,18 +163,26 @@ async fn streaming_operation_handler( state: Arc>, ) -> Result { state.lock().unwrap().streaming_operation.num_calls += 1; - let ev = input.events.recv().await; + state.lock().unwrap().streaming_operation.initial_signature = input + .events + .initial_signature() + .map(|s| s.chunk_signature.to_vec()); - if let Ok(Some(signed_event)) = &ev { - // Extract the actual event from the SignedEvent wrapper - let actual_event = &signed_event.message; - state - .lock() - .unwrap() - .streaming_operation - .events - .push(actual_event.clone()); - } + let state_clone = state.clone(); + tokio::spawn(async move { + while let Ok(Some(signed_event)) = input.events.recv().await { + tracing::debug!( + "streaming_operation received event: {:?}", + signed_event.message + ); + state_clone + .lock() + .unwrap() + .streaming_operation + .events + .push(signed_event); + } + }); Ok(output::StreamingOperationOutput::builder() .events(EventStreamSender::once(Ok(Events::A(Event {})))) @@ -173,19 +203,30 @@ async fn streaming_operation_with_initial_data_handler( .unwrap() .streaming_operation_with_initial_data .initial_data = Some(input.initial_data); + state + .lock() + .unwrap() + .streaming_operation_with_initial_data + .initial_signature = input + .events + .initial_signature() + .map(|s| s.chunk_signature.to_vec()); - let ev = input.events.recv().await; - - if let Ok(Some(signed_event)) = &ev { - // Extract the actual event from the SignedEvent wrapper - let actual_event = &signed_event.message; - state - .lock() - .unwrap() - .streaming_operation_with_initial_data - .events - .push(actual_event.clone()); - } + let state_clone = state.clone(); + tokio::spawn(async move { + while let Ok(Some(signed_event)) = input.events.recv().await { + tracing::debug!( + "streaming_operation_with_initial_data received event: {:?}", + signed_event.message + ); + state_clone + .lock() + .unwrap() + .streaming_operation_with_initial_data + .events + .push(signed_event); + } + }); Ok(output::StreamingOperationWithInitialDataOutput::builder() .events(EventStreamSender::once(Ok(Events::A(Event {})))) @@ -200,7 +241,14 @@ async fn streaming_operation_with_initial_response_handler( output::StreamingOperationWithInitialResponseOutput, error::StreamingOperationWithInitialResponseError, > { - let _ev = input.events.recv().await; + tokio::spawn(async move { + while let Ok(Some(event)) = input.events.recv().await { + tracing::debug!( + "streaming_operation_with_initial_response received event: {:?}", + event + ); + } + }); Ok( output::StreamingOperationWithInitialResponseOutput::builder() @@ -224,17 +272,30 @@ async fn streaming_operation_with_optional_data_handler( .unwrap() .streaming_operation_with_optional_data .optional_data = input.optional_data; + state + .lock() + .unwrap() + .streaming_operation_with_optional_data + .initial_signature = input + .events + .initial_signature() + .map(|s| s.chunk_signature.to_vec()); - let ev = input.events.recv().await; - - if let Ok(Some(event)) = &ev { - state - .lock() - .unwrap() - .streaming_operation_with_optional_data - .events - .push(event.message.clone()); - } + let state_clone = state.clone(); + tokio::spawn(async move { + while let Ok(Some(event)) = input.events.recv().await { + tracing::debug!( + "streaming_operation_with_optional_data received event: {:?}", + event + ); + state_clone + .lock() + .unwrap() + .streaming_operation_with_optional_data + .events + .push(event); + } + }); Ok(output::StreamingOperationWithOptionalDataOutput::builder() .optional_response_data(Some("optional response".to_string())) @@ -373,6 +434,13 @@ fn sign_message(inner_message: Message, signature: &[u8], timestamp_secs: i64) - } fn build_sigv4_signed_event(event_type: &str) -> Message { + build_sigv4_signed_event_with_signature( + event_type, + b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + ) +} + +fn build_sigv4_signed_event_with_signature(event_type: &str, signature: &[u8]) -> Message { use std::time::{SystemTime, UNIX_EPOCH}; let timestamp = SystemTime::now() @@ -380,11 +448,7 @@ fn build_sigv4_signed_event(event_type: &str) -> Message { .unwrap() .as_secs(); - sign_message( - build_event(event_type), - b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - timestamp as i64, - ) + sign_message(build_event(event_type), signature, timestamp as i64) } fn build_sigv4_signed_initial_data(data: &str, signature: &[u8], timestamp_secs: i64) -> Message { @@ -421,7 +485,12 @@ async fn test_streaming_operation_with_initial_request() { ); assert_eq!( - harness.server.streaming_operation_events(), + harness + .server + .streaming_operation_events() + .into_iter() + .map(|e| e.message) + .collect::>(), vec![Events::A(Event {})] ); } @@ -436,7 +505,12 @@ async fn test_streaming_operation_without_initial_request() { let resp = harness.expect_message().await; assert_eq!(get_event_type(&resp), "A"); assert_eq!( - harness.server.streaming_operation_events(), + harness + .server + .streaming_operation_events() + .into_iter() + .map(|e| e.message) + .collect::>(), vec![Events::A(Event {})] ); } @@ -453,7 +527,10 @@ async fn test_streaming_operation_with_initial_data() { assert_eq!( harness .server - .streaming_operation_with_initial_data_events(), + .streaming_operation_with_initial_data_events() + .into_iter() + .map(|e| e.message) + .collect::>(), vec![Events::A(Event {})] ); // verify that we parsed the initial data properly @@ -477,7 +554,10 @@ async fn test_streaming_operation_with_initial_data_missing() { assert_eq!( harness .server - .streaming_operation_with_initial_data_events(), + .streaming_operation_with_initial_data_events() + .into_iter() + .map(|e| e.message) + .collect::>(), vec![] ); } @@ -486,6 +566,9 @@ async fn test_streaming_operation_with_initial_data_missing() { /// The client wraps the actual event in a SigV4 envelope with signature headers. #[tokio::test] async fn test_sigv4_signed_event_stream() { + let _logs = show_filtered_test_logs( + "aws_smithy_http_server=trace,hyper_util=debug,rpcv2cbor_extras=trace", + ); let mut harness = TestHarness::new("StreamingOperation").await; // Send a SigV4 signed event - the inner message is wrapped in an envelope @@ -494,92 +577,104 @@ async fn test_sigv4_signed_event_stream() { let resp = harness.expect_message().await; assert_eq!(get_event_type(&resp), "A"); + + let events = harness.server.streaming_operation_events(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].message, Events::A(Event {})); + // The event itself is signed + assert!(events[0].signature.is_some()); assert_eq!( - harness.server.streaming_operation_events(), - vec![Events::A(Event {})] + events[0].signature.as_ref().unwrap().chunk_signature, + b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" ); } -/// Test that initial_signature field is populated when initial message is SigV4 signed +/// Test that SigV4-signed initial-request works on operations without initial data #[tokio::test] -async fn test_sigv4_initial_signature() { - use std::sync::Arc; - use tokio::sync::Mutex; +async fn test_sigv4_signed_initial_request_initial_event_empty() { + let mut harness = TestHarness::new("StreamingOperation").await; - let initial_sig = Arc::new(Mutex::new(None)); - let sig_capture = initial_sig.clone(); + // Send SigV4-signed initial-request (empty) to operation that doesn't expect initial data + let signed_initial = sign_message(build_initial_request(), b"test-sig-123", 1700000000); + harness.client.send(signed_initial).await.unwrap(); - let config = RpcV2CborServiceConfig::builder().build(); - let app = RpcV2CborService::builder::(config) - .streaming_operation_with_initial_data( - move |mut input: input::StreamingOperationWithInitialDataInput| { - let sig_capture = sig_capture.clone(); - async move { - // Capture the initial signature - *sig_capture.lock().await = input.events.initial_signature().map(|s| s.clone()); - - let _ev = input.events.recv().await; - Ok(output::StreamingOperationWithInitialDataOutput::builder() - .events(EventStreamSender::once(Ok(Events::A(Event {})))) - .build() - .unwrap()) - } - }, - ) - .build_unchecked(); + // Send an event + harness.send_event("A").await; + harness.send_event("B").await; - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); + // Should receive the event response + let resp = harness.expect_message().await; + assert_eq!(get_event_type(&resp), "A"); - tokio::spawn(async move { - let make_service = app.into_make_service(); - let server = hyper0::Server::from_tcp(listener.into_std().unwrap()) - .unwrap() - .http2_only(true) - .serve(make_service); - server.await.unwrap(); - }); + let events = harness.server.streaming_operation_events(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].message, Events::A(Event {})); + assert_eq!(events[0].signature, None); // Unsigned event + assert_eq!(events[1].message, Events::B(Event {})); + assert_eq!(events[1].signature, None); // Unsigned event - let path = "/service/RpcV2CborService/operation/StreamingOperationWithInitialData"; - let mut client = ManualEventStreamClient::connect_to_service( - addr, - path, - vec![("Smithy-Protocol", "rpc-v2-cbor")], - ) - .await - .unwrap(); + assert_eq!( + harness.server.streaming_operation_initial_signature(), + Some(b"test-sig-123".to_vec()) + ); +} - // Send SigV4-signed initial-data message with specific signature and timestamp - let test_signature = b"test-signature-12345"; - let test_timestamp = 1700000000i64; - let signed_initial = - build_sigv4_signed_initial_data("test-data", test_signature, test_timestamp); - client.send(signed_initial).await.unwrap(); +/// Test that SigV4-signed initial-request works on operations without initial data +#[tokio::test] +async fn test_sigv4_signed_initial_request_no_initial_event() { + let mut harness = TestHarness::new("StreamingOperation").await; // Send an event - client.send(build_event("A")).await.unwrap(); + harness.send_event("A").await; + harness.send_event("B").await; + + // Should receive the event response + let resp = harness.expect_message().await; + assert_eq!(get_event_type(&resp), "A"); + assert_eq!( + harness + .server + .streaming_operation_events() + .into_iter() + .map(|e| e.message) + .collect::>(), + vec![Events::A(Event {}), Events::B(Event {})] + ); + assert_eq!(harness.server.streaming_operation_initial_signature(), None); +} + +/// Test that multiple SigV4-signed events are properly unwrapped with signatures preserved +#[tokio::test] +async fn test_sigv4_signed_multiple_events() { + let mut harness = TestHarness::new("StreamingOperation").await; + + // Send multiple SigV4-signed events with different signatures + let signed_event_a = build_sigv4_signed_event_with_signature("A", b"signature-for-event-A"); + harness.client.send(signed_event_a).await.unwrap(); + + let signed_event_b = build_sigv4_signed_event_with_signature("B", b"signature-for-event-B"); + harness.client.send(signed_event_b).await.unwrap(); // Receive response - let _resp = client.recv().await.unwrap().unwrap(); + let resp = harness.expect_message().await; + assert_eq!(get_event_type(&resp), "A"); - // Verify initial_signature was captured and has expected values - let sig = initial_sig.lock().await; - let sig_info = sig - .as_ref() - .expect("initial_signature should be populated for signed initial message"); + // Verify both events were received with their distinct signatures + let events = harness.server.streaming_operation_events(); + assert_eq!(events.len(), 2); - // Verify the chunk signature matches what we sent + assert_eq!(events[0].message, Events::A(Event {})); + assert!(events[0].signature.is_some()); assert_eq!( - sig_info.chunk_signature, test_signature, - "chunk_signature should match the signature sent in the message" + events[0].signature.as_ref().unwrap().chunk_signature, + b"signature-for-event-A" ); - // Verify timestamp matches what we sent - use std::time::{SystemTime, UNIX_EPOCH}; - let expected_time = UNIX_EPOCH + std::time::Duration::from_secs(test_timestamp as u64); + assert_eq!(events[1].message, Events::B(Event {})); + assert!(events[1].signature.is_some()); assert_eq!( - sig_info.timestamp, expected_time, - "timestamp should match the timestamp sent in the message" + events[1].signature.as_ref().unwrap().chunk_signature, + b"signature-for-event-B" ); } @@ -701,7 +796,10 @@ async fn test_streaming_operation_with_optional_data() { assert_eq!( harness .server - .streaming_operation_with_optional_data_events(), + .streaming_operation_with_optional_data_events() + .into_iter() + .map(|e| e.message) + .collect::>(), vec![Events::A(Event {})] ); // Verify optional data was not provided @@ -734,4 +832,8 @@ async fn test_sigv4_framed_initial_request_with_data() { // Verify the server received and parsed the initial data from inside the SigV4 envelope assert_eq!(harness.server.initial_data(), Some("test-data".to_string())); + assert_eq!( + harness.server.initial_signature(), + Some(b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_vec()) + ); } diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt index ec6a9d7c83..0ebaa2f761 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt @@ -17,7 +17,7 @@ import software.amazon.smithy.rust.codegen.core.smithy.rustType import software.amazon.smithy.rust.codegen.core.util.PANIC object SigV4EventStreamSupportStructures { - private val supportModule = RustModule.private("sigv4_event_stream") + private val supportModule = RustModule.public("sigv4_event_stream", documentationOverride = "Support structures for SigV4 signed event streams") fun codegenScope(runtimeConfig: RuntimeConfig) = arrayOf( @@ -26,6 +26,7 @@ object SigV4EventStreamSupportStructures { "SignedEventError" to signedEventError(runtimeConfig), "SignedEvent" to signedEvent(runtimeConfig), "SigV4Receiver" to sigV4Receiver(runtimeConfig), + "SigV4Unmarshaller" to sigV4Unmarshaller(runtimeConfig), "extract_signed_message" to extractSignedMessage(runtimeConfig), ) @@ -61,7 +62,7 @@ object SigV4EventStreamSupportStructures { """ /// Information extracted from a signed event stream message ##[non_exhaustive] - ##[derive(Debug, Clone)] + ##[derive(Debug, Clone, PartialEq)] pub struct SignatureInfo { /// The chunk signature bytes from the `:chunk-signature` header pub chunk_signature: Vec, @@ -150,7 +151,7 @@ object SigV4EventStreamSupportStructures { rustTemplate( """ /// Wrapper for event stream messages that may be signed - ##[derive(Debug)] + ##[derive(Debug, Clone)] pub struct SignedEvent { /// The actual event message pub message: T, @@ -163,6 +164,86 @@ object SigV4EventStreamSupportStructures { ) } + private fun sigV4Unmarshaller(runtimeConfig: RuntimeConfig): RuntimeType = + RuntimeType.forInlineFun("SigV4Unmarshaller", supportModule) { + rustTemplate( + """ + /// Unmarshaller wrapper that handles SigV4 signed event stream messages + ##[derive(Debug)] + pub struct SigV4Unmarshaller { + inner: T, + } + + impl SigV4Unmarshaller { + pub fn new(inner: T) -> Self { + Self { inner } + } + } + + impl #{UnmarshallMessage} for SigV4Unmarshaller + where + T: #{UnmarshallMessage}, + { + type Output = #{SignedEvent}; + type Error = #{SignedEventError}; + + fn unmarshall(&self, message: &#{Message}) -> #{Result}<#{UnmarshalledMessage}, #{EventStreamError}> { + // First, try to extract the signed message + match #{extract_signed_message}(message) { + #{Ok}(MaybeSignedMessage::Signed { message: inner_message, signature }) => { + // Process the inner message with the base unmarshaller + match self.inner.unmarshall(&inner_message) { + #{Ok}(unmarshalled) => match unmarshalled { + #{UnmarshalledMessage}::Event(event) => { + #{Ok}(#{UnmarshalledMessage}::Event(#{SignedEvent} { + message: event, + signature: #{Some}(signature), + })) + } + #{UnmarshalledMessage}::Error(err) => { + #{Ok}(#{UnmarshalledMessage}::Error(#{SignedEventError}::Event(err))) + } + }, + #{Err}(err) => #{Err}(err), + } + } + #{Ok}(MaybeSignedMessage::Unsigned) => { + // Process unsigned message directly + match self.inner.unmarshall(message) { + #{Ok}(unmarshalled) => match unmarshalled { + #{UnmarshalledMessage}::Event(event) => { + #{Ok}(#{UnmarshalledMessage}::Event(#{SignedEvent} { + message: event, + signature: #{None}, + })) + } + #{UnmarshalledMessage}::Error(err) => { + #{Ok}(#{UnmarshalledMessage}::Error(#{SignedEventError}::Event(err))) + } + }, + #{Err}(err) => #{Err}(err), + } + } + #{Err}(extraction_err) => #{Ok}(#{UnmarshalledMessage}::Error(#{SignedEventError}::InvalidSignedEvent(extraction_err))), + } + } + } + """, + "UnmarshallMessage" to + CargoDependency.smithyEventStream(runtimeConfig).toType() + .resolve("frame::UnmarshallMessage"), + "UnmarshalledMessage" to + CargoDependency.smithyEventStream(runtimeConfig).toType() + .resolve("frame::UnmarshalledMessage"), + "Message" to CargoDependency.smithyTypes(runtimeConfig).toType().resolve("event_stream::Message"), + "EventStreamError" to CargoDependency.smithyEventStream(runtimeConfig).toType().resolve("error::Error"), + "SignedEvent" to signedEvent(runtimeConfig), + "SignedEventError" to signedEventError(runtimeConfig), + "extract_signed_message" to extractSignedMessage(runtimeConfig), + *RuntimeType.preludeScope, + ) + } + private fun sigV4Receiver(runtimeConfig: RuntimeConfig): RuntimeType = RuntimeType.forInlineFun("SigV4Receiver", supportModule) { rustTemplate( @@ -170,7 +251,7 @@ object SigV4EventStreamSupportStructures { /// Receiver wrapper that handles SigV4 signed event stream messages ##[derive(Debug)] pub struct SigV4Receiver { - inner: #{Receiver}, + inner: #{Receiver}<#{SignedEvent}, #{SignedEventError}>, initial_signature: #{Option}<#{SignatureInfo}>, } @@ -179,9 +260,10 @@ object SigV4EventStreamSupportStructures { unmarshaller: impl #{UnmarshallMessage} + #{Send} + #{Sync} + 'static, body: #{SdkBody}, ) -> Self { + let sigv4_unmarshaller = #{SigV4Unmarshaller}::new(unmarshaller); Self { - inner: #{Receiver}::new(unmarshaller, body), - initial_signature: None, + inner: #{Receiver}::new(sigv4_unmarshaller, body), + initial_signature: #{None}, } } @@ -195,7 +277,7 @@ object SigV4EventStreamSupportStructures { pub async fn try_recv_initial( &mut self, message_type: #{event_stream}::InitialMessageType, - ) -> #{Result}<#{Option}<#{Message}>, #{SdkError}> + ) -> #{Result}<#{Option}<#{Message}>, #{SdkError}<#{SignedEventError}, #{RawMessage}>> where E: std::error::Error + 'static, { @@ -221,24 +303,17 @@ object SigV4EventStreamSupportStructures { } /// Receive the next event from the stream + /// The SigV4Unmarshaller handles unwrapping, so we just pass through pub async fn recv(&mut self) -> #{Result}<#{Option}<#{SignedEvent}>, #{SdkError}<#{SignedEventError}, #{RawMessage}>> where E: std::error::Error + 'static, { - match self.inner.recv().await.map_err(|e| e.map_service_error(#{SignedEventError}::Event))? { - #{Some}(event) => { - // Wrap in SignedEvent with no signature (signatures only on initial message) - #{Ok}(#{Some}(#{SignedEvent} { - message: event, - signature: #{None}, - })) - } - #{None} => #{Ok}(#{None}), - } + self.inner.recv().await } } """, "Receiver" to RuntimeType.eventStreamReceiver(runtimeConfig), + "SigV4Unmarshaller" to sigV4Unmarshaller(runtimeConfig), "event_stream" to RuntimeType.smithyHttp(runtimeConfig).resolve("event_stream"), "SdkBody" to RuntimeType.sdkBody(runtimeConfig), "Message" to CargoDependency.smithyTypes(runtimeConfig).toType().resolve("event_stream::Message"), diff --git a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs index b7aaed5008..74d70cd5af 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs @@ -251,7 +251,7 @@ impl Receiver { { if let Some(message) = self.next_message().await? { let (processed_message, metadata) = - preprocessor(message).map_err(|err| SdkError::ResponseError(err))?; + preprocessor(message.clone()).map_err(|err| SdkError::ResponseError(err))?; if let Some(event_type) = processed_message .headers() @@ -268,7 +268,7 @@ impl Receiver { } } // Buffer the processed message so that it can be returned by the next call to `recv()` - self.buffered_message = Some(processed_message); + self.buffered_message = Some(message); } Ok(None) } From 1c4ce2c6502e13b2714835c87407dbf947e45f6d Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Fri, 14 Nov 2025 16:24:02 -0500 Subject: [PATCH 6/9] Add more tests --- .../integration-tests/Cargo.lock | 149 ++++++++ .../integration-tests/eventstreams/Cargo.toml | 1 + .../tests/structured_eventstream_tests.rs | 357 ++++++++++-------- 3 files changed, 342 insertions(+), 165 deletions(-) diff --git a/codegen-server-test/integration-tests/Cargo.lock b/codegen-server-test/integration-tests/Cargo.lock index 6132cf38e4..215926c097 100644 --- a/codegen-server-test/integration-tests/Cargo.lock +++ b/codegen-server-test/integration-tests/Cargo.lock @@ -450,6 +450,7 @@ dependencies = [ "hyper-util", "rpcv2cbor_extras", "rpcv2cbor_extras_no_initial_response", + "rstest", "tokio", "tokio-stream", "tracing", @@ -476,6 +477,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -483,6 +499,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -491,6 +508,23 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + [[package]] name = "futures-macro" version = "0.3.31" @@ -514,15 +548,25 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -546,6 +590,12 @@ version = "0.32.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "h2" version = "0.3.27" @@ -1170,6 +1220,15 @@ dependencies = [ "yansi", ] +[[package]] +name = "proc-macro-crate" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro2" version = "1.0.101" @@ -1267,6 +1326,12 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "roxmltree" version = "0.14.1" @@ -1315,12 +1380,51 @@ dependencies = [ "tracing", ] +[[package]] +name = "rstest" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a2c585be59b6b5dd66a9d2084aa1d8bd52fbdb806eafdeffb52791147862035" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "825ea780781b15345a146be27eaefb05085e337e869bff01b4306a4fd4a9ad5a" +dependencies = [ + "cfg-if", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn", + "unicode-ident", +] + [[package]] name = "rustc-demangle" version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -1339,6 +1443,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "separator" version = "0.4.1" @@ -1618,6 +1728,36 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml_datetime" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_edit" +version = "0.23.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" +dependencies = [ + "indexmap", + "toml_datetime", + "toml_parser", + "winnow", +] + +[[package]] +name = "toml_parser" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +dependencies = [ + "winnow", +] + [[package]] name = "tower" version = "0.4.13" @@ -2055,6 +2195,15 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen" version = "0.46.0" diff --git a/codegen-server-test/integration-tests/eventstreams/Cargo.toml b/codegen-server-test/integration-tests/eventstreams/Cargo.toml index 2100e450f3..6773b132bb 100644 --- a/codegen-server-test/integration-tests/eventstreams/Cargo.toml +++ b/codegen-server-test/integration-tests/eventstreams/Cargo.toml @@ -19,3 +19,4 @@ http-body-util = "0.1.3" hyper-util = { version = "0.1.17", features = ["client-legacy", "tokio", "http2", "http1"] } tokio-stream = "0.1.17" tracing = "0.1.41" +rstest = "0.23" diff --git a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs index 71ad7066d8..ea8647a5a6 100644 --- a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs +++ b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs @@ -466,55 +466,6 @@ fn get_event_type(msg: &Message) -> &str { .as_str() } -#[tokio::test] -async fn test_streaming_operation_with_initial_request() { - let mut harness = TestHarness::new("StreamingOperation").await; - - // if we send an initial request it should work - harness.send_initial_request().await; - harness.send_event("A").await; - - let resp = harness.expect_message().await; - assert_eq!(get_event_type(&resp), "A"); - - // Check that initial-response was received - assert!(harness.initial_response.is_some()); - assert_eq!( - get_event_type(harness.initial_response.as_ref().unwrap()), - "initial-response" - ); - - assert_eq!( - harness - .server - .streaming_operation_events() - .into_iter() - .map(|e| e.message) - .collect::>(), - vec![Events::A(Event {})] - ); -} - -#[tokio::test] -async fn test_streaming_operation_without_initial_request() { - let mut harness = TestHarness::new("StreamingOperation").await; - - // BUT: if we don't send an initial request, it should also work - harness.send_event("A").await; - - let resp = harness.expect_message().await; - assert_eq!(get_event_type(&resp), "A"); - assert_eq!( - harness - .server - .streaming_operation_events() - .into_iter() - .map(|e| e.message) - .collect::>(), - vec![Events::A(Event {})] - ); -} - #[tokio::test] async fn test_streaming_operation_with_initial_data() { let mut harness = TestHarness::new("StreamingOperationWithInitialData").await; @@ -562,122 +513,6 @@ async fn test_streaming_operation_with_initial_data_missing() { ); } -/// Test that the server can handle SigV4 signed event stream messages. -/// The client wraps the actual event in a SigV4 envelope with signature headers. -#[tokio::test] -async fn test_sigv4_signed_event_stream() { - let _logs = show_filtered_test_logs( - "aws_smithy_http_server=trace,hyper_util=debug,rpcv2cbor_extras=trace", - ); - let mut harness = TestHarness::new("StreamingOperation").await; - - // Send a SigV4 signed event - the inner message is wrapped in an envelope - let signed_event = build_sigv4_signed_event("A"); - harness.client.send(signed_event).await.unwrap(); - - let resp = harness.expect_message().await; - assert_eq!(get_event_type(&resp), "A"); - - let events = harness.server.streaming_operation_events(); - assert_eq!(events.len(), 1); - assert_eq!(events[0].message, Events::A(Event {})); - // The event itself is signed - assert!(events[0].signature.is_some()); - assert_eq!( - events[0].signature.as_ref().unwrap().chunk_signature, - b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" - ); -} - -/// Test that SigV4-signed initial-request works on operations without initial data -#[tokio::test] -async fn test_sigv4_signed_initial_request_initial_event_empty() { - let mut harness = TestHarness::new("StreamingOperation").await; - - // Send SigV4-signed initial-request (empty) to operation that doesn't expect initial data - let signed_initial = sign_message(build_initial_request(), b"test-sig-123", 1700000000); - harness.client.send(signed_initial).await.unwrap(); - - // Send an event - harness.send_event("A").await; - harness.send_event("B").await; - - // Should receive the event response - let resp = harness.expect_message().await; - assert_eq!(get_event_type(&resp), "A"); - - let events = harness.server.streaming_operation_events(); - assert_eq!(events.len(), 2); - assert_eq!(events[0].message, Events::A(Event {})); - assert_eq!(events[0].signature, None); // Unsigned event - assert_eq!(events[1].message, Events::B(Event {})); - assert_eq!(events[1].signature, None); // Unsigned event - - assert_eq!( - harness.server.streaming_operation_initial_signature(), - Some(b"test-sig-123".to_vec()) - ); -} - -/// Test that SigV4-signed initial-request works on operations without initial data -#[tokio::test] -async fn test_sigv4_signed_initial_request_no_initial_event() { - let mut harness = TestHarness::new("StreamingOperation").await; - - // Send an event - harness.send_event("A").await; - harness.send_event("B").await; - - // Should receive the event response - let resp = harness.expect_message().await; - assert_eq!(get_event_type(&resp), "A"); - assert_eq!( - harness - .server - .streaming_operation_events() - .into_iter() - .map(|e| e.message) - .collect::>(), - vec![Events::A(Event {}), Events::B(Event {})] - ); - assert_eq!(harness.server.streaming_operation_initial_signature(), None); -} - -/// Test that multiple SigV4-signed events are properly unwrapped with signatures preserved -#[tokio::test] -async fn test_sigv4_signed_multiple_events() { - let mut harness = TestHarness::new("StreamingOperation").await; - - // Send multiple SigV4-signed events with different signatures - let signed_event_a = build_sigv4_signed_event_with_signature("A", b"signature-for-event-A"); - harness.client.send(signed_event_a).await.unwrap(); - - let signed_event_b = build_sigv4_signed_event_with_signature("B", b"signature-for-event-B"); - harness.client.send(signed_event_b).await.unwrap(); - - // Receive response - let resp = harness.expect_message().await; - assert_eq!(get_event_type(&resp), "A"); - - // Verify both events were received with their distinct signatures - let events = harness.server.streaming_operation_events(); - assert_eq!(events.len(), 2); - - assert_eq!(events[0].message, Events::A(Event {})); - assert!(events[0].signature.is_some()); - assert_eq!( - events[0].signature.as_ref().unwrap().chunk_signature, - b"signature-for-event-A" - ); - - assert_eq!(events[1].message, Events::B(Event {})); - assert!(events[1].signature.is_some()); - assert_eq!( - events[1].signature.as_ref().unwrap().chunk_signature, - b"signature-for-event-B" - ); -} - /// Test that when alwaysSendEventStreamInitialResponse is disabled, no initial-response is sent #[tokio::test] async fn test_server_no_initial_response_when_disabled() { @@ -837,3 +672,195 @@ async fn test_sigv4_framed_initial_request_with_data() { Some(b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_vec()) ); } + +#[derive(Debug, Clone, Copy)] +enum InitialMessage { + None, + Unsigned, + Signed, +} + +#[derive(Debug, Clone)] +struct EventStreamTestCase { + initial: InitialMessage, + events_signed: Vec, +} + +/// Comprehensive test matrix for SigV4 event stream combinations +#[rstest::rstest] +#[case::no_initial_unsigned_events(EventStreamTestCase { initial: InitialMessage::None, events_signed: vec![false, false] })] +#[case::no_initial_signed_events(EventStreamTestCase { initial: InitialMessage::None, events_signed: vec![true, true] })] +#[case::no_initial_mixed_events(EventStreamTestCase { initial: InitialMessage::None, events_signed: vec![false, true] })] +#[case::unsigned_initial_unsigned_events(EventStreamTestCase { initial: InitialMessage::Unsigned, events_signed: vec![false, false] })] +#[case::unsigned_initial_signed_events(EventStreamTestCase { initial: InitialMessage::Unsigned, events_signed: vec![true, true] })] +#[case::unsigned_initial_mixed_events(EventStreamTestCase { initial: InitialMessage::Unsigned, events_signed: vec![false, true] })] +#[case::signed_initial_unsigned_events(EventStreamTestCase { initial: InitialMessage::Signed, events_signed: vec![false, false] })] +#[case::signed_initial_signed_events(EventStreamTestCase { initial: InitialMessage::Signed, events_signed: vec![true, true] })] +#[case::signed_initial_mixed_events(EventStreamTestCase { initial: InitialMessage::Signed, events_signed: vec![false, true] })] +#[case::no_events(EventStreamTestCase { initial: InitialMessage::None, events_signed: vec![] })] +#[case::many_signed_events(EventStreamTestCase { initial: InitialMessage::Signed, events_signed: vec![true; 100] })] +#[case::many_unsigned_events(EventStreamTestCase { initial: InitialMessage::None, events_signed: vec![false; 100] })] +#[tokio::test] +async fn test_sigv4_event_stream_matrix(#[case] test_case: EventStreamTestCase) { + let mut harness = TestHarness::new("StreamingOperation").await; + + // Send initial message if specified + match test_case.initial { + InitialMessage::None => {} + InitialMessage::Unsigned => { + harness.client.send(build_initial_request()).await.unwrap(); + } + InitialMessage::Signed => { + let signed_initial = sign_message(build_initial_request(), b"initial-sig", 1700000000); + harness.client.send(signed_initial).await.unwrap(); + } + } + + // Send events + for (i, &signed) in test_case.events_signed.iter().enumerate() { + let event_type = if i % 2 == 0 { "A" } else { "B" }; + if signed { + let sig = format!("sig-event-{}", i); + let signed_event = build_sigv4_signed_event_with_signature(event_type, sig.as_bytes()); + harness.client.send(signed_event).await.unwrap(); + } else { + harness.send_event(event_type).await; + } + } + + // Receive response (only if we sent events) + if !test_case.events_signed.is_empty() { + let resp = harness.expect_message().await; + assert_eq!(get_event_type(&resp), "A"); + } + + // Verify events + let events = harness.server.streaming_operation_events(); + assert_eq!(events.len(), test_case.events_signed.len()); + + for (i, &signed) in test_case.events_signed.iter().enumerate() { + let expected_event = if i % 2 == 0 { + Events::A(Event {}) + } else { + Events::B(Event {}) + }; + assert_eq!(events[i].message, expected_event); + + if signed { + assert!( + events[i].signature.is_some(), + "Event {} should have signature", + i + ); + let expected_sig = format!("sig-event-{}", i); + assert_eq!( + events[i].signature.as_ref().unwrap().chunk_signature, + expected_sig.as_bytes() + ); + } else { + assert!( + events[i].signature.is_none(), + "Event {} should not have signature", + i + ); + } + } + + // Verify initial signature + match test_case.initial { + InitialMessage::Signed => { + assert_eq!( + harness.server.streaming_operation_initial_signature(), + Some(b"initial-sig".to_vec()) + ); + } + InitialMessage::None | InitialMessage::Unsigned => { + assert_eq!(harness.server.streaming_operation_initial_signature(), None); + } + } +} + +/// Test signed initial data with signed events +#[tokio::test] +async fn test_sigv4_signed_initial_data_with_signed_events() { + let mut harness = TestHarness::new("StreamingOperationWithInitialData").await; + + // Send signed initial data + let signed_initial = + build_sigv4_signed_initial_data("test-data", b"sig-initial-data", 1700000000); + harness.client.send(signed_initial).await.unwrap(); + + // Send signed events + let signed_event_a = build_sigv4_signed_event_with_signature("A", b"sig-event-A"); + harness.client.send(signed_event_a).await.unwrap(); + + let signed_event_b = build_sigv4_signed_event_with_signature("B", b"sig-event-B"); + harness.client.send(signed_event_b).await.unwrap(); + + let resp = harness.expect_message().await; + assert_eq!(get_event_type(&resp), "A"); + + // Verify initial data was received + assert_eq!(harness.server.initial_data(), Some("test-data".to_string())); + + // Verify initial signature + assert_eq!( + harness.server.initial_signature(), + Some(b"sig-initial-data".to_vec()) + ); + + // Verify events with signatures + let events = harness + .server + .streaming_operation_with_initial_data_events(); + assert_eq!(events.len(), 2); + + assert_eq!(events[0].message, Events::A(Event {})); + assert_eq!( + events[0].signature.as_ref().unwrap().chunk_signature, + b"sig-event-A" + ); + + assert_eq!(events[1].message, Events::B(Event {})); + assert_eq!( + events[1].signature.as_ref().unwrap().chunk_signature, + b"sig-event-B" + ); +} + +/// Test that timestamps are preserved in signatures +#[tokio::test] +async fn test_sigv4_timestamp_preservation() { + let mut harness = TestHarness::new("StreamingOperation").await; + + // Send events with specific timestamps + let timestamp1 = 1700000000i64; + let timestamp2 = 1700000100i64; + + let event1 = sign_message(build_event("A"), b"sig-1", timestamp1); + harness.client.send(event1).await.unwrap(); + + let event2 = sign_message(build_event("B"), b"sig-2", timestamp2); + harness.client.send(event2).await.unwrap(); + + let resp = harness.expect_message().await; + assert_eq!(get_event_type(&resp), "A"); + + let events = harness.server.streaming_operation_events(); + assert_eq!(events.len(), 2); + + // Verify timestamps are preserved + use std::time::{SystemTime, UNIX_EPOCH}; + + let expected_time1 = UNIX_EPOCH + std::time::Duration::from_secs(timestamp1 as u64); + assert_eq!( + events[0].signature.as_ref().unwrap().timestamp, + expected_time1 + ); + + let expected_time2 = UNIX_EPOCH + std::time::Duration::from_secs(timestamp2 as u64); + assert_eq!( + events[1].signature.as_ref().unwrap().timestamp, + expected_time2 + ); +} From 1ad97c4807fcd295f78ece89b1a6d8140529a1c3 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Fri, 14 Nov 2025 16:29:50 -0500 Subject: [PATCH 7/9] Backout gradle changes --- codegen-server-test/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codegen-server-test/build.gradle.kts b/codegen-server-test/build.gradle.kts index 08155124c2..ab7ecf037d 100644 --- a/codegen-server-test/build.gradle.kts +++ b/codegen-server-test/build.gradle.kts @@ -116,7 +116,7 @@ val commonCodegenTests = "../codegen-core/common-test-models".let { commonModels ) } // When iterating on protocol tests use this to speed up codegen: - .filter { it.module == "rpcv2Cbor_extras" || it.module == "rpcv2Cbor_extras_no_initial_response" } +// .filter { it.module == "rpcv2Cbor_extras" || it.module == "rpcv2Cbor_extras_no_initial_response" } val customCodegenTests = "custom-test-models".let { customModels -> listOf( From dc09d43ac033fc328b4b592eeeb3650679f521b8 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Mon, 17 Nov 2025 15:02:49 -0500 Subject: [PATCH 8/9] Fix various test failures --- .changelog/1763408225.md | 10 ++++++ .../integration-tests/eventstreams/src/lib.rs | 4 +-- .../tests/structured_eventstream_tests.rs | 26 ++++---------- .../smithy/TsServerCodegenVisitor.kt | 2 +- .../smithy/ValidateUnsupportedConstraints.kt | 3 +- .../SigV4EventStreamSupportStructures.kt | 10 ++++-- ...serProvidedValidationExceptionDecorator.kt | 34 +++++++++++-------- .../SigV4EventStreamSupportStructuresTest.kt | 3 +- .../src/event_stream/receiver.rs | 2 +- 9 files changed, 50 insertions(+), 44 deletions(-) create mode 100644 .changelog/1763408225.md diff --git a/.changelog/1763408225.md b/.changelog/1763408225.md new file mode 100644 index 0000000000..649a2120f3 --- /dev/null +++ b/.changelog/1763408225.md @@ -0,0 +1,10 @@ +--- +applies_to: ["server"] +authors: ["rcoh"] +references: ["smithy-rs#4400", "smithy-rs#4397"] +breaking: true +new_feature: false +bug_fix: true +--- +Fix issue where SigV4 envelopes for EventStreams did not support the initial message. This is _technically_ a breaking change but should not break consumers in practice since the +resulting type has the same methods. diff --git a/codegen-server-test/integration-tests/eventstreams/src/lib.rs b/codegen-server-test/integration-tests/eventstreams/src/lib.rs index aa437f5440..ea7c2ebc30 100644 --- a/codegen-server-test/integration-tests/eventstreams/src/lib.rs +++ b/codegen-server-test/integration-tests/eventstreams/src/lib.rs @@ -57,7 +57,7 @@ impl ManualEventStreamClient { tokio::spawn(async move { while let Some(message) = message_receiver.recv().await { let mut buffer = Vec::new(); - if let Err(_) = write_message_to(&message, &mut buffer) { + if write_message_to(&message, &mut buffer).is_err() { break; } let _ = frame_sender @@ -131,7 +131,7 @@ impl ManualEventStreamClient { self.message_sender .send(message) .await - .map_err(|e| format!("Send failed: {}", e)) + .map_err(|e| format!("Send failed: {e}")) } /// Receives the next response message. diff --git a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs index ea8647a5a6..e5bf0d3d4a 100644 --- a/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs +++ b/codegen-server-test/integration-tests/eventstreams/tests/structured_eventstream_tests.rs @@ -316,7 +316,7 @@ struct TestHarness { impl TestHarness { async fn new(operation: &str) -> Self { let server = TestServer::start().await; - let path = format!("/service/RpcV2CborService/operation/{}", operation); + let path = format!("/service/RpcV2CborService/operation/{operation}"); let client = ManualEventStreamClient::connect_to_service( server.addr, &path, @@ -332,11 +332,6 @@ impl TestHarness { } } - async fn send_initial_request(&mut self) { - let msg = build_initial_request(); - self.client.send(msg).await.ok(); - } - async fn send_initial_data(&mut self, data: &str) { let msg = build_initial_data_message(data); self.client.send(msg).await.ok(); @@ -433,13 +428,6 @@ fn sign_message(inner_message: Message, signature: &[u8], timestamp_secs: i64) - Message::new_from_parts(headers, Bytes::from(inner_bytes)) } -fn build_sigv4_signed_event(event_type: &str) -> Message { - build_sigv4_signed_event_with_signature( - event_type, - b"example298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - ) -} - fn build_sigv4_signed_event_with_signature(event_type: &str, signature: &[u8]) -> Message { use std::time::{SystemTime, UNIX_EPOCH}; @@ -720,7 +708,7 @@ async fn test_sigv4_event_stream_matrix(#[case] test_case: EventStreamTestCase) for (i, &signed) in test_case.events_signed.iter().enumerate() { let event_type = if i % 2 == 0 { "A" } else { "B" }; if signed { - let sig = format!("sig-event-{}", i); + let sig = format!("sig-event-{i}"); let signed_event = build_sigv4_signed_event_with_signature(event_type, sig.as_bytes()); harness.client.send(signed_event).await.unwrap(); } else { @@ -749,10 +737,9 @@ async fn test_sigv4_event_stream_matrix(#[case] test_case: EventStreamTestCase) if signed { assert!( events[i].signature.is_some(), - "Event {} should have signature", - i + "Event {i} should have signature" ); - let expected_sig = format!("sig-event-{}", i); + let expected_sig = format!("sig-event-{i}"); assert_eq!( events[i].signature.as_ref().unwrap().chunk_signature, expected_sig.as_bytes() @@ -760,8 +747,7 @@ async fn test_sigv4_event_stream_matrix(#[case] test_case: EventStreamTestCase) } else { assert!( events[i].signature.is_none(), - "Event {} should not have signature", - i + "Event {i} should not have signature" ); } } @@ -850,7 +836,7 @@ async fn test_sigv4_timestamp_preservation() { assert_eq!(events.len(), 2); // Verify timestamps are preserved - use std::time::{SystemTime, UNIX_EPOCH}; + use std::time::UNIX_EPOCH; let expected_time1 = UNIX_EPOCH + std::time::Duration::from_secs(timestamp1 as u64); assert_eq!( diff --git a/codegen-server/codegen-server-typescript/src/main/kotlin/software/amazon/smithy/rust/codegen/server/typescript/smithy/TsServerCodegenVisitor.kt b/codegen-server/codegen-server-typescript/src/main/kotlin/software/amazon/smithy/rust/codegen/server/typescript/smithy/TsServerCodegenVisitor.kt index 53763211c7..dea89fbc07 100644 --- a/codegen-server/codegen-server-typescript/src/main/kotlin/software/amazon/smithy/rust/codegen/server/typescript/smithy/TsServerCodegenVisitor.kt +++ b/codegen-server/codegen-server-typescript/src/main/kotlin/software/amazon/smithy/rust/codegen/server/typescript/smithy/TsServerCodegenVisitor.kt @@ -62,7 +62,7 @@ class TsServerCodegenVisitor( ServerProtocolLoader( codegenDecorator.protocols( service.id, - ServerProtocolLoader.defaultProtocols(), + ServerProtocolLoader.DefaultProtocols, ), ) .protocolFor(context.model, service) diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraints.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraints.kt index f6a617bd38..86c53d1ced 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraints.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/ValidateUnsupportedConstraints.kt @@ -34,6 +34,7 @@ import software.amazon.smithy.rust.codegen.core.util.hasTrait import software.amazon.smithy.rust.codegen.core.util.inputShape import software.amazon.smithy.rust.codegen.core.util.orNull import java.util.logging.Level +import java.util.stream.Collectors private sealed class UnsupportedConstraintMessageKind { private val constraintTraitsUberIssue = "https://github.com/smithy-lang/smithy-rs/issues/1401" @@ -330,7 +331,7 @@ fun validateModelHasAtMostOneValidationException( model .shapes() .filter { it.hasTrait(ValidationExceptionTrait.ID) && it.isReachableFromOperationErrors(model) } - .toList() + .collect(Collectors.toList()) val messages = mutableListOf() diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt index 0ebaa2f761..00ece53a41 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructures.kt @@ -17,7 +17,11 @@ import software.amazon.smithy.rust.codegen.core.smithy.rustType import software.amazon.smithy.rust.codegen.core.util.PANIC object SigV4EventStreamSupportStructures { - private val supportModule = RustModule.public("sigv4_event_stream", documentationOverride = "Support structures for SigV4 signed event streams") + internal val supportModule = + RustModule.public( + "sigv4_event_stream", + documentationOverride = "Support structures for SigV4 signed event streams", + ) fun codegenScope(runtimeConfig: RuntimeConfig) = arrayOf( @@ -319,7 +323,9 @@ object SigV4EventStreamSupportStructures { "Message" to CargoDependency.smithyTypes(runtimeConfig).toType().resolve("event_stream::Message"), "RawMessage" to CargoDependency.smithyTypes(runtimeConfig).toType().resolve("event_stream::RawMessage"), "SdkError" to RuntimeType.sdkError(runtimeConfig), - "ResponseError" to RuntimeType.smithyRuntimeApiClient(runtimeConfig).resolve("client::result::ResponseError"), + "ResponseError" to + RuntimeType.smithyRuntimeApiClient(runtimeConfig) + .resolve("client::result::ResponseError"), "UnmarshallMessage" to CargoDependency.smithyEventStream(runtimeConfig).toType() .resolve("frame::UnmarshallMessage"), diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/UserProvidedValidationExceptionDecorator.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/UserProvidedValidationExceptionDecorator.kt index 8df3251528..09901d013a 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/UserProvidedValidationExceptionDecorator.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/UserProvidedValidationExceptionDecorator.kt @@ -55,6 +55,7 @@ import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.Ser import software.amazon.smithy.rust.codegen.server.smithy.util.isValidationFieldName import software.amazon.smithy.rust.codegen.server.smithy.util.isValidationMessage import software.amazon.smithy.rust.codegen.server.smithy.validationErrorMessage +import java.util.stream.Collectors /** * Decorator for user provided validation exception codegen @@ -84,7 +85,7 @@ class UserProvidedValidationExceptionDecorator : ServerCodegenDecorator { internal fun firstStructureShapeWithValidationExceptionTrait(model: Model): StructureShape? = model .shapes(StructureShape::class.java) - .toList() + .collect(Collectors.toList()) // Defining multiple validation exceptions is unsupported. See `ValidateUnsupportedConstraints` .firstOrNull({ it.hasTrait(ValidationExceptionTrait.ID) }) @@ -328,9 +329,7 @@ class UserProvidedValidationExceptionConversionGenerator( "FieldAssignments" to fieldAssignments( "path.clone()", - """format!(${ - lengthTrait.validationErrorMessage().dq() - }, length, &path)""", + "format!(${lengthTrait.validationErrorMessage().dq()}, length, &path)", ), ) } @@ -351,9 +350,9 @@ class UserProvidedValidationExceptionConversionGenerator( "FieldAssignments" to fieldAssignments( "path.clone()", - """format!(${ + "format!(${ patternTrait.validationErrorMessage().dq() - }, &path, ${patternTrait.pattern.toString().dq()})""", + }, &path, ${patternTrait.pattern.toString().dq()})", ), ) } @@ -394,9 +393,9 @@ class UserProvidedValidationExceptionConversionGenerator( "FieldAssignments" to fieldAssignments( "path.clone()", - """format!(${ + "format!(${ blobLength.lengthTrait.validationErrorMessage().dq() - }, length, &path)""", + }, length, &path)", ), ) } @@ -519,7 +518,7 @@ class UserProvidedValidationExceptionConversionGenerator( ConstraintViolation::${it.name()} => #{ValidationExceptionField} { #{FieldAssignments} }, - """.trimIndent(), + """, *codegenScope, "FieldAssignments" to fieldAssignments( @@ -569,10 +568,10 @@ class UserProvidedValidationExceptionConversionGenerator( "FieldAssignments" to fieldAssignments( "path.clone()", - """format!(${ + "format!(${ collectionTraitInfo.lengthTrait.validationErrorMessage() .dq() - }, length, &path)""", + }, length, &path)", ), ) } @@ -588,10 +587,15 @@ class UserProvidedValidationExceptionConversionGenerator( "FieldAssignments" to fieldAssignments( "path.clone()", - """format!(${ - collectionTraitInfo.uniqueItemsTrait.validationErrorMessage() - .dq() - }, &duplicate_indices, &path)""", + """ + format!( + ${ + collectionTraitInfo.uniqueItemsTrait.validationErrorMessage().dq() + }, + &duplicate_indices, + &path + ) + """, ), ) } diff --git a/codegen-server/src/test/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructuresTest.kt b/codegen-server/src/test/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructuresTest.kt index 2f09c18ffc..059092889a 100644 --- a/codegen-server/src/test/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructuresTest.kt +++ b/codegen-server/src/test/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/SigV4EventStreamSupportStructuresTest.kt @@ -6,7 +6,6 @@ package software.amazon.smithy.rust.codegen.server.smithy.customizations import org.junit.jupiter.api.Test -import software.amazon.smithy.rust.codegen.core.rustlang.RustModule import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate import software.amazon.smithy.rust.codegen.core.testutil.TestRuntimeConfig import software.amazon.smithy.rust.codegen.core.testutil.TestWorkspace @@ -19,7 +18,7 @@ class SigV4EventStreamSupportStructuresTest { @Test fun `support structures compile`() { val project = TestWorkspace.testProject() - project.withModule(RustModule.private("sigv4_event_stream")) { + project.lib { val codegenScope = SigV4EventStreamSupportStructures.codegenScope(runtimeConfig) // Generate the support structures - RuntimeType.forInlineFun automatically generates the code diff --git a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs index 74d70cd5af..07351f2ba9 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs @@ -229,7 +229,7 @@ impl Receiver { &mut self, message_type: InitialMessageType, ) -> Result, SdkError> { - self.try_recv_initial_with_preprocessor(message_type, |msg| Ok((msg.clone(), ()))) + self.try_recv_initial_with_preprocessor(message_type, |msg| Ok((msg, ()))) .await .map(|opt| opt.map(|(msg, _)| msg)) } From 92d0c293a1240d8d0efb7c3ea6e4ddb43e3aabc1 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Mon, 1 Dec 2025 15:43:48 -0500 Subject: [PATCH 9/9] Bump codegen version to 0.1.7 --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index d0333b3cf2..090976f745 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,4 +17,4 @@ allowLocalDeps=false # Avoid registering dependencies/plugins/tasks that are only used for testing purposes isTestingEnabled=true # codegen publication version -codegenVersion=0.1.6 +codegenVersion=0.1.7