当前位置: 首页 > news >正文

基于脚手架微服务的视频点播系统-脚手架开发部分-FFmpeg,Etcd-SDK的简单使用与二次封装

基于脚手架微服务的视频点播系统-脚手架开发部分-FFmpeg,Etcd-SDK的简单使用与二次封装

  • 一.FFmpeg
    • 1.1HLS协议
    • 1.2libffmpeg 实现视频分片
    • 1.3开发包的安装
    • 1.4使用过程中的关键头文件
    • 1.5关键结构
    • 1.6关键接口
      • 视频相关操作
      • 时间基转换
      • 字典选项
        • HLS分片的相关选项
    • 1.7简单使用样例
    • 1.8二次封装
      • 使用样例
  • 二.Etcd-SDK
    • 2.1Etcd是什么
    • 2.2安装Etcd服务
    • 2.3客户端SDK
      • 2.3.1依赖安装
    • 2.4开发过程中需要使用到的头文件
    • 2.5接口
    • 2.6简单使用样例
      • 2.6.1普通键值对
      • 2.6.2目录型键值对+保活与监听机制
      • 2.6.3makefile
    • 2.7二次封装
      • 2.7.1设计
      • 2.7.2服务的注册
      • 2.7.3服务的发现
      • 2.7.4具体实现
    • 2.8简单使用样例
      • makefile
    • 2.9 etcd-brpc实现远程服务监控与调用
      • makefile

源码链接

一.FFmpeg

1.1HLS协议

HLS协议是 Apple 提出的基于 HTTP 的流媒体传输协议。它已被业界⼴泛采纳,成为主要的流媒体传输⽅案之⼀(尤其是在点播、直播和时移回看领域)。
其工作流程大致可以分为以下三步:

  1. 编码与分⽚ :
    原始视频被转码为多码率版本,并切割为 TS 分⽚(通常 2~10 秒)。
  2. ⽣成 M3U8 播放列表 :
    每个码率对应⼀个 M3U8 ⽂件,主播放列表(Master Playlist)描述所有可⽤码率。
  3. 客⼾端拉取 :
    播放器根据网络状况选择合适码率,按顺序下载分⽚并播放。
    在这里插入图片描述
    所以接下来就是我们之前客户端提到过的m3u8文件了。我们来看一个样例m3u8文件:
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-TARGETDURATION:10
#EXT-X-MEDIA-SEQUENCE:0
#EXTINF:10.0,
http://192.168.65.130:9000/segment0.ts
#EXTINF:8.5,
segment1.ts
#EXTINF:9.2,
segment2.ts
#EXT-X-ENDLIST

其中各个标签对应的含义如下:

标签说明
#EXTM3U ⽂件头,标识 M3U8 格式
#EXT-X-VERSION指定 HLS 协议版本(如 3、6、7)
#EXT-X-PLAYLIST-TYPE播放类型, VOD 表⽰当前视频为点播类型
#EXT-X-TARGETDURATION所有分⽚的最⼤时⻓(单位:秒)
#EXT-X-MEDIA-SEQUENCE第⼀个分⽚的序列号(⽤于直播流滑动窗⼝)
#EXTINF分⽚时⻓和路径
#EXT-X-ENDLIST标识点播流结束(直播流⽆此标签)

我们这里将以HLS分⽚处理作为样例讲解 libffmpeg 的简单使⽤流程。

1.2libffmpeg 实现视频分片

FFmpeg 与 libffmpeg 的关系

  • FFmpeg :开源⾳视频处理⼯具,⽀持编解码、转封装、流媒体处理等。
  • libffmpeg :FFmpeg的编程接⼝库,可直接集成到 C/C++ 项⽬中。
库名功能
libavcodec编解码核⼼库
libavformat封装与解封装
libavutil通⽤⼯具(如时间计算、⽇志)
libswscale图像缩放与⾊彩空间转换

1.3开发包的安装

sudo apt install libavcodec-dev libavformat-dev libavutil-dev libswscale-dev

1.4使用过程中的关键头文件

#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include <libavutil/opt.h>
#include <libavutil/error.h>

1.5关键结构

typedef struct AVFormatContext
{struct AVInputFormat *iformat;  // 输⼊格式化对象struct AVOutputFormat *oformat; // 输出格式化对象unsigned int nb_streams;        // 媒体流的数量(如视频流、⾳频流、字幕流)AVStream **streams;             // 指向所有流的指针数组,通过索引访问int64_t start_time;             // 媒体⽂件的起始时间戳int64_t duration;               // 媒体⽂件的总时⻓int64_t bit_rate;               // 全局码率....
};
struct AVStream
{int id;                      // 流的唯⼀标识符int64_t start_time;          // 流的起始时间戳int64_t duration;            // 流的总时⻓int64_t nb_frames;           // 流的总帧数AVRational time_base; //时间基(时间戳单位),如 {1, 1000} 表⽰毫秒AVRational avg_frame_rate;   // 平均帧率(如 {25, 1} 表⽰ 25 FPS)AVRational r_frame_rate;     // 标称帧率⽤于关键帧间隔计算。AVCodecParameters *codecpar; // 编解码参数(如编码格式、分辨率、采样率等)...
} 
struct AVCodecParameters
{enum AVMediaType codec_type; // 媒体类型:如 AVMEDIA_TYPE_VIDEO、AVMEDIA_TYPE_AUDIOenum AVCodecID codec_id; // 编解码器ID:如AV_CODEC_ID_H264、AV_CODEC_ID_AACuint32_t codec_tag;      // 格式特定的编解码标签int format;              // 数据格式:视频/⾳频int64_t bit_rate;        // 码率(单位:bps),若未知则为 0int width;               // 视频分辨率(单位:像素)int height;...
}
typedef struct AVPacket
{int64_t pts;      // 显⽰时间戳,单位:stream->time_baseint64_t dts;      // 解码时间戳,单位:stream->time_baseint size;         // 数据包⼤⼩(字节数)int stream_index; // 所属流的索引int64_t duration; // 数据包持续时间int64_t pos;      // 数据包在输⼊⽂件中的字节偏移量...
} 
typedef struct AVDictionaryEntry
{char *key;char *value;
} AVDictionaryEntry

1.6关键接口

视频相关操作

/* 功能:打开输⼊⽂件/流,初始化 AVFormatContext。
参数:
ps:指向 AVFormatContext 指针的指针,函数内部分配并填充该结构体。
url:输⼊⽂件路径或 URL(如 "input.mp4"、"rtmp://example.com/live")。
fmt:强制指定输⼊格式(如 AVInputFormat),通常设为 NULL ⾃动探测。
options:附加选项(如设置超时、协议参数),可为 NULL返回值:<0表⽰失败*/
int avformat_open_input(AVFormatContext **ps,const char *url,AVInputFormat *fmt,AVDictionary **options);
/* 功能:读取输⼊⽂件的流信息(如编码参数、帧率、时⻓)。
参数:
ic:已初始化的 AVFormatContext。
options:流探测的附加选项(如限制探测时⻓),可为 NULL
返回值:<0表⽰失败*/
int avformat_find_stream_info(AVFormatContext *ic, AVDictionary **options);
/* 功能:分配输出格式的上下⽂(AVFormatContext)。
参数:
ctx:输出 AVFormatContext 的双重指针。
oformat:强制指定输出格式,设为 NULL 时由 format_name 或 filename 推断。
format_name:输出格式短名称(如 "mp4"、"hls")。
filename:输出⽂件名或 URL,⽤于辅助推断格式。 对于hls输出⽂件名: xx.m3u8
返回值:<0表⽰失败*/
int avformat_alloc_output_context2(AVFormatContext **ctx,AVOutputFormat *oformat,const char *format_name,const char *filename);
/* 功能:释放由 avformat_alloc_output_context2 分配的 AVFormatContext。
参数:
s:待释放的 AVFormatContext 指针。*/
void avformat_free_context(AVFormatContext *s);
/* 功能:关闭输⼊⽂件并释放 AVFormatContext 相关资源。
参数:
s:指向 AVFormatContext 指针的指针,调⽤后指针会被置为 NULL*/
void avformat_close_input(AVFormatContext **s);
/* 功能:为输出上下⽂创建新的流。
参数:
s:输出格式的 AVFormatContext。
c:流的编解码器(可为 NULL,后续⼿动设置编码参数)。
返回值:NULL-失败*/
AVStream *avformat_new_stream(AVFormatContext *s, const AVCodec *c);
/* 功能:复制编解码参数(从输⼊流到输出流)。
参数:
dst:⽬标流的 AVCodecParameters。
src:源流的 AVCodecParameters
返回值:<0表⽰失败*/
int avcodec_parameters_copy(AVCodecParameters *dst, const AVCodecParameters*src);
/* 功能:写⼊输出⽂件的头部信息。
参数:
s:输出的 AVFormatContext。options: 配置字典选项,根据字典选项决定写⼊头部内容*/
int avformat_write_header(AVFormatContext *s, AVDictionary **options);
// 返回值:<0表⽰失败
/* 功能:从输⼊⽂件中读取⼀个数据包(AVPacket)。
参数:
s:输⼊的 AVFormatContext。
pkt:指向 AVPacket 的指针,⽤于存储读取的数据
返回值:<0表⽰失败或读取结束*/
int av_read_frame(AVFormatContext *s, AVPacket *pkt);
/* 功能:释放 AVPacket 中的资源(如数据缓冲区)。
参数:
pkt:待释放的 AVPacket。
注意:每次调⽤ av_read_frame 后必须调⽤此函数,避免内存泄漏*/
void av_packet_unref(AVPacket *pkt);
/* 功能:将数据包按时间戳顺序写⼊输出⽂件(⾃动处理交织)。
参数:
s:输出的 AVFormatContext。
pkt:待写⼊的 AVPacket。
返回值:<0表⽰失败*/
int av_interleaved_write_frame(AVFormatContext *s, AVPacket *pkt);
/* 功能:写⼊输出⽂件的尾部信息(如 MP4 的 moov 原⼦)。
参数:
s:输出的 AVFormatContext。*/
int av_write_trailer(AVFormatContext *s);
/*
功能:获取每个接⼝执⾏失败的原因,也就是前边各个接⼝的返回值
*/
int av_strerror(int errnum, char *errbuf, size_t errbuf_size);

时间基转换

/* 功能:将时间戳从⼀个时间基转换到另⼀个时间基。
公式: result=a×bq/cq
参数:
a:原始时间戳。
bq:原时间基。
cq:⽬标时间基。
返回值:转换后的时间戳
*/
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
/* 功能:批量转换数据包的 pts、dts 和 duration 到新时间基。
参数:
pkt:待处理的 AVPacket。
tb_src:原时间基。
tb_dst:⽬标时间基。
*/
void av_packet_rescale_ts(AVPacket *pkt, AVRational tb_src, AVRational tb_dst);
/*
AV_TIME_BASE 是 FFmpeg 内部使⽤的时间基准(Time Base)常量,定义在 libavutil 库
中。
它代表了 FFmpeg 内部计算时间和持续时间时使⽤的基本时间单位
在视频处理(特别是在FFmpeg等多媒体框架中),时间基准(Time Base) 是最核⼼的底层时间
度量单位,
其作⽤是定义时间戳(PTS/DTS)和持续时间的数值如何映射到真实时间。
*/
AVRational AV_TIME_BASE_Q = {1, AV_TIME_BASE};

字典选项

/* 功能:设置字典选项
参数:
pm:字典对象指针,若为*pm为NULL,则会申请空间并添加字典数据
key:字典字段key
value:为key字段要设置的数据
flags: 默认为0,函数内部会⾃动复制键名(key)和转换后的值字符串,调⽤者⽆需管理
内存
*/
int av_dict_set_int(AVDictionary **pm, const char *key, int64_t value, int flags);
int av_dict_set(AVDictionary **pm, const char *key, const char *value, int flags);
/* 释放字典对象资源*/
void av_dict_free(AVDictionary **m);
HLS分片的相关选项
选项功能
hls_time指定每个 TS 分⽚的时⻓(单位:秒)
hls_base_url设置 M3U8 ⽂件中分⽚ URL 的基础路径
hls_list_size控制播放列表中保留的分⽚数量,0表⽰不做限制
hls_playlist_type定义播放列表类型,影响 M3U8 结构。
• vod :点播模式,添加 EXT-X-ENDLIST ,列表不再更新。
• event :事件流,列表持续更新但不删除旧分⽚
hls_flags通过标志位启⽤⾼级功能(多选时⽤ + 分隔)
• split_by_time :强制按 hls_time 分⽚,忽略关键帧间隔。
• independent_segments :标记分⽚可独⽴解码,提升兼容性。
• delete_segments :⾃动删除已被播放列表移除的旧分⽚(直播节省空间)
• append_list :在已有 M3U8 ⽂件末尾追加新分⽚(重启时不覆盖)
hls_segment_filename⾃定义分⽚⽂件名格式(⽀持 %d 占位符),例如:“segment_%03d.ts”
hls_key_info_file指定加密分⽚的密钥信息⽂件路径(启⽤ AES-128 加密)
hls_allow_cache控制客⼾端是否缓存分⽚(通过 EXT-X-ALLOW-CACHE 标签)
• 1 (允许缓存)或 0 (禁⽤缓存)
hls_init_time设置初始分⽚的时⻓(⾸个分⽚可能较短,⽤于快速起播)
hls_ts_options传递额外参数给 TS 分⽚编码器(如设置视频参数)

1.7简单使用样例

简单样例使用流程

  1. 打开输⼊⽂件,并初始化输⼊格式化对象
  2. 查找输⼊视频参数
  3. 申请输出格式化对象
  4. 遍历输⼊媒体流
    a. 为输出对象申请媒体流
    b. 从输⼊媒体流复制解码器参数到输出媒体流中
  5. 设置分⽚字典选项
  6. 通过输出格式化对象,输出媒体头部信息
  7. 遍历输⼊流中的数据帧
    a. 将数据包中的时间戳,从输⼊流的时间基转换为输出流的时间基的时间戳
    i. 若数据帧的显⽰时间戳⽆效(AV_NOPTS_VALUE),则默认为从0开始的默认时间基
    ii. 将从0开始的默认时间基转换为输⼊流时间基,重新进⾏a操作
    b. 将数据帧写⼊输出格式化对象中
    c. 释放帧结构资源
  8. 通过输出格式化对象,输出媒体尾部信息
  9. 释放资源:字典选项,输⼊格式化对象,输出格式化对象
extern "C"{
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include <libavutil/opt.h>
#include <libavutil/error.h>
}
#include <iostream>const char* mavError(int err_code) {static char errmsg[256];av_strerror(err_code, errmsg, 255);return errmsg;
}//编写一个简单的mp4转hls的程序,主要参考ffmpeg官网的示例代码
int main(int argc, char* argv[]) {if(argc != 3){std::cout << "Usage: " << argv[0] << " input.mp4 output.m3u8" << std::endl;return -1;}//1. 打开输⼊⽂件,并初始化输⼊格式化对象AVFormatContext* inputContext = nullptr, *outputContext = nullptr;int ret = avformat_open_input(&inputContext, argv[1], nullptr, nullptr);if(ret < 0){std::cout << "打开目标输入文件失败:" << argv[1] << "错误原因:" << mavError(ret) << std::endl;return -1;}//2. 通过输入格式化上下文对象解析视频文件元信息,并获取视频流信息(如编解码器参数、帧率、时⻓)ret = avformat_find_stream_info(inputContext, nullptr);if(ret < 0){std::cout << "解析输入文件元信息失败:" << mavError(ret) << std::endl;return -1;}//3. 申请创建输出格式化上下文对象,并且设定输出格式 hlsret = avformat_alloc_output_context2(&outputContext, nullptr, "hls", argv[2]);if(ret < 0){std::cout << "创建输出格式化上下文对象失败:" << mavError(ret) << std::endl;return -1;}//4. 遍历输入格式化上下文中的媒体流信息,为输出格式化上下文对象创建媒体流,并复制编解码器参数for(int i = 0;i < inputContext->nb_streams;i++){AVStream * inputStream = inputContext->streams[i];AVStream * outputStream = avformat_new_stream(outputContext, nullptr);//创建输出媒体流avcodec_parameters_copy(outputStream->codecpar, inputStream->codecpar);//复制编解码器参数outputStream->avg_frame_rate = inputStream->avg_frame_rate;//复制帧率outputStream->r_frame_rate = inputStream->r_frame_rate;//标称帧率⽤于关键帧间隔计算}//5. 设置HLS转码的各项细节参数:播放类型-点播vod,分片时间, 路径前缀 http://192.168.30.128/video/AVDictionary * options = nullptr;av_dict_set(&options, "hls_time", "10", 0);//分片时间-分片时不一定严格按照10s,取决于关键帧间隔av_dict_set(&options, "hls_base_url", "http://192.168.30.128:9000/video/", 0);//路径前缀av_dict_set(&options, "hls_playlist_type", "vod", 0);//播放类型-点播av_dict_set(&options, "hls_flags", "independent_segments", 0);//设置HLS转码参数-标识分片可独立解码//6. 通过输出格式化上下文,向输出文件写入头部信息ret = avformat_write_header(outputContext, &options);if(ret < 0){std::cout << "写入输出文件头部信息失败:" << mavError(ret) << std::endl;return -1;}//7. 遍历输入格式化上下文中的数据帧,并将其写入输出文件中AVPacket pkt;while(av_read_frame(inputContext, &pkt) >= 0){AVStream *inputStream = inputContext->streams[pkt.stream_index];AVStream *outputStream = outputContext->streams[pkt.stream_index];//1. 将数据包中的时间戳,从输入流的时间基转换为输出流的时间基if (pkt.pts == AV_NOPTS_VALUE) {// 若当前数据帧显示时间戳无效,则将时间戳设置为从低0s开始的时间戳pkt.pts = av_rescale_q(0, AV_TIME_BASE_Q, inputStream->time_base);pkt.dts = pkt.pts;}av_packet_rescale_ts(&pkt, inputStream->time_base, outputStream->time_base);//2. 将数据帧通过输出格式化上下文对象,写入输出文件中ret = av_interleaved_write_frame(outputContext, &pkt);if (ret < 0) {std::cout << "输出数据帧失败:" << mavError(ret) << std::endl;return -1;}//3. 释放数据帧av_packet_unref(&pkt);}//8. 向输出文件写入文件尾部信息ret = av_write_trailer(outputContext);if (ret < 0) {std::cout << "输出尾部信息失败:" << mavError(ret) << std::endl;return -1;}//9. 转码完成,释放资源:参数字典对象,输入格式化上下文对象,输出格式化上下文对象av_dict_free(&options);avformat_close_input(&inputContext);//关闭输入文件-注意需要使用的是avformat_close_input而不是avformat_free_contextavformat_free_context(outputContext);return 0;
}

我们可以写一个简单服务端然后使用该服务端通过浏览器下载我们的分片文件,如果能正常播放下载下来的分片文件就说明简单使用没有问题:

/*1.创建server对象2.设置监听端口3.设置路由4.启动服务
*/
#include <httplib.h>int main(){httplib::Server svr;//设置静态路径以便访问静态文件svr.set_mount_point("/", "./wwwroot/");svr.listen("0.0.0.0", 9000);return 0;
}

1.8二次封装

在视频转码这⾥,⽬前我们的需求有两个:

  1. 针对视频⽂件进⾏转码,⽣成转码后的视频⽂件
  2. 解析⽣成M3U8⽂件内容,获取内容结构,并能够⽣成新的M3U8⽂件
    针对这两个需求,我们封装两个类即可:
    M3U8信息类
    成员变量
    1.M3U8⽂件名称
    头部字段列表(有序)
    分⽚信息列表(有序)
    2.成员⽅法
    M3U8⽂件解析
    M3U8⽂件⽣成
    获取分⽚信息列表
    获取头部字段列表
    HLS转码类
    1.成员变量
    HLS分⽚参数对象
    2.成员⽅法
    视频⽂件转码
//limeffmpeg.h
#pragma once
extern "C"
{
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include <libavutil/opt.h>
#include <libavutil/error.h>
}
#include <iostream>
#include <string>
#include <vector>
#include "limelog.h"
#include "limeutil.h"namespace limeffmpeg
{const std::string HLS_EXTM3U = "#EXTM3U";const std::string HLS_VERSION = "#EXT-X-VERSION:";const std::string HLS_TARGETDURATION = "#EXT-X-TARGETDURATION:";const std::string HLS_SEQUENCE = "EXT-X-MEDIA-SEQUENCE:";const std::string HLS_PLAYLIST_TYPE = "EXT-X-PLAYLIST-TYPE";const std::string HLS_INDEPENDENT_SEGMENTS = "#EXT-X-INDEPENDENT-SEGMENTS";const std::string HLS_ENDLIST = "#EXT-X-ENDLIST";const std::string HLS_EXTINF = "#EXTINF:";class M3U8InFo{public:using StrPair = std::pair<std::string, std::string>;M3U8InFo(const std::string &filename);bool write();bool parse();const std::string &getFilename() const;std::vector<std::string> &getHeaders();std::vector<StrPair> &getPieces();private:std::string _filename;std::vector<std::string> _headers;std::vector<StrPair> _pieces;};struct trans_settings{int hls_time;//单个切片的时长std::string hls_base_url;//基准URLstd::string hls_playlist_type;//播放类型};class HLSTranscoder{public:static bool transcode(const std::string &input_file,const std::string &output_file,const trans_settings &ts);private:static std::string mavError(int error_code);};
} // namespace limeffmpeg
//limeffmpeg.cc
#include "limeffmpeg.h"namespace limeffmpeg
{M3U8InFo::M3U8InFo(const std::string &filename): _filename(filename){}bool M3U8InFo::write(){// 将M3U8InFo类的信息写入到磁盘//创建输入流std::stringstream ss;//写入文件头for(auto &header : _headers){ss << header << "\n";}//写入分片信息for(auto &piece : _pieces){ss << piece.first << "\n";ss << piece.second << "\n";}//写入文件尾ss << HLS_ENDLIST;//写入文件bool ret = limeutil::LimeFile::write(_filename, ss.str());if (!ret){ERR("写入M3U8文件失败:{}", _filename);return false;}return true;}bool M3U8InFo::parse(){// 解析M3U8文件的信息到当前M3U8InFo类中std::string content;bool ret = limeutil::LimeFile::read(_filename, content);if (!ret){ERR("读取M3U8文件失败:{}", _filename);return false;}//提取m3u8文件信息std::vector<std::string> lines;int count = limeutil::LimeSTR::split(content, "\n", lines);//提取文件头信息//提取分片信息for(int i = 0;i < count;i++){//到达末尾时结束提取if(lines[i] == HLS_ENDLIST){break;}if(lines[i].find(HLS_EXTINF) != std::string::npos){//分片信息_pieces.emplace_back(lines[i], lines[i+1]);_pieces.back().second = "hello.ts";i++;continue;}//文件头信息_headers.emplace_back(lines[i]);}return true;}const std::string &M3U8InFo::getFilename() const{return _filename;}std::vector<std::string> &M3U8InFo::getHeaders(){return _headers;}std::vector<M3U8InFo::StrPair> &M3U8InFo::getPieces(){return _pieces;}std::string HLSTranscoder::mavError(int err_code){static char errmsg[256];av_strerror(err_code, errmsg, 255);return errmsg;}bool HLSTranscoder::transcode(const std::string &input_file, const std::string &output_file, const trans_settings &ts){// 1. 打开输⼊⽂件,并初始化输⼊格式化对象AVFormatContext *inputContext = nullptr, *outputContext = nullptr;int ret = avformat_open_input(&inputContext, input_file.c_str(), nullptr, nullptr);if (ret < 0){ERR("打开输入文件失败:{},错误原因:{}", input_file.c_str(), mavError(ret));return false;}// 2. 通过输入格式化上下文对象解析视频文件元信息,并获取视频流信息(如编解码器参数、帧率、时⻓)ret = avformat_find_stream_info(inputContext, nullptr);if (ret < 0){ERR("解析输入文件元信息失败:{},错误原因:{}", input_file.c_str(), mavError(ret));// 确保资源释放avformat_close_input(&inputContext); // 关闭输入文件-注意需要使用的是avformat_close_input而不是avformat_free_contextreturn false;}// 3. 申请创建输出格式化上下文对象,并且设定输出格式 hlsret = avformat_alloc_output_context2(&outputContext, nullptr, "hls", output_file.c_str());if (ret < 0){ERR("创建输出格式化上下文对象失败:{},错误原因:{}", output_file.c_str(), mavError(ret));// 确保资源释放avformat_close_input(&inputContext); // 关闭输入文件-注意需要使用的是avformat_close_input而不是avformat_free_contextreturn false;}// 4. 遍历输入格式化上下文中的媒体流信息,为输出格式化上下文对象创建媒体流,并复制编解码器参数for (int i = 0; i < inputContext->nb_streams; i++){AVStream *inputStream = inputContext->streams[i];AVStream *outputStream = avformat_new_stream(outputContext, nullptr);   // 创建输出媒体流avcodec_parameters_copy(outputStream->codecpar, inputStream->codecpar); // 复制编解码器参数outputStream->avg_frame_rate = inputStream->avg_frame_rate;             // 复制帧率outputStream->r_frame_rate = inputStream->r_frame_rate;                 // 标称帧率⽤于关键帧间隔计算}// 5. 设置HLS转码的各项细节参数:播放类型-点播vod,分片时间, 路径前缀 http://192.168.30.128/video/AVDictionary *options = nullptr;av_dict_set(&options, "hls_time", std::to_string(ts.hls_time).c_str(), 0);// 分片时间-分片时不一定严格按照10s,取决于关键帧间隔av_dict_set(&options, "hls_base_url", ts.hls_base_url.c_str(), 0); // 路径前缀av_dict_set(&options, "hls_playlist_type", ts.hls_playlist_type.c_str(), 0);// 播放类型-点播av_dict_set(&options, "hls_flags", "independent_segments", 0);// 设置HLS转码参数-标识分片可独立解码// 6. 通过输出格式化上下文,向输出文件写入头部信息ret = avformat_write_header(outputContext, &options);if (ret < 0){ERR("写入输出文件头部信息失败:{},错误原因:{}", output_file.c_str(), mavError(ret));// 确保资源释放av_dict_free(&options);avformat_close_input(&inputContext); // 关闭输入文件-注意需要使用的是avformat_close_input而不是avformat_free_contextavformat_free_context(outputContext);return false;}// 7. 遍历输入格式化上下文中的数据帧,并将其写入输出文件中AVPacket pkt;while (av_read_frame(inputContext, &pkt) >= 0){AVStream *inputStream = inputContext->streams[pkt.stream_index];AVStream *outputStream = outputContext->streams[pkt.stream_index];// 1. 将数据包中的时间戳,从输入流的时间基转换为输出流的时间基if (pkt.pts == AV_NOPTS_VALUE){// 若当前数据帧显示时间戳无效,则将时间戳设置为从低0s开始的时间戳pkt.pts = av_rescale_q(0, AV_TIME_BASE_Q, inputStream->time_base);pkt.dts = pkt.pts;}av_packet_rescale_ts(&pkt, inputStream->time_base, outputStream->time_base);// 2. 将数据帧通过输出格式化上下文对象,写入输出文件中ret = av_interleaved_write_frame(outputContext, &pkt);if (ret < 0){ERR("输出数据帧失败:{},错误原因:{}", output_file.c_str(), mavError(ret));return false;}// 3. 释放数据帧av_packet_unref(&pkt);}// 8. 向输出文件写入文件尾部信息ret = av_write_trailer(outputContext);if (ret < 0){ERR("输出尾部信息失败:{},错误原因:{}", output_file.c_str(), mavError(ret));// 确保资源释放av_dict_free(&options);avformat_close_input(&inputContext); // 关闭输入文件-注意需要使用的是avformat_close_input而不是avformat_free_contextavformat_free_context(outputContext);return false;}// 9. 转码完成,释放资源:参数字典对象,输入格式化上下文对象,输出格式化上下文对象av_dict_free(&options);avformat_close_input(&inputContext); // 关闭输入文件-注意需要使用的是avformat_close_input而不是avformat_free_contextavformat_free_context(outputContext);return true;}
} // namespace limeffmpeg

使用样例

#include "../../source/limeffmpeg.h"bool test_ffmpeg(){std::string input_file = "./Cutlery.mp4";std::string output_file = "./wwwroot/Cutlery.m3u8";limeffmpeg::trans_settings settings{.hls_time = 10,.hls_base_url = "http://192.168.30.128:9000/wwwroot/",.hls_playlist_type = "vod"};//对目标文件进行转码bool ret = limeffmpeg::HLSTranscoder::transcode(input_file, output_file, settings);if(!ret){ERR("转码失败,输入文件为{}", input_file);return false;}return true;
}void parse_test(){std::string input_file = "./wwwroot/Cutlery.m3u8";limeffmpeg::M3U8InFo m3u8_info(input_file);m3u8_info.parse();for(auto &header : m3u8_info.getHeaders()){std::cout << "[" << header << "]" <<std::endl;}for(auto &piece : m3u8_info.getPieces()){std::cout << "[" << piece.first << "]" <<std::endl;std::cout << "[" << piece.second << "]" <<std::endl;}m3u8_info.write();
}int main(){limelog::limelog_init();bool ret = test_ffmpeg();if(ret)parse_test();return 0;
}

makefile:

hls:ffmpeg_test.cc ../../source/limeffmpeg.cc ../../source/limelog.cc ../../source/limeutil.ccg++ $^ -o $@ -std=c++17 -lavcodec -lavformat -lavutil -lspdlog -lgflags -lfmt -lpthread -ljsoncpp
clean:rm -f hls

二.Etcd-SDK

2.1Etcd是什么

Etcd是⼀个golang编写的分布式、⾼可⽤的⼀致性键值存储系统,⽤于配置共享和服务发现等。它使用Raft⼀致性算法来保持集群数据的⼀致性,且客⼾端通过⻓连接watch功能,能够及时收到数据变化通知,相较于Zookeeper框架更加轻量化。

2.2安装Etcd服务

  1. 安装Etcdsudo apt-get install etcd
  2. 启动Etcd服务sudo systemctl start etcd
  3. 如有需要可以设置Etcd为开机自启sudo systemctl enable etcd

2.3客户端SDK

由于Etcd官方并没有提供c++开发的相关接口,所以我们这里使用非官方的SDK进行etcd的二次封装。etcd-cpp-apiv3是⼀个etcd的C++版本客⼾端API。它依赖于mipsasm, boost, protobuf, gRPC, cpprestsdk等库。
其github地址为etcd-cpp-apiv3。

2.3.1依赖安装

sudo apt-get install libboost-all-dev libssl-dev
sudo apt-get install libprotobuf-dev protobuf-compiler-grpc
sudo apt-get install libgrpc-dev libgrpc++-dev
sudo apt-get install libcpprest-dev

api框架安装

git clone https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/usr
make -j$(nproc) && sudo make install

2.4开发过程中需要使用到的头文件

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>

2.5接口

namespace etcd
{class Value{bool is_dir();                  // 判断是否是⼀个⽬录std::string const &key();       // 键值对的key值std::string const &as_string(); // 键值对的val值int64_t lease();                // ⽤于创建租约的响应中,返回租约ID};// etcd会监控所管理的数据的变化,⼀旦数据产⽣变化会通知客⼾端// 在通知客⼾端的时候,会返回改变前的数据和改变后的数据class Event{enum class EventType{PUT,     // 键值对新增或数据发⽣改变DELETE_, // 键值对被删除INVALID,};enum EventType event_type();const Value &kv();      // 当前键值对的数据const Value &prev_kv(); // 改变前键值对的数据};class Response{bool is_ok()std::string const &error_message();Value const &value();            // 当前的数值 或者 ⼀个请求的处理结果Value const &prev_value();          // 之前的数值Value const &value(int index);      //std::vector<Event> const &events(); // 触发的事件using Values = std::vector<Value>;Values const &values() const; // 多组数据的响应结果--针对⽬录};
}// pplx::task 并⾏库异步结果对象
// 阻塞⽅式 get(): 阻塞直到任务执⾏完成,并获取任务结果
// ⾮阻塞⽅式 wait(): 等待任务到达终⽌状态,然后返回任务状态
namespace etcd
{class Client{// etcd_url: "http://127.0.0.1:2379"Client(std::string const &etcd_url,std::string const &load_balancer = "round_robin");// Put a new key-value pair 新增⼀个键值对pplx::task<Response> put(std::string const &key,std::string const &value);// 新增带有租约的键值对 (⼀定时间后,如果没有续租,数据⾃动删除)pplx::task<Response> put(std::string const &key,std::string const &value,const int64_t leaseId);pplx::task<Response> get(std::string const &key);// 获取⼀个指定key⽬录下的数据列表pplx::task<Response> ls(std::string const &key);// 创建并获取⼀个存活ttl时间的租约pplx::task<Response> leasegrant(int ttl);// 获取⼀个租约保活对象,其参数ttl表⽰租约有效时间pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);// Returns etcdv3::ERROR_KEY_NOT_FOUND if the key does not existpplx::task<Response> rm(std::string const &key);// Returns etcdv3::ERROR_KEY_NOT_FOUND if the no key been deleted.pplx::task<Response> rmdir(std::string const &key, bool recursive =false);// Watches for changes of a key or a subtreepplx::task<Response> watch(std::string const& key, bool recursive =false);// 撤销⼀个指定的租约pplx::task<Response> leaserevoke(int64_t lease_id);// 数据锁pplx::task<Response> lock(std::string const &key);pplx::task<Response> unlock(std::string const &lock_key);// Execute a etcd transactionpplx::task<Response> txn(etcdv3::Transaction const &txn);}; class KeepAlive{KeepAlive(Client const &client, int ttl, int64_t lease_id = 0);KeepAlive(std::string const &address,std::function<void(std::exception_ptr)> const &handler,int ttl,int64_t lease_id = 0);// 返回租约IDint64_t Lease();// 停⽌保活动作void Cancel();}; class Watcher{Watcher(Client const &client,std::string const &key,                 // 要监控的键值对keystd::function<void(Response)> callback, // 发⽣改变后的回调bool recursive = false);                // 是否递归监控⽬录下的所有数据改变Watcher(std::string const &address,std::string const &key,std::function<void(Response)> callback,bool recursive = false);// 阻塞等待,直到监控任务被停⽌bool Wait();// 异步⾮阻塞,设置任务停⽌时的回调函数,当任务被主动取消则传⼊truebool Wait(std::function<void(bool)> callback);bool Cancel();};
}

2.6简单使用样例

2.6.1普通键值对

//simple_get.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>int main(){std::string url = "http://192.168.30.128:2379";//1.创建etcd客户端etcd::Client etcd(url);//2.样例get一个键值对auto resp = etcd.get("hello").get();if(resp.is_ok() == false){std::cout << "get数据失败!" << std::endl;return -1;}std::cout << "get数据成功!" << std::endl;std::cout << "key:" << resp.value().key() << std::endl;std::cout << "value:" << resp.value().as_string() << std::endl;return 0;
}
//simple_put.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>int main() {std::string url = "http://192.168.30.128:2379";//1.创建etcd客户端etcd::Client etcd(url);//2.put一个样例键值对auto resp = etcd.put("hello", "world").get();if(resp.is_ok() == false){std::cout << "put数据失败!" << std::endl;return -1;}std::cout << "put数据成功!" << std::endl;return 0;
}

2.6.2目录型键值对+保活与监听机制

//dir_put.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>void put(shared_ptr<etcd::KeepAlive>& KeepAlive,const std::string& url ,const std::string& key, const std::string& value)
{etcd::Client etcd(url);//2.创建租约auto leasegrant_resp = etcd.leasegrant(3).get();//3秒保活时间,get获取原始responseif(leasegrant_resp.is_ok() == false){std::cout << "创建租约失败:" << leasegrant_resp.error_message() << std::endl;return;}//3.获取租约Idint64_t lease_id = leasegrant_resp.value().lease();//4.put一个带租约对象auto resp = etcd.put(key, value, lease_id).get();if(resp.is_ok() == false){std::cout << "put数据失败:" << resp.error_message() << std::endl;return;}auto handler = [&](const std::exception_ptr& eptr){//异常处理->重新进行put操作put(KeepAlive,url,key,value);       };KeepAlive.reset(new etcd::KeepAlive(url,handler,3, lease_id));
}int main() {//1.创建etcd客户端std::string url = "http://192.168.30.128:2379";std::string key = "/114514/231";std::string value = "1919810";etcd::Client etcd(url);//5.创建租约保活对象,每隔3秒发送一次保活请求shared_ptr<etcd::KeepAlive> KeepAlive;put(KeepAlive,url,key,value);//6.回车结束保活std::cout << "put数据成功!按回车结束保活!" << std::endl;getchar();return 0;
}
//dir_get.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>void handle_response(etcd::Response const& response) {if(response.is_ok() == false){std::cout << "监听出错!" << std::endl;return;}//当键值对状态变化时,会调用该回调函数auto events = response.events();for(auto& event : events){if(event.event_type() == etcd::Event::EventType::PUT){std::cout << event.kv().key() << "数据改变:" << event.prev_kv().as_string() << "->" << event.kv().as_string() << std::endl;}else if(event.event_type() == etcd::Event::EventType::DELETE_){std::cout << event.prev_kv().key() << "数据删除!" << std::endl;}else{std::cout << "未知事件!" << std::endl;}}
}void get(std::shared_ptr<etcd::Watcher>& watcher,const std::string& url ,const std::string& key)
{etcd::Client etcd(url);//2.get键值对auto response = etcd.ls(key).get();if(response.is_ok() == true){//3.打印键值对for(auto& kv : response.values()){std::cout << kv.key() << "->" << kv.as_string() << std::endl;}}std::function<void(bool)> hanlder = [&](bool cond){//如果为非正常退出则重新调用get函数if(cond == false)get(watcher,url,key);     };//4.阻塞监听所有键值对变化watcher.reset(new etcd::Watcher(url,"/", handle_response, true));watcher->Wait(hanlder);
}int main() {//1.创建etcd客户端std::string url = "http://192.168.30.128:2379";std::string key = "/114514";//5.监听键值对变化std::shared_ptr<etcd::Watcher> watcher;get(watcher, url, key);std::cout << "按回车键退出!" << std::endl;getchar();return 0;
}

2.6.3makefile

all: simple_get simple_put dir_get dir_put
simple_get:simple_get.ccg++ $^ -o $@ -std=c++17 -lcpprest -letcd-cpp-api
simple_put:simple_put.ccg++ $^ -o $@ -std=c++17 -lcpprest -letcd-cpp-api
dir_get:dir_get.ccg++ $^ -o $@ -std=c++17 -lcpprest -letcd-cpp-api
dir_put:dir_put.ccg++ $^ -o $@ -std=c++17 -lcpprest -letcd-cpp-api
clean:rm -f simple_get simple_put dir_get dir_put

2.7二次封装

使⽤Etcd作为服务注册发现中⼼,需要定义服务的注册和发现逻辑。这通常涉及到以下⼏个操作:

  1. 服务注册:服务启动时,向Etcd注册⾃⼰的服务名称和访问地址信息的键值对。
  2. 服务发现:客⼾端通过Etcd获取服务的访问地址信息,⽤于连接服务器进⾏远程调⽤。
  3. 健康检查:服务定期向Etcd发送⼼跳,以维持其注册信息的有效性。

2.7.1设计

服务管理的封装中需要注意以下两点:

  • 服务的注册: 向etcd服务器中,添加服务数据;例如: put /user/myid 192.168.30.86:9090
  • 服务的发现: 从etcd服务器中,获取服务数据;例如: ls /user
    通过从etcd服务器上获取的数据,可以解析获知,当前有哪个主机可以提供 /user 服务
    在这里插入图片描述
    封装思想:
  • 将服务注册和服务发现分别封装类,⽅便外部能够通过实例化的对象简便实现服务发现/注册
  • 服务注册类,添加 {/服务名称, 访问地址} 的键值对。
  • 服务发现类,通过ls以及监视 / ⽬录来获取 / 下的所有键值对变动。

2.7.2服务的注册

封装服务注册类,将客⼾端请求与租约保活部分封装起来,向外提供⼀个接⼝能够实现数据的新增即可,通过实例化的对象可以⽅便快捷的实现服务注册功能。
成员:

  • 注册中⼼地址
  • 当前节点标识
  • 要注册的服务信息(服务名称&节点地址)
  • 租约保活器
    接口:
  • 服务注册接口

2.7.3服务的发现

封装服务发现类,将客⼾端请求与路径监视部分封装起来,通过实例化的对象可以⽅便快捷的实现服务发现功能,并针对发现的服务进⾏对应处理。
为了能够与其他功能进⾏解耦,因此这⾥封装的时候由外部传⼊针对服务上线和下线所进⾏处理的接口进⾏回调处理,当前模块部分本⾝并不关注具体事件该如何处理。

成员:

  • 注册中⼼地址
  • 针对服务上线&下线的处理回调对象
  • 监视器
    接口:
  • 服务监视接⼝

2.7.4具体实现

//limeetcd.h
#pragma once
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>namespace limeetcd {extern void wait_for_connection(etcd::Client &client);class SvcProvider {public:using ptr = std::shared_ptr<SvcProvider>;SvcProvider(const std::string &reg_center_addr, const std::string &svc_name, const std::string &svc_addr);bool registry();private:std::string make_key();private:std::string _reg_center_addr; //注册中心地址std::string _instance_id; //当前节点的标识std::string _svc_name;  // 节点能够提供的服务名称std::string _svc_addr;  // 节点地址std::shared_ptr<etcd::KeepAlive> _keepalive; //租约保活对象};class SvcWatcher {public:using ptr = std::shared_ptr<SvcWatcher>;//在产生事件时,传入两个数据:1-服务名称,2-节点地址using ModCallbck = std::function<void(std::string, std::string)>;SvcWatcher(const std::string &reg_center_addr, ModCallbck &&online_callback, ModCallbck &&offline_callback);//数据监控/目录下的所有数据bool watch();private:void callback(const etcd::Response &resp);std::string parse_key(const std::string &key);//从key中获取实际服务名称private:std::string _reg_center_addr; //注册中心地址ModCallbck _online_callback;  //服务上线回调函数ModCallbck _offline_callback; //服务下线回调函数std::shared_ptr<etcd::Watcher> _watcher; // 服务上下线监控对象};
}//namespace limeetcd
//limeetcd.cc
#include "limeetcd.h"
#include "limelog.h"
#include "limeutil.h"namespace limeetcd
{void wait_for_connection(etcd::Client &client){// wait until the client connects to etcd serverwhile (!client.head().get().is_ok()){WRN("连接etcd服务器失败, 1秒后重试...");std::this_thread::sleep_for(std::chrono::seconds(1));}INF("成功连接etcd服务器!");}std::string SvcProvider::make_key(){// key格式: /svc_name/instance_idreturn "/" + _svc_name + "/" + _instance_id;}SvcProvider::SvcProvider(const std::string &reg_center_addr, const std::string &svc_name, const std::string &svc_addr): _reg_center_addr(reg_center_addr),_svc_name(svc_name),_svc_addr(svc_addr),_instance_id(limeutil::LimeRandom::code()){}bool SvcProvider::registry(){// 1.创建etcd客户端etcd::Client etcd(_reg_center_addr);wait_for_connection(etcd); // 等待客户端连接成功// 2.创建租约auto leasegrant_resp = etcd.leasegrant(3).get(); // 3秒保活时间if (leasegrant_resp.is_ok() == false){WRN("创建租约失败:{}", leasegrant_resp.error_message());return false;}// 3.获取租约Idint64_t lease_id = leasegrant_resp.value().lease();// 4.put一个带租约对象auto resp = etcd.put(make_key(), _svc_addr, lease_id).get();if (resp.is_ok() == false){WRN("注册服务失败:{}", resp.error_message());return false;}// 异常处理函数auto handler = [this](const std::exception_ptr &eptr){this->registry();};// 5.创建租约保活对象,每隔3秒发送一次保活请求_keepalive.reset(new etcd::KeepAlive(_reg_center_addr, handler, 3, lease_id));return true;}std::string SvcWatcher::parse_key(const std::string &key){// key格式: /svc_name/instance_idstd::vector<std::string> dst;int count = limeutil::LimeSTR::split(key, "/", dst);if (count < 2){return ""; // 格式错误}return dst[0]; // 返回服务名称}SvcWatcher::SvcWatcher(const std::string &reg_center_addr, ModCallbck &&online_callback, ModCallbck &&offline_callback): _reg_center_addr(reg_center_addr),_online_callback(std::move(online_callback)),_offline_callback(std::move(offline_callback)){}bool SvcWatcher::watch(){// 1.创建etcd客户端etcd::Client etcd(_reg_center_addr);wait_for_connection(etcd); // 等待客户端连接成功auto resp = etcd.ls("/").get();if(resp.is_ok() == true){auto values = resp.values();for(auto &value : values){std::string key = value.key();std::string value_str = value.as_string();if(_online_callback) _online_callback(parse_key(key), value_str);}}std::function<void(bool)> hanlder = [this](bool cond){// 如果为非正常退出则重新调用get函数if (cond == false)this->watch();};// 2.阻塞监听所有键值对变化auto cb = std::bind(&SvcWatcher::callback, this, std::placeholders::_1);_watcher.reset(new etcd::Watcher(_reg_center_addr, "/", cb, true));_watcher->Wait(hanlder);return true;}void SvcWatcher::callback(const etcd::Response &resp){if (resp.is_ok() == false){std::cout << "监听出错!" << std::endl;return;}// 当键值对状态变化时,会调用该回调函数auto events = resp.events();for (auto &event : events){if (event.event_type() == etcd::Event::EventType::PUT){INF("{}服务上线,地址:{}", event.kv().key(), event.kv().as_string());//服务上线if(_online_callback) _online_callback(parse_key(event.kv().key()), event.kv().as_string());}else if (event.event_type() == etcd::Event::EventType::DELETE_){INF("{}服务下线!",event.prev_kv().key());//服务下线if(_offline_callback) _offline_callback(parse_key(event.prev_kv().key()), event.prev_kv().as_string());}else{WRN("未知事件!");}}}
} // namespace limeetcd

2.8简单使用样例

//provider.cc
#include "../../source/limeetcd.h"
#include "../../source/limelog.h"int main(){limelog::limelog_init();std::string url = "http://192.168.30.128:2379";std::string svc_name = "user";std::string svc_addr = "192.168.30.128:9000";//进行服务注册并保活limeetcd::SvcProvider::ptr provider(new limeetcd::SvcProvider(url, svc_name, svc_addr));provider->registry();std::cout << "服务注册成功, 按回车键退出!" << std::endl;getchar();return 0;
}
//watcher.cc
#include "../../source/limeetcd.h"
#include "../../source/limelog.h"int main(){limelog::limelog_init();std::string url = "http://192.168.30.128:2379";auto online_callback = [](std::string svc_name, std::string svc_addr){std::cout << "服务上线: " << svc_name << " 地址: " << svc_addr << std::endl;};auto offline_callback = [](std::string svc_name, std::string svc_addr){std::cout << "服务下线: " << svc_name << " 地址: " << svc_addr << std::endl;};//进行服务监听limeetcd::SvcWatcher::ptr watcher(new limeetcd::SvcWatcher(url, online_callback, offline_callback));watcher->watch();//等待退出std::cout << "服务监听中, 按回车键退出!" << std::endl;getchar();return 0;
}

makefile

all: provider watcher
provider:provider.cc ../../source/limeetcd.cc ../../source/limeutil.cc ../../source/limelog.ccg++ $^ -o $@ -std=c++17 -lcpprest -letcd-cpp-api -lspdlog -lfmt -ljsoncpp -lpthread
watcher:watcher.cc ../../source/limeetcd.cc ../../source/limeutil.cc ../../source/limelog.ccg++ $^ -o $@ -std=c++17 -lcpprest -letcd-cpp-api -lspdlog -lfmt -ljsoncpp -lpthread
clean:rm -f provider watcher

2.9 etcd-brpc实现远程服务监控与调用

//cal.proto
syntax = "proto3";//声明语法版本
package cal;//定义包名
option cc_generic_services = true;//是否启用rpc服务message AddRequest {//定义请求消息int32 a = 1;//第一个参数int32 b = 2;//第二个参数
}message AddResponse {//定义响应消息int32 c = 1;//结果
}//这是一个http请求,不需要任何字段
message HelloRequest {
}//这是一个http响应,不需要任何字段
message HelloResponse {}service Calculator {//定义服务rpc Add(AddRequest) returns (AddResponse);//定义rpc方法rpc Hello(HelloRequest) returns (HelloResponse);//定义另一个rpc方法
}
//rpc_client.cc
#include "../../source/limerpc.h"
#include "../../source/limeetcd.h"
#include "cal.pb.h"int main() {//初始化日志limelog::limelog_init();//创建服务管理类limerpc::SvcRpcChannels svc_rpc_channels;std::string svc_name = "Calculator";//添加服务关心svc_rpc_channels.set_match(svc_name);//通过etcd模拟服务发现-自动添加服务关心的结点std::string reg_center_addr = "http://192.168.30.128:2379";auto online_callback = std::bind(&limerpc::SvcRpcChannels::add_node, &svc_rpc_channels, std::placeholders::_1, std::placeholders::_2);auto offline_callback = std::bind(&limerpc::SvcRpcChannels::remove_node, &svc_rpc_channels, std::placeholders::_1, std::placeholders::_2);limeetcd::SvcWatcher svc_watcher(reg_center_addr, online_callback, offline_callback);svc_watcher.watch();//获取服务信道limerpc::ChannelPtr channel;while(!channel){WRN("无合适的服务信道,等待服务上线...");std::this_thread::sleep_for(std::chrono::seconds(1));channel = svc_rpc_channels.get_channel(svc_name);}//3.创建stub对象-用于发起rpc调用cal::Calculator_Stub stub(channel.get());//4.创建请求对象-用于设置rpc调用参数cal::AddRequest* request = new cal::AddRequest();//需要new,否则会有生命周期问题request->set_a(10);request->set_b(20);//5.发起rpc调用-异步调用cal::AddResponse* response = new cal::AddResponse();brpc::Controller* controller = new brpc::Controller();//补充:设置Controller的timeout时间,默认是3秒controller->set_timeout_ms(4000);//设置回调函数auto done = limerpc::ClosureFactory::create([controller,request,response](){std::unique_ptr<brpc::Controller> cntl_guard(controller);std::unique_ptr<cal::AddRequest> req_guard(request);std::unique_ptr<cal::AddResponse> res_guard(response);if (cntl_guard->Failed()) {std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl;return;}//打印rpc调用的结果std::cout << "a+b=" << response->c() << std::endl;});stub.Add(controller, request, response, done);//设置回调函数表示异步rpc调用std::cout << "rpc调用已发出,继续干其他事情..." << std::endl;//等待rpc调用结果-键盘按下回车键退出程序getchar();return 0;
}
//rpcserver.cc
#include "../../source/limerpc.h"
#include "cal.pb.h"
#include "../../source/limeetcd.h"
#include <thread>//异步简易rpc服务端
class CalculatorService : public cal::Calculator{
public:CalculatorService(){};~CalculatorService(){};void Add(google::protobuf::RpcController* controller,const cal::AddRequest* request,cal::AddResponse* response,google::protobuf::Closure* done) override{//使用多线程进行异步处理std::thread thr([=](){//当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用brpc::ClosureGuard done_guard(done);int result = request->a() + request->b();response->set_c(result);//模拟业务处理时间std::this_thread::sleep_for(std::chrono::seconds(3));DBG("ADD服务响应完成");});thr.detach();}
};int main()
{std::string reg_center_addr = "http://192.168.30.128:2379";std::string svc_name = "Calculator";std::string svc_addr = "192.168.30.128:9000";limelog::limelog_init();//定义计算服务CalculatorService* service = new CalculatorService();//通过服务器工厂类获取一个服务器实例auto server = limerpc::RpcServer::create(9000, service);//注册服务到etcdlimeetcd::SvcProvider provider(reg_center_addr, svc_name, svc_addr);provider.registry();//等待服务器退出server->RunUntilAskedToQuit();return 0;
}

makefile

.PHONY: all clean
all: server clientserver: rpc_server.cc cal.pb.cc ../../source/limerpc.cc ../../source/limelog.cc ../../source/limeetcd.cc ../../source/limeutil.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -lspdlog -lfmt -lcpprest -letcd-cpp-api -ljsoncpp -std=c++17
client: rpc_client.cc cal.pb.cc ../../source/limerpc.cc ../../source/limelog.cc ../../source/limeetcd.cc ../../source/limeutil.ccg++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -lspdlog -lfmt -lcpprest -letcd-cpp-api -ljsoncpp -std=c++17
%.pb.cc: %.protoprotoc --cpp_out=. $<
clean:rm -f server client
http://www.dtcms.com/a/528291.html

相关文章:

  • 【教学类-120-01】20251025旋转数字
  • 制作网站多少钱一个有哪些做企业点评的网站
  • 网站会员营销上海注册公司哪家好
  • 【深度学习新浪潮】深入理解Seed3D模型:参数化驱动的下一代3D内容生成技术
  • GitHub等平台形成的开源文化正在重塑和人家
  • 免费网站收录入口有了域名空间服务器怎么做网站
  • 5.go-zero集成gorm 和 go-redis
  • Linux系统入门:System V进程间通信
  • 第一章 蓝图篇 - 全景认知与项目设计
  • mormot.net.server.pas源代码分析
  • 丹阳网站建设价位php网站搭建
  • 【工具分享】另一个免费开源的远程桌面服务-Apache Guacamole
  • RabbitMQ TTL机制详解
  • XSL-FO 对象:深度解析与实际应用
  • 在JavaScript / Node.js / 抖音小游戏中,使用tt.request通信
  • 两学一做网站源码wordpress 柚子皮下载
  • Go slog 日志打印最佳实践指南
  • Go的垃圾回收
  • 珠海网站管理公司国际公司名字
  • 自动化模型学习器——autoGluon
  • 长沙网站建设招聘外贸做那种网站有哪些
  • 浏览器卡顿内存高?傲游浏览器三核加速,网页加载效率提升60%
  • 研发部门验收流程
  • 贪心算法 with Gemini
  • 掌握 Rust:从内存安全到高性能服务的完整技术图谱
  • [Java]重学Java-Java平台
  • Bash Shell 脚本编程入门详解
  • 打造高清3D虚拟世界|零基础学习Unity HDRP高清渲染管线(第七天)
  • 营销型网站建立费用手机端网站开发页
  • 网页模板免费资源搜索引擎排名优化技术