Refactoring, mainly formatting
This commit is contained in:
parent
3825263fa3
commit
d8f1733eb3
25 changed files with 667 additions and 348 deletions
129
src/comm/mod.rs
129
src/comm/mod.rs
|
@ -1,23 +1,23 @@
|
|||
pub mod messages;
|
||||
pub mod message_processor;
|
||||
pub mod messages;
|
||||
mod types;
|
||||
use tracing::{warn, error, debug};
|
||||
use tracing::{debug, error, warn};
|
||||
pub use types::*;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::io::{Read, Write};
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::io::{Read, Write};
|
||||
|
||||
use anyhow::bail;
|
||||
use i2p::net::{I2pListenerBuilder, I2pListener, I2pSocketAddr, I2pStream, I2pAddr};
|
||||
use i2p::net::{I2pAddr, I2pListener, I2pListenerBuilder, I2pSocketAddr, I2pStream};
|
||||
use i2p::sam_options::SAMOptions;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::Config;
|
||||
use crate::state::CommState;
|
||||
use crate::state::types::PeerId;
|
||||
use crate::state::CommState;
|
||||
use crate::Config;
|
||||
|
||||
use self::messages::Message;
|
||||
|
||||
|
@ -31,16 +31,14 @@ pub struct CommHandle {
|
|||
|
||||
impl CommHandle {
|
||||
pub fn new(state: CommState, config: &Config) -> anyhow::Result<Self> {
|
||||
let mut listener_builder = I2pListenerBuilder::default()
|
||||
.with_options(SAMOptions::default());
|
||||
let mut listener_builder =
|
||||
I2pListenerBuilder::default().with_options(SAMOptions::default());
|
||||
|
||||
if let Some(privkey) = &config.i2p_private_key {
|
||||
listener_builder = listener_builder.with_private_key(privkey.to_string());
|
||||
}
|
||||
|
||||
let listener = listener_builder
|
||||
.build()
|
||||
.unwrap();
|
||||
let listener = listener_builder.build().unwrap();
|
||||
|
||||
Ok(CommHandle {
|
||||
state: Arc::new(state),
|
||||
|
@ -71,7 +69,9 @@ impl CommHandle {
|
|||
// there is another stream - thus, the existing streams will not be read.
|
||||
// `spawn_blocking` moves the reading task to a special pool of tasks which are
|
||||
// executed _despite_ other tasks blocking for something.
|
||||
tokio::task::spawn_blocking(move || Self::read_connection(wrapped_stream, state_arc));
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Self::read_connection(wrapped_stream, state_arc)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -90,11 +90,14 @@ impl CommHandle {
|
|||
for peer in self.state.get_peers().unwrap() {
|
||||
debug!("Sending to peer '{:?}' message '{:?}'", &peer, &msg);
|
||||
if let Err(e) = self.send_to_addr(&peer.addr(), msg_string.as_bytes()).await {
|
||||
debug!("Failed to send message.\nError: {:?}\nMessage: {:?}", e, &msg);
|
||||
debug!(
|
||||
"Failed to send message.\nError: {:?}\nMessage: {:?}",
|
||||
e, &msg
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
|
@ -104,7 +107,7 @@ impl CommHandle {
|
|||
Ok(msg_string) => {
|
||||
self.send_to_addr(dest, msg_string.as_bytes()).await?;
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
|
@ -116,12 +119,14 @@ impl CommHandle {
|
|||
Ok(client) => {
|
||||
//client.inner.sam.conn.set_nodelay(true)?;
|
||||
//client.inner.sam.conn.set_nonblocking(false)?;
|
||||
self.clients.write().await.insert(addr.clone(), Arc::new(RwLock::new(client)));
|
||||
},
|
||||
self.clients
|
||||
.write()
|
||||
.await
|
||||
.insert(addr.clone(), Arc::new(RwLock::new(client)));
|
||||
}
|
||||
Err(e) => bail!(e),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Fetch current client for this connection from clients map, and send the message
|
||||
if let Some(client) = self.clients.read().await.get(&addr) {
|
||||
|
@ -129,18 +134,21 @@ impl CommHandle {
|
|||
match writeguard.write_all(msg) {
|
||||
Ok(_) => {
|
||||
writeguard.flush()?;
|
||||
return Ok(())
|
||||
},
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error writing to stream: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
return Err(anyhow::Error::msg("No client found despite trying to add one beforehand."))
|
||||
} else {
|
||||
return Err(anyhow::Error::msg(
|
||||
"No client found despite trying to add one beforehand.",
|
||||
));
|
||||
}
|
||||
self.clients.write().await.remove(&addr);
|
||||
Err(anyhow::Error::msg("Failed to send anything, most likely the stream was broken and has been removed"))
|
||||
Err(anyhow::Error::msg(
|
||||
"Failed to send anything, most likely the stream was broken and has been removed",
|
||||
))
|
||||
}
|
||||
|
||||
pub fn i2p_address(&self) -> anyhow::Result<I2pSocketAddr> {
|
||||
|
@ -156,27 +164,29 @@ impl CommHandle {
|
|||
Ok(i2p_dest)
|
||||
}
|
||||
|
||||
fn read_connection(wrapped_stream: Arc<RwLock<I2pStream>>, state: Arc<CommState>) -> JoinHandle<()> {
|
||||
fn read_connection(
|
||||
wrapped_stream: Arc<RwLock<I2pStream>>,
|
||||
state: Arc<CommState>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut stream = wrapped_stream.write().await;
|
||||
let peer: PeerId = stream.peer_addr().expect("Failed to get peer addr").into();
|
||||
|
||||
|
||||
// All streams start with a \n byte which does not belong to the payload, take that from the stream.
|
||||
if let Err(e) = stream.read(&mut [0; 1]) {
|
||||
error!("Error while reading first byte of stream: {}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
let iterator = serde_json::Deserializer::from_reader(&mut *stream).into_iter::<serde_json::Value>();
|
||||
|
||||
let iterator = serde_json::Deserializer::from_reader(&mut *stream)
|
||||
.into_iter::<serde_json::Value>();
|
||||
for item in iterator {
|
||||
match item {
|
||||
Ok(value) => {
|
||||
match serde_json::from_value::<Message>(value) {
|
||||
Ok(message) => {
|
||||
message_processor::handle(state.deref(), &peer, message);
|
||||
},
|
||||
Err(e) => warn!("Deserialization failed: {:?}", e),
|
||||
Ok(value) => match serde_json::from_value::<Message>(value) {
|
||||
Ok(message) => {
|
||||
message_processor::handle(state.deref(), &peer, message);
|
||||
}
|
||||
Err(e) => warn!("Deserialization failed: {:?}", e),
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("Deserialization failed: {:?}", e);
|
||||
|
@ -188,39 +198,52 @@ impl CommHandle {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use i2p::Session;
|
||||
use i2p::net::I2pListener;
|
||||
use i2p::sam::StreamForward;
|
||||
use i2p::sam_options::SAMOptions;
|
||||
use i2p::Session;
|
||||
|
||||
use crate::Config;
|
||||
use crate::state::{State, CommState};
|
||||
use crate::comm::{messages, Message};
|
||||
use crate::state::types::ElementId;
|
||||
use crate::state::{CommState, State};
|
||||
use crate::Config;
|
||||
|
||||
use super::CommHandle;
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
pub async fn msg() {
|
||||
let ch = CommHandle::new(CommState::new(State::new().await.unwrap()), &Config::default() ).unwrap();
|
||||
let ch = CommHandle::new(
|
||||
CommState::new(State::new().await.unwrap()),
|
||||
&Config::default(),
|
||||
)
|
||||
.unwrap();
|
||||
ch.run().await;
|
||||
println!("My address: {:?}", ch.i2p_b32_address());
|
||||
|
||||
|
||||
ch.send(
|
||||
&ch.i2p_address().unwrap(),
|
||||
Message::new(messages::MessageContent::Hello { peer_name: "a".to_string() })
|
||||
).await.expect("Could not send hello");
|
||||
Message::new(messages::MessageContent::Hello {
|
||||
peer_name: "a".to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("Could not send hello");
|
||||
for i in 0..10 {
|
||||
let result = ch.send(
|
||||
&ch.i2p_address().unwrap(),
|
||||
Message::new(messages::MessageContent::CreateElement { id: ElementId::new(), content: crate::state::types::ElementContent::Text(format!("hello world no. {}", i)) })
|
||||
).await;
|
||||
let result = ch
|
||||
.send(
|
||||
&ch.i2p_address().unwrap(),
|
||||
Message::new(messages::MessageContent::CreateElement {
|
||||
id: ElementId::new(),
|
||||
content: crate::state::types::ElementContent::Text(format!(
|
||||
"hello world no. {}",
|
||||
i
|
||||
)),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||
println!("Result of sending: {:?}", result);
|
||||
}
|
||||
|
@ -232,11 +255,11 @@ mod tests {
|
|||
pub fn from_privkey() {
|
||||
let privkey = "DPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gQz1KRvzfH0pI7uNWn8ZiRKQ812KjOrOMqnq7owjAjSBDPUpG~N8fSkju41afxmJEpDzXYqM6s4yqerujCMCNIEM9Skb83x9KSO7jVp~GYkSkPNdiozqzjKp6u6MIwI0gVKUHq0huLwfh0u06PlujRXTgcUJw9pg4Vkh-e0CGQFL6yn2FxCUIvaryyFt3-8mwO1OTkQyB7u1rnO9FpLlKeT9FPSkwmaxZmwQ1kvsuTTIp5ntxQZ1XMCDm2qhRWdcEsYxTKLJIMYxN1Ujk9Y7SqNYORmxrwQWC4ENGnt~VyvbAAAAfAabqgU0GhMWN2syDQ5sYZ61WXDqC4esasxwyLvJ-ES7~k40Uq9htc8t16-RXEq0Q17C499WxW6~GQRcXbgBNd0bMdV-46RsFo1jNgfB6H4nkuTrQXMqXB6s2Fhx2gwcHRk3Lt5DE4N0mvHG8Po974tJWr1hIRiSxQUtSj5kcZOOT~EKWMoCA7qDgZORZAnJavaRr0S-PiPQwAw8HOekdw50CyOByxXEfLBAi-Kz1nhdNvMHIrtcBZ~RpsxOK63O633e0PeYwrOOG7AFVLh7SzdwVvI1-KUe7y2ADBcoHuJRMwk5MEV-BATEfhWA2SzWw1qFRzJyb-pGbgGCJQOoc1YcP8jikBJhtuRbD5K-wK5MXoHL";
|
||||
let _ = I2pListener {
|
||||
forward: StreamForward::with_session(&Session::from_destination(
|
||||
i2p::sam::DEFAULT_API,
|
||||
&privkey,
|
||||
SAMOptions::default()).expect("Failed to create session for listener2")
|
||||
).expect("Failed to create StreamForward for listener2")
|
||||
forward: StreamForward::with_session(
|
||||
&Session::from_destination(i2p::sam::DEFAULT_API, &privkey, SAMOptions::default())
|
||||
.expect("Failed to create session for listener2"),
|
||||
)
|
||||
.expect("Failed to create StreamForward for listener2"),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue