diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index dc9cdabc1..ff152c69e 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -871,6 +871,11 @@ impl Executor { .await .map_err(ExecutorError::other)?; + tracing::info!( + "Contract state updated for {key}, new_size_bytes={}", + new_state.as_ref().len() + ); + if let Err(err) = self .send_update_notification(key, parameters, &new_state) .await diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 2996bab9a..07dc0accc 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -570,6 +570,11 @@ impl Operation for PutOp { return_msg = None; } PutMsg::SuccessfulPut { id, .. } => { + tracing::debug!( + tx = %id, + current_state = ?self.state, + "PutOp::process_message: handling SuccessfulPut" + ); match self.state { Some(PutState::AwaitingResponse { key, @@ -654,6 +659,12 @@ impl Operation for PutOp { // Forward success message upstream if needed if let Some(upstream) = upstream { + tracing::trace!( + tx = %id, + %key, + upstream = %upstream.peer, + "PutOp::process_message: Forwarding SuccessfulPut upstream" + ); return_msg = Some(PutMsg::SuccessfulPut { id: *id, target: upstream, @@ -661,6 +672,11 @@ impl Operation for PutOp { sender: op_manager.ring.connection_manager.own_location(), }); } else { + tracing::trace!( + tx = %id, + %key, + "PutOp::process_message: SuccessfulPut originated locally; no upstream" + ); return_msg = None; } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 4b21ccc72..7743ce95b 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -316,10 +316,33 @@ impl Operation for UpdateOp { }); new_state = None; } else { - // No peers available and we don't have the contract - error + // No peers available and we don't have the contract - capture context + let skip_list = [&self_location.peer, &request_sender.peer]; + let subscribers = op_manager + .ring + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() + }) + .unwrap_or_default(); + let candidates = op_manager + .ring + .k_closest_potentially_caching(key, skip_list.as_slice(), 5) + .into_iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>(); + let connection_count = + op_manager.ring.connection_manager.num_connections(); tracing::error!( tx = %id, %key, + subscribers = ?subscribers, + candidates = ?candidates, + connection_count, + request_sender = %request_sender.peer, "Cannot handle UPDATE: contract not found locally and no peers to forward to" ); return Err(OpError::RingError(RingError::NoCachingPeers(*key))); @@ -447,10 +470,33 @@ impl Operation for UpdateOp { }); new_state = None; } else { - // No more peers to try - error + // No more peers to try - capture context for diagnostics + let skip_list = [&sender.peer, &self_location.peer]; + let subscribers = op_manager + .ring + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() + }) + .unwrap_or_default(); + let candidates = op_manager + .ring + .k_closest_potentially_caching(key, skip_list.as_slice(), 5) + .into_iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>(); + let connection_count = + op_manager.ring.connection_manager.num_connections(); tracing::error!( tx = %id, %key, + subscribers = ?subscribers, + candidates = ?candidates, + connection_count, + sender = %sender.peer, "Cannot handle UPDATE SeekNode: contract not found and no peers to forward to" ); return Err(OpError::RingError(RingError::NoCachingPeers(*key))); @@ -649,9 +695,21 @@ impl OpManager { .ring .subscribers_of(key) .map(|subs| { + let self_peer = self.ring.connection_manager.get_peer_key(); + let allow_self = self_peer.as_ref().map(|me| me == sender).unwrap_or(false); subs.value() .iter() - .filter(|pk| &pk.peer != sender) + .filter(|pk| { + // Allow the sender (or ourselves) to stay in the broadcast list when we're + // originating the UPDATE so local auto-subscribes still receive events. + let is_sender = &pk.peer == sender; + let is_self = self_peer.as_ref() == Some(&pk.peer); + if is_sender || is_self { + allow_self + } else { + true + } + }) .cloned() .collect::>() }) @@ -671,10 +729,21 @@ impl OpManager { subscribers.len() ); } else { + let own_peer = self.ring.connection_manager.get_peer_key(); + let skip_slice = std::slice::from_ref(sender); + let fallback_candidates = self + .ring + .k_closest_potentially_caching(key, skip_slice, 5) + .into_iter() + .map(|candidate| format!("{:.8}", candidate.peer)) + .collect::>(); + tracing::warn!( - "UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate", + "UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate (self={:?}, fallback_candidates={:?})", key, - sender + sender, + own_peer.map(|p| format!("{:.8}", p)), + fallback_candidates ); } diff --git a/crates/core/src/wasm_runtime/store.rs b/crates/core/src/wasm_runtime/store.rs index 15c701cbe..07044f377 100644 --- a/crates/core/src/wasm_runtime/store.rs +++ b/crates/core/src/wasm_runtime/store.rs @@ -7,6 +7,7 @@ use std::io::{self, BufReader, BufWriter, Seek, Write}; use std::path::{Path, PathBuf}; use std::time::Duration; use std::{fs::File, io::Read}; +use tracing::error; const INTERNAL_KEY: usize = 32; const TOMBSTONE_MARKER: usize = 1; @@ -325,7 +326,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re let mut original_reader = BufReader::new(original_file); let mut temp_writer = SafeWriter::::new(&temp_file_path, true).inspect_err(|_| { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } })?; @@ -340,7 +341,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re }; if let Err(err) = temp_writer.insert_record(store_key, value) { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(err); } @@ -356,7 +357,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re Err(other) => { // Handle other errors gracefully if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(other); } @@ -366,7 +367,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Check if any deleted records were found; if not, skip compaction if !any_deleted { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Ok(()); } @@ -374,7 +375,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Clean up and finalize the compaction process if let Err(e) = temp_writer.flush() { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(e); } @@ -382,14 +383,14 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Replace the original file with the temporary file if let Err(e) = fs::rename(&temp_file_path, key_file_path) { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(e); } // Remove the lock file fs::remove_file(&lock_file_path).map_err(|e| { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); e })?; @@ -589,13 +590,13 @@ mod tests { create_test_data(&mut file, &key_file_path, shared_data, i); } else if let Err(err) = super::compact_index_file::(&key_file_path) { - eprintln!("Thread encountered an error during compaction: {err}"); + error!("Thread encountered an error during compaction: {err}"); return Err(err); } barrier.wait(); // compact a last time so we know what data to compare against super::compact_index_file::(&key_file_path).map_err(|err| { - eprintln!("Thread encountered an error during compaction: {err}"); + error!("Thread encountered an error during compaction: {err}"); err }) }) diff --git a/crates/core/src/wasm_runtime/tests/contract_metering.rs b/crates/core/src/wasm_runtime/tests/contract_metering.rs index 88c63a857..c4d849ebe 100644 --- a/crates/core/src/wasm_runtime/tests/contract_metering.rs +++ b/crates/core/src/wasm_runtime/tests/contract_metering.rs @@ -5,6 +5,7 @@ use crate::wasm_runtime::tests::TestSetup; use crate::wasm_runtime::{ContractExecError, RuntimeInnerError}; use freenet_stdlib::prelude::*; use std::time::Instant; +use tracing::info; const TEST_CONTRACT_METERING: &str = "test_contract_metering"; @@ -52,7 +53,7 @@ fn validate_state_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -103,7 +104,7 @@ fn test_update_state_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -150,7 +151,7 @@ fn test_summarize_state_metering() -> Result<(), Box> { let result = runtime.summarize_state(&contract_key, &Parameters::from([].as_ref()), &state); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -202,7 +203,7 @@ fn test_get_state_delta_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( diff --git a/crates/core/src/wasm_runtime/tests/mod.rs b/crates/core/src/wasm_runtime/tests/mod.rs index 955c4062e..110c49a0c 100644 --- a/crates/core/src/wasm_runtime/tests/mod.rs +++ b/crates/core/src/wasm_runtime/tests/mod.rs @@ -6,6 +6,7 @@ use freenet_stdlib::prelude::{ use crate::util::tests::get_temp_dir; use crate::util::workspace::get_workspace_target_dir; +use tracing::info; use super::{ContractStore, DelegateStore, SecretsStore}; @@ -22,7 +23,7 @@ pub(crate) fn get_test_module(name: &str) -> Result, Box Result, Box