Redis使用简明教程

发布于:2025-08-30 ⋅ 阅读:(17) ⋅ 点赞:(0)

Redis 是一款高性能的键值对存储数据库,支持丰富的数据结构和高级特性。下面结合 Python SDK(redis-py 库),分别介绍其通用用法和高级用法,并附具体代码示例。

一、通用用法(基础数据结构操作)

Redis 提供了 5 种核心数据结构,适用于大多数日常场景(如缓存、计数器、简单队列等)。

1. 字符串(String)

最基础的键值对类型,适用于存储文本、数字等,支持原子操作。

import redis

# 连接 Redis(默认 host=localhost, port=6379, db=0)
r = redis.Redis(decode_responses=True)  # decode_responses=True 自动将 bytes 转为字符串

# 设置键值(ex 为过期时间,单位秒)
r.set("name", "Alice", ex=3600)  # 1小时后过期
r.set("age", 25)

# 获取值
print(r.get("name"))  # 输出: Alice
print(r.get("age"))   # 输出: 25

# 原子递增/递减(适用于计数器)
r.incr("age")  # age 变为 26
r.decr("age")  # age 变回 25
2. 哈希(Hash)

适用于存储对象(如用户信息、商品属性),键值对的嵌套结构。

# 存储用户信息(hset 支持映射或键值对)
r.hset("user:100", mapping={
    "name": "Bob",
    "age": 30,
    "city": "Shanghai"
})

# 获取单个字段
print(r.hget("user:100", "name"))  # 输出: Bob

# 获取所有字段和值
print(r.hgetall("user:100"))  # 输出: {'name': 'Bob', 'age': '30', 'city': 'Shanghai'}

# 检查字段是否存在
print(r.hexists("user:100", "email"))  # 输出: False
3. 列表(List)

有序可重复的元素集合,适用于实现队列、栈、最新消息列表等。

# 左侧插入元素(lpush 左进,rpush 右进)
r.lpush("tasks", "task1", "task2")  # 列表变为: [task2, task1]

# 右侧插入元素
r.rpush("tasks", "task3")  # 列表变为: [task2, task1, task3]

# 获取指定范围元素(0 为开头,-1 为结尾)
print(r.lrange("tasks", 0, -1))  # 输出: ['task2', 'task1', 'task3']

# 弹出元素(lpop 左出,rpop 右出)
print(r.lpop("tasks"))  # 输出: task2(列表剩余: [task1, task3])
4. 集合(Set)

无序不可重复的元素集合,适用于去重、交集/并集计算(如共同好友)。

# 添加元素
r.sadd("tags", "python", "redis", "database")

# 获取所有元素
print(r.smembers("tags"))  # 输出: {'python', 'redis', 'database'}(无序)

# 检查元素是否存在
print(r.sismember("tags", "java"))  # 输出: False

# 交集计算(如用户1和用户2的共同关注)
r.sadd("user:1:follows", "A", "B", "C")
r.sadd("user:2:follows", "B", "C", "D")
print(r.sinter("user:1:follows", "user:2:follows"))  # 输出: {'B', 'C'}
5. 有序集合(Sorted Set)

有序且不可重复的元素集合,每个元素关联一个分数(score),适用于排行榜、优先级队列等。

# 添加元素(值 + 分数)
r.zadd("leaderboard", {
    "user1": 90,
    "user2": 100,
    "user3": 80
})

# 按分数升序获取元素(0 为最低分,-1 为最高分)
print(r.zrange("leaderboard", 0, -1, withscores=True))  
# 输出: [('user3', 80.0), ('user1', 90.0), ('user2', 100.0)]

# 按分数降序获取 Top 2
print(r.zrevrange("leaderboard", 0, 1, withscores=True))  
# 输出: [('user2', 100.0), ('user1', 90.0)]

# 统计分数在 80-100 之间的元素数量
print(r.zcount("leaderboard", 80, 100))  # 输出: 3

二、高级用法(进阶特性)

Redis 提供了多种高级特性,适用于性能优化、复杂业务场景(如分布式锁、消息队列、实时统计等)。

1. 事务(Transaction)

通过 MULTI/EXEC 保证一系列操作的原子性(要么全执行,要么全不执行)。

# 示例:转账操作(A 减 100,B 加 100,需原子执行)
with r.pipeline() as pipe:  # pipeline 用于事务和批量操作
    try:
        # 监听键,若期间被修改则事务中断
        pipe.watch("balance:A", "balance:B")
        
        # 开始事务
        pipe.multi()
        pipe.decrby("balance:A", 100)
        pipe.incrby("balance:B", 100)
        
        # 执行事务
        pipe.execute()
        print("转账成功")
    except redis.WatchError:
        # 键被修改,事务失败
        print("转账失败,数据已变更")
2. 管道(Pipeline)

批量发送命令,减少网络往返次数,提升性能(事务依赖管道实现)。

# 批量执行多个命令
with r.pipeline() as pipe:
    # 链式调用多个命令
    pipe.set("a", 1).set("b", 2).get("a").get("b")
    # 一次性执行,返回结果列表
    results = pipe.execute()
    print(results)  # 输出: [True, True, '1', '2']
3. 发布订阅(Pub/Sub)

实现消息通信模式(生产者发布消息,消费者订阅频道接收消息)。

# 订阅者(持续监听消息)
def subscriber():
    pubsub = r.pubsub()
    pubsub.subscribe("news")  # 订阅 "news" 频道
    
    for message in pubsub.listen():
        # 过滤非消息类型(如订阅确认)
        if message["type"] == "message":
            print(f"收到消息: {message['data']}")

# 发布者(发送消息)
def publisher():
    r.publish("news", "Redis 发布订阅示例")  # 向 "news" 频道发布消息

# 实际使用时需多线程/进程运行(订阅者阻塞)
import threading
threading.Thread(target=subscriber, daemon=True).start()
publisher()  # 输出: 收到消息: Redis 发布订阅示例
4. Lua 脚本

通过 Lua 脚本执行复杂逻辑,保证原子性(脚本内的操作不会被其他命令中断)。

# 示例:判断键是否存在,存在则返回值,否则设置默认值
script = """
local key = KEYS[1]
local default = ARGV[1]
if redis.call('exists', key) == 1 then
    return redis.call('get', key)
else
    redis.call('set', key, default)
    return default
end
"""

# 执行脚本(1 个键,1 个参数)
result = r.eval(script, 1, "config", "default_value")
print(result)  # 首次执行输出: default_value(键不存在时)
5. Stream(消息队列)

比 Pub/Sub 更强大的消息队列,支持消息持久化、消费组(避免重复消费)。

# 生产者:添加消息到 Stream(id="*" 表示自动生成ID)
r.xadd("order_stream", {"order_id": "1001", "user": "Alice"}, id="*")
r.xadd("order_stream", {"order_id": "1002", "user": "Bob"}, id="*")

# 消费者组:创建消费组(首次需初始化)
r.xgroup_create("order_stream", "group1", id="$")  # "$" 表示从最新消息开始消费

# 消费者:从组内获取消息(count=2 表示最多2条)
messages = r.xreadgroup(
    groupname="group1",
    consumername="consumer1",
    streams={"order_stream": ">"},  # ">" 表示未被消费的消息
    count=2
)
print(messages)  # 输出消息列表

# 确认消息已处理(避免重复消费)
for msg_id in [msg[0] for msg in messages[0][1]]:
    r.xack("order_stream", "group1", msg_id)
6. 地理空间(GEO)

存储地理位置信息,支持距离计算、附近点查询(如“附近的门店”)。

# 添加地理位置(经度, 纬度, 名称)
r.geoadd("cities", 
    (116.4074, 39.9042, "beijing"),  # 北京
    (121.4737, 31.2304, "shanghai")  # 上海
)

# 计算两地距离(单位 km)
distance = r.geodist("cities", "beijing", "shanghai", unit="km")
print(f"北京到上海距离: {distance:.2f} km")  # 输出约 1318.39 km

# 查询指定坐标附近的点(半径 2000 km,最多 10 个)
nearby = r.georadius("cities", 116.4074, 39.9042, 2000, "km", count=10, withdist=True)
print(nearby)  # 输出: [('beijing', 0.0), ('shanghai', 1318.3933)]
7. 位图(Bitmap)

用位存储数据,适用于高效统计(如用户签到、活跃用户计数)。

# 记录用户签到(用户ID=10086 在第 5 天签到)
r.setbit("user:sign:10086", 5, 1)  # 第 5 位设为 1(从 0 开始)

# 检查用户是否在第 5 天签到
print(r.getbit("user:sign:10086", 5))  # 输出: 1

# 统计用户总签到天数(位为 1 的数量)
print(r.bitcount("user:sign:10086"))  # 输出: 1
8. HyperLogLog

用于超大数据量的基数统计(如独立访客 UV 计数),占用内存极小。

# 添加访问用户
r.pfadd("uv:20231001", "user1", "user2", "user3")
r.pfadd("uv:20231001", "user3", "user4", "user5")  # 去重统计

# 估算独立用户数(误差约 0.8%)
print(r.pfcount("uv:20231001"))  # 输出: 5

# 合并多天的 UV 统计
r.pfmerge("uv:202310", "uv:20231001", "uv:20231002")

总结

  • 通用用法:基于 5 种核心数据结构,满足缓存、计数、简单队列等基础需求。
  • 高级用法:通过事务、管道、Lua 脚本等特性,解决分布式场景、性能优化、复杂业务逻辑等问题。

实际开发中,需根据场景选择合适的用法,例如:缓存用 String/Hash,排行榜用 Sorted Set,消息队列用 Stream,独立统计用 HyperLogLog 等。


网站公告

今日签到

点亮在社区的每一天
去签到