当前位置: 首页 > news >正文

Redis(二)——Redis协议与异步方式

redis协议

redis pipeline机制

对比

  1. 普通的redis交互(非pipeline):
    (1)客户端发送一条命令
    (2)redis服务器执行命令并返回结果
    (3)客户端收到结果后发送下一条命令

  2. pipeline:允许客户端将多条命令一次性发送给服务器,而不是一条一条,redis会依次执行这些命令并将结果一次性返回

    # 普通模式:
    Client → [SET k1 v1] → Server → [OK]
    Client → [SET k2 v2] → Server → [OK]
    ...# Pipeline 模式:
    Client → [SET k1 v1, SET k2 v2, SET k3 v3, ...] → Server
    Server → [OK, OK, OK, ...] → Client
    

优势

  1. 减少了系统调用;
  2. 减少了多次TCP数据包的发送;
  3. 减缓网络延迟;

注意:pipeline不具备事务性

redis事务

事务指的是将一系列数据库操作视为一个完整的逻辑处理单元,要么全部执行,要么全部不执行,是不可分割的工作单元

redis中的事务原语有四个

  1. multi:开启事务,单个命令是入队列操作,知道调用exec才会一起执行,相当于start transaction
  2. exec:提交事务,相当于commit
  3. discard:取消事务,相当于rollback
  4. watch:监听事务,检测key的变动,若在事务执行过程中key变动,则取消事务;

lua脚本

实际开发中,基本不会使用上面提到的四个事务原语,而是使用lua脚本实现原子性(redis内部嵌有lua虚拟机)

  1. eval:执行lua脚本

    1. 格式:EVAL script numkeys key [key ...] arg [arg ...]

      script是lua脚本
      numkeys是传入脚本的key数量
      key…是脚本中可访问的key
      arg…是脚本中可访问的参数

    2. 示例
      在这里插入图片描述

  1. evalsha:根据SHA1校验码执行缓存的lua脚本,避免重复传入脚本

    1. 格式script load s; evalsha sha numkeys ...
      首先通过script load加载脚本,这条命令会返回一个sha校验码,后面的evalsha命令就通过加载这个校验码来执行命令,后面的numkeys、key、arg和eval相同
    2. 示例
      在这里插入图片描述

实际项目redis中先将lua段批量编译后,用unordered_map缓存在内存中,未来有相关业务需求调用evalsha 指令执行相关业务

在这里插入图片描述

使用lua脚本时的acid特性

  1. 原子性(atomicity),完全支持
    1. redis保证lua脚本的执行是原子的;
    2. 整个脚本要么全部执行,要么玩全部执行;
    3. 在执行过程中不会背其他命令打断;
    4. 如果脚本运行过程中出错,已经执行的命令依然生效,但是不会回滚;
  2. 一致性(consistency),通常能保持一致性
    1. lua脚本在逻辑上执行原子命令序列,所以不会出现中间状态被外界读取的情况,如果脚本逻辑正确,redis会保证一致性;
    2. redis不会自动帮忙做数据约束,一致性由脚本逻辑本身保证;
  3. 隔离性(isolation),完全隔离
    1. redis是单线程执行,所有命令和lua脚本都在同一个事件循环中按顺序执行;
    2. lua脚本执行期间,其他客户端命令必须等待,不存在并发访问;
  4. 持久性(durability),取决于redis的持久化策略
    1. redis是内存数据库,所以持久性由redis配置决定

发布订阅

发布者使用publish向某个频道发送消息,订阅者使用subscribepsubscribe订阅一个或多个频道

命令

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

注意:发布订阅模式中的消息是不会持久化的,所以当redis停机重启,所以没有接收到的消息也就不会接收到了,会直接丢弃;

使用c/c++与redis通信

同步

相关结构体

  1. redisContext,表示与redis服务器的一个连接的上下文

    typedef struct redisContext {int err;                // 错误码(0 表示无错误)char errstr[128];       // 错误字符串int fd;                 // 套接字文件描述符int flags;              // 标志位,连接状态char *obuf;             // 输出缓冲区(待发送的数据)redisReader *reader;    // hiredis 内部使用的协议解析器// ... 省略内部字段
    } redisContext;
    
  2. redisReply,redis服务器返回的响应结构体,代表命令执行结果

    typedef struct redisReply {int type;               // 回复类型(REDIS_REPLY_*)long long integer;      // 整数结果(如 INCR 返回)size_t len;             // 字符串长度char *str;              // 字符串结果size_t elements;        // 数组元素个数(如 MGET)struct redisReply **element; // 数组元素列表
    } redisReply;
    

    常见的回复类型(type):
    REDIS_REPLY_STATUS,表示字符串状态,如OK
    REDIS_REPLY_ERROR,表示错误信息,如ERR wrong type
    REDIS_REPLY_INTEGER,表示整数返回值,如1
    REDIS_REPLY_STRING,表示普通字符串,如hello
    REDIS_REPLY_ARRAY,表示数组
    REDIS_REPLY_NIL,表示空值

相关函数

  1. 建立TCP连接,redisConnect

    redisContext *redisConnect(const char *ip, int port);
    

    返回redisContext*,如果连接失败,返回值的err字段非零,errstr中有错误信息

  2. 发送命令并获取相应

    1. 直接传字符串

      redisReply *redisCommand(redisContext *c, const char *format, ...);
      

      例如redisReply *reply = (redisReply*)redisCommand(c, "SET %s %s", "foo", "bar");

    2. 传参数数组

      redisReply *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen);
      
    3. 每次执行完后都需要释放reply的内存

      void freeReplyObject(void *reply);
      
  3. 关闭连接void redisFree(redisContext *c);

示例

#include <iostream>extern "C" {
#include <hiredis/hiredis.h>
}int main() {redisContext *c;redisReply *reply;c = redisConnect("127.0.0.1", 6379);// timeval timeout = { 1, 500000 };// c = redisConnectWithTimeout("127.0.0.1", 9999, timeout);if(c==nullptr || c->err) {if(!c) {printf("Connection error: can't allocate redis context\n");} else if(c->err) {printf("Connection error: %s\n", c->errstr);redisFree(c);}}int num = 1000;for(int i=0; i<num; ++i) {reply = (redisReply *)redisCommand(c, "incr counter");printf("counter: %lld\n", reply->integer);freeReplyObject(reply);}redisFree(c);return 0;
}

异步

主要结构体

  1. redisAsyncContext,定义在async.h中,是异步通信的核心结构体

    typedef struct redisAsyncContext {redisContext c;              // 基础同步上下文(包含socket、连接状态等)int err;                     // 错误码char errstr[128];            // 错误描述/* 回调函数集合 */struct {redisCallbackList replies;   // 正常命令回复的回调队列redisCallbackList invalid;   // 已经无效的回调(例如断开连接后)} callbacks;/* 连接事件回调 */void (*connectCallback)(const redisAsyncContext*, int status);void (*disconnectCallback)(const redisAsyncContext*, int status);/* 用户数据指针(常用于注册reactor) */void *data;/* 内部状态 */int flags;int refcount;/* 事件驱动相关接口 */void *ev;                       // 指向外部事件库的对象(例如libevent)void (*addRead)(void *privdata);void (*delRead)(void *privdata);void (*addWrite)(void *privdata);void (*delWrite)(void *privdata);void (*cleanup)(void *privdata);void *dataPriv;                 // 事件驱动库的私有数据
    } redisAsyncContext;

主要函数

  1. redisAsyncConnect,建立与redis的异步连接,返回一个redisAsyncContext*,该函数不会立即完成TCP握手,当真正建立连接之后会触发connectCallback

    redisAsyncContext *redisAsyncConnect(const char *ip, int port);
    
  2. redisAsyncConnectCallback,设置连接成功时的回调函数fn

    int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
    
  3. redisAsyncDisconnectCallback,设置连接断开时的回调函数

    int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
    
  4. redisAsyncCommand,向redis发送命令,但是不会等待执行结果,而是将命令写入到发送缓冲区中,由事件循环(epoll)检测到可写事件后真正发送,收到响应后调用回调函数fn

    int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);
    
  5. redisAsyncHandleRead/redisAsyncHandleWrite,当socket可读/写时,由事件驱动调用handle函数,handle内部读取数据、解析响应、执行回调

    void redisAsyncHandleRead(redisAsyncContext *ac);
    void redisAsyncHandleWrite(redisAsyncContext *ac);
    
  6. 自定义的attach函数,用于将hiredis异步上下文与事件循环绑定,设置addReadaddWrite等函数指针

异步通信的执行时序

redisAsyncConnect()                 // 建立异步连接
redisAttach(ctx, reactor)           // 绑定事件循环
redisAsyncSetConnectCallback()      // 设置连接回调
redisAsyncSetDisconnectCallback()   // 设置断开回调
reactor_add_write()                 // 等待连接可写
↓
handle redis write → connect ok → connectCallback()
↓
redisAsyncCommand()                 // 发送命令(仅写入缓冲区)
redis add write                     // 注册写事件
↓
handle redis write → 实际发送
↓
redis add read                      // 注册读事件
↓
handle redis read → redisAsyncHandleRead() → 执行命令回调

示例

主函数中

// 建立异步连接
redisAsyncContext *ac = redisAsyncConnect("127.0.0.1", 6379);
if(ac->err) {printf("Error: %s\n", ac->errstr);return 1;
}// 创建事件循环
R = create_reactor();
// 绑定事件循环与redis异步上下文
redisAttach(ac, R);// 设置连接成功/断开连接的回调函数
// 两个回调函数我并没有实现什么内容
redisAsyncSetConnectCallback(ac, connectCallback);
redisAsyncSetDisconnectCallback(ac, disconnectCallback);num = 1000;
for(int i=0; i<num; ++i) {redisAsyncCommand(ac, getCallback, (void *)"count", "INCR counter");
}eventloop(R);

attach函数

static int redisAttach(redisAsyncContext *ac, reactor_t *r) {redisContext *rc = &(ac->c);redis_event_t *re;if(ac->ev.data != nullptr) return REDIS_ERR;re = (redis_event_t *)hi_malloc(sizeof(redis_event_t));if(!re) return REDIS_ERR;// redis_event_t是自己定义的结构体,用于将redis上下文与自己的事件循环建立关系re->ctx = ac;re->e.fd = rc->fd;re->e.r = r;re->e.in = nullptr;re->e.out = nullptr;// ac中的回调函数ac->ev.addRead = redisAddRead; // addRead中主要设置读回调函数,并将读事件注册到epoll中ac->ev.addWrite = redisAddWrite; // addWrite同理ac->ev.cleanup = redisCleanUp;ac->ev.delRead = redisDelRead;ac->ev.delWrite = redisDelWrite;// data是传给read、write一系列函数的参数ac->ev.data = re;return REDIS_OK;
}

addRead函数示例

// add read,将读事件加入到epoll中,顺便给事件绑定读回调函数,这是event_t设计的缘故
static void redisAddRead(void *privdata) {redis_event_t *re = (redis_event_t *)privdata;re->e.read_fn = redisReadHandler;redisEventUpdate(re, EPOLLIN, 0); // 将事件加入到epoll中
}// 这是event_t绑定的读回调事件,是epoll中监听到读事件后调用的
static void redisReadHandler(int fd, int events, void *privdata) {// 前两行的作用是防止未使用参数警告(void)fd;(void)events;event_t *e = (event_t *)privdata;redis_event_t *re = (redis_event_t *)(char *)e;redisAsyncHandleRead(re->ctx);
}

性能对比

差距明显

在这里插入图片描述

在这里插入图片描述

http://www.dtcms.com/a/457478.html

相关文章:

  • Lua迭代器与泛型for
  • 国外网站后台模板下载seo在线优化排名
  • 三亚中国检科院生物安全中心门户网站建设溧阳网站优化
  • 做的好的阅读类的网站有哪些免费素材app
  • dw怎么做打开网站跳出提示中国交建总承包公司官网
  • 12306网站开始是谁开发的雅思培训机构哪家好机构排名
  • 鸿蒙实现滴滴出行项目之线路规划图
  • 虚幻引擎5 GAS开发俯视角RPG游戏 P05-04 使用效果应用游戏标签
  • 浙江网站怎么做推广合肥企业网站
  • 广东华电建设股份有限公司网站网站 别名
  • 郑州微网站制作ui设计师作品集网站
  • 中GETS与Sc#ANF的深度与应用技巧
  • 大连市开发区网站建设公司备案号被取消 没有重新备案网站会被关闭吗
  • 【已解决】WPS反复报错“我们遇到了一个无法恢复的问题”的解决方法
  • Go语言入门(22)-通道 channel
  • 做期货看那个网站比较专业网站内容方案
  • 网站快速建设软件下载wordpress图片博客插件
  • 52Hz——STM32单片机学习记录——定时器
  • PID--微分项D
  • 如何配置 GitHub 远程仓库及本地 Git 环境
  • 旋转矩阵的推导+矩阵在3DGS中的应用
  • 泰山派无 eMMC 版:嘉立创 Linux 镜像 “大 SD 卡资源浪费” 问题解析与解决
  • 人物摄影网站济宁网站建设优惠
  • WebClient工具调用HTTP接口报错远程主机断开连接
  • 【C语言基础详细版】09. 文件操作完全指南:从基础到高级应用
  • 卡盟网站专用主机批量建wordpress
  • Java高并发常见架构、处理方式、api调优
  • 基于 Delphi 与 ICS 的 Mosquitto Broker 重构实现:架构创新与技术比较分析
  • rag的评估优化应用前景
  • 1.2 openEuler - 安装OpenStack云计算平台基础框架