C++负载均衡远程调用学习之Agent代理模块基础构建
目录
1.课程复习
2.Lars-lbAgentV0.1-udpserver启动
3.Lars-lbAgentV0.1-dns-reporter-client-thread启动
4.Lars-lbAgentV0.1-dns-client实现
5.Lars-lbAgentV0.1-dns-client编译错误修正
6.Lars-lbAgentV0.1-reporter_client实现
1.课程复习
### 11.2 完成Lars Reactor V0.9开发
 
 > serv.conf
 
 ```conf
 [reactor]
 ip = 127.0.0.1
 port = 7777
 maxConn = 1024
 threadNum = 5
 ```
 
 
 
 > server.cpp
 
 ```c
 #include "tcp_server.h"
 #include <string>
 #include <string.h>
 #include "config_file.h"
 
 
 //回显业务的回调函数
 void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
 {
     printf("callback_busi ...\n");
     //直接回显
     conn->send_message(data, len, msgid);
 }
 
 //打印信息回调函数
 void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
 {
     printf("recv client: [%s]\n", data);
     printf("msgid: [%d]\n", msgid);
     printf("len: [%d]\n", len);
 }
 
 
 //新客户端创建的回调
 void on_client_build(net_connection *conn, void *args)
 {
     int msgid = 101;
     const char *msg = "welcome! you online..";
 
     conn->send_message(msg, strlen(msg), msgid);
 }
 
 //客户端销毁的回调
 void on_client_lost(net_connection *conn, void *args)
 {
     printf("connection is lost !\n");
 }
 
 
 int main() 
 {
     event_loop loop;
 
     //加载配置文件
     config_file::setPath("./serv.conf");
     std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
     short port = config_file::instance()->GetNumber("reactor", "port", 8888);
 
     printf("ip = %s, port = %d\n", ip.c_str(), port);
 
     tcp_server server(&loop, ip.c_str(), port);
 
     //注册消息业务路由
     server.add_msg_router(1, callback_busi);
     server.add_msg_router(2, print_busi);
 
     //注册链接hook回调
     server.set_conn_start(on_client_build);
     server.set_conn_close(on_client_lost);
 
     loop.event_process();
 
     return 0;
 }
 ```
2.Lars-lbAgentV0.1-udpserver启动
## 12) udp服务与客户端
 
         接下来为了让Reactor框架功能更加丰富,结合之前的功能,再加上udpserver的服务接口。udp我们暂时不考虑加线程池实现,只是单线程的处理方式。
3.Lars-lbAgentV0.1-dns-reporter-client-thread启动
> lars_reactor/src/udp_server.cpp
 
 ```c
 #include <signal.h>
 #include <unistd.h>
 #include <strings.h>
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <stdio.h>
 #include <string.h>
 #include "udp_server.h"
 
 
 void read_callback(event_loop *loop, int fd, void *args)
 {
     udp_server *server = (udp_server*)args;
 
     //处理业务函数
     server->do_read();
 }
 
 void udp_server::do_read()
 {
     while (true) {
         int pkg_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, (struct sockaddr *)&_client_addr, &_client_addrlen);
 
         if (pkg_len == -1) {
             if (errno == EINTR) {
                 continue;
             }
             else if (errno == EAGAIN) {
                 break;
             }
             else {
                 perror("recvfrom\n");
                 break;
             }
         }
 
         //处理数据
         msg_head head; 
         memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
         if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkg_len) {
             //报文格式有问题
             fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkg_len = %d\n", head.msgid, head.msglen, pkg_len);
             continue;
         }
 
         //调用注册的路由业务
         _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
     }
 }
4.Lars-lbAgentV0.1-dns-client实现
udp_server::udp_server(event_loop *loop, const char *ip, uint16_t port)
 {
     //1 忽略一些信号
     if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
         perror("signal ignore SIGHUB");
         exit(1);
     }
     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
         perror("signal ignore SIGPIPE");
         exit(1);
     }
     
     //2 创建套接字
     //SOCK_CLOEXEC在execl中使用该socket则关闭,在fork中使用该socket不关闭
     _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
     if (_sockfd == -1) {
         perror("create udp socket");
         exit(1);
     }
 
     //3 设置服务ip+port
     struct sockaddr_in servaddr;
     bzero(&servaddr, sizeof(servaddr));
     servaddr.sin_family = AF_INET;
     inet_aton(ip, &servaddr.sin_addr);//设置ip
     servaddr.sin_port = htons(port);//设置端口
 
     //4 绑定
     bind(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
     
     //3 添加读业务事件
     _loop = loop;
 
     bzero(&_client_addr, sizeof(_client_addr));
     _client_addrlen = sizeof(_client_addr);
     
 
     printf("server on %s:%u is running...\n", ip, port);
 
     _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
     
 }
 
 int udp_server::send_message(const char *data, int msglen, int msgid)
 {
     if (msglen > MESSAGE_LENGTH_LIMIT) {
         fprintf(stderr, "too large message to send\n");
         return -1;
     }
 
     msg_head head;
     head.msglen = msglen;
     head.msgid = msgid;
 
     memcpy(_write_buf,  &head, MESSAGE_HEAD_LEN);
     memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);
 
     int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, (struct sockaddr*)&_client_addr, _client_addrlen);
     if (ret == -1) {
         perror("sendto()..");
         return -1;
     }
 
     return ret;
 }
5.Lars-lbAgentV0.1-dns-client编译错误修正
//注册消息路由回调函数
 void udp_server::add_msg_router(int msgid, msg_callback* cb, void *user_data)
 {
     _router.register_msg_router(msgid, cb, user_data);
 }
 
 udp_server::~udp_server()
 {
     _loop->del_io_event(_sockfd);
     close(_sockfd);
 }
 ```
 
 
 
         这里面实现的方式和tcp_server的实现方式几乎一样,需要注意的是,udp的socket编程是不需要listen的,而且也不需要accept。所以recvfrom就能够得知每个包的对应客户端是谁,然后回执消息给对应的客户端就可以。因为没有连接,所以都是以包为单位来处理的,一个包一个包处理。可能相邻的两个包来自不同的客户端。
6.Lars-lbAgentV0.1-reporter_client实现
### 12.2 udp_client客户端功能实现
 
 > lars_reactor/include/udp_client.h
 
 ```h
 #pragma once
 
 #include "net_connection.h"
 #include "message.h"
 #include "event_loop.h"
 
 class udp_client: public net_connection
 {
 public:
     udp_client(event_loop *loop, const char *ip, uint16_t port);
     ~udp_client();
 
     void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL);
 
     virtual int send_message(const char *data, int msglen, int msgid);
 
     //处理消息
     void do_read();
 
 private:
 
     int _sockfd;
 
     char _read_buf[MESSAGE_LENGTH_LIMIT];
     char _write_buf[MESSAGE_LENGTH_LIMIT];
 
     //事件触发
     event_loop *_loop;
 
     //消息路由分发
     msg_router _router;
 };
 ```
 
 
 
 > lars_reactor/src/udp_client.cpp
 
 ```c
 #include "udp_client.h"
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <arpa/inet.h>
 #include <unistd.h>
 #include <strings.h>
 #include <string.h>
 #include <stdio.h>
 
 
 void read_callback(event_loop *loop, int fd, void *args)
 {
     udp_client *client = (udp_client*)args;
     client->do_read();
 }
