etcd封装
- etcd框架
- etcd::Client类
- 一、构造函数:客户端初始化
- 二、核心功能函数:etcd 操作接口
- 1. 键值基础操作:CRUD
- (1)读取键值:`get`/`ls`
- (2)写入键值:`set`/`add`/`put`/`modify`
- (3)删除键值:`rm`/`rmdir`
- 2. 租约操作:`leasegrant`/`leaserevoke`/`leasekeepalive`
- 3. 监听操作:`watch`
- 4. 精简化参考源代码
- etcd::Response类
- 1. 静态创建函数(create 模板系列)
- 2. 响应状态查询函数
- 3. 响应数据获取函数(基本信息)
- 4. 响应数据获取函数(键值相关)
- 5. 响应数据获取函数(键列表相关)
- 6. 响应数据获取函数(高级功能相关)
- 7. 响应数据获取函数(集群信息)
- 8. 构造函数与拷贝构造函数
- 9. 精简化参考源代码
- etcd::Value类与Event类
- 1. etcd::Value 类(键值对数据载体)
- 2. etcd::Event 类(监听事件载体)
- 3. 精简化参考源代码
- etcd::Watcher
- 1. etcd::Watcher 核心函数与参数表格
- 2. 精简化参考源代码
- 3. 参考使用
- etcd::KeepAlive
- 1. etcd::KeepAlive 核心函数与参数表格
- 2. 精简版 etcd::KeepAlive 头文件(初学者友好)
- 3. 给初学者的关键说明
- pplx::task
- 1. 模板类 `task<_ReturnType>` 核心函数与参数表格
etcd框架
etcd::Client类
一、构造函数:客户端初始化
构造函数的核心作用是建立与 etcd 集群的连接,支持普通连接、认证(用户名密码)、SSL 加密、自定义 gRPC 参数等场景,同时提供静态工厂方法(WithXXX
)简化初始化。
1. 基础构造:无认证、普通连接
构造函数签名 | 核心参数 | 功能说明 |
---|
Client(std::string const & etcd_url, std::string const & load_balancer = "round_robin") | - etcd_url :etcd 集群地址,支持多个地址(用 , 或 ; 分隔,如 "http://127.0.0.1:2379,http://127.0.0.1:2380" )- load_balancer :负载均衡策略,默认 round_robin (轮询),可选值:round_robin (轮询)、pick_first (优先第一个)、grpclb 、xds | 初始化无认证的异步客户端,指定集群地址和负载均衡策略 |
Client(std::string const & etcd_url, grpc::ChannelArguments const & arguments) | - arguments :gRPC 通道自定义参数(如超时、最大重试次数等) | 支持通过 gRPC 原生参数精细化配置连接(如设置 TLS 选项、通道缓存大小等) |
静态工厂方法 static Client* WithUrl(...) | 同上述构造函数参数 | |
二、核心功能函数:etcd 操作接口
在学习函数前,先明确 2 个基础概念:
- 异步任务(pplx::task):所有核心函数的返回值都是
pplx::task<Response>
,这是微软 PPL 库的异步任务类型 —— 函数调用后不会 “阻塞等待结果”,而是返回一个 “任务对象”,通过 .then()
或 .get()
(阻塞)获取最终结果。 - Response 对象:存储 etcd 操作的结果(成功 / 失败、键值数据、版本号等),常用方法如
IsSuccess()
(判断是否成功)、value()
(获取键对应的值)、index()
(获取操作的版本号)。
1. 键值基础操作:CRUD
(1)读取键值:get
/ls
函数签名 | 核心参数 | 功能说明 |
---|
pplx::task<Response> get(std::string const & key) | - key :要读取的键(如 /config/db/host ) | 读取单个键的 value 和元数据(版本号、租约 ID 等) |
pplx::task<Response> ls(std::string const & key) | - key :目录键(如 /config/db ) | 列出目录下的所有子键(类似文件系统 ls ) |
pplx::task<Response> ls(std::string const & key, size_t const limit) | - limit :结果数量限制 | 分页列出目录下的子键,避免结果过多 |
pplx::task<Response> ls(std::string const & key, std::string const & range_end) | - range_end :键范围结束(左闭右开 [key, range_end) ) | |
(2)写入键值:set
/add
/put
/modify
函数签名 | 核心参数 | 功能说明 |
---|
pplx::task<Response> set(std::string const & key, std::string const & value, int ttl = 0) | - key /value :键 / 值- ttl :键的有效期(秒,0 表示永久) | 写入键值:键存在则更新,不存在则创建(覆盖式写入) |
pplx::task<Response> set(std::string const & key, std::string const & value, int64_t leaseId) | - leaseId :租约 ID | 绑定租约写入:租约过期后键自动删除 |
pplx::task<Response> add(std::string const & key, std::string const & value, int ttl = 0) | 同 set | 仅创建键:键已存在则操作失败(原子性 “新增”) |
pplx::task<Response> put(std::string const & key, std::string const & value) | 同 set (无 TTL / 租约) | 简化版写入(仅键值,无过期逻辑),等价于 set(key, value, 0) |
pplx::task<Response> modify(std::string const & key, std::string const & value, int ttl = 0) | 同 set | 仅更新键:键不存在则操作失败(原子性 “更新”) |
pplx::task<Response> modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0) | - old_value :预期的旧值 | 条件更新:仅当键的当前值等于 old_value 时更新(避免并发覆盖) |
pplx::task<Response> modify_if(std::string const & key, std::string const & value, int64_t old_index, int ttl = 0) | - old_index :预期的旧版本号(mod_revision ) | 条件更新:仅当键的当前版本号等于 old_index 时更新(更严谨的并发控制) |
(3)删除键值:rm
/rmdir
函数签名 | 核心参数 | 功能说明 |
---|
pplx::task<Response> rm(std::string const & key) | - key :要删除的单个键 | 删除非目录键,键不存在则操作失败 |
pplx::task<Response> rm_if(std::string const & key, std::string const & old_value) | - old_value :预期的旧值 | 条件删除:仅当键的当前值等于 old_value 时删除 |
pplx::task<Response> rm_if(std::string const & key, int64_t old_index) | - old_index :预期的旧版本号 | 条件删除:仅当键的当前版本号等于 old_index 时删除 |
pplx::task<Response> rmdir(std::string const & key, bool recursive = false) | - key :目录键- recursive :是否递归删除(true 删整个子树,false 仅删空目录) | 删除目录:类似文件系统 rmdir |
pplx::task<Response> rmdir(std::string const & key, std::string const & range_end) | - range_end :键范围结束 | 删除指定范围的键(左闭右开 [key, range_end) ),批量删除场景 |
2. 租约操作:leasegrant
/leaserevoke
/leasekeepalive
etcd 租约用于管理键的生命周期:租约过期后,所有绑定该租约的键自动删除。
函数签名 | 核心参数 | 功能说明 |
---|
pplx::task<Response> leasegrant(int ttl) | - ttl :租约有效期(秒) | 申请新租约,返回租约 ID(lease_id ) |
pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl) | - ttl :租约有效期 | 申请租约并自动续期(返回 KeepAlive 实例,销毁时停止续期) |
pplx::task<Response> leaserevoke(int64_t lease_id) | - lease_id :要注销的租约 ID | 手动注销租约,绑定该租约的键立即删除 |
pplx::task<Response> leasetimetolive(int64_t lease_id) | - lease_id :租约 ID | 查询租约剩余有效期 |
3. 监听操作:watch
监听键或目录的变化(新增、更新、删除),异步获取变化事件。
函数签名 | 核心参数 | 功能说明 |
---|
pplx::task<Response> watch(std::string const & key, bool recursive = false) | - key :监听的键 / 目录- recursive :是否递归监听目录(true 监听子键变化) | 实时监听键 / 目录的变化,首次调用从当前版本开始 |
pplx::task<Response> watch(std::string const & key, int64_t fromIndex, bool recursive = false) | - fromIndex :起始版本号 | 从指定版本号开始监听(支持 “回溯” 监听历史变化) |
pplx::task<Response> watch(std::string const & key, std::string const & range_end) | - range_end :键范围结束 | |
4. 精简化参考源代码
#ifndef __ETCD_CLIENT_CORE_HPP__
#define __ETCD_CLIENT_CORE_HPP__
#include <chrono>
#include <memory>
#include <string>
#include "pplx/pplxtasks.h"
#include "etcd/Response.hpp"
#include "etcd/SyncClient.hpp"
#include "etcd/v3/action_constants.hpp"namespace etcd
{class Client{public:Client(SyncClient *client);static Client* WithClient(SyncClient *client);Client(std::string const & etcd_url,std::string const & load_balancer = "round_robin");static Client *WithUrl(std::string const & etcd_url,std::string const & load_balancer = "round_robin");Client(std::string const & etcd_url,std::string const & username,std::string const & password,int const auth_token_ttl = 300,std::string const & load_balancer = "round_robin");static Client *WithUser(std::string const & etcd_url,std::string const & username,std::string const & password,int const auth_token_ttl = 300,std::string const & load_balancer = "round_robin");Client(std::string const & etcd_url,std::string const & ca,std::string const & cert = "",std::string const & privkey = "",std::string const & target_name_override = "",std::string const & load_balancer = "round_robin");static Client *WithSSL(std::string const & etcd_url,std::string const & ca,std::string const & cert = "",std::string const & privkey = "",std::string const & target_name_override = "",std::string const & load_balancer = "round_robin");~Client();pplx::task<Response> head();pplx::task<Response> get(std::string const & key);pplx::task<Response> set(std::string const & key, std::string const & value, int ttl = 0);pplx::task<Response> set(std::string const & key, std::string const & value, int64_t leaseId);pplx::task<Response> add(std::string const & key, std::string const & value, int ttl = 0);pplx::task<Response> rm(std::string const & key);pplx::task<Response> ls(std::string const & key);pplx::task<Response> ls(std::string const & key, std::string const &range_end);pplx::task<Response> rmdir(std::string const & key, bool recursive = false);pplx::task<Response> rmdir(std::string const & key, std::string const &range_end);pplx::task<Response> watch(std::string const & key, bool recursive = false);pplx::task<Response> watch(std::string const & key, int64_t fromIndex, bool recursive = false);pplx::task<Response> watch(std::string const & key, std::string const &range_end);pplx::task<Response> leasegrant(int ttl);pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);pplx::task<Response> leaserevoke(int64_t lease_id);pplx::task<Response> lock(std::string const &key);pplx::task<Response> lock(std::string const &key, int lease_ttl);pplx::task<Response> unlock(std::string const &lock_key);template <typename Rep = std::micro>void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {this->client->set_grpc_timeout(timeout);}std::chrono::microseconds get_grpc_timeout() const {return this->client->get_grpc_timeout();}SyncClient* sync_client() const;private:bool own_client = true; SyncClient *client = nullptr; };
}#endif
etcd::Response类
1. 静态创建函数(create 模板系列)
函数原型 | 参数解释 | 作用 |
---|
template <typename T> static etcd::Response create(std::unique_ptr<T> call) | call :unique_ptr 包装的异步操作对象 | 等待异步操作完成,解析响应并创建 Response 对象 |
template <typename T> static etcd::Response create(std::shared_ptr<T> call) | call :shared_ptr 包装的异步操作对象 | 等待异步操作完成,解析响应并创建 Response 对象 |
template <typename T> static etcd::Response create(std::unique_ptr<T> call, std::function<void(Response)> callback) | call :unique_ptr 包装的异步操作对象callback :响应处理完成后的回调函数 | 等待异步操作完成,执行回调后解析响应并创建 Response 对象 |
template <typename T> static etcd::Response create(std::function<std::unique_ptr<T>()> callfn) | callfn :返回 unique_ptr<T> 的函数对象 | 延迟创建异步操作对象,等待完成后解析响应并创建 Response 对象 |
template <typename T> static etcd::Response create(std::function<std::shared_ptr<T>()> callfn) | callfn :返回 shared_ptr<T> 的函数对象 | 延迟创建异步操作对象,等待完成后解析响应并创建 Response 对象 |
2. 响应状态查询函数
函数原型 | 参数解释 | 作用 |
---|
bool is_ok() const | 无参数 | 判断请求是否成功(成功返回 true ) |
bool is_network_unavailable() const | 无参数 | 判断错误是否为网络不可用(是则返回 true ) |
int error_code() const | 无参数 | 返回错误代码(0 表示成功) |
bool is_grpc_timeout() const | 无参数 | 判断是否为 gRPC 超时错误(是则返回 true ) |
std::string const & error_message() const | 无参数 | 返回错误信息的字符串描述 |
3. 响应数据获取函数(基本信息)
函数原型 | 参数解释 | 作用 |
---|
std::string const & action() const | 无参数 | 返回操作类型(如 get/set/delete 等) |
int64_t index() const | 无参数 | 返回 etcd 当前的索引值 |
std::chrono::microseconds const & duration() const | 无参数 | 返回请求执行耗时(单位:微秒) |
4. 响应数据获取函数(键值相关)
函数原型 | 参数解释 | 作用 |
---|
Value const & value() const | 无参数 | 返回当前值对象(适用于 get/set/modify 操作) |
Value const & prev_value() const | 无参数 | 返回修改前的值对象(适用于 set/modify/rm 操作) |
Value const & value(int index) const | index :值在列表中的索引 | 返回指定索引的值(适用于目录列表操作) |
Values const & values() const | 无参数 | 返回值列表(适用于目录操作) |
5. 响应数据获取函数(键列表相关)
函数原型 | 参数解释 | 作用 |
---|
Keys const & keys() const | 无参数 | 返回键列表(适用于目录操作) |
std::string const & key(int index) const | index :键在列表中的索引 | 返回指定索引的键 |
6. 响应数据获取函数(高级功能相关)
函数原型 | 参数解释 | 作用 |
---|
int64_t compact_revision() const | 无参数 | 返回压缩版本号(用于 watch 操作取消场景,-1 表示未初始化) |
std::string const & lock_key() const | 参数 | 返回锁的键(用于锁操作) |
std::string const & name() const | 无参数 | 返回名称(用于选举操作) |
std::vector<Event> const & events() const | 无参数 | 返回 watch 到的事件列表 |
7. 响应数据获取函数(集群信息)
函数原型 | 参数解释 | 作用 |
---|
uint64_t cluster_id() const | 无参数 | 返回集群 ID |
uint64_t member_id() const | 无参数 | 返回成员 ID |
uint64_t raft_term() const | 无参数 | 返回当前 raft 任期 |
8. 构造函数与拷贝构造函数
函数原型 | 参数解释 | 作用 |
---|
Response() | 无参数 | 默认构造函数,初始化空响应 |
Response(const Response &) | 无参数(隐式传入被拷贝对象) | 拷贝构造函数,复制已有响应对象 |
Response(const etcdv3::V3Response& response, std::chrono::microseconds const& duration) | response :etcd v3 响应对象duration :请求执行耗时 | 从 v3 响应构建 Response 对象(受保护) |
Response(int error_code, char const * error_message) | error_code :错误代码error_message :错误信息 | 直接通过错误码和信息构建 Response 对象(受保护) |
9. 精简化参考源代码
#ifndef __ETCD_RESPONSE_CORE_HPP__
#define __ETCD_RESPONSE_CORE_HPP__
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "etcd/Value.hpp"
namespace etcdv3 {class AsyncWatchAction;class AsyncLeaseKeepAliveAction;class AsyncObserveAction;class V3Response;
}namespace etcd
{typedef std::vector<std::string> Keys;class Response{public:template <typename T>static etcd::Response create(std::unique_ptr<T> call);template <typename T>static etcd::Response create(std::shared_ptr<T> call);template <typename T>static etcd::Response create(std::unique_ptr<T> call,std::function<void(Response)> callback);template <typename T>static etcd::Response create(std::function<std::unique_ptr<T>()> callfn);template <typename T>static etcd::Response create(std::function<std::shared_ptr<T>()> callfn);Response();Response(const Response &);bool is_ok() const;bool is_network_unavailable() const;int error_code() const;bool is_grpc_timeout() const;std::string const & error_message() const;std::string const & action() const;int64_t index() const;std::chrono::microseconds const & duration() const;Value const & value() const;Value const & prev_value() const;Value const & value(int index) const;Values const & values() const;Keys const & keys() const;std::string const & key(int index) const;int64_t compact_revision() const;std::string const & lock_key() const;std::string const & name() const;std::vector<Event> const & events() const;uint64_t cluster_id() const;uint64_t member_id() const;uint64_t raft_term() const;protected:Response(const etcdv3::V3Response& response, std::chrono::microseconds const& duration);Response(int error_code, char const * error_message);int _error_code; std::string _error_message; int64_t _index; std::string _action; Value _value; Value _prev_value; Values _values; Keys _keys; int64_t _compact_revision = -1;std::string _lock_key; std::string _name; std::vector<Event> _events; std::chrono::microseconds _duration; uint64_t _cluster_id; uint64_t _member_id; uint64_t _raft_term; friend class Client;friend class SyncClient;friend class etcdv3::AsyncWatchAction;friend class etcdv3::AsyncLeaseKeepAliveAction;friend class etcdv3::AsyncObserveAction;};
}#endif
etcd::Value类与Event类
1. etcd::Value 类(键值对数据载体)
函数原型 | 参数解释 | 作用 |
---|
bool is_dir() const | 无参数 | 判断当前键是否为目录(true 表示目录,false 表示普通键) |
std::string const & key() const | 无参数 | 返回键的完整绝对路径(如 /config/db/host ) |
std::string const & as_string() const | 无参数 | 返回键的字符串形式值(仅普通键有效,目录键返回空) |
int64_t created_index() const | 无参数 | 返回键的创建索引(etcd 全局递增的版本号,标记创建时间) |
int64_t modified_index() const | 无参数 | 返回键的最后修改索引(标记最近一次更新的时间) |
int64_t version() const | 无参数 | 返回键的版本号(创建时为 1,每次更新递增 1) |
int ttl() const | 无参数 | 返回键的过期时间(TTL,单位:秒,0 表示永久有效) |
int64_t lease() const | 无参数 | 返回键绑定的租约 ID(0 表示未绑定租约,租约过期后键自动删除) |
2. etcd::Event 类(监听事件载体)
函数原型 | 参数解释 | 作用 |
---|
enum EventType event_type() const | 无参数 | 返回事件类型:- PUT :键新增或修改- DELETE_ :键删除- INVALID :无效事件 |
bool has_kv() const | 无参数 | 判断事件是否包含变更后的键值(PUT 事件为 true ,DELETE_ 事件为 false ) |
bool has_prev_kv() const | 无参数 | 判断事件是否包含变更前的键值(修改 / 删除事件可能为 true ,新增事件为 false ) |
const Value &kv() const | 无参数 | 返回变更后的键值对象(仅 has_kv() 为 true 时有效) |
const Value &prev_kv() const | 无参数 | 返回变更前的键值对象(仅 has_prev_kv() 为 true 时有效) |
补充说明
- Values 类型:
typedef std::vector<Value> Values
,用于存储多个键值对(如目录下的所有键)。 - Events 类型:
typedef std::vector<Event> Events
,用于存储多个监听事件(如一次 watch 操作触发的所有变更)。 - 这两个类的对象通常通过
Response
类的接口(如 values()
、events()
)获取,不直接由用户创建。
3. 精简化参考源代码
#ifndef __ETCD_VECTOR_CORE_HPP__
#define __ETCD_VECTOR_CORE_HPP__
#include <string>
#include <vector>
namespace etcdv3 { class KeyValue; }
namespace mvccpb { class KeyValue; class Event; }namespace etcd
{class Client;class SyncClient;class Response;class Value{public:bool is_dir() const;std::string const & key() const;std::string const & as_string() const;int64_t created_index() const;int64_t modified_index() const;int64_t version() const;int ttl() const;int64_t lease() const;protected:friend class Client;friend class SyncClient;friend class Response;friend class Event;Value(); Value(etcdv3::KeyValue const & kvs); Value(mvccpb::KeyValue const & kvs); std::string _key; bool dir; std::string value; int64_t created; int64_t modified; int64_t _version; int _ttl; int64_t leaseId; };typedef std::vector<Value> Values;class Event{public:enum class EventType {PUT, DELETE_, INVALID };enum EventType event_type() const;bool has_kv() const;bool has_prev_kv() const;const Value &kv() const;const Value &prev_kv() const;protected:friend class Response;Event(mvccpb::Event const & event); private:enum EventType event_type_; Value _kv; Value _prev_kv; bool _has_kv; bool _has_prev_kv; };typedef std::vector<Event> Events;
}#endif
etcd::Watcher
1. etcd::Watcher 核心函数与参数表格
函数原型 | 参数解释 | 作用 |
---|
Watcher(Client const &client, std::string const & key, std::function<void(Response)> callback, bool recursive=false) | - client :etcd 客户端实例- key :监听的键- callback :事件触发时的回调函数- recursive :是否递归监听子树(默认 false) | 基于客户端监听单个键(非递归) |
Watcher(SyncClient const &client, std::string const & key, std::function<void(Response)> callback, bool recursive=false) | 参数同上,客户端为同步类型 SyncClient | 基于同步客户端监听单个键 |
Watcher(Client const &client, std::string const & key, std::string const &range_end, std::function<void(Response)> callback) | - range_end :监听范围的结束键([key, range_end)) | 监听键范围 [key, range_end) |
Watcher(Client const &client, std::string const & key, int64_t fromIndex, std::function<void(Response)> callback, bool recursive=false) | - fromIndex :起始版本号(从该版本开始监听) | 从指定版本开始监听单个键 |
Watcher(Client const &client, std::string const & key, std::string const &range_end, int64_t fromIndex, std::function<void(Response)> callback) | 结合键范围和起始版本 | 从指定版本开始监听键范围 |
| | |
其他构造函数 | 包含地址、用户名密码、SSL 证书等参数 | 直接通过连接信息初始化监听器(无需提前创建客户端) |
bool Wait() | 无参数 | 阻塞等待监听器停止(正常取消返回 true,异常停止返回 false) |
void Wait(std::function<void(bool)> callback) | - callback :监听器停止后的回调(参数为是否正常取消) | 异步等待监听器停止 |
bool Cancel() | 无参数 | 主动停止监听(成功返回 true) |
bool Cancelled() const | 无参数 | 判断监听器是否已停止(返回 true 表示已停止) |
~Watcher() | 无参数 | 析构函数,释放监听资源 |
2. 精简化参考源代码
#ifndef __ETCD_WATCHER_SIMPLE_HPP__
#define __ETCD_WATCHER_SIMPLE_HPP__#include <atomic>
#include <functional>
#include <string>
#include <thread>
#include <memory>
#include "etcd/Response.hpp"
#include "etcd/Client.hpp"
#include "etcd/SyncClient.hpp"namespace etcd
{class Watcher{public:Watcher(Client const &client, std::string const & key,std::function<void(Response)> callback, bool recursive = false);Watcher(SyncClient const &client, std::string const & key,std::function<void(Response)> callback, bool recursive = false);Watcher(Client const &client, std::string const & key,std::string const &range_end,std::function<void(Response)> callback);Watcher(Client const &client, std::string const & key,int64_t fromIndex,std::function<void(Response)> callback, bool recursive = false);Watcher(std::string const & address, std::string const & key,std::function<void(Response)> callback, bool recursive = false);Watcher(std::string const & address,std::string const & username, std::string const & password,std::string const & key,std::function<void(Response)> callback, bool recursive = false);Watcher(Watcher const &) = delete;Watcher(Watcher &&) = delete;bool Wait();void Wait(std::function<void(bool)> callback);bool Cancel();bool Cancelled() const;~Watcher();protected:void doWatch(std::string const & key,std::string const & range_end,std::string const & auth_token,std::function<void(Response)> callback);std::function<void(Response)> callback; std::function<void(bool)> wait_callback; std::thread task_; struct EtcdServerStubs;struct EtcdServerStubsDeleter {void operator()(EtcdServerStubs *stubs);};std::unique_ptr<EtcdServerStubs, EtcdServerStubsDeleter> stubs;private:int64_t fromIndex{0}; bool recursive{false}; std::atomic_bool cancelled{false}; };
}#endif
3. 参考使用
-
核心作用:Watcher
是 etcd 的 “监听器”,用于实时监控键的变化,当键被新增、修改或删除时,会通过回调函数通知你。
-
使用流程:
etcd::Client client("http://127.0.0.1:2379");
auto callback = [](etcd::Response const& resp) {if (resp.is_ok()) {for (auto const& event : resp.events()) {std::cout << "事件类型: " << (event.event_type() == etcd::Event::EventType::PUT ? "修改/新增" : "删除") << std::endl;std::cout << "键: " << event.kv().key() << std::endl;}}
};
etcd::Watcher watcher(client, "/test", callback, true);
watcher.Wait();
-
关键接口:
- 构造函数:决定监听 “哪个键”“从哪个版本开始”“是否递归”。
Cancel()
:主动停止监听(必须调用,否则线程可能泄漏)。- 回调函数:处理实际的键变化事件(核心业务逻辑在这里实现)。
etcd::KeepAlive
1. etcd::KeepAlive 核心函数与参数表格
函数原型 | 参数解释 | 作用 |
---|
构造函数系列 | | 初始化租约保活器,绑定租约并设置自动续期 |
KeepAlive(Client const &client, int ttl, int64_t lease_id = 0) | - client :etcd 客户端实例- ttl :租约有效期(秒)- lease_id :租约 ID(0 表示自动生成) | 基于客户端创建租约保活器 |
KeepAlive(SyncClient const &client, int ttl, int64_t lease_id = 0) | 参数同上,客户端为同步类型 SyncClient | 基于同步客户端创建保活器 |
KeepAlive(std::string const & address, int ttl, int64_t lease_id = 0) | - address :etcd 服务地址 | 直接通过地址创建保活器(无需提前创建客户端) |
KeepAlive(Client const &client, std::function<void (std::exception_ptr)> const &handler, int ttl, int64_t lease_id = 0) | - handler :异常回调函数(保活失败时触发) | 带异常处理的保活器创建 |
其他构造函数 | 包含用户名密码、SSL 证书等参数 | 带认证 / 加密的保活器创建 |
int64_t Lease() const | 无参数 | 返回当前保活的租约 ID |
void Cancel() | 无参数 | 停止租约保活(租约将在 TTL 后过期) |
void Check() | 无参数 | 检查保活状态(异常时抛出错误) |
template <typename Rep = std::micro> void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) | - timeout :gRPC 操作超时时间 | 设置保活请求的超时时间 |
std::chrono::microseconds get_grpc_timeout() const | 无参数 | 获取当前 gRPC 超时时间 |
~KeepAlive() | 无参数 | 析构函数,自动停止保活 |
2. 精简版 etcd::KeepAlive 头文件(初学者友好)
#ifndef __ETCD_KEEPALIVE_SIMPLE_HPP__
#define __ETCD_KEEPALIVE_SIMPLE_HPP__#include <atomic>
#include <chrono>
#include <exception>
#include <functional>
#include <string>
#include <thread>
#include <memory>
#include "etcd/Client.hpp"
#include "etcd/SyncClient.hpp"
#include <boost/config.hpp>
#if BOOST_VERSION >= 106600
#include <boost/asio/io_context.hpp>
#else
#include <boost/asio/io_service.hpp>
#endif
#include <boost/asio/steady_timer.hpp>namespace etcd
{class KeepAlive{public:KeepAlive(Client const &client, int ttl, int64_t lease_id = 0);KeepAlive(SyncClient const &client, int ttl, int64_t lease_id = 0);KeepAlive(std::string const & address, int ttl, int64_t lease_id = 0);KeepAlive(Client const &client,std::function<void (std::exception_ptr)> const &handler,int ttl, int64_t lease_id = 0);KeepAlive(std::string const & address,std::string const & username, std::string const & password,int ttl, int64_t lease_id = 0);KeepAlive(KeepAlive const &) = delete;KeepAlive(KeepAlive &&) = delete;int64_t Lease() const { return lease_id; }void Cancel();void Check();template <typename Rep = std::micro>void set_grpc_timeout(std::chrono::duration<Rep> const &timeout) {this->grpc_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout);}std::chrono::microseconds get_grpc_timeout() const {return this->grpc_timeout;}~KeepAlive();protected:void refresh();struct EtcdServerStubs;struct EtcdServerStubsDeleter {void operator()(EtcdServerStubs *stubs);};std::unique_ptr<EtcdServerStubs, EtcdServerStubsDeleter> stubs;private:std::exception_ptr eptr_; std::function<void (std::exception_ptr)> handler_; std::thread task_; int ttl; int64_t lease_id; std::atomic_bool continue_next{true}; std::chrono::microseconds grpc_timeout{0};
#if BOOST_VERSION >= 106600boost::asio::io_context context;
#elseboost::asio::io_service context;
#endifstd::unique_ptr<boost::asio::steady_timer> keepalive_timer_;};
}#endif
3. 给初学者的关键说明
-
核心作用:KeepAlive
是 etcd 租约的 “续命器”。当你为键绑定租约后,需要定期向 etcd 发送续期请求才能维持租约有效,KeepAlive
会自动完成这个过程,避免键被自动删除。
-
使用流程:
etcd::Client client("http://127.0.0.1:2379");
auto keepalive = client.leasekeepalive(30);
int64_t lease_id = keepalive->Lease();
client.set("/test/key", "value", lease_id);
-
关键接口:
- 构造函数:指定租约有效期(TTL),保活器会每 TTL/3 左右自动续期一次。
Lease()
:获取租约 ID,用于将键绑定到该租约。Cancel()
:停止续期(必须调用,否则租约会一直有效)。- 异常回调:保活失败(如网络中断)时触发,可用于错误处理或重连。
pplx::task
1. 模板类 task<_ReturnType>
核心函数与参数表格
函数原型(简化) | 核心参数解释 | 作用 |
---|
构造函数 | | 创建异步任务对象,承载异步逻辑或任务状态 |
task() | 无参数 | 默认构造(空任务),不可直接使用(需赋值后才能调用 wait() /get() ) |
task(_Ty _Param) | _Param :可调用对象(lambda / 函数 / 函数对象),或任务完成事件 | 基于 “异步逻辑” 创建任务(如 lambda 中的耗时操作) |
task(_Ty _Param, const task_options& _TaskOptions) | _Param :同上;_TaskOptions :任务配置(取消令牌、调度器等) | 带配置的任务创建(如指定任务取消规则) |
task(const task& _Other) | _Other :其他 task 对象 | 拷贝构造(任务共享底层状态,类似智能指针) |
task(task&& _Other) | _Other :其他 task 对象(右值) | 移动构造(转移底层状态,避免拷贝开销) |
赋值运算符 | | 替换任务的底层状态 |
task& operator=(const task& _Other) | _Other :其他 task 对象 | 拷贝赋值(共享目标任务状态) |
task& operator=(task&& _Other) | _Other :其他 task 对象(右值) | 移动赋值(转移目标任务状态) |
任务延续(核心) | | 任务完成后自动执行的后续逻辑(避免回调嵌套) |
auto then(_Function&& _Func) const | _Func :延续函数(参数为当前任务结果 / 自身,返回值为新任务结果) | 为当前任务绑定延续任务(如 “任务 A 完成后执行任务 B”) |
auto then(_Function&& _Func, task_options _TaskOptions) const | _Func :同上;_TaskOptions :延续任务配置 | 带配置的延续任务(如指定延续任务的调度器) |
任务等待与结果获取 | | 同步获取任务状态或结果 |
task_status wait() const | 无参数 | 阻塞等待任务完成(返回 completed /canceled ,异常时抛出) |
_ReturnType get() const | 无参数 | 阻塞等待并获取任务结果(任务取消 / 异常时抛出错误) |
任务状态查询 | | 检查任务是否完成 |
bool is_done() const | 无参数 | 判断任务是否进入 “终态”(完成 / 取消 / 异常,返回 true /false ) |
任务调度器相关 | | 管理任务的执行调度器 |
scheduler_ptr scheduler() const | 无参数 | 获取当前任务使用的调度器(控制任务在哪个线程执行) |
任务比较 | | 判断两个任务是否指向同一底层状态 |
bool operator==(const task& _Rhs) const | _Rhs :另一个 task 对象 | 相等则表示共享同一底层任务 |
bool operator!=(const task& _Rhs) const | _Rhs :另一个 task 对象 | |