refactor: introduce constants for default addresses and improve error handling in TCP examples (#7741)

- Added `DEFAULT_ADDR` constant to `chat.rs` and `echo-tcp.rs` for better maintainability.
- Enhanced error logging in `connect-tcp.rs` and `echo-tcp.rs` to include connection addresses.
- Improved peer management in `chat.rs` by automatically cleaning up disconnected peers.
This commit is contained in:
jinronga
2025-11-25 14:44:41 +08:00
committed by GitHub
parent 9a1b076c00
commit 963b631754
3 changed files with 50 additions and 23 deletions

View File

@@ -39,6 +39,8 @@ use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
const DEFAULT_ADDR: &str = "127.0.0.1:6142";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
@@ -70,7 +72,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:6142".to_string());
.unwrap_or_else(|| DEFAULT_ADDR.to_string());
// Bind a TCP listener to the socket address.
//
@@ -88,9 +90,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Spawn our handler to be run asynchronously.
tokio::spawn(async move {
tracing::debug!("accepted connection");
tracing::debug!("accepted connection from {}", addr);
if let Err(e) = process(state, stream, addr).await {
tracing::info!("an error occurred; error = {:?}", e);
tracing::warn!("Connection from {} failed: {:?}", addr, e);
}
});
}
@@ -138,12 +140,26 @@ impl Shared {
/// Send a `LineCodec` encoded message to every peer, except
/// for the sender.
///
/// This function also cleans up disconnected peers automatically.
async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
for peer in self.peers.iter_mut() {
if *peer.0 != sender {
let _ = peer.1.send(message.into());
let mut failed_peers = Vec::new();
let message = message.to_string(); // Clone once for all sends
for (addr, tx) in self.peers.iter() {
if *addr != sender {
if tx.send(message.clone()).is_err() {
// Receiver has been dropped, mark for removal
failed_peers.push(*addr);
}
}
}
// Clean up disconnected peers
for addr in failed_peers {
self.peers.remove(&addr);
tracing::debug!("Removed disconnected peer: {}", addr);
}
}
}
@@ -203,7 +219,10 @@ async fn process(
tokio::select! {
// A message was received from a peer. Send it to the current user.
Some(msg) = peer.rx.recv() => {
peer.lines.send(&msg).await?;
if let Err(e) = peer.lines.send(&msg).await {
tracing::error!("Failed to send message to {}: {:?}", username, e);
break;
}
}
result = peer.lines.next() => match result {
// A message was received from the current user, we should
@@ -221,6 +240,7 @@ async fn process(
username,
e
);
break;
}
// The stream has been exhausted.
None => break,

View File

@@ -58,7 +58,7 @@ pub async fn connect(
//BytesMut into Bytes
Ok(i) => future::ready(Some(i.freeze())),
Err(e) => {
println!("failed to read from socket; error={e}");
eprintln!("failed to read from socket; error={e}");
future::ready(None)
}
})

View File

@@ -27,6 +27,9 @@ use tokio::net::TcpListener;
use std::env;
use std::error::Error;
const DEFAULT_ADDR: &str = "127.0.0.1:8080";
const BUFFER_SIZE: usize = 4096;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Allow passing an address to listen on as the first argument of this
@@ -34,7 +37,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// 127.0.0.1:8080 for connections.
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
.unwrap_or_else(|| DEFAULT_ADDR.to_string());
// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
@@ -44,7 +47,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
// Asynchronously wait for an inbound socket.
let (mut socket, _) = listener.accept().await?;
let (mut socket, addr) = listener.accept().await?;
// And this is where much of the magic of this server happens. We
// crucially want all clients to make progress concurrently, rather than
@@ -55,23 +58,27 @@ async fn main() -> Result<(), Box<dyn Error>> {
// which will allow all of our clients to be processed concurrently.
tokio::spawn(async move {
let mut buf = vec![0; 1024];
let mut buf = vec![0; BUFFER_SIZE];
// In a loop, read data from the socket and write the data back.
loop {
let n = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");
if n == 0 {
return;
match socket.read(&mut buf).await {
Ok(0) => {
// Connection closed by peer
return;
}
Ok(n) => {
// Write the data back. If writing fails, log the error and exit.
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("Failed to write to socket {}: {}", addr, e);
return;
}
}
Err(e) => {
eprintln!("Failed to read from socket {}: {}", addr, e);
return;
}
}
socket
.write_all(&buf[0..n])
.await
.expect("failed to write data to socket");
}
});
}