use super::model::OnionPerfAnalysis;
use super::utils::decompress_xz;
use chrono::{DateTime, Datelike, Days, Utc};
use log::error;
use reqwest::{StatusCode, Url};
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{oneshot, Mutex};
use tokio::time::sleep;
use tokio_stream::wrappers::UnboundedReceiverStream;
const ONION_PERF_ANALYAIS_PATHS: [&str; 2] = ["/htdocs/", "/"];
const RECHECK_INTERVAL: u64 = 2;
pub struct OnionPerfRunner {
pub host_name: String,
already_checked: Arc<Mutex<Vec<DateTime<Utc>>>>,
onionperf_analysis_sender: UnboundedSender<OnionPerfAnalysis>,
pub onionperf_analysis_receiver_stream: Arc<Mutex<UnboundedReceiverStream<OnionPerfAnalysis>>>,
onionperf_terminate_sender: Mutex<Option<oneshot::Sender<()>>>,
onionperf_terminate_receiver: Mutex<oneshot::Receiver<()>>,
}
impl OnionPerfRunner {
pub fn new<T: AsRef<str>>(host_name: T) -> Self {
let already_checked = Arc::default();
let (sd, rv) = tokio::sync::mpsc::unbounded_channel::<OnionPerfAnalysis>();
let onionperf_analysis_receiver_stream = Arc::new(Mutex::new(UnboundedReceiverStream::new(rv)));
let (onionperf_terminate_sender, onionperf_terminate_receiver) = oneshot::channel();
let onionperf_terminate_receiver = Mutex::new(onionperf_terminate_receiver);
let onionperf_terminate_sender = Mutex::new(Some(onionperf_terminate_sender));
OnionPerfRunner {
host_name: String::from(host_name.as_ref()),
onionperf_analysis_sender: sd,
onionperf_analysis_receiver_stream,
already_checked,
onionperf_terminate_sender,
onionperf_terminate_receiver,
}
}
async fn process_one_try(
&self,
current_utc_time: &mut DateTime<Utc>,
current_utc_date: &str,
tries: &mut i32,
onionperf_analysis_file: Result<Vec<u8>, OnionPerfRunnerError>,
) -> OneTryResult {
match onionperf_analysis_file {
Ok(onionperf_analysis_file_compressed) => match decompress_xz(&onionperf_analysis_file_compressed).await {
Ok(onionperf_analysis_file_decompressed) => {
let onionperf_analysis = serde_json::from_slice::<OnionPerfAnalysis>(&onionperf_analysis_file_decompressed[..]);
match onionperf_analysis {
Ok(mut v) => {
v.time = Some(current_utc_date.to_owned());
let _ = self.onionperf_analysis_sender.send(v);
let mut already_checked: tokio::sync::MutexGuard<'_, Vec<DateTime<Utc>>> = self.already_checked.lock().await;
already_checked.push(*current_utc_time);
return OneTryResult::ReceivedAndParsed;
}
Err(e) => {
log::error!("There was error on parsing OnionPerfAnalysis JSON file at tries no {tries} {:?}", e);
*tries += 1;
}
}
}
Err(e) => {
log::error!(
"There was error on decompressing the xz compressed OnionPerfAnalysis JSON file aat tries no {tries} {:?}",
e
);
*tries += 1;
}
},
Err(onionperfrunner_error) => {
match onionperfrunner_error {
OnionPerfRunnerError::OnionPerfAnalysisFileNotFound => {
log::warn!(
"The OnionPerfAnalysis file was not found, it mostly due to file not being available at the link at this time
and server throwing 404 errors"
);
*current_utc_time = current_utc_time.checked_sub_days(Days::new(1)).unwrap();
let checked = {
let already_checked = self.already_checked.lock().await;
already_checked
.iter()
.any(|already_checked_utc_time| already_checked_utc_time.day() == current_utc_time.day())
};
if checked {
*current_utc_time = current_utc_time.checked_add_days(Days::new(1)).unwrap();
}
return OneTryResult::NotFound;
}
OnionPerfRunnerError::OnionPerfAnalysisNetworkError => {
log::error!("Network error has occured");
}
}
}
};
OneTryResult::Continue
}
pub async fn start(&self) {
let mut current_utc_time = Utc::now();
let mut terminate_receiver_guard = self.onionperf_terminate_receiver.lock().await;
'onionperf_running_main_loop: loop {
let checked = {
let already_checked = self.already_checked.lock().await;
already_checked
.iter()
.any(|already_checked_utc_time| already_checked_utc_time.day() == current_utc_time.day())
};
if !checked {
let current_utc_date = self.generate_date(current_utc_time);
let mut tries = 0;
'decompression_parsing_error_try_iteration: loop {
let onionpef_analysis_file_future = self.download_onionperf_analysis_file(current_utc_date.clone());
let onionperf_analysis_file;
select! {
file = onionpef_analysis_file_future => {
onionperf_analysis_file = file;
}
_ = &mut *terminate_receiver_guard => {
break 'onionperf_running_main_loop;
}
}
let res = self
.process_one_try(&mut current_utc_time, ¤t_utc_date, &mut tries, onionperf_analysis_file)
.await;
if let OneTryResult::NotFound | OneTryResult::ReceivedAndParsed = res {
break 'decompression_parsing_error_try_iteration;
}
if tries >= 5 {
tokio::select! {
_ = &mut *terminate_receiver_guard => {
break 'onionperf_running_main_loop;
}
_ = sleep(Duration::from_secs(RECHECK_INTERVAL * 60 * 60)) => {
break 'decompression_parsing_error_try_iteration;
}
}
}
}
} else {
tokio::select! {
_ = &mut *terminate_receiver_guard => {
break 'onionperf_running_main_loop;
}
_ = sleep(Duration::from_secs(RECHECK_INTERVAL * 60 * 60)) => ()
}
current_utc_time = current_utc_time.checked_add_days(Days::new(1)).unwrap();
}
}
}
pub async fn stop(&self) {
let mut terminate_sender_guard = self.onionperf_terminate_sender.lock().await;
if let Some(sender) = (*terminate_sender_guard).take() {
let sd_res = sender.send(());
if let Err(e) = sd_res {
error!("Error occurred when sending OnionPerfRunner stop signal. {:?}", e);
}
}
}
async fn download_onionperf_analysis_file(&self, date: String) -> Result<Vec<u8>, OnionPerfRunnerError> {
let (url_1, url_2) = self.generate_onionperf_analysis_urls(date);
let reqwest_client = reqwest::Client::new();
let resp_1 = reqwest_client.get(url_1).send().await;
let resp_2 = reqwest_client.get(url_2).send().await;
if let Ok(resp) = resp_1 {
if resp.status() == StatusCode::OK {
let data = resp.bytes().await.unwrap().into();
return Ok(data);
}
}
match resp_2 {
Ok(resp) => {
if resp.status() == StatusCode::OK {
let data = resp.bytes().await.unwrap().into();
Ok(data)
} else {
Err(OnionPerfRunnerError::OnionPerfAnalysisFileNotFound)
}
}
Err(_) => {
Err(OnionPerfRunnerError::OnionPerfAnalysisNetworkError)
}
}
}
fn generate_date(&self, time: DateTime<Utc>) -> String {
let year = time.year();
let month = time.month();
let day = time.day();
let date = format!("{year}-{month:02}-{day:02}");
date
}
fn generate_onionperf_analysis_urls(&self, date: String) -> (Url, Url) {
let (url_1, url_2) = {
let url_1 = format!(
"https://{}.onionperf.torproject.net:8443{}{}",
self.host_name,
ONION_PERF_ANALYAIS_PATHS[0],
format_args!("{}.onionperf.analysis.json.xz", date)
);
let url_2 = format!(
"https://{}.onionperf.torproject.net:8443{}{}",
self.host_name,
ONION_PERF_ANALYAIS_PATHS[1],
format_args!("{}.onionperf.analysis.json.xz", date)
);
log::debug!("Trying to download onionperf analysis either from {url_1} or {url_2}");
(Url::parse(&url_1), Url::parse(&url_2))
};
(url_1.unwrap(), url_2.unwrap())
}
}
#[derive(Debug, thiserror::Error)]
enum OnionPerfRunnerError {
#[error("The onion perf analysis file could not found at both links")]
OnionPerfAnalysisFileNotFound,
#[error("Can't connect to both links")]
OnionPerfAnalysisNetworkError,
}
#[derive(Debug)]
enum OneTryResult {
Continue,
ReceivedAndParsed,
NotFound,
}
#[cfg(test)]
mod tests {
use super::OnionPerfRunner;
use crate::runner::OneTryResult;
use chrono::{TimeZone, Utc};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::oneshot,
time::{self, timeout},
};
use tokio_stream::StreamExt;
#[tokio::test]
async fn test_received_and_parsed() {
let runner = OnionPerfRunner::new("op-de8a");
let mut utc_time = Utc.with_ymd_and_hms(2025, 3, 31, 0, 0, 0).unwrap();
let utc_date = runner.generate_date(utc_time);
let mut tries = 0;
let onionperf_analysis_file = runner.download_onionperf_analysis_file(utc_date.clone()).await;
assert!(onionperf_analysis_file.is_ok(), "Failed to download the analysis file.");
let res = runner
.process_one_try(&mut utc_time, &utc_date, &mut tries, onionperf_analysis_file)
.await;
assert_eq!(tries, 0, "Unexpected tries count change. Expected: 0, Actual: {}", tries);
assert!(
matches!(res, OneTryResult::ReceivedAndParsed),
"Failed to receive or parse. Result: {:?}",
res
);
let mut rv = runner.onionperf_analysis_receiver_stream.lock().await;
let mut has_result = false;
tokio::select! {
_ = (*rv).next() => has_result = true,
_ = time::sleep(Duration::from_secs(1)) => ()
}
assert!(has_result, "No parsed analysis in stream.");
}
#[tokio::test]
async fn test_not_found() {
let runner = OnionPerfRunner::new("op-de8a");
let mut utc_time = Utc.with_ymd_and_hms(2077, 4, 1, 0, 0, 0).unwrap();
let utc_date = runner.generate_date(utc_time);
let mut tries = 0;
let onionperf_analysis_file = runner.download_onionperf_analysis_file(utc_date.clone()).await;
let res = runner
.process_one_try(&mut utc_time, &utc_date, &mut tries, onionperf_analysis_file)
.await;
assert_eq!(tries, 0, "Unexpected tries count change. Expected: 0, Actual: {}", tries);
assert_eq!(
utc_time,
Utc.with_ymd_and_hms(2077, 3, 31, 0, 0, 0).unwrap(),
"Time didn't sub a day. Expected: 2077-03-31, Actual: {}",
utc_time.format("%Y-%m-%d")
);
assert!(matches!(res, OneTryResult::NotFound), "Expected NotFound result. Actual: {:?}", res);
}
#[tokio::test]
#[should_panic]
async fn test_sleeping() {
let runner = OnionPerfRunner::new("op-de8a");
let utc_time = Utc::now();
{
let mut checked_guard = runner.already_checked.lock().await;
(*checked_guard).push(utc_time);
}
timeout(Duration::from_secs(30), async move {
runner.start().await;
})
.await
.unwrap();
}
#[tokio::test]
async fn test_stop() {
let runner = Arc::new(OnionPerfRunner::new("op-de8a"));
let utc_time = Utc::now();
{
let mut checked_guard = runner.already_checked.lock().await;
(*checked_guard).push(utc_time);
}
let runner_cloned = runner.clone();
let (stopped, rx) = oneshot::channel();
let handle = tokio::spawn(async move {
runner_cloned.start().await;
stopped.send(()).unwrap();
});
runner.stop().await;
timeout(Duration::from_secs(30), async move {
match rx.await {
Ok(_) => (),
Err(_) => panic!(),
}
handle.await.unwrap();
})
.await
.unwrap();
}
}