1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
use super::super::{config::Neo4jConfig, relay::Node};
use deadqueue::unlimited::Queue;
use erpc_scanner::work::{CompletedWork, CompletedWorkStatus};
use log::error;
use neo4rs::{query, Graph};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

const RETRY_INTERVAL_ON_ERROR: u64 = 200;
const MAX_INSERTION_LIMIT: u64 = 400;

pub struct Neo4jDbClient {
    /// The Neo4j connection
    graph: Arc<neo4rs::Graph>,

    /// A completed work queue
    completed_work_queue: Arc<Queue<CompletedWork>>,
}

impl Neo4jDbClient {
    /// Creates a new [Neo4jDbClient] from the given [Neo4jConfig]
    pub async fn new(neo4j_config: &Neo4jConfig) -> anyhow::Result<Self> {
        let uri = neo4j_config.uri.clone();
        let username = neo4j_config.username.clone();
        let password = neo4j_config.password.clone();

        let graph = Arc::new(neo4rs::Graph::new(uri, username, password).await?);
        let completed_work_queue = Arc::new(Queue::new());

        let neo4jdbclient = Self {
            graph,
            completed_work_queue,
        };

        neo4jdbclient.start_getting_completed_works();

        Ok(neo4jdbclient)
    }

    async fn push_completed_works(neo4j_connection: Arc<Graph>, completed_works: Vec<CompletedWork>) -> bool {
        if let Ok(transaction) = neo4j_connection.start_txn().await {
            for completed_work in completed_works {
                let guard_relay = &completed_work.source_relay;
                let exit_relay = &completed_work.destination_relay;
                let timestamp = completed_work.timestamp;
                let (status, message) = {
                    match completed_work.status {
                        CompletedWorkStatus::Success => (1, String::from("Success")),
                        CompletedWorkStatus::Failure(ref message) => (0, message.clone()),
                    }
                };
                let raw_query = format!(
                    "MERGE (guard_relay:Relay {{fingerprint: $guard_relay_fingerprint}})
                           MERGE (exit_relay:Relay {{fingerprint: $exit_relay_fingerprint}})
                           MERGE (guard_relay)-[rel:{}]->(exit_relay)
                           SET rel.message = $message, rel.timestamp = $timestamp ",
                    if status == 1 { "CIRCUIT_SUCCESS" } else { "CIRCUIT_FAILURE" }
                );

                let query = query(&raw_query)
                    .param("guard_relay_fingerprint", guard_relay.as_str())
                    .param("exit_relay_fingerprint", exit_relay.as_str())
                    .param("message", message)
                    .param("timestamp", timestamp.to_string());

                if transaction.execute(query).await.is_err() {
                    return false;
                }
            }
            transaction.commit().await.is_ok()
        } else {
            false
        }
    }

    fn start_getting_completed_works(&self) {
        let completed_work_queue = self.completed_work_queue.clone();
        let neo4j_connection = self.graph.clone();

        tokio::task::spawn(async move {
            let mut count = 0;
            let mut completed_works: Vec<CompletedWork> = Vec::new();

            loop {
                match tokio::time::timeout(Duration::from_millis(50), completed_work_queue.pop()).await {
                    Ok(completed_work) => {
                        if count < MAX_INSERTION_LIMIT {
                            completed_works.push(completed_work);
                            count += 1;
                        } else {
                            'error_recovery: loop {
                                if Self::push_completed_works(neo4j_connection.clone(), completed_works.clone()).await {
                                    completed_works.clear();
                                    count = 0;
                                    break 'error_recovery;
                                }
                                sleep(Duration::from_millis(RETRY_INTERVAL_ON_ERROR)).await;
                            }
                        }
                    }
                    Err(_) => {
                        // On timeout we push what we already have
                        if !completed_works.is_empty() {
                            'error_recovery: loop {
                                if Self::push_completed_works(neo4j_connection.clone(), completed_works.clone()).await {
                                    completed_works.clear();
                                    count = 0;
                                    break 'error_recovery;
                                }
                                sleep(Duration::from_millis(RETRY_INTERVAL_ON_ERROR)).await;
                            }
                        }
                    }
                }
            }
        });
    }

    /// Add an edge i.e circuit creation attempt into the database
    pub async fn add_completed_work(&self, completed_work: CompletedWork) {
        self.completed_work_queue.push(completed_work);
    }

    /// Add the indexing item of the relay i.e the fingerprint of the relay
    pub async fn add_node(&self, node: Arc<Node>) {
        let query = query("MERGE (:Relay {fingerprint:  $fingerprint})").param("fingerprint", node.fingerprint());
        if let Err(err) = self.graph.run(query).await {
            error!(
                "Coudln't add the relay with fingerprint {} in the neo4j database | Error : {err:?}",
                node.fingerprint(),
            );
        }
    }
}