0 Followers
0 Following
1 Posts

Okay, for some reason I’m unable to paste the code without chunks getting cut out, so here a BASE64 encoded version that should be good:

dXNlIHN0ZDo6e2lvOjpFcnJvciwgc3luYzo6QXJjfTsKCnVzZSB0b2tpbzo6e2lvOjp7QXN5bmNCdWZSZWFkRXh0LCBBc3luY1dyaXRlRXh0LCBCdWZSZWFkZXIsIEJ1ZldyaXRlcn0sCiAgICAgICAgICAgIG5ldDo6e3RjcDo6e093bmVkUmVhZEhhbGYsIE93bmVkV3JpdGVIYWxmfSwKICAgICAgICAgICAgICAgICAgVGNwTGlzdGVuZXJ9LAogICAgICAgICAgICBzeW5jOjp7bXBzYzo6e3NlbGYsIFVuYm91bmRlZFJlY2VpdmVyLCBVbmJvdW5kZWRTZW5kZXJ9LAogICAgICAgICAgICAgICAgICAgUndMb2NrfX07Cgphc3luYyBmbiBzdHJlYW1faGFuZGxlcihzdHJlYW0gOiBPd25lZFJlYWRIYWxmLCBzZW5kZXIgOiBVbmJvdW5kZWRTZW5kZXI8KFN0cmluZywgU3RyaW5nKT4pCnsKICAgIGxldCBhZGRyID0gKCZzdHJlYW0pLnBlZXJfYWRkcigpLnVud3JhcCgpLnRvX3N0cmluZygpOwoKICAgIGxldCBtdXQgcmVhZGVyID0gQnVmUmVhZGVyOjpuZXcoc3RyZWFtKTsKCiAgICBsZXQgbXV0IGJ1ZmZlciA6IFZlYzx1OD4gPSB2ZWMhW107CgogICAgd2hpbGUgbGV0IE9rKG4pID0gcmVhZGVyLnJlYWRfdW50aWwoYidcbicsICZtdXQgYnVmZmVyKS5hd2FpdAogICAgewogICAgICAgIGlmIG4gPT0gMAogICAgICAgIHsKICAgICAgICAgICAgYnJlYWs7CiAgICAgICAgfQoKICAgICAgICBsZXQgbWVzc2FnZSA9IFN0cmluZzo6ZnJvbV91dGY4X2xvc3N5KCZidWZmZXJbLi5dKTsKCiAgICAgICAgc2VuZGVyLnNlbmQoKGFkZHIuY2xvbmUoKSwgbWVzc2FnZS50b19zdHJpbmcoKSkpLnVud3JhcCgpOwoKICAgICAgICBidWZmZXIuY2xlYXIoKTsKICAgIH0KfQoKYXN5bmMgZm4gc2VuZF90b19vdGhlcnMoCiAgICByZWN2IDogJm11dCBVbmJvdW5kZWRSZWNlaXZlcjwoU3RyaW5nLCBTdHJpbmcpPiwKICAgIHdyaXRlcyA6IEFyYzxSd0xvY2s8VmVjPE93bmVkV3JpdGVIYWxmPj4+CikKewogICAgbG9vcAogICAgewogICAgICAgIGxldCB3cml0ZV9jbG9uZSA9IHdyaXRlcy5jbG9uZSgpOwogICAgICAgIGxldCBtdXQgd3JpdGUgPSB3cml0ZV9jbG9uZS53cml0ZSgpLmF3YWl0OwogICAgICAgIGlmIGxldCBTb21lKG1zZykgPSByZWN2LnJlY3YoKS5hd2FpdAogICAgICAgIHsKICAgICAgICAgICAgZm9yIHN0cmVhbSBpbiB3cml0ZS5pdGVyX211dCgpCiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAgIGxldCBtdXQgd3JpdGVyID0gQnVmV3JpdGVyOjpuZXcoc3RyZWFtKTsKCiAgICAgICAgICAgICAgICBsZXQgbWVzc2FnZSA9IGZvcm1hdCEoIklOQ09NSU5HOiB7fSAtIHt9IiwgbXNnLjAsIG1zZy4xKTsKCiAgICAgICAgICAgICAgICBfID0gd3JpdGVyLndyaXRlX2FsbChtZXNzYWdlLmFzX2J5dGVzKCkpLmF3YWl0OwogICAgICAgICAgICB9CiAgICAgICAgfQogICAgfQp9CgojW3Rva2lvOjptYWluXQphc3luYyBmbiBtYWluKCkgLT4gUmVzdWx0PCgpLCBFcnJvcj4KewogICAgbGV0IGxpc3RlbmVyID0gVGNwTGlzdGVuZXI6OmJpbmQoIjAuMC4wLjA6NjY2NyIpLmF3YWl0PzsKCiAgICBsZXQgKHNlbmQsIG11dCByZWNlaXZlKSA6ICgKICAgICAgICBVbmJvdW5kZWRTZW5kZXI8KFN0cmluZywgU3RyaW5nKT4sCiAgICAgICAgVW5ib3VuZGVkUmVjZWl2ZXI8KFN0cmluZywgU3RyaW5nKT4KICAgICkgPSBtcHNjOjp1bmJvdW5kZWRfY2hhbm5lbCgpOwoKICAgIGxldCB3cml0ZXMgOiBBcmM8UndMb2NrPFZlYzxPd25lZFdyaXRlSGFsZj4+PiA9IEFyYzo6bmV3KFJ3TG9jazo6bmV3KHZlYyFbXSkpOwoKICAgIGxldCB3cml0ZXNfY2xvbmUgPSB3cml0ZXMuY2xvbmUoKTsKCiAgICB0b2tpbzo6c3Bhd24oYXN5bmMgbW92ZSB7CiAgICAgICAgc2VuZF90b19vdGhlcnMoJm11dCByZWNlaXZlLCB3cml0ZXNfY2xvbmUpLmF3YWl0OwogICAgfSk7CgogICAgbG9vcAogICAgewogICAgICAgIHByaW50bG4hKCJzdGFydGluZyBsb29wIik7CiAgICAgICAgbGV0IHNlbmQgPSBzZW5kLmNsb25lKCk7CiAgICAgICAgbGV0IHdyaXRlcyA9IHdyaXRlcy5jbG9uZSgpOwoKICAgICAgICAvLyBUaGlzIHJ1bnMgb25jZSBwZXIgY29ubmVjdGlvbiwgYW5kIHN0YXJ0cyBhIGJhY2tncm91bmQgdGhyZWFkIHJ1bm5pbmcgdGhlICdzdHJlYW1faGFuZGxlcicgZnVuY3Rpb24KICAgICAgICBpZiBsZXQgT2soKHN0cmVhbSwgYWRkcmVzcykpID0gbGlzdGVuZXIuYWNjZXB0KCkuYXdhaXQKICAgICAgICB7CiAgICAgICAgICAgIGxldCAocmVhZCwgd3JpdGUpID0gc3RyZWFtLmludG9fc3BsaXQoKTsKCiAgICAgICAgICAgIGxldCBtdXQgd3JpdGVzID0gd3JpdGVzLndyaXRlKCkuYXdhaXQ7CgogICAgICAgICAgICB3cml0ZXMucHVzaCh3cml0ZSk7CgogICAgICAgICAgICBwcmludGxuISgiTmV3IGNsaWVudDogezojP30iLCBhZGRyZXNzLnRvX3N0cmluZygpKTsKCiAgICAgICAgICAgIHRva2lvOjpzcGF3bihhc3luYyBtb3ZlIHsKICAgICAgICAgICAgICAgIHN0cmVhbV9oYW5kbGVyKHJlYWQsIHNlbmQpLmF3YWl0OwogICAgICAgICAgICB9KTsKICAgICAgICB9CiAgICB9Cn0K

[HELP] Sending data to multiple TcpStreams upon receiving data from TcpStream

https://lemmy.world/post/5780632

[HELP] Sending data to multiple TcpStreams upon receiving data from another TcpStream - Lemmy.world

Hey guys, I’ve been stuck on this problem for a while, maybe someone with more tokio/async experience can help me with it. The goal of the program is this: It’s a TCP server that accepts connections. Any data/text received should be sent to the other connections. The problem stems prom the face that I need write access to the vector of OwnedWriteHalfs in two places, simultaneously. So when I first lock the RwLock of the vector to listen in a loop for the messages from the MPSC, that locks the vector to the part of the code that should put the connections in that vector. Once, when I put it in the vector of OwnedWriteHalfs, and once when I go over the list to send the messages to the other clients. Anybody got any ideas or pointers? Thanks! The code: use std::{io::Error, sync::Arc}; use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, net::{tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener}, sync::{mpsc::{self, UnboundedReceiver, UnboundedSender}, RwLock}}; async fn stream_handler(stream : OwnedReadHalf, sender : UnboundedSender<(String, String)>) { let addr = (&stream).peer_addr().unwrap().to_string(); let mut reader = BufReader::new(stream); let mut buffer : Vec = vec![]; while let Ok(n) = reader.read_until(b'\n', &mut buffer).await { if n == 0 { break; } let message = String::from_utf8_lossy(&buffer[..]); sender.send((addr.clone(), message.to_string())).unwrap(); buffer.clear(); } } async fn send_to_others( recv : &mut UnboundedReceiver<(String, String)>, writes : Arc>> ) { loop { let write_clone = writes.clone(); let mut write = write_clone.write().await; if let Some(msg) = recv.recv().await { for stream in write.iter_mut() { let mut writer = BufWriter::new(stream); let message = format!("INCOMING: {} - {}", msg.0, msg.1); _ = writer.write_all(message.as_bytes()).await; } } } } #[tokio::main] async fn main() -> Result<(), Error> { let listener = TcpListener::bind("0.0.0.0:6667").await?; let (send, mut receive) : ( UnboundedSender<(String, String)>, UnboundedReceiver<(String, String)> ) = mpsc::unbounded_channel(); let writes : Arc>> = Arc::new(RwLock::new(vec![])); let writes_clone = writes.clone(); tokio::spawn(async move { send_to_others(&mut receive, writes_clone).await; }); loop { println!("starting loop"); let send = send.clone(); let writes = writes.clone(); // This runs once per connection, and starts a background thread running the 'stream_handler' function if let Ok((stream, address)) = listener.accept().await { let (read, write) = stream.into_split(); let mut writes = writes.write().await; writes.push(write); println!("New client: {:#?}", address.to_string()); tokio::spawn(async move { stream_handler(read, send).await; }); } } }