Skip to content

Commit d3294a6

Browse files
committed
feat: filter low nonces when ingesting txns
1 parent 32d1725 commit d3294a6

File tree

1 file changed

+42
-7
lines changed

1 file changed

+42
-7
lines changed

src/tasks/cache/tx.rs

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
//! Transaction service responsible for fetching and sending trasnsactions to the simulator.
22
use crate::config::BuilderConfig;
3-
use alloy::consensus::TxEnvelope;
3+
use alloy::{
4+
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
5+
providers::Provider,
6+
};
47
use eyre::Error;
58
use reqwest::{Client, Url};
69
use serde::{Deserialize, Serialize};
710
use std::time::Duration;
811
use tokio::{sync::mpsc, task::JoinHandle, time};
9-
use tracing::{Instrument, debug, debug_span, trace};
12+
use tracing::{Instrument, debug, debug_span, info_span, trace, warn};
1013

1114
/// Poll interval for the transaction poller in milliseconds.
1215
const POLL_INTERVAL_MS: u64 = 1000;
@@ -56,6 +59,42 @@ impl TxPoller {
5659
Duration::from_millis(self.poll_interval_ms)
5760
}
5861

62+
// Spawns a tokio task to check the nonce of a transaction before sending
63+
// it to the cachetask via the outbound channel.
64+
fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender<TxEnvelope>) {
65+
tokio::spawn(async move {
66+
let Ok(ru_provider) = crate::config().connect_ru_provider().await else {
67+
warn!("Failed to connect to RU provider, stopping noncecheck task.");
68+
return;
69+
};
70+
let span = info_span!("check_nonce", tx_id = %tx.tx_hash());
71+
72+
let Ok(sender) = tx.recover_signer() else {
73+
span_warn!(span, "Failed to recover sender from transaction");
74+
return;
75+
};
76+
77+
let Ok(tx_count) = ru_provider
78+
.get_transaction_count(sender)
79+
.into_future()
80+
.instrument(span.clone())
81+
.await
82+
else {
83+
span_warn!(span, %sender, "Failed to fetch nonce for sender");
84+
return;
85+
};
86+
87+
if tx.nonce() < tx_count {
88+
span_debug!(span, %sender, tx_nonce = %tx.nonce(), ru_nonce = %tx_count, "Dropping transaction with stale nonce");
89+
return;
90+
}
91+
92+
if outbound.send(tx).is_err() {
93+
span_warn!(span, "Outbound channel closed, stopping NonceChecker task.");
94+
}
95+
});
96+
}
97+
5998
/// Polls the transaction cache for transactions.
6099
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
61100
let url: Url = self.config.tx_pool_url.join("transactions")?;
@@ -94,11 +133,7 @@ impl TxPoller {
94133
let _guard = span.entered();
95134
debug!(count = ?transactions.len(), "found transactions");
96135
for tx in transactions.into_iter() {
97-
if outbound.send(tx).is_err() {
98-
// If there are no receivers, we can shut down
99-
trace!("No receivers left, shutting down");
100-
break;
101-
}
136+
self.spawn_check_nonce(tx, outbound.clone());
102137
}
103138
}
104139

0 commit comments

Comments
 (0)