Python Redis 教程
Redis 是一个开源的内存数据结构存储系统,常用作数据库、缓存和消息代理。本教程将介绍如何在 Python 中使用 Redis。
安装 Python Redis 客户端
pip install redis
连接 Redis
基本连接
import redis# 连接到本地 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)# 测试连接
try:r.ping()print("成功连接到 Redis")
except redis.ConnectionError:print("无法连接到 Redis")
连接池
import redis# 创建连接池
pool = redis.ConnectionPool(host='localhost',port=6379,db=0,max_connections=10,decode_responses=True
)# 使用连接池
r = redis.Redis(connection_pool=pool)
基本数据类型操作
字符串 (String)
# 设置键值对
r.set('name', 'Alice')
r.setex('temp_key', 60, '临时值') # 60秒后过期# 获取值
name = r.get('name')
print(f"Name: {name}")# 批量操作
r.mset({'key1': 'value1', 'key2': 'value2'})
values = r.mget('key1', 'key2')
print(f"Values: {values}")# 自增操作
r.set('counter', 0)
r.incr('counter')
r.incrby('counter', 5)
print(f"Counter: {r.get('counter')}")
列表 (List)
# 添加元素
r.lpush('mylist', 'item1')
r.rpush('mylist', 'item2', 'item3')# 获取元素
first_item = r.lpop('mylist')
last_item = r.rpop('mylist')
all_items = r.lrange('mylist', 0, -1)print(f"First: {first_item}, Last: {last_item}")
print(f"All items: {all_items}")# 列表长度
length = r.llen('mylist')
print(f"List length: {length}")
集合 (Set)
# 添加元素
r.sadd('myset', 'member1', 'member2', 'member3')# 获取所有成员
members = r.smembers('myset')
print(f"Set members: {members}")# 检查成员是否存在
is_member = r.sismember('myset', 'member1')
print(f"Is member1 in set: {is_member}")# 集合运算
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')intersection = r.sinter('set1', 'set2')
union = r.sunion('set1', 'set2')
difference = r.sdiff('set1', 'set2')print(f"Intersection: {intersection}")
print(f"Union: {union}")
print(f"Difference: {difference}")
哈希 (Hash)
# 设置字段值
r.hset('user:1000', mapping={'name': 'John Doe','email': 'john@example.com','age': '30'
})# 获取字段值
name = r.hget('user:1000', 'name')
all_fields = r.hgetall('user:1000')print(f"Name: {name}")
print(f"All fields: {all_fields}")# 批量操作
r.hmset('user:1001', {'name': 'Jane', 'age': '25'})# 删除字段
r.hdel('user:1000', 'age')
有序集合 (Sorted Set)
# 添加带分数的成员
r.zadd('leaderboard', {'player1': 1000,'player2': 1500,'player3': 1200
})# 获取排名
top_players = r.zrevrange('leaderboard', 0, 2, withscores=True)
player_rank = r.zrevrank('leaderboard', 'player1')
player_score = r.zscore('leaderboard', 'player1')print(f"Top players: {top_players}")
print(f"Player1 rank: {player_rank}, score: {player_score}")
高级功能
事务处理
# 使用管道执行事务
pipe = r.pipeline()try:pipe.watch('balance')current_balance = int(pipe.get('balance') or 0)if current_balance >= 100:pipe.multi()pipe.decrby('balance', 100)pipe.incrby('savings', 100)pipe.execute()print("转账成功")else:print("余额不足")except redis.WatchError:print("数据已被修改,操作取消")
finally:pipe.reset()
发布/订阅
import threading
import timedef subscriber():pubsub = r.pubsub()pubsub.subscribe('news')for message in pubsub.listen():if message['type'] == 'message':print(f"收到消息: {message['data']}")# 启动订阅者线程
thread = threading.Thread(target=subscriber)
thread.daemon = True
thread.start()# 发布消息
time.sleep(1) # 等待订阅者准备好
r.publish('news', 'Hello, World!')
r.publish('news', '这是第二条消息')
键操作和过期时间
# 设置过期时间
r.set('session:user123', 'user_data')
r.expire('session:user123', 3600) # 1小时后过期# 检查键是否存在
exists = r.exists('session:user123')
print(f"Key exists: {exists}")# 获取所有键
keys = r.keys('session:*')
print(f"Session keys: {keys}")# 删除键
r.delete('session:user123')
实战示例
缓存示例
import time
import jsondef get_user_data(user_id, use_cache=True):cache_key = f"user:{user_id}"if use_cache:# 尝试从缓存获取cached_data = r.get(cache_key)if cached_data:print("从缓存获取数据")return json.loads(cached_data)# 模拟数据库查询print("从数据库查询数据")time.sleep(2) # 模拟耗时操作user_data = {'id': user_id,'name': f'User {user_id}','email': f'user{user_id}@example.com'}# 存入缓存,设置5分钟过期if use_cache:r.setex(cache_key, 300, json.dumps(user_data))return user_data# 测试缓存
user = get_user_data(1, use_cache=True) # 第一次从数据库获取
print(f"User: {user}")user = get_user_data(1, use_cache=True) # 第二次从缓存获取
print(f"User: {user}")
频率限制器
def rate_limiter(user_id, action, limit=10, window=60):key = f"rate_limit:{user_id}:{action}"# 使用管道确保原子操作pipe = r.pipeline()pipe.incr(key)pipe.expire(key, window)current_count, _ = pipe.execute()if current_count > limit:return False # 超过限制return True # 在限制内# 测试频率限制
for i in range(15):allowed = rate_limiter('user123', 'login', limit=10, window=60)print(f"请求 {i+1}: {'允许' if allowed else '拒绝'}")
会话管理
class SessionManager:def __init__(self, redis_client, prefix='session:'):self.redis = redis_clientself.prefix = prefixdef create_session(self, user_id, data, ttl=3600):session_id = f"{user_id}_{int(time.time())}"key = self.prefix + session_idsession_data = {'user_id': user_id,'created_at': time.time(),'data': data}self.redis.setex(key, ttl, json.dumps(session_data))return session_iddef get_session(self, session_id):key = self.prefix + session_iddata = self.redis.get(key)if data:return json.loads(data)return Nonedef delete_session(self, session_id):key = self.prefix + session_idself.redis.delete(key)# 使用会话管理器
session_mgr = SessionManager(r)# 创建会话
session_id = session_mgr.create_session('user123', {'theme': 'dark', 'language': 'zh'})
print(f"Session ID: {session_id}")# 获取会话
session = session_mgr.get_session(session_id)
print(f"Session data: {session}")# 删除会话
session_mgr.delete_session(session_id)
最佳实践
连接管理: 使用连接池避免频繁创建连接
错误处理: 总是处理连接异常和操作异常
序列化: 复杂对象使用 JSON 序列化存储
键命名: 使用有意义的键名,如
user:123:profile
内存管理: 设置合理的过期时间,避免内存泄漏
监控: 监控 Redis 内存使用情况和性能指标
总结
本教程介绍了 Python 中 Redis 的基本用法,包括:
连接 Redis 服务器
各种数据类型的操作
高级功能如事务、发布订阅
实际应用场景示例