Redis中间件(二):Redis协议与异步方式
文章目录
- 一、什么是redis事务?
- 1. redis pipeline机制
- 2. redis事务
- 事务原语(MULTI | EXEC | DISCARD | WATCH)
- 乐观锁和悲观锁
- 事务原语的应用
- 3. lua脚本
- 4. ACID特性分析(分析redis事务)
- A: 原子性
- C: 一致性
- I: 隔离性
- D: 持久性
- 5. 发布订阅
- 二、实现server端与redis通信
- 1. 同步连接 (用redis默认驱动)
- 2. 异步连接 (基于当前网络模块实现驱动)
一、什么是redis事务?
1. redis pipeline机制
- redis pipline是一个客户端提供的机制,不是服务端提供;通过一次发送多次请求命令,从而减少网络传输的时间。
- 注意:pipeline不具备事务性;事务性是指一组操作要么全部成功执行,要么全部失败回滚,中间不能只执行一部分。这种特性通常用于数据库或数据操作中,保证数据的一致性和可靠性,
2. redis事务
事务是指 用户定于一系列数据库操作,这些操作视为一个完整的逻辑处理单元,要么全部执行,要么全部不执行,是不可分割的工作单元。
+多核情况 探讨原子操作(避免其他线程执行)
+多条并发连接 探讨事务
事务原语(MULTI | EXEC | DISCARD | WATCH)
- MULTI 开启事务 (begin / strat transaction)
MULTI开启事务,事务执行过程中,单个命令是入队列操作,直到
调用EXEC才会一起执行;乐观锁实现,所以失败需要重试; - EXEC 提交事务 (commit)
提交事务 - DISCARD 取消事务 (rollback)
取消事务 - WATCH 监听事务
检测key的变动,若在事务执行中,key变动则取消事务;在事务开启前调用,乐观锁(cas)实现;若被取消则事务返回nil
乐观锁和悲观锁
乐观锁(redis):
“我假设你不会碰我操作的变量,冲突很少,出问题我再处理。”
悲观锁(mysql):
“我假设你一定会碰,所以我提前上锁,防止你碰。”
对比项 | 乐观锁(Optimistic Lock) | 悲观锁(Pessimistic Lock) |
---|---|---|
核心假设 | 不会发生冲突 | 一定会发生冲突 |
操作方式 | 不加锁,操作前后做数据校验 | 操作前加锁,阻塞其他操作 |
并发性能 | 性能好,冲突少时效率高 | 并发低,阻塞多时效率低 |
实现方式 | 版本号(version)、时间戳等 | 数据库 select … for update、行锁等 |
失败处理 | 操作失败后重试 | 先加锁,保证别人不能动 |
典型场景 | 数据读多写少,如缓存、排行榜更新 | 数据写多读多,如账户转账、订单处理 |
事务原语的应用
【1. 事务实现 zpop】
WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC
-----------------------------------
【2. 事务实现 加倍操作】
WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC
实操
3. lua脚本
redis内部嵌有lua虚拟机,实际开发不用四个事务,而是用lua脚本实现原子性。
【1.调用方式】
redis.call("命令",key1,key2,...,arg1,arg2,...)//call:命令失败抛出异常
//pcall:命令失败不抛异常,返回错误对象
--------------------------------------
【2.EVALSHA 来代替 EVAL】
//EVAL是Redis执行一段Lua脚本的命令,支持多key、多参数传入,原子执行。
//使用EVALSHA来代替EVAL时,相当于只传递一个hash(SHA1)值到Redis,而不传整段Lua脚本:+ 网络传输更小
+ 安全(不暴露源码)
+ 快速查找(Redis 内部用哈希表管理脚本)
实际项目redis中先将lua段批量编译后,用unordered_map缓存在内存中,未来有相关业务需求调用evalsha 指令执行相关业务。
4. ACID特性分析(分析redis事务)
lua脚本满足原子性和隔离性, 一致性和持久性不满足
A: 原子性
事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis不支持回滚;
即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直到将事务队列中的所有命令都执行完毕为止。
C: 一致性
执行事务后,数据从一个一致状态过渡到另一个一致状态(满足约束、逻辑正确); 一个是逻辑上一致性,一个是数据库一致性(完整约束)
I: 隔离性
各个事务之间互相影响的程度;redis是单线程执行,天然具备隔离性;
D: 持久性
一旦事务执行成功,结果会被永久保存,即使系统崩溃也能恢复;
redis只有在aof持久化策略时候,才具备持久性,实际项目中几乎不会使用aof 持久化策略;
5. 发布订阅
消息推送、实时通信、微服务解耦、事件驱动等场景。
(1)工作原理
发布者 使用 PUBLISH 向某个频道发送消息;
订阅者 使用 SUBSCRIBE、PSUBSCRIBE 订阅一个或多个频道;
一旦有新消息,所有订阅者会立即收到消息;是即时通信模型,不做持久化,也不记录历史消息。
(2)命令
# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容
发布订阅功能一般要区别命令连接重新开启一个连接。因为命令连接严格遵循请求回应模式,而 pubsub 能收到 redis 主动推送的内容;所以实际项目中如果支持 pubsub 的话,需要另开一条连接 用于处理发布订阅;
注意:redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;
二、实现server端与redis通信
1.先适配事件对象
2.适配事件函数,才可以使用redis接口.
1. 同步连接 (用redis默认驱动)
先引入lhiredis库文件【01:20:00】,再进行连接操作。
2. 异步连接 (基于当前网络模块实现驱动)
(1)思想
hiredis 异步客户端接入自定义的 reactor 事件驱动系统的适配器,核心作用是桥接Redis的异步事件自定义事件循环机制,实现了一套hiredis的IO多路复用抽象接口。
(2) 源码
hiredis实现了:协议解析、读写事件、缓冲区操作、协议加密
我们适配的文件实现了:适配事件对象 以及 事件操作函数
static int redisAttach(reactor_t *r, redisAsyncContext *ac) {redisContext *c = &(ac->c);redis_event_t *re;/* Nothing should be attached when something is already attached */if (ac->ev.data != NULL)return REDIS_ERR;/* Create container for ctx and r/w events */re = (redis_event_t*)hi_malloc(sizeof(*re));if (re == NULL)return REDIS_ERR;re->ctx = ac;re->e.fd = c->fd;re->e.r = r;// dont use event buffer, using hiredis's bufferre->e.in = NULL;re->e.out = NULL;re->mask = 0;//这些是 hiredis 要求你实现的“注册函数”。每当 Redis 需要监听某个 FD 的读/写事件,就会调用这些函数ac->ev.addRead = redisAddRead;ac->ev.delRead = redisDelRead;ac->ev.addRead = redisAddWrite;ac->ev.delWrite = redisDelWrite;ac->ev.cleanup = redisCleanup;//清理函数必不可少ac->ev.data = re;return REDIS_OK;
}int main() {// 1. 创建 event loopreactor_t *r = reactor_create();// 2. 连接 Redis 异步客户端redisAsyncContext *ac = redisAsyncConnect("127.0.0.1", 6379);if (ac->err) {printf("Redis error: %s\n", ac->errstr);return -1;}// 3. 接入自己的 reactorif (redisAttach(r, ac) != REDIS_OK) {printf("Failed to attach redis context to reactor\n");return -1;}// 4. 设置连接成功/断开回调(可选)redisAsyncSetConnectCallback(ac, connectCallback);redisAsyncSetDisconnectCallback(ac, disconnectCallback);// 5. 发送异步命令redisAsyncCommand(ac, commandCallback, NULL, "SET foo bar");// 6. 启动事件循环reactor_run(r);return 0;
}-----------------------------------------
【1】redisAttach(reactor_t *r, redisAsyncContext *ac)+ 1.创建一个redis_event_t对象(包含 event_t 和 Redis 上下文);
+ 2.设置该对象的addRead、delRead、addWrite、delWrite、cleanup函数;
+ 3.将这些函数注册进redisAsyncContext的 ev成员;【2】ac->ev.addRead = redisAddRead;ac->ev.addRead = redisAddWrite;
这两个去调用redisReadHandler/redisWriteHandler,去使用hiredis提供的读写处理【3】redisEventUpdate
这是事件变化的调度器,核心逻辑是根据 mask 来:+ 新增事件:调用 add_event()
+ 删除事件:调用 del_event()
+ 修改事件:调用 enable_event() 切换读写状态
异步连接具体实现 reactor.h
- 事件对象的封装:struct event_s
创建事件对象: new_event( )
事件操作函数:add_event| del_event| enable_event来适配实现4个api
异步和同步连接redis性能测试【01:35:00】
异步连接性能高,适合业务开发,不会阻塞业务逻辑线程,抛给数据库进行检测,执行回调函数。
redis-test-async.c
redis-test-sync.c
优秀笔记:
1. Redis协议与异步方式
2. Redis协议与异步方式(二)
参考学习:https://github.com/0voice