Skip to content

Commit

Permalink
make write async too
Browse files Browse the repository at this point in the history
  • Loading branch information
Davidson-Souza committed Nov 27, 2024
1 parent 7542cd6 commit 02fc3b9
Showing 1 changed file with 61 additions and 16 deletions.
77 changes: 61 additions & 16 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl Node {

fn create_workers(
worker_context: &WorkerContext,
scheduler: Sender<(usize, TcpStream)>,
scheduler: Sender<(usize, TcpStream, Intent)>,
) -> [Sender<Message>; WORKES_PER_CLUSTER] {
let mut workers = Vec::new();
let peers = Rc::new(UnsafeCell::new(HashMap::new()));
Expand Down Expand Up @@ -188,7 +188,14 @@ impl Node {
pub enum Message {
NewConnection(TcpStream),
ReadReady((TcpStream, usize)),
WriteReady((TcpStream, usize)),
Disconnect(usize),

}

enum Intent {
Read,
Write,
}

struct Worker {
Expand All @@ -199,7 +206,7 @@ struct Worker {
chainview: Arc<ChainView>,
magic: Magic,
peers: Rc<UnsafeCell<HashMap<usize, Peer>>>,
scheduler: Sender<(usize, TcpStream)>,
scheduler: Sender<(usize, TcpStream, Intent)>,
}

unsafe impl Sync for Worker {}
Expand All @@ -214,7 +221,7 @@ impl Worker {
chainview: Arc<ChainView>,
magic: Magic,
job_receiver: Receiver<Message>,
scheduler: Sender<(usize, TcpStream)>,
scheduler: Sender<(usize, TcpStream, Intent)>,
) -> Self {
Self {
peers,
Expand Down Expand Up @@ -251,7 +258,7 @@ impl Worker {
let peers = unsafe { &mut *self.peers.get() };

peers.insert(id, peer);
self.scheduler.send((id, socket)).expect("reactor died");
self.scheduler.send((id, socket, Intent::Read)).expect("reactor died");
}

Message::ReadReady((mut stream, id)) => {
Expand All @@ -265,6 +272,7 @@ impl Worker {
if let Err(err) = peer.handle_request(&mut stream) {
if let PeerError::Io(ref e) = err {
if e.kind() == std::io::ErrorKind::WouldBlock {
self.scheduler.send((id, stream, Intent::Write)).expect("reactor died");
continue;
}
}
Expand All @@ -274,7 +282,29 @@ impl Worker {
continue;
}

self.scheduler.send((id, stream)).expect("reactor died");
self.scheduler.send((id, stream, Intent::Read)).expect("reactor died");
}

Message::WriteReady((mut stream, id)) => {
let peers = unsafe { &mut *self.peers.get() };
let Some(peer) = peers.get_mut(&id) else {
log::error!("can't find peer: {id}");
peers.remove(&id);
continue;
};

if let Err(err) = peer.write_back(&mut stream) {
if let PeerError::Io(ref e) = err {
if e.kind() == std::io::ErrorKind::WouldBlock {
self.scheduler.send((id, stream, Intent::Write)).expect("reactor died");
continue;
}
}

log::error!("Error handling request: {}", err);
peers.remove(&id);
continue;
}
}

Message::Disconnect(id) => {
Expand All @@ -289,8 +319,8 @@ impl Worker {
struct Reactor {
listener: TcpListener,
worker_pool: [Sender<Message>; WORKES_PER_CLUSTER],
register: Receiver<(usize, TcpStream)>,
registered: HashMap<usize, TcpStream>,
register: Receiver<(usize, TcpStream, Intent)>,
registered: HashMap<usize, (TcpStream, Intent)>,
block_notifier: Receiver<BlockHash>,
magic: Magic,
timeouts: BTreeMap<Instant, usize>,
Expand All @@ -310,15 +340,21 @@ impl Reactor {
while let Ok(mut work) = self.register.try_recv() {
self.timeouts
.insert(Instant::now() + Duration::from_secs(10 * 60), work.0);
match registry.register(&mut work.1, mio::Token(work.0), mio::Interest::READABLE) {

let intent = match work.2 {
Intent::Read => mio::Interest::READABLE,
Intent::Write => mio::Interest::WRITABLE,
};

match registry.register(&mut work.1, mio::Token(work.0), intent) {
Ok(_) => {}
Err(e) => {
log::error!("Failed to register socket: {}", e);
continue;
}
}

self.registered.insert(work.0, work.1);
self.registered.insert(work.0, (work.1, work.2));
}

let next_timeout = self
Expand All @@ -343,7 +379,7 @@ impl Reactor {

let msg = serialize(&inv);

self.registered.iter_mut().for_each(|(_, socket)| {
self.registered.iter_mut().for_each(|(_, (socket, _))| {
let _ = socket.write_all(&msg);
});
});
Expand All @@ -370,7 +406,7 @@ impl Reactor {
mio::Token(token) => {
let worker_id = random::<usize>() % self.worker_pool.len();
let worker = self.worker_pool.get(worker_id).expect("broken worker");
let mut socket = self
let (mut socket, intent) = self
.registered
.remove(&token)
.expect("BUG: socket is registered but not in registered map");
Expand All @@ -384,10 +420,19 @@ impl Reactor {
self.timeouts.retain(|_, id| id != &token);

debug!("Sending job to worker {}", worker_id);
worker
.send(Message::ReadReady((socket, token)))
.expect("Failed to send to worker");
}
match intent {
Intent::Read => {
worker
.send(Message::ReadReady((socket, token)))
.expect("Failed to send to worker");
}
Intent::Write => {
worker
.send(Message::WriteReady((socket, token)))
.expect("Failed to send to worker");
}
}
}
}
}

Expand All @@ -399,7 +444,7 @@ impl Reactor {
let ping = RawNetworkMessage::new(self.magic, NetworkMessage::Ping(nonce));

let msg = serialize(&ping);
let socket = self
let (socket, _) = self
.registered
.get_mut(&id)
.expect("BUG: socket is registered but not in registered map");
Expand Down

0 comments on commit 02fc3b9

Please sign in to comment.