【Linux网络】实现一个简单的聊天室

本篇将介绍如何设计一个简单的聊天室,Socket编程UDP协议。UDP协议⽀持全双⼯,⼀个sockfd,既可以读取,⼜可以写⼊,对于客户端和服务端同样如此。
获取到信息的server,把这个信息当作一个任务推送给线程池,然后由线程池把消息转发给所有的client。

在上图的右半部分,其中的server就可以看作一个生产者,整个转发消息的服务器就是一个生产者消费者模型。
1.路由功能的设计
首先这个路由就是给在线用户转发消息的,所以首先要记录下当前所有的用户,我们没有设计登陆注册的功能,所以这里首次发消息就等同于登录。
1.1 新增用户
记录用户就是记录一个InetAddr的类,记录用户的容器这里就选择vector,新增用户之前肯定要判断这个用户是否存在,而判断用户是否存在就需要InetAddr类重载==。
// InetAddr.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>class InetAddr
{
public:InetAddr(struct sockaddr_in &addr) : _addr(addr){_port = ntohs(addr.sin_port); // 网络序列转主机序列_ip = inet_ntoa(addr.sin_addr); // 4字节网络序列转点分十进制q}const struct sockaddr_in &NetAddr() { return _addr; } // 网络地址std::string Ip() { return _ip; } // 主机ipuint16_t Port() { return _port; } // 主机portstd::string StringAddr() { return "[" + _ip + ":" + std::to_string(_port) + "]"; }bool operator==(InetAddr &i){return i.Ip() == _ip && i.Port() == _port;}~InetAddr() {}private:struct sockaddr_in _addr; // 网络序列std::string _ip;uint16_t _port;
};
遍历vector,判断用户是否存在。如果用户不存在,就把这个用户加进来。并且添加用户的时候还可以打印一下用户的信息。
// Route.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "InetAddr.hpp"
#include "MyLog.hpp"using namespace MyLog;class Route
{
private:bool IsExist(InetAddr &peer) // 判断用户是否存在{for (auto &user : _online_user){if (user == peer)return true;}return false;}void AddUser(InetAddr &peer){LOG(LogLevel::INFO) << "新增一个在线用户: " << peer.StringAddr();_online_user.push_back(peer);}public:Route() {}~Route() {}private:std::vector<InetAddr> _online_user; // 记录在线用户
};
1.2 发消息
发消息之前看看这个用户是否被记录,如果没有就新增。
要把消息发送给包括自己在内的所有人,所以要遍历所有用户,发消息用到的接口是sendto,这个接口要sockfd,以及要发送的消息内容,所以MessageRoute的参数列表还要添加。
而发消息的时候发出去的不止消息本身,应该还要有这个消息是谁发的相关的信息,所以这里再对这个消息做一个处理。
void MessageRoute(int sockfd, const std::string &message, InetAddr &peer){if (!IsExist(peer))AddUser(peer);std::string send_message = peer.StringAddr() + "# " + message;for (auto &user : _online_user){sendto(sockfd, send_message.c_str(), send_message.size(), 0,(struct sockaddr *)&user.NetAddr(), sizeof(user.NetAddr()));}}
后面两个参数就直接调用InetAddr类里的NetAddr接口,这个就是网络序列,直接用就行。
然后我们修改UdpServer.hpp和UdpServer.cc的代码,进行一下测试。
// UdpServer.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <strings.h>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "MyLog.hpp"
#include "InetAddr.hpp"using namespace MyLog;
// 返回值为void, 参数包括sockfd, 要发送的消息,InetAddr类
using func_t = std::function<void(int &, const std::string &, InetAddr &)>;
class UdpServer
{
public:UdpServer(uint16_t &port, func_t func): _sockfd(-1),_port(port),_isrunning(false),_func(func){}void Init(){// 1.创建套接字_sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (_sockfd == -1){LOG(LogLevel::FATAL) << "创建套接字失败";exit(1);}LOG(LogLevel::INFO) << "creat socket success, sockfd: " << _sockfd;struct sockaddr_in local;bzero(&local, sizeof(local)); // 先清0local.sin_family = AF_INET;local.sin_port = htons(_port); // 主机序列转网络序列local.sin_addr.s_addr = INADDR_ANY;int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));if (n < 0){LOG(LogLevel::FATAL) << "bind失败";exit(2);}LOG(LogLevel::INFO) << "bind success";}void Start(){if (_isrunning)return; // 不要重复启动_isrunning = true;while (true){// server只需要收消息,不用发消息char buffer[1024];struct sockaddr_in peer;socklen_t len = sizeof(peer); // 大小要是socklen_t类型size_t n = recvfrom(_sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);if (n > 0){buffer[n] = 0; // 手动添加字符串结束标志// 网络序列转主机序列InetAddr client(peer); // 传peer结构体过去_func(_sockfd, buffer, client); // 把client类对象传过去}}}~UdpServer() {}private:int _sockfd;uint16_t _port;bool _isrunning;func_t _func; // 服务器的回调函数
};
// UdpServer.cc文件
#include "UdpServer.hpp"
#include "Route.hpp"
#include <memory>// 格式为:udpserver port
int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage:" << argv[0] << " port" << std::endl;return 1;}uint16_t port = std::stoi(argv[1]);// 路由对象提供路由功能Route r;// 网络服务器对象提供通信功能Refresh_Log_To_Console();std::unique_ptr<UdpServer> server = std::make_unique<UdpServer>(port, [&r](int sockfd, const std::string &message, InetAddr &peer){ r.MessageRoute(sockfd, message, peer); });server->Init();server->Start();return 0;
}
打开两个Xshell,用不同的ip地址发消息,在发消息的同时,用户就被添加进来了。

但是172地址的client端不能立即收到127.0.0.1的消息,要按一下回车才能收到前一条消息。

会出现这个情况的原因是我们的客户端此时的代码逻辑是要求先发消息才能收消息,不发消息就会被阻塞住。

而在群聊中,哪怕自己不说话也应该收到别人发的消息。
2.修改客户端
客户端也要是多线程的,一个线程负责发消息,一个线程负责收消息。此时我们需要把我们的线程给引入进来。线程相关讲解在:【Linux】多线程创建及封装
为了减少传参,这里选择把sockfd,ip和port定义为全局的。
// UdpClient.cc文件
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Thread.hpp"using namespace MyThread;std::string server_ip;
uint16_t server_port;
int sockfd;void Send()
{// 填写服务器信息struct sockaddr_in server;memset(&server, 0, sizeof(server)); // 先请0server.sin_family = AF_INET;server.sin_port = htons(server_port); // 主机转网络server.sin_addr.s_addr = inet_addr(server_ip.c_str()); // ip转4字节,4字节转网络序列// 发消息while (true){std::string input;std::getline(std::cin, input); // 从输入流获取到input里sendto(sockfd, input.c_str(), input.size(), 0, (struct sockaddr *)&server, sizeof(server));}
}void Recv()
{while (true){// 收消息char buffer[1024];struct sockaddr_in peer;socklen_t len = sizeof(peer);int m = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);if (m > 0){buffer[m] = 0;std::cout << buffer << std::endl;}}
}// 格式为:udpclient server_ip server_port
int main(int argc, char *argv[])
{if (argc != 3){std::cout << "Usage:" << argv[0] << " server_ip server_port" << std::endl;return 1;}server_ip = argv[1];server_port = std::stoi(argv[2]);// 1.创建套接字sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0){std::cout << "client创建套接字失败" << std::endl;return 2;}// 2.bind,但不需要显示的bind// 3.创建线程,主线程只要join就行Thread sender(Send); // 此线程负责发消息Thread Receiver(Recv); // 此线程负责收消息sender.Start();Receiver.Start();sender.Join();Receiver.Join();return 0;
}
此时发消息和收消息就可以同时进行了。

但是此时我们发的消息和就和收到的消息混在一起了。
3.消息分区
对于收消息来说,只需要输出,他的输出是通过recvfrom从网络里来的。

我们就可以让它往标准错误cerr里输出,文件描述符为2。
std::cerr << buffer << std::endl; // 往标准错误里输出
对于发送消息来说,既有标准输出又有标准输入。标准输入的文件描述符的0,标准输出的文件描述符为1。这里暂时不做修改。

确认自己的设备。
ls /dev/pts

然后一个一个测试当前的Xshell在哪个pts下。

当前是pts/8。
再打开一个Xshell,同样的方法测试新开的在哪个pts下。

新打开的是pts/9。
现在确定设备之后我们将标准错误重定向到另一个pts上。
./udpclient 127.0.0.1 8080 2>/dev/pts/9

现在发消息的显示就在pts/8,而收到的消息就显示在pts/9上(自己发的消息自己也要收到)。
4.用户下线
假设消息内容为QUIT时,就证明这个用户想退出。而用户退出后,他退出的消息也要告知别人。
// Route.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "InetAddr.hpp"
#include "MyLog.hpp"using namespace MyLog;class Route
{
private:bool IsExist(InetAddr &peer) // 判断用户是否存在{for (auto &user : _online_user){if (user == peer)return true;}return false;}void AddUser(InetAddr &peer){LOG(LogLevel::INFO) << "新增一个在线用户: " << peer.StringAddr();_online_user.push_back(peer);}void DeleteUser(InetAddr &peer){for (auto i = _online_user.begin(); i != _online_user.end(); i++) // 迭代器遍历{if (*i == peer){LOG(LogLevel::INFO) << "删除一个在线用户" << peer.StringAddr();_online_user.erase(i);break;}}}public:Route() {}void MessageRoute(int sockfd, const std::string &message, InetAddr &peer){if (!IsExist(peer))AddUser(peer);// 发消息std::string send_message = peer.StringAddr() + "# " + message;for (auto &user : _online_user){sendto(sockfd, send_message.c_str(), send_message.size(), 0,(struct sockaddr *)&user.NetAddr(), sizeof(user.NetAddr()));}// 退出的时候,退出消息也要先发给所有人,再退出if (message == "QUIT"){DeleteUser(peer);}}~Route() {}private:std::vector<InetAddr> _online_user; // 记录在线用户
};
用户被删除后相应的线程也要被cancel,在发消息的函数内cancel掉收消息的线程,然后自己退出循环,自己的线程也就结束了,两个子线程就等待主线程的join。
因为cancel线程要得到线程的id,所以这里就简单粗暴的定义一个全局的pthread变量,赋值为收消息的线程id。
// UdpClient.cc文件
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Thread.hpp"using namespace MyThread;std::string server_ip;
uint16_t server_port;
int sockfd;
pthread_t recv_id;void Send()
{// 填写服务器信息struct sockaddr_in server;memset(&server, 0, sizeof(server)); // 先请0server.sin_family = AF_INET;server.sin_port = htons(server_port); // 主机转网络server.sin_addr.s_addr = inet_addr(server_ip.c_str()); // ip转4字节,4字节转网络序列// 发消息while (true){std::string input;std::cout << "Please Enter# ";std::getline(std::cin, input); // 从输入流获取到input里sendto(sockfd, input.c_str(), input.size(), 0, (struct sockaddr *)&server, sizeof(server));if(input == "QUIT"){pthread_cancel(recv_id); // 把收消息的进程cancelbreak; // 然后自己退出}}
}void Recv()
{while (true){// 收消息char buffer[1024];struct sockaddr_in peer;socklen_t len = sizeof(peer);int m = recvfrom(sockfd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);if (m > 0){buffer[m] = 0;//std::cout << buffer << std::endl;std::cerr << buffer << std::endl;}}
}// 格式为:udpclient server_ip server_port
int main(int argc, char *argv[])
{if (argc != 3){std::cout << "Usage:" << argv[0] << " server_ip server_port" << std::endl;return 1;}server_ip = argv[1];server_port = std::stoi(argv[2]);// 1.创建套接字sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0){std::cout << "client创建套接字失败" << std::endl;return 2;}// 2.bind,但不需要显示的bind// 3.创建线程,主线程只要join就行Thread sender(Send); // 此线程负责发消息Thread Receiver(Recv); // 此线程负责收消息sender.Start();Receiver.Start();recv_id = Receiver.Id(); // 获得收消息线程的idsender.Join();Receiver.Join();return 0;
}

我们运行程序之后会有一点点小问题,因为我们是直接把线程cancel了,所以自己看不到自己的退出消息,但是别人可以看到。


5.加入线程池
现在我们的消息是由server调用对应的函数然后进行消息转发的。

我们要server做的事情就是把消息转发当成一个任务给到线程池,让线程池里的线程负责转发消息,而不是server自己直接调用消息转发的函数。
线程池的详细讲解在:【Linux】线程池
线程池初始化的时候要传一个方法过去,这里先定义一个返回值和参数都是void的函数类型,传给线程池做初始化,之前的线程池已经实现成了单例模式,所以这里直接获取单例。
// UdpServer.cc文件
#include "UdpServer.hpp"
#include "Route.hpp"
#include "ThreadPool.hpp"
#include <memory>using namespace MyThreadPool;
using task_t = std::function<void()>; // 格式为:udpserver port
int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage:" << argv[0] << " port" << std::endl;return 1;}uint16_t port = std::stoi(argv[1]);Refresh_Log_To_Console();// 1.路由对象提供路由功能Route r;// 2.线程池auto tp = ThreadPool<task_t>::GetInstance(); // 获取单例// 3.网络服务器对象提供通信功能return 0;
}
服务器就要向线程池里入任务。服务器对象对象初始化的时候第一个参数是端口号和方法,这里的方法还是用lambda表达式,捕捉列表捕捉r和tp,参数列表依旧是sockfd,消息和InetAddr对象。
// 3.网络服务器对象提供通信功能std::unique_ptr<UdpServer> server = std::make_unique<UdpServer>(port,[&r, &tp](int sockfd, const std::string &message, InetAddr &peer){
});
在lambda表达式的函数体部分,我们需要任务,会用到bind函数,然后将任务入队列。
// UdpServer.cc文件
using namespace MyThreadPool;
using task_t = std::function<void()>;// 格式为:udpserver port
int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage:" << argv[0] << " port" << std::endl;return 1;}uint16_t port = std::stoi(argv[1]);Refresh_Log_To_Console();// 1.路由对象提供路由功能Route r;// 2.线程池auto tp = ThreadPool<task_t>::GetInstance(); // 获取单例// 3.网络服务器对象提供通信功能std::unique_ptr<UdpServer> server = std::make_unique<UdpServer>(port, [&r, &tp](int sockfd, const std::string &message, InetAddr &peer){task_t t = std::bind(&Route::MessageRoute, &r, sockfd, message, peer); // 布置任务tp->Equeue(t); // 入队列});server->Init();server->Start();return 0;
}
然后运行server和client程序。

client端发一次消息,就会唤醒一个线程,有线程负责消息转发。
但是此时我们的代码是线程不安全的,主要的原因是可能会有多个线程访问vector,而STL容器并不是线程安全的,将来我们需要定制协议,现在就简单粗暴的上个锁。
// Route.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "InetAddr.hpp"
#include "MyLog.hpp"
#include "Mutex.hpp"using namespace MyLog;
using namespace MyMutex;
class Route
{
private:bool IsExist(InetAddr &peer) // 判断用户是否存在{for (auto &user : _online_user){if (user == peer)return true;}return false;}void AddUser(InetAddr &peer){LOG(LogLevel::INFO) << "新增一个在线用户: " << peer.StringAddr();_online_user.push_back(peer);}void DeleteUser(InetAddr &peer){for (auto i = _online_user.begin(); i != _online_user.end(); i++) // 迭代器遍历{if (*i == peer){LOG(LogLevel::INFO) << "删除一个在线用户" << peer.StringAddr();_online_user.erase(i);break;}}}public:Route() {}void MessageRoute(int sockfd, const std::string &message, InetAddr &peer){LockGuard lg(&_mutex);if (!IsExist(peer))AddUser(peer);// 发消息std::string send_message = peer.StringAddr() + "# " + message;for (auto &user : _online_user){sendto(sockfd, send_message.c_str(), send_message.size(), 0,(struct sockaddr *)&user.NetAddr(), sizeof(user.NetAddr()));}// 退出的时候,退出消息也要先发给所有人,再退出if (message == "QUIT"){DeleteUser(peer);}}~Route() {}private:std::vector<InetAddr> _online_user; // 记录在线用户Mutex _mutex;
};
6.补充知识
本节只介绍基于IPv4的socket⽹络编程,sockaddr_in中的成员struct in_addr sin_addr表⽰32位 的IP
地址但是我们通常⽤点分⼗进制的字符串表⽰IP 地址,以下函数可以在字符串表⽰ 和in_addr表⽰之间转换。
字符串转in_addr的函数:
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>int inet_aton(const char *cp, struct in_addr *inp);
in_addr_t inet_addr(const char *cp);
int inet_pton(int af, const char *src, void *dst);
inet_pton就是把一个字符串ip转成四字节ip,第一个参数就是协议家族,我们用这个接口比之前用的inet_addr安全一些。
我们之前的InetAddr类初始化的时候不传网络地址初始化,而是用本机ip和port初始化。
class InetAddr
{
public:InetAddr(struct sockaddr_in &addr) : _addr(addr) // 传参为网络地址{// 网络转主机_port = ntohs(addr.sin_port); // 网络序列转主机序列_ip = inet_ntoa(addr.sin_addr); // 4字节网络序列转点分十进制q}InetAddr(const std::string ip, uint16_t port) : _ip(ip), _port(port) // 传参为主机地址{//主机转网络inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr);}//...
private:struct sockaddr_in _addr; // 网络序列std::string _ip;uint16_t _port;
};
将字符串格式的IP (_ip)转为4字节的网络序列,第三个参数相当于是一个缓冲区,要把转好的四字节ip写入到哪里?就写入到网络序列的sin_addr里,而_addr就是网络序列的结构体,就等同于UdpServer的sin_addr设置,这里就不需要了。

其他的设置也可以在构造函数里填充好。
InetAddr(const std::string ip, uint16_t port) : _ip(ip), _port(port) // 传参为主机地址{//主机转网络memset(&_addr, 0, sizeof(_addr)); // 清0_addr.sin_family = AF_INET;_addr.sin_port = htons(_port);inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr);}
UdpServer的相应设置就可以都不要了,换成结构体定义对象。
InetAddr local("0", _port); // 直接就将主机转网络
local.NetAddr(); // 获得网络地址

in_addr转字符串的函数:
char *inet_ntoa(struct in_addr in);
const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);
inet_ntoa这个函数返回了⼀个字符串的地址char*, 那么这个字符串本身在哪里?很显然是这个函数⾃⼰在内部为我们申请了⼀块内存来保存ip的结果,那么是否需要调⽤者⼿动释放呢?

man⼿册上说, inet_ntoa函数, 是把这个返回结果放到了静态存储区. 这个时候不需要我们⼿动进⾏释放。
那么问题来了, 如果我们调⽤多次这个函数, 会有什么样的效果呢? 参⻅如下代码:


因为inet_ntoa把结果放到⾃⼰内部的⼀个静态存储区, 这样第⼆次调⽤时的结果会覆盖掉上⼀次的结果。结论就是这个函数存在线程安全问题,因为它使用了静态空间。
所以我们更推荐使用inet_ntop函数,第一个参数是协议家族;第二个参数是const参数,只能输入型,我们要传4字节网络序列的ip过去;第三个参数是输出型参数,把转换好的ip带出去;第四个参数就是缓冲区的长度。
InetAddr(struct sockaddr_in &addr) : _addr(addr) // 传参为网络地址{// 网络转主机_port = ntohs(addr.sin_port); // 网络序列转主机序列// 4字节网络序列转点分十进制q//_ip = inet_ntoa(addr.sin_addr); char ip_buffer[64];inet_ntop(AF_INET, &_addr.sin_addr, ip_buffer, sizeof(ip_buffer));_ip = ip_buffer;}
此时我们自己定义了缓冲区,不会互相冲突,比inet_ntoa更安全。
本篇分享就到这里,我们下篇见~

