Redis 是一款高性能的键值对存储数据库,支持丰富的数据结构和高级特性。下面结合 Python SDK(redis-py
库),分别介绍其通用用法和高级用法,并附具体代码示例。
一、通用用法(基础数据结构操作)
Redis 提供了 5 种核心数据结构,适用于大多数日常场景(如缓存、计数器、简单队列等)。
1. 字符串(String)
最基础的键值对类型,适用于存储文本、数字等,支持原子操作。
import redis
# 连接 Redis(默认 host=localhost, port=6379, db=0)
r = redis.Redis(decode_responses=True) # decode_responses=True 自动将 bytes 转为字符串
# 设置键值(ex 为过期时间,单位秒)
r.set("name", "Alice", ex=3600) # 1小时后过期
r.set("age", 25)
# 获取值
print(r.get("name")) # 输出: Alice
print(r.get("age")) # 输出: 25
# 原子递增/递减(适用于计数器)
r.incr("age") # age 变为 26
r.decr("age") # age 变回 25
2. 哈希(Hash)
适用于存储对象(如用户信息、商品属性),键值对的嵌套结构。
# 存储用户信息(hset 支持映射或键值对)
r.hset("user:100", mapping={
"name": "Bob",
"age": 30,
"city": "Shanghai"
})
# 获取单个字段
print(r.hget("user:100", "name")) # 输出: Bob
# 获取所有字段和值
print(r.hgetall("user:100")) # 输出: {'name': 'Bob', 'age': '30', 'city': 'Shanghai'}
# 检查字段是否存在
print(r.hexists("user:100", "email")) # 输出: False
3. 列表(List)
有序可重复的元素集合,适用于实现队列、栈、最新消息列表等。
# 左侧插入元素(lpush 左进,rpush 右进)
r.lpush("tasks", "task1", "task2") # 列表变为: [task2, task1]
# 右侧插入元素
r.rpush("tasks", "task3") # 列表变为: [task2, task1, task3]
# 获取指定范围元素(0 为开头,-1 为结尾)
print(r.lrange("tasks", 0, -1)) # 输出: ['task2', 'task1', 'task3']
# 弹出元素(lpop 左出,rpop 右出)
print(r.lpop("tasks")) # 输出: task2(列表剩余: [task1, task3])
4. 集合(Set)
无序不可重复的元素集合,适用于去重、交集/并集计算(如共同好友)。
# 添加元素
r.sadd("tags", "python", "redis", "database")
# 获取所有元素
print(r.smembers("tags")) # 输出: {'python', 'redis', 'database'}(无序)
# 检查元素是否存在
print(r.sismember("tags", "java")) # 输出: False
# 交集计算(如用户1和用户2的共同关注)
r.sadd("user:1:follows", "A", "B", "C")
r.sadd("user:2:follows", "B", "C", "D")
print(r.sinter("user:1:follows", "user:2:follows")) # 输出: {'B', 'C'}
5. 有序集合(Sorted Set)
有序且不可重复的元素集合,每个元素关联一个分数(score),适用于排行榜、优先级队列等。
# 添加元素(值 + 分数)
r.zadd("leaderboard", {
"user1": 90,
"user2": 100,
"user3": 80
})
# 按分数升序获取元素(0 为最低分,-1 为最高分)
print(r.zrange("leaderboard", 0, -1, withscores=True))
# 输出: [('user3', 80.0), ('user1', 90.0), ('user2', 100.0)]
# 按分数降序获取 Top 2
print(r.zrevrange("leaderboard", 0, 1, withscores=True))
# 输出: [('user2', 100.0), ('user1', 90.0)]
# 统计分数在 80-100 之间的元素数量
print(r.zcount("leaderboard", 80, 100)) # 输出: 3
二、高级用法(进阶特性)
Redis 提供了多种高级特性,适用于性能优化、复杂业务场景(如分布式锁、消息队列、实时统计等)。
1. 事务(Transaction)
通过 MULTI
/EXEC
保证一系列操作的原子性(要么全执行,要么全不执行)。
# 示例:转账操作(A 减 100,B 加 100,需原子执行)
with r.pipeline() as pipe: # pipeline 用于事务和批量操作
try:
# 监听键,若期间被修改则事务中断
pipe.watch("balance:A", "balance:B")
# 开始事务
pipe.multi()
pipe.decrby("balance:A", 100)
pipe.incrby("balance:B", 100)
# 执行事务
pipe.execute()
print("转账成功")
except redis.WatchError:
# 键被修改,事务失败
print("转账失败,数据已变更")
2. 管道(Pipeline)
批量发送命令,减少网络往返次数,提升性能(事务依赖管道实现)。
# 批量执行多个命令
with r.pipeline() as pipe:
# 链式调用多个命令
pipe.set("a", 1).set("b", 2).get("a").get("b")
# 一次性执行,返回结果列表
results = pipe.execute()
print(results) # 输出: [True, True, '1', '2']
3. 发布订阅(Pub/Sub)
实现消息通信模式(生产者发布消息,消费者订阅频道接收消息)。
# 订阅者(持续监听消息)
def subscriber():
pubsub = r.pubsub()
pubsub.subscribe("news") # 订阅 "news" 频道
for message in pubsub.listen():
# 过滤非消息类型(如订阅确认)
if message["type"] == "message":
print(f"收到消息: {message['data']}")
# 发布者(发送消息)
def publisher():
r.publish("news", "Redis 发布订阅示例") # 向 "news" 频道发布消息
# 实际使用时需多线程/进程运行(订阅者阻塞)
import threading
threading.Thread(target=subscriber, daemon=True).start()
publisher() # 输出: 收到消息: Redis 发布订阅示例
4. Lua 脚本
通过 Lua 脚本执行复杂逻辑,保证原子性(脚本内的操作不会被其他命令中断)。
# 示例:判断键是否存在,存在则返回值,否则设置默认值
script = """
local key = KEYS[1]
local default = ARGV[1]
if redis.call('exists', key) == 1 then
return redis.call('get', key)
else
redis.call('set', key, default)
return default
end
"""
# 执行脚本(1 个键,1 个参数)
result = r.eval(script, 1, "config", "default_value")
print(result) # 首次执行输出: default_value(键不存在时)
5. Stream(消息队列)
比 Pub/Sub 更强大的消息队列,支持消息持久化、消费组(避免重复消费)。
# 生产者:添加消息到 Stream(id="*" 表示自动生成ID)
r.xadd("order_stream", {"order_id": "1001", "user": "Alice"}, id="*")
r.xadd("order_stream", {"order_id": "1002", "user": "Bob"}, id="*")
# 消费者组:创建消费组(首次需初始化)
r.xgroup_create("order_stream", "group1", id="$") # "$" 表示从最新消息开始消费
# 消费者:从组内获取消息(count=2 表示最多2条)
messages = r.xreadgroup(
groupname="group1",
consumername="consumer1",
streams={"order_stream": ">"}, # ">" 表示未被消费的消息
count=2
)
print(messages) # 输出消息列表
# 确认消息已处理(避免重复消费)
for msg_id in [msg[0] for msg in messages[0][1]]:
r.xack("order_stream", "group1", msg_id)
6. 地理空间(GEO)
存储地理位置信息,支持距离计算、附近点查询(如“附近的门店”)。
# 添加地理位置(经度, 纬度, 名称)
r.geoadd("cities",
(116.4074, 39.9042, "beijing"), # 北京
(121.4737, 31.2304, "shanghai") # 上海
)
# 计算两地距离(单位 km)
distance = r.geodist("cities", "beijing", "shanghai", unit="km")
print(f"北京到上海距离: {distance:.2f} km") # 输出约 1318.39 km
# 查询指定坐标附近的点(半径 2000 km,最多 10 个)
nearby = r.georadius("cities", 116.4074, 39.9042, 2000, "km", count=10, withdist=True)
print(nearby) # 输出: [('beijing', 0.0), ('shanghai', 1318.3933)]
7. 位图(Bitmap)
用位存储数据,适用于高效统计(如用户签到、活跃用户计数)。
# 记录用户签到(用户ID=10086 在第 5 天签到)
r.setbit("user:sign:10086", 5, 1) # 第 5 位设为 1(从 0 开始)
# 检查用户是否在第 5 天签到
print(r.getbit("user:sign:10086", 5)) # 输出: 1
# 统计用户总签到天数(位为 1 的数量)
print(r.bitcount("user:sign:10086")) # 输出: 1
8. HyperLogLog
用于超大数据量的基数统计(如独立访客 UV 计数),占用内存极小。
# 添加访问用户
r.pfadd("uv:20231001", "user1", "user2", "user3")
r.pfadd("uv:20231001", "user3", "user4", "user5") # 去重统计
# 估算独立用户数(误差约 0.8%)
print(r.pfcount("uv:20231001")) # 输出: 5
# 合并多天的 UV 统计
r.pfmerge("uv:202310", "uv:20231001", "uv:20231002")
总结
- 通用用法:基于 5 种核心数据结构,满足缓存、计数、简单队列等基础需求。
- 高级用法:通过事务、管道、Lua 脚本等特性,解决分布式场景、性能优化、复杂业务逻辑等问题。
实际开发中,需根据场景选择合适的用法,例如:缓存用 String/Hash,排行榜用 Sorted Set,消息队列用 Stream,独立统计用 HyperLogLog 等。