diff --git a/src/main.rs b/src/main.rs index beedb7c47c39ca837f5eeccc17e31552c6b29068..c4e03caf61de76a09875a52650bf0244739adf37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{process, time::Duration}; use testnet_injector::utils::miner::get_miner_behavior; use testnet_injector::utils::{mobility_data::*, txbx}; -use tracing::{debug, error, info, Level}; +use tracing::{debug, error, info, warn, Level}; use ::clap::{Args, Parser, Subcommand}; @@ -28,8 +28,10 @@ struct Cli { enum Commands { /// Send txs and/or bxs using default content Send(SendParams), - /// Simulate behaviors from dataset in csv and send them to node + /// Simulate behaviors from csv dataset and send them to the node SimulateBxFromCsv(SimulateBxFromCsv), + /// Send transactions from derived accounts from csv dataset + SendTxFromCsv(SendTxFromCsv), /// Mine a block Mine(MineParams), } @@ -62,6 +64,20 @@ struct SimulateBxFromCsv { chain_id: Option<u64>, } +#[derive(Args)] +struct SendTxFromCsv { + /// Path to CSV file + file: String, + /// Block time in seconds, will send 1 tx every block + block_time: u64, + /// Send tx to unknown addresses + unknown_addr: Option<bool>, + // Node RPC URL + rpc_url: Option<String>, + // Chain ID + chain_id: Option<u64>, +} + #[derive(Args)] struct MineParams { // Node RPC URL @@ -192,7 +208,7 @@ async fn main() -> Result<(), anyhow::Error> { } Some(Commands::SimulateBxFromCsv(params)) => { info!( - "Simulating file {:?} for a total time of {:?} secs", + "Simulating bx from {:?} for a total time of {:?} secs", params.file.clone(), params.total_simulation_duration.clone() ); @@ -306,6 +322,114 @@ async fn main() -> Result<(), anyhow::Error> { block_on(handle).unwrap(); } } + Some(Commands::SendTxFromCsv(params)) => { + info!( + "Sending tx from {:?} every {:?} secs with unknown address={:?}", + params.file, params.block_time, params.unknown_addr + ); + // Override env vars if specified in CLI + if params.rpc_url.clone().is_some() { + rpc_url = params.rpc_url.clone().unwrap(); + } + if params.chain_id.clone().is_some() { + chain_id = params.chain_id.clone().unwrap(); + } + let records = read_csv(csv::Reader::from_path(params.file.clone())?); + if let Err(err) = records { + error!("error running example: {}", err); + process::exit(1); + } + let records = records.unwrap(); + + let mobility_records = generate_behaviors_from_records(records.clone()); + + let is_addr_unknown = params.unknown_addr.unwrap_or(false); + let block_time = if params.block_time > 0 { + params.block_time + } else { + 12 // Ethereum PoS block time + }; + + info!("Simulation of {:?} records", mobility_records.len()); + + // ---------------- + // Sending queue + // ---------------- + let mut handles = Vec::with_capacity(mobility_records.len()); + + for i in 0..mobility_records.len() { + let simulation_behavior = mobility_records[i].clone(); + let rpc_url = rpc_url.clone(); + let chain_id = chain_id.clone(); + let sleep_duration = Duration::from_secs(block_time * i as u64); + + handles.push(tokio::spawn({ + async move { + tokio::time::sleep(sleep_duration).await; + + let provider = Provider::<Http>::try_from(rpc_url.clone()).unwrap(); + + // Wallet is derived from mnemomic with user_id to ensure each user have the same wallet for the simulation. + let mnemonic = std::env::var("MNEMONIC").unwrap_or("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about".to_string()); + let user_id = simulation_behavior.mobility_record.user_id as u32; + let wallet = MnemonicBuilder::<English>::default() + .phrase(PathOrString::String(mnemonic.clone())) + .index(user_id) + .unwrap() + .build() + .unwrap() + .with_chain_id(chain_id); + + let signer = SignerMiddleware::new(provider.clone(), wallet.clone()); + let addr = signer.clone().address(); + let nonce_manager = signer.nonce_manager(addr); + + // List of active accounts from node + let accounts = provider.request::<[Behavior;0], Vec<(Address, U256)>>("ots_getAccountWithBalance", []).await.unwrap(); + + let to_addr = if is_addr_unknown { + Address::random() + } else { + let mut rng = rand::thread_rng(); + let rnd = rng.gen_range(0..accounts.len()); + accounts[rnd].0 + }; + + let from_balance = provider.get_balance(wallet.address(),None).await.unwrap(); + + // Temporary value close to min + let tx_value= U256::from(210000); + + if from_balance.gt(&tx_value) { + let tx = TransactionRequest::new() + .to(to_addr) + .value(tx_value) + .chain_id(chain_id); + + info!( + "Sending tx from: {:?} (user_id: {:?}),to {:?} value {:?} ", + addr, + user_id, + to_addr, + tx_value + ); + + let tx_hash = nonce_manager.send_transaction(tx, None).await; + info!("Sent tx {:?}", tx_hash); + } else { + warn!("Didn't send tx for {:?}, balance too low ({:?}<210000)", addr, from_balance); + } + } + })) + } + + info!("Scheduling done, waiting for tx threads..."); + + // Wait for all of them to complete. + for handle in handles { + block_on(handle).unwrap(); + } + } Some(Commands::Mine(params)) => { // Override env vars if specified in CLI if params.rpc_url.clone().is_some() {