Skip to content

Commit b212e74

Browse files
NicolappsConvex, Inc.
authored andcommitted
fivetran_source: Surface state errors as tasks (#43467)
GitOrigin-RevId: 2c9563a7832e8fd21cfafadec1aefcf27eda9829
1 parent 835ae21 commit b212e74

File tree

1 file changed

+24
-3
lines changed

1 file changed

+24
-3
lines changed

crates/fivetran_source/src/connector.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ use convex_fivetran_common::{
44
schema_response,
55
source_connector_server::SourceConnector,
66
test_response,
7+
update_response,
78
ConfigurationFormRequest,
89
ConfigurationFormResponse,
910
ConfigurationTest,
1011
SchemaRequest,
1112
SchemaResponse,
13+
Task,
1214
TestRequest,
1315
TestResponse,
1416
UpdateRequest,
@@ -137,13 +139,17 @@ impl SourceConnector for ConvexConnector {
137139
let config = match Config::from_parameters(inner.configuration) {
138140
Ok(config) => config,
139141
Err(error) => {
140-
return Err(Status::internal(error.to_string()));
142+
return error_to_task(error);
141143
},
142144
};
143145
log(&format!("update request for {}", config.deploy_url));
144146

145-
let state = deserialize_state_json(inner.state_json.as_deref().unwrap_or("{}"))
146-
.map_err(|error| Status::internal(error.to_string()))?;
147+
let state = match deserialize_state_json(inner.state_json.as_deref().unwrap_or("{}")) {
148+
Ok(state) => state,
149+
Err(error) => {
150+
return error_to_task(error);
151+
},
152+
};
147153

148154
log(&format!(
149155
"update request for {} at checkpoint {:?}",
@@ -196,6 +202,21 @@ fn deserialize_state_json(state_json: &str) -> anyhow::Result<Option<State>> {
196202
Ok(state)
197203
}
198204

205+
fn error_to_task(
206+
error: anyhow::Error,
207+
) -> ConnectorResult<<ConvexConnector as SourceConnector>::UpdateStream> {
208+
Ok(Response::new(
209+
futures::stream::once(async move {
210+
Ok(FivetranUpdateResponse {
211+
operation: Some(update_response::Operation::Task(Task {
212+
message: error.to_string(),
213+
})),
214+
})
215+
})
216+
.boxed(),
217+
))
218+
}
219+
199220
#[cfg(test)]
200221
mod tests {
201222
use super::deserialize_state_json;

0 commit comments

Comments
 (0)