@@ -56,12 +56,7 @@ public void completeTransit(Long driverId, UUID requestUUID, Address destination
5656There are issues with the above method.
5757
5858<1> It uses the `+@Transational+` annotation, and modify number of unrelated data stores.
59- // This means that when one of those operations fails, the whole processing will be rolled back.
60- // In effect, the end-user will receive a nasty error message.
61-
6259<2> It merges different, business domains.
63- // This makes it hard to understand and maintain.
64- // It isn't required to all of those operations complete at the same time.
6560====
6661
6762Similar methods are, unfortunately, quite common in business applications.
@@ -98,6 +93,158 @@ Still, the application will do all the operations, starting from `+completeTrans
9893
9994In this section, we'll refactor the Cabs application.
10095The refactoring will be limited to make the process easy to understand.
96+ The scope of the refactoring will be the extraction of drivers module, which is already a separate domain in the codebase.
97+ Within the scope of the `+completeTransit+` method, we'll need to shift the way we calculate the fee for the driver.
98+ The calculation will be done asynchronously, and when the driver module publishes the calculation result, it will be saved back into the database.
99+
100+ The base for the refactoring is the _Event Mesh_ pattern, and the asynchronous communication will be done with _Cloud Events_.
101+
102+ ==== Drivers module
103+
104+ The functionality around drivers is already quite separated in the codebase, so it is a good staring point to extract into a separate module.
105+ The drivers module will become a standalone web service, deployed on the _Kubernetes_ cluster.
106+ The implementation of the drivers module will be done with _Rust_ for this example.
107+
108+ Here's the _Rust_ code for calculate fee functionality.
109+ The entrypoint is the _Cloud Event_ of type `cabs.drivers.calculate-fee` we are expecting the _Event Mesh_ will route.
110+
111+ [source,rust]
112+ ----
113+ impl Service {
114+ pub async fn calculate_fee(&mut self, ce: Event) -> Result<()> {
115+ let fee_event = Self::parse_fee_event(ce)?; // <1>
116+ let subject = fee_event.id.clone();
117+
118+ let drv = self.repo.get(&fee_event.entity.driver_id).await?;
119+
120+ let fee = drv.calculate_fee(&fee_event.entity.transit_price); // <2>
121+
122+ let fee_event = DriverFeeEvent {
123+ driver_id: fee_event.entity.driver_id,
124+ fee,
125+ }; // <3>
126+
127+ let mut builder = fee_event.to_builder(); // <3>
128+ if let Some(id) = subject {
129+ builder = builder.subject(id);
130+ } // <3>
131+ let ce = builder.build().map_err(error::ErrorInternalServerError)?; // <3>
132+
133+ Sender::new(&self.config).send(ce).await?; // <4>
134+
135+ Ok(())
136+ }
137+ // [..]
138+ }
139+ ----
140+
141+ In the above code, we are doing the following:
142+
143+ <1> We are parsing the internal, business logic, fee event from the _Cloud Events_ envelope.
144+ <2> We are calculating the fee for this event, using some business logic.
145+ <3> We are wrapping the calculated fee into the _Cloud Events_ envelope.
146+ <4> We are sending the fee back to the _Event Mesh_ using _HTTP REST_ client.
147+
148+ Of course, in order for this method to be called, we need to route the event from the HTTP listener:
149+
150+ [source,rust]
151+ ----
152+ pub fn routes() -> impl HttpServiceFactory + 'static {
153+ web::resource("/").route(web::post().to(recv))
154+ }
155+
156+ async fn recv(
157+ ce: Event,
158+ state: web::Data<State>,
159+ binding: web::Data<Binding>,
160+ ) -> Result<HttpResponse> {
161+ log::info!("Received event:\n{}", ce);
162+
163+ let mut svc = service::new(state, binding).await?;
164+
165+ match ce.ty() {
166+ "cabs.drivers.calculate-fee" => svc.calculate_fee(ce).await,
167+ _ => Err(error::ErrorBadRequest("unsupported event type")),
168+ }?;
169+
170+ Ok(HttpResponse::Ok().finish())
171+ }
172+ ----
173+
174+ Let's see also the _Cloud Event_ sender, that uses the _HTTP REST_ client to send events to the _Event Mesh_:
175+
176+ [source,rust]
177+ ----
178+ impl Sender {
179+ pub async fn send(&self, ce: Event) -> Result<()> {
180+ log::debug!("sending {} event to {}:\n{:?}", ce.ty(), &self.sink, ce,);
181+
182+ let response = self
183+ .client
184+ .post(&self.sink) // <1>
185+ .event(ce)
186+ .map_err(error::ErrorInternalServerError)?
187+ .send()
188+ .await
189+ .map_err(error::ErrorInternalServerError)?;
190+
191+ match response.status().is_success() {
192+ true => Ok(()),
193+ false => {
194+ log::error!("failed to send event: {:#?}", response);
195+ Err(error::ErrorInternalServerError(format!(
196+ "failed to send event: {}",
197+ response.status()
198+ )))
199+ }
200+ }
201+ }
202+ }
203+ ----
204+
205+ [NOTE]
206+ ====
207+ <1> The client uses _POST_ method, to send the _JSON_ representation of the event to the sink.
208+ The _sink_ is the URL of the target, in this case the url of the _Event Mesh_.
209+ ====
210+
211+ ==== Event Mesh
212+
213+ In this section, we'll use the _Event Mesh_ setup to communication between the different parts of refactored code.
214+
215+ Here's the _Event Mesh_ central component configuration, the _Broker_, which will be used in this example.
216+ The _Broker_ here is the _Knative_ component, and will be deployed in the _Kubernetes_ cluster.
217+
218+ [source,yaml]
219+ ----
220+ apiVersion: eventing.knative.dev/v1
221+ kind: Broker
222+ metadata:
223+ name: default
224+ namespace: demo
225+ spec:
226+ delivery:
227+ backoffDelay: PT0.2S # <1>
228+ backoffPolicy: exponential # <2>
229+ retry: 10 # <3>
230+ ----
231+
232+ <1> The `backoffDelay` is the delay between retries, and us use `+200ms+` initially.
233+ <2> The `backoffPolicy` is set to `exponential`, which means that the delay will be doubled each time.
234+ <3> The `retry` is the number of times we retry before giving up.
235+
236+ [IMPORTANT]
237+ ====
238+ Because the policy is `exponential`, the maximum time the _Broker_ will be retrying is 6 min and 49 sec.
239+ In the above configuration, after that time is reached, the event will be dropped.
240+ ====
241+
242+ [NOTE]
243+ ====
244+ A `+deadLetterSink+` could be configured for the _Broker_ to send the events that failed to be delivered in time to a back-up location.
245+ Events captured in a back-up location can be re-transmitted into the _Event Mesh_ later.
246+ ====
247+
101248
102249image::https://www.plantuml.com/plantuml/svg/VP1DJiCm58JtFiMZ-rmWYwgqeHkeX2WNUBK7Ok4ubdyYzVQuZKbe5TZ5olTcFiqcHFOnTKOyn1OTIC8d0xPLdwBH5iBb_rfgnpRIwWMVBC_qwDoAED3ul4MUBKSzW9u6vES1eRsYMzz_mT-YZS-W3tJeLUwyOdlW23zeYJkK8vyuZ52p5O9bRk687uTYLgrB4zNqcav6XvPsR6GocTsZQ8d2L1aV3slQzVP3-uuKpCNgB1JkEwQpzI_FcjxoL5XgcUvdMioVL4soi-iuIOQcE5N259RYPgKYMNJ-3lfdkMPRqp7s7lJkjQFBvWihR61Lwimt[width=100%]
103250
@@ -131,7 +278,7 @@ deactivate Legacy
131278
132279The diagram illustrates the flow of events between the legacy application, the Knative _Event Mesh_, the fee calculator service, and the datastore.
133280
134- In this video you can see xpto :
281+ In this video you can see the above example presented by Red Hat' employee https://github.com/cardil[Chris Suszynski] :
135282
136283video::Rc5IO6S6ZOk[youtube,width=800,height=480]
137284
@@ -141,17 +288,12 @@ Next, you can learn how to walkthrough this demo.
141288
142289=== Before getting started
143290
144- To run this demo, you will need xpto.
145- Adding to that, make sure to have:
146-
147- * ABC
148- * XYZ
149- * XPTO
291+ // TODO: Add instructions how to run
150292
151293=== Installing the demo
152294
153- Installation guide and basic test of the demo installation if needed
295+ // TODO: Installation guide and basic test of the demo installation if needed
154296
155297=== Walkthrough guide
156298
157- How to run through the demo
299+ // TODO: How to run through the demo
0 commit comments