Rust实战教程:做一个UDP聊天软件
文章目录
- 目标任务
- 基础准备
- UdpApp
- 发送、打开和关闭
- 布局
本文所有代码均在文中,若想支持一下博主,可以下载这个:Rust UDP测试软件,内有完整的源代码和可执行文件。
可能需要的预备知识:Rust入门教程⚙️Rust所有权⚙️egui下载⚙️egui程序结构⚙️Rust多线程
目标任务
我们的目标是实现一个UDP测试软件,如下图所示

其左侧为收发窗口,下面是发送文本框,上方是接收文本框,前两行表示本机地址是【127.0.0.1:1233】,目标地址是【127.0.0.1:1234】,第三行表示本机发送一条Hello共5个字节给了【127.0.0.1:1234】;第四行表示从【127.0.0.1:1234】接收到了一个Hi。
右侧是控制窗口,最上方是本机IP、本机端口、目标IP、目标端口四个输入框,之后是一个用于开启和关闭服务的按钮,在下面是一个描述当前状态的标签,最下面是发送按钮。
从布局的角度来说,这个程序并不复杂,只涉及到按钮、文本框和标签,最多再加一个界面布局。但考虑到Udp收发功能,以及Rust语言对所有权的严格要求,我们不得不考虑下面几个问题:
- UDP服务的监听线程与egui窗口独立,如何将其监听到的数据发送给窗口?
- egui窗口如何发送指令以结束UDP的监听服务?
只要想通了这两个问题,那么对于更加复杂的GUI程序,也可以不在话下了。
基础准备
首先,用cargo创建一个新项目,并添加eframe
cargo new udp_tool
cd udp_tool
cargo add eframe
在配置文件【Cargo.toml】中添加【crossbeam-channel】依赖,用于后续的线程通信。
[dependencies]
eframe = "0.33.0"
crossbeam-channel = "0.5"
在main.rs中,需要引用如下模块,egui是图形界面;UdpSocket是UDP服务;sync用于线程控制;thread用于开启线程。在引用完这些东西之后,创建的两个常量分别是红色和绿色,用于显示当前状态。main函数则是egui的统一格式,其中调用的【UdpApp】,就是整个项目的主角,也是我们接下来需要费尽心思实现的主程序。
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
use eframe::egui;
use std::net::UdpSocket;
use egui::Color32;
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::thread;const SOFT_GREEN: Color32 = Color32::from_rgb(70, 160, 90);
const SOFT_RED: Color32 = Color32::from_rgb(180, 60, 60);fn main() -> eframe::Result {let options = eframe::NativeOptions {viewport: egui::ViewportBuilder::default().with_inner_size([600.0, 300.0]),..Default::default()};eframe::run_native("UdpApp",options,Box::new(|_cc| Ok(Box::new(UdpApp::default()))), // app_creator)
}
UdpApp
【UdpApp】的成员和初始化函数如下。
struct UdpApp {socket: Option<UdpSocket>,local: String,target: String,local_ip: String, local_port: String,target_ip: String, target_port:String,rx_log: String,tx_buf: String,info: String,ui_ctx: Option<egui::Context>,rx_tx: crossbeam_channel::Sender<String>, // 给 UI 线程发日志rx_rx: crossbeam_channel::Receiver<String>,stop_flag: Arc<AtomicBool>, // 线程退出标志
}impl Default for UdpApp {fn default() -> Self {let (tx, rx) = crossbeam_channel::unbounded();Self {socket: None,target: String::new(),local: String::new(),local_ip: "127.0.0.1".to_owned(),local_port: "0".to_owned(),target_ip: "127.0.0.1".to_owned(), target_port:"0".to_owned(),rx_log: String::new(),tx_buf: String::new(),info: "waiting".to_owned(),ui_ctx: None,rx_tx: tx,rx_rx: rx,stop_flag: Arc::new(AtomicBool::new(false)),}}
}
其中,【socket】即用于UDP通信的套接字对象;local和target分别是字符串形式的本地与发送目标的地址。
从【local_ip】到【info】,均为布局相关的字符串,都可以在界面上找到,其中rx_log即接收文本框中的内容;tx_buf则是发送文本框中的内容;info为状态信息。
【ui_ctx】可以理解为是当前程序的一份克隆,之所以构造这个成员,目的是让Udp在接收到数据后,给窗口一个刷新的提示。
【rx_tx】和【rx_rx】是一对在线程间传递信息的通道,通过【crossbeam_channel::unbounded】创建,rx_tx将被放入Udp的监听函数中,将Udp接收到的数据,传递给egui窗口;rx_rx则在egui中驻足等待。
【stop_flag】为线程停止标志,当点击停止按钮后,用于销毁当前工作的监听线程。
监听程序
监听程序是本项目的核心,可以说,理解了监听过程,就能够写出一个Udp收发软件了,尽管这段代码并不长,但几乎每行都有些说法。
impl UdpApp {fn start_listener(&mut self, sock: UdpSocket) {let tx = self.rx_tx.clone();let ctx = self.ui_ctx.as_ref().unwrap().clone();self.stop_flag.store(false, Ordering::Relaxed);let flag = Arc::clone(&self.stop_flag);let _ = thread::spawn(move || {let mut buf = [0u8; 1024];loop {if flag.load(Ordering::Relaxed) { break; }match sock.recv_from(&mut buf) {Ok((n, src)) => {let txt = String::from_utf8_lossy(&buf[..n]);let _ = tx.send(format!("[{}]👉{}\n",src,txt.trim_end()));ctx.request_repaint(); // ← 立即通知 UI 刷新}Err(_) => break,}}});}
}
下面逐行解读
【tx】是【self.rx_tx】的克隆,如前文所说,self.rx_tx用于线程间的通信。在后面的死循环中,tx调用了【send】方法,将UDP接收到的内容发送给【self.rx_rx】。
【ctx】是self.ui_ctx的克隆,其目的是通知egui窗口刷新。在后面的死循环中,ctx调用了【request_repaint】方法,即刷新界面。
【flag】是self.stop_flag的克隆,且和tx,ctx不同,flag是Arc克隆。区别在于,无论tx还是ctx,在进入死循环之后,都将不受到外界的干扰;flag作为结束信号,则必须根据egui的指令随时发生变化。Arc克隆的作用,就是让线程外的变化穿透到线程内,只有这样,死循环中的【if flag.load(Ordering::Relaxed)】才能起到作用。
在克隆完tx, ctx以及flag之后,程序就进入了死循环。在死循环中,除了flag, tx以及ctx之外,唯一值得一提的也就是sock的接收过程了。sock是外部传入的UdpSocket对象,通过【recv_from】,可以阻塞式地接收外部传来的数据。
所谓阻塞式,就是说只要没收到,死循环就一直卡在recv_from这一步。所以这里就出现了一个坑点,要知道flag是监听函数结束的标记,但监听函数却一直卡在recv_from这一步。也就是说,如果主程序中将flag置为true,但只要没有新的数据传进来,监听函数就会一直卡在recv_from这里,从而没办法执行flag的判断语句,从而导致当前监听程序不会被释放。正因如此,窗口线程在将stop_flag置为true之后,还需要主动给当前UdpSocket发送一个数据,以跳过recv_from,来到flag的判断语句。
发送、打开和关闭
和监听程序相比,打开、关闭以及发送程序虽然冗长,但并没有太多知识点,首先,其发送代码如下
impl UdpApp {fn send_input(&mut self) {let data = self.tx_buf.trim();if data.is_empty() {return;}if let Some(sock) = &self.socket {match sock.send_to(data.as_bytes(), &self.target) {Ok(n) => self.rx_log.push_str(&format!("{}({} bytes)👉[{}]\n", data, n, self.target)),Err(e) => self.rx_log.push_str(&format!("↑ Error: {}\n", e)),}} else {self.rx_log.push_str("Please open UDP\n");}}
}
由于发送代码是窗口按钮直接调度的,所以可以没有任何顾虑地使用UdpApp的任何成员。其逻辑也不复杂,首先获取发送窗口中的字符串tx_buf,如果为空,就不发送,否则判断一下self.socket是否正常,如果正常,就将tx_buf整理一下并发送,否则(即self.socket是None),就提示打开UDP。
打开和关闭UDP的函数如下
impl UdpApp {fn open_socket(&mut self) {self.local = format!("{}:{}", self.local_ip, self.local_port);self.target = format!("{}:{}", self.target_ip, self.target_port);match UdpSocket::bind(&self.local) {Ok(sock) => {if let Ok(real) = sock.local_addr() {self.local = real.to_string();self.rx_log.push_str(&format!("local: {}\r\nTarget:{}\r\n", real, self.target));self.info = "working".to_owned();}let sock_clone = sock.try_clone().expect("socket clone failed");self.start_listener(sock_clone); // 启动线程self.socket = Some(sock);}Err(e) => {self.rx_log.push_str(&format!("Error: {}\n", e));self.info = "Error".to_owned()}}}fn close_socket(&mut self) {self.stop_flag.store(true, Ordering::Relaxed); // 通知线程退出self.socket = None;let _ = UdpSocket::bind("127.0.0.1:0").and_then(|s| s.send_to(b"quit", &self.local));self.info = "waiting".to_owned();self.rx_log.push_str("UDP Closed\n");}
}
open_socket的逻辑也很直接,首先读取本地和目标的IP和端口,并将self.local绑定给UdpSocket,然后更新一下界面,就没什么可说的了。唯一需要注意的,就是别忘了调用start_listener,打开监听程序。
close_socket更加简单,但这里面呼应了前文提到的坑点,再将stop_falg置为true,且将self.socket销毁之后,向当前正在活动的本地地址发送了一条信息。这条信息是什么并不重要,关键是让loop跳出recv_from的阻塞。
布局
最后是布局代码,即egui必不可少的update函数,这里面需要注意的是,除了三个布局面板之外,update函数有两个额外的功能。一是要判断self.ui_ctx是否已经创建,若未创建,则创建一下;二则是需要接收rx_tx传来的数据,即根据rx_rx接收到的内容,实时更改rx_log的内容。完整代码如下。
impl eframe::App for UdpApp {fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {egui::SidePanel::right("right_panel").resizable(true).default_width(150.0).width_range(80.0..=200.0).show(ctx, |ui| {ui.group(|ui| {ui.label("Local IP");ui.text_edit_singleline(&mut self.local_ip);ui.label("Local Port");ui.text_edit_singleline(&mut self.local_port);ui.separator();ui.label("Target IP");ui.text_edit_singleline(&mut self.target_ip);ui.label("Target Port");ui.text_edit_singleline(&mut self.target_port);ui.separator();let btn_text = if self.socket.is_some() {"Close"} else {"Open"};let open_btn = ui.add_sized([ui.available_width(), 20.0],egui::Button::new(btn_text),);if open_btn.clicked() {if self.socket.is_some() {self.close_socket(); // 关闭} else {self.open_socket(); // 打开}}ui.separator();let color = if self.socket.is_some() { SOFT_GREEN } else { SOFT_RED };ui.label(egui::RichText::new(format!("{}", self.info)).color(color));ui.separator();});let send_btn = ui.add_sized([ui.available_width(), ui.available_height()], // 宽 80,高占满面板高度egui::Button::new("Send"),);if send_btn.clicked() {self.send_input();}});egui::TopBottomPanel::bottom("botton_panel").min_height(80.0).show(ctx, |ui|{ui.add(egui::TextEdit::multiline(&mut self.tx_buf).desired_width(f32::INFINITY) // 横向撑满.desired_rows(5) // 纵向撑满.frame(false).hint_text("Input ..."),)});egui::CentralPanel::default().show(ctx, |ui| {// 1. 先放一个可滚动的区域,只让 y 方向能滚egui::ScrollArea::vertical().auto_shrink([false; 2]) // 别自动缩,撑满.show(ui, |ui| {// 2. 把 TextEdit 的高设成“父容器里还剩多少就占多少”let desired_height = ui.available_height();ui.add(egui::TextEdit::multiline(&mut self.rx_log).desired_width(f32::INFINITY).desired_rows((desired_height / ui.text_style_height(&egui::TextStyle::Body)).ceil() as usize).font(egui::TextStyle::Monospace).frame(false),);});});if self.ui_ctx.is_none() {self.ui_ctx = Some(ctx.clone());}while let Ok(line) = self.rx_rx.try_recv() {self.rx_log.push_str(&line);} }
}
