一、使用 Express 操作 Redis 的完整示例
涵盖 Redis 各种核心数据类型(String、Hash、List、Set、Sorted Set)及常见操作
1️⃣ 概念介绍
Redis 是一个开源的高性能键值对数据库,支持多种数据结构,非常适合缓存、排行榜、会话管理等场景。
Redis 支持的数据类型包括:
类型 | 简介 |
---|---|
String | 最基本的数据类型,支持数字/字符串存取 |
Hash | 类似对象,适合存储用户信息、对象结构 |
List | 链表结构,可做消息队列 |
Set | 无序唯一集合,常用于去重 |
Sorted Set | 有序唯一集合,常用于排行榜(带分数排序) |
Node.js 下最常用的 Redis 客户端是 ioredis,支持连接池、集群等高级功能。
2️⃣ 安装依赖
npm install express ioredis
3️⃣ 初始化 Express + Redis 应用结构
project/
├── index.js
├── redisClient.js
└── package.json
4️⃣ 创建 Redis 客户端 redisClient.js
const Redis = require('ioredis');
// 默认连接本地 Redis,如果你有密码或远程地址可以传入配置对象
const redis = new Redis({
host: '127.0.0.1',
port: 6379,
// password: 'your-password', // 如果有密码
});
module.exports = redis;
5️⃣ Express 应用 index.js
:演示各种 Redis 数据类型操作
const express = require('express');
const redis = require('./redisClient');
const app = express();
app.use(express.json());
/**
* ============ String 类型 ============
*/
// 设置字符串
app.post('/string/set', async (req, res) => {
const { key, value } = req.body;
await redis.set(key, value);
res.send(`设置成功:${key}=${value}`);
});
// 获取字符串
app.get('/string/get/:key', async (req, res) => {
const value = await redis.get(req.params.key);
res.send(`值为:${value}`);
});
/**
* ============ Hash 类型 ============
*/
// 设置哈希字段
app.post('/hash/set', async (req, res) => {
const { key, field, value } = req.body;
await redis.hset(key, field, value);
res.send(`哈希字段设置成功:${key}.${field}=${value}`);
});
// 获取哈希字段
app.get('/hash/get', async (req, res) => {
const { key, field } = req.query;
const value = await redis.hget(key, field);
res.send(`哈希字段值为:${value}`);
});
// 获取整个哈希
app.get('/hash/all/:key', async (req, res) => {
const hash = await redis.hgetall(req.params.key);
res.json(hash);
});
/**
* ============ List 类型 ============
*/
// 从左侧推入列表
app.post('/list/leftpush', async (req, res) => {
const { key, value } = req.body;
await redis.lpush(key, value);
res.send(`左侧推入成功:${value}`);
});
// 从右侧弹出列表
app.get('/list/rightpop/:key', async (req, res) => {
const value = await redis.rpop(req.params.key);
res.send(`右侧弹出值:${value}`);
});
/**
* ============ Set 类型 ============
*/
// 添加到 Set 集合
app.post('/set/add', async (req, res) => {
const { key, value } = req.body;
await redis.sadd(key, value);
res.send(`添加到集合成功:${value}`);
});
// 获取 Set 所有成员
app.get('/set/members/:key', async (req, res) => {
const members = await redis.smembers(req.params.key);
res.json(members);
});
/**
* ============ Sorted Set 类型 ============
*/
// 添加成员和分数
app.post('/zset/add', async (req, res) => {
const { key, score, member } = req.body;
await redis.zadd(key, score, member);
res.send(`添加到有序集合成功:${member}(分数${score})`);
});
// 获取排行榜
app.get('/zset/range/:key', async (req, res) => {
const list = await redis.zrevrange(req.params.key, 0, 9, 'WITHSCORES');
res.json(list);
});
const PORT = 3000;
app.listen(PORT, () => {
console.log(`服务器启动:http://localhost:${PORT}`);
});
6️⃣ Postman 测试示例
使用 POST
和 GET
请求调用以下接口:
POST /string/set
+ JSON{ "key": "user:1:name", "value": "Alice" }
GET /string/get/user:1:name
POST /hash/set
+{ "key": "user:2", "field": "email", "value": "a@example.com" }
GET /hash/all/user:2
POST /list/leftpush
+{ "key": "messages", "value": "hi" }
GET /list/rightpop/messages
POST /set/add
+{ "key": "tags", "value": "web3" }
GET /set/members/tags
POST /zset/add
+{ "key": "ranking", "score": 98, "member": "Bob" }
GET /zset/range/ranking
✅ 总结
数据类型 | 操作方法(部分) | 使用场景 |
---|---|---|
String | set , get , incr |
缓存数据、计数器 |
Hash | hset , hget , hgetall |
用户信息、配置项 |
List | lpush , rpop , lrange |
消息队列、时间轴 |
Set | sadd , smembers , sismember |
去重、用户标签 |
Sorted Set | zadd , zrange , zrevrange |
排行榜、权重排序 |
如需进一步加上缓存失效时间(EX
)、分布式锁、订阅发布等高级操作,也可以继续告诉我,我可以补充完整!
二、Redis 在 Express 项目中实现 缓存失效时间(EX)、分布式锁 和 发布/订阅机制
1️⃣ 缓存失效时间(EX)
概念介绍
Redis 支持为 key 设置生存时间(TTL),超时后自动删除该键,常用于缓存数据。
示例代码
// 设置字符串并指定过期时间(单位:秒)
app.post('/cache/setex', async (req, res) => {
const { key, value, ttl } = req.body;
await redis.set(key, value, 'EX', ttl); // 'EX' 表示秒级过期时间
res.send(`设置缓存成功:${key}=${value}(过期时间:${ttl}秒)`);
});
// 查询 key 的剩余时间
app.get('/cache/ttl/:key', async (req, res) => {
const ttl = await redis.ttl(req.params.key);
res.send(`剩余时间:${ttl} 秒`);
});
2️⃣ 分布式锁
概念介绍
Redis 提供 SET key value NX EX ttl
命令可用于实现简易分布式锁。
NX
: 仅当 key 不存在时设置,保证只有一个客户端加锁成功。EX ttl
: 设置锁过期时间,防止死锁。
示例代码
// 尝试获取锁
app.post('/lock/acquire', async (req, res) => {
const { lockKey, lockValue, ttl } = req.body;
const success = await redis.set(lockKey, lockValue, 'NX', 'EX', ttl);
if (success) {
res.send('✅ 成功获取锁');
} else {
res.status(423).send('❌ 获取锁失败,资源已被占用');
}
});
// 释放锁(注意:必须验证持有者)
app.post('/lock/release', async (req, res) => {
const { lockKey, lockValue } = req.body;
// 用 Lua 脚本确保只有锁持有者可以释放
const luaScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
const result = await redis.eval(luaScript, 1, lockKey, lockValue);
res.send(result === 1 ? '✅ 成功释放锁' : '⚠️ 无权释放锁');
});
3️⃣ 发布/订阅(Pub/Sub)
概念介绍
Redis 提供消息发布订阅功能,常用于:
服务间通信
实时通知
消息驱动架构
示例代码
消费端(订阅频道)
// 单独开一个 Redis 订阅连接
const Redis = require('ioredis');
const subClient = new Redis();
subClient.subscribe('news', () => {
console.log('📡 订阅了 news 频道');
});
subClient.on('message', (channel, message) => {
console.log(`📨 接收到频道 ${channel} 的消息: ${message}`);
});
生产端(发布消息)
// 发布消息到 news 频道
app.post('/publish/news', async (req, res) => {
const { message } = req.body;
await redis.publish('news', message);
res.send('📤 消息已发布');
});
✅ 总结表
功能 | 核心命令 | 应用场景 |
---|---|---|
缓存失效 | SET key value EX ttl |
缓存自动清理,热点数据缓存 |
分布式锁 | SET key val NX EX ttl + Lua |
控制资源访问,避免并发冲突 |
发布订阅 | PUBLISH/SUBSCRIBE |
实时通信、系统内事件通知 |
如需更高级用法(如 Redlock、Redis Stream 消息队列、异步任务持久化等),欢迎继续提问!也可以让我帮你把这些整合进一个生产级项目模板。
三、Express实现Redlock、Redis Stream 消息队列、异步任务持久化
下面详细讲解 Redlock 分布式锁算法、Redis Streams 消息队列、异步任务持久化 的概念、使用场景与详细代码示例(基于 Express + Node.js + ioredis):
1️⃣ Redlock 分布式锁算法
概念介绍
Redlock 是 Redis 官方推荐的分布式锁算法,适用于多 Redis 实例构建高可靠性锁。
使用多个 Redis 节点写入同一锁键;
获取大多数实例同意才算加锁成功;
自动设置超时,防止死锁。
示例代码(基于 redlock
库)
npm install redlock ioredis
const Redis = require('ioredis');
const Redlock = require('redlock');
const clients = [new Redis(), new Redis(), new Redis()]; // 模拟多个实例
const redlock = new Redlock(clients, {
retryCount: 3,
retryDelay: 200, // ms
});
app.post('/redlock', async (req, res) => {
try {
const lock = await redlock.acquire(['locks:resource'], 10000); // 10秒锁
console.log('✅ 获取Redlock成功');
setTimeout(async () => {
await lock.release();
console.log('🔓 Redlock释放成功');
}, 5000);
res.send('Redlock锁定成功');
} catch (err) {
res.status(500).send('🔒 Redlock锁获取失败');
}
});
2️⃣ Redis Streams 消息队列
概念介绍
Redis Stream 是 Redis 5.0+ 引入的持久化消息队列,支持:
消息追加(
XADD
)消费者组(
XGROUP
)消费与确认(
XREADGROUP
,XACK
)
适合实现 可靠队列、异步任务处理、事件流系统。
示例代码(任务生产+消费)
const streamKey = 'mystream';
const consumerGroup = 'mygroup';
const consumerName = 'consumer1';
(async () => {
try {
await redis.xgroup('CREATE', streamKey, consumerGroup, '0', 'MKSTREAM');
} catch (e) {
if (!e.message.includes('BUSYGROUP')) throw e;
}
})();
// 生产消息
app.post('/stream/produce', async (req, res) => {
const { orderId } = req.body;
await redis.xadd(streamKey, '*', 'orderId', orderId);
res.send('⏩ 消息已推入流');
});
// 消费消息(自动应答)
setInterval(async () => {
const result = await redis.xreadgroup(
'GROUP', consumerGroup, consumerName,
'BLOCK', 2000,
'COUNT', 1,
'STREAMS', streamKey, '>'
);
if (result) {
const [[stream, messages]] = result;
for (const [id, fields] of messages) {
const orderId = fields[1];
console.log('✅ 消费订单:', orderId);
await redis.xack(streamKey, consumerGroup, id); // 标记已处理
}
}
}, 3000);
3️⃣ 异步任务持久化(基于队列)
概念介绍
将任务加入 Redis 队列并异步处理,可实现:
防止阻塞主线程
保证任务持久化存储
支持失败重试、延时执行等
推荐使用 bull 或 bullmq
,内置持久化、并发控制、失败重试等机制。
示例代码(使用 bull
)
npm install bull
const Queue = require('bull');
const jobQueue = new Queue('emailQueue', { redis: { port: 6379, host: '127.0.0.1' } });
// 添加任务
app.post('/async/send-email', async (req, res) => {
const { to, content } = req.body;
await jobQueue.add({ to, content });
res.send('📨 已加入异步发送任务队列');
});
// 处理任务
jobQueue.process(async (job) => {
console.log(`发送邮件给:${job.data.to},内容:${job.data.content}`);
// 真实场景:集成 nodemailer、Mailgun、Sendgrid 等发送
});
// 失败重试配置
jobQueue.on('failed', (job, err) => {
console.error(`❌ 邮件发送失败:${job.id}`, err.message);
});
📌 总结对比表
技术 | 类型 | 特点与应用场景 |
---|---|---|
Redlock | 分布式锁 | 多节点锁协调,防止并发冲突与死锁 |
Redis Stream | 消息队列 | 原生持久化、消费者组、高吞吐异步任务 |
Bull/BullMQ | 队列框架 | 丰富功能,失败重试、延迟队列、状态可视化 |
如你需要我将三种方式整合成完整的 Express 微服务框架示例,也可以继续提问!
四、Express 使用布隆过滤器
下面是关于 Express 使用布隆过滤器(Bloom Filter) 的概念、详细代码示例和讲解,帮助你理解它在实际 Web 应用中的应用场景:
1️⃣ 概念介绍
布隆过滤器(Bloom Filter)是一种空间效率极高的概率型数据结构,用于判断一个元素是否存在于集合中:
优点:节省内存、查询速度快,适用于大规模去重、黑名单校验等场景。
缺点:可能误判为存在(假阳性),但不会漏判(不会漏掉存在的元素)。
常用于:
防止缓存穿透(查询不存在数据频繁打数据库)
用户是否已提交过表单、请求
黑名单过滤、URL 去重
2️⃣ 安装依赖
我们使用 bloom-filters 库:
npm install bloom-filters
3️⃣ 示例代码(Express + BloomFilter)
以下示例演示如何使用布隆过滤器拦截重复请求或缓存穿透:
// 引入布隆过滤器
const express = require('express');
const { BloomFilter } = require('bloom-filters');
const app = express();
app.use(express.json());
// 创建布隆过滤器,参数: 预计插入数量、可接受误差率
const bloom = new BloomFilter(1000, 0.01);
// 示例:用户请求时检查是否访问过该资源
app.post('/api/resource', (req, res) => {
const { userId } = req.body;
// 检查是否已经请求过
if (bloom.has(userId)) {
return res.status(429).json({ message: '该用户已处理过请求,拒绝重复操作。' });
}
// 第一次访问,插入布隆过滤器
bloom.add(userId);
// 模拟处理业务逻辑
console.log(`处理用户 ${userId} 的请求`);
res.json({ message: '请求处理成功' });
});
4️⃣ 与 Redis 配合使用(持久化布隆过滤器)
如需跨服务共享布隆过滤器状态,可使用 Redis 存储:
npm install redis
const redis = require('redis');
const { BloomFilter } = require('bloom-filters');
const client = redis.createClient();
// 将布隆过滤器序列化为 JSON 保存
const saveToRedis = async (key, filter) => {
const json = JSON.stringify(filter.saveAsJSON());
await client.set(key, json);
};
// 从 Redis 读取布隆过滤器
const loadFromRedis = async (key) => {
const json = await client.get(key);
if (json) {
return BloomFilter.fromJSON(JSON.parse(json));
}
return new BloomFilter(1000, 0.01);
};
5️⃣ 使用场景举例
使用场景 | 示例 |
---|---|
防止缓存穿透 | 数据库中没有的 key 被频繁查询 |
请求重复提交拦截 | 用户提交重复订单、重复点赞 |
URL 去重 | 爬虫系统中防止重复抓取 |
黑名单判断 | IP、邮箱、手机号是否在黑名单中 |
✅ 小结
布隆过滤器适合在高并发系统中快速过滤“不存在”的请求;
它不能代替数据库查重,但能大幅减少无效请求打到数据库/缓存;
结合 Redis 可以实现布隆过滤器的持久化与多节点共享。
五、Express 使用 RabbitMQ
以下是关于 Express 使用 RabbitMQ 的完整指南,包括:
概念介绍
安装依赖
完整代码示例(发送、消费、连接关闭、持久化、确认机制等)
实战讲解与注释
1️⃣ 概念介绍
RabbitMQ 是一个高性能的消息中间件(消息队列),用于实现异步通信、解耦模块、削峰填谷等。
它基于 AMQP(Advanced Message Queuing Protocol)协议,主要组件包括:
组件 | 说明 |
---|---|
Producer | 消息生产者,发送消息到队列 |
Queue | 消息队列,缓存待处理消息 |
Consumer | 消费者,监听并处理队列中的消息 |
Exchange | 交换机,决定消息路由到哪个队列(direct、fanout、topic、headers) |
Routing Key | 消息携带的路由标识 |
2️⃣ 安装依赖
我们使用 amqplib
模块:
npm install amqplib
3️⃣ Express 集成 RabbitMQ:发送与消费消息(全流程)
项目结构:
/rabbitmq-demo
├── producer.js # 发送消息
├── consumer.js # 消费消息
└── app.js # Express 接口入口
3.1 配置 RabbitMQ 地址
// config.js
module.exports = {
RABBITMQ_URL: 'amqp://localhost', // 默认端口5672
QUEUE_NAME: 'task_queue'
};
3.2 消息发送(Producer)
// producer.js
const amqp = require('amqplib');
const { RABBITMQ_URL, QUEUE_NAME } = require('./config');
async function sendToQueue(msg) {
const conn = await amqp.connect(RABBITMQ_URL);
const channel = await conn.createChannel();
// 保证队列存在(幂等操作)
await channel.assertQueue(QUEUE_NAME, { durable: true });
// 发送消息(Buffer)
channel.sendToQueue(QUEUE_NAME, Buffer.from(msg), { persistent: true });
console.log(`[x] 发送消息: ${msg}`);
// 延迟关闭连接(避免连接还没写完就关闭)
setTimeout(() => {
channel.close();
conn.close();
}, 500);
}
module.exports = sendToQueue;
3.3 消息消费(Consumer)
// consumer.js
const amqp = require('amqplib');
const { RABBITMQ_URL, QUEUE_NAME } = require('./config');
async function startConsumer() {
const conn = await amqp.connect(RABBITMQ_URL);
const channel = await conn.createChannel();
// 保证队列存在
await channel.assertQueue(QUEUE_NAME, { durable: true });
// 每次只处理一个消息(限流)
channel.prefetch(1);
console.log('[*] 等待接收消息...');
channel.consume(QUEUE_NAME, async (msg) => {
const content = msg.content.toString();
console.log(`[x] 收到消息: ${content}`);
// 模拟处理耗时
await new Promise(res => setTimeout(res, 2000));
// 确认消息处理完毕
channel.ack(msg);
console.log(`[✓] 处理完成: ${content}`);
});
}
// 启动消费者
startConsumer().catch(console.error);
3.4 Express 中调用 Producer
// app.js
const express = require('express');
const sendToQueue = require('./producer');
const app = express();
app.use(express.json());
app.post('/send', async (req, res) => {
const { message } = req.body;
if (!message) return res.status(400).json({ error: 'message is required' });
await sendToQueue(message);
res.json({ status: 'Message sent to queue' });
});
app.listen(3000, () => {
console.log('Server running at http://localhost:3000');
});
示例请求(用 Postman 或 curl):
curl -X POST http://localhost:3000/send -H "Content-Type: application/json" -d '{"message": "Hello, RabbitMQ!"}'
4️⃣ 进阶功能(操作补全)
✅ 持久化消息
使用
{ durable: true }
创建队列使用
{ persistent: true }
发送消息
可以防止 RabbitMQ 重启丢失消息。
✅ 消息确认机制(ack)
如果
channel.ack(msg)
不执行,消息会保留在队列,确保消息不丢失。
✅ 消费者限流(prefetch)
channel.prefetch(1); // 每次只处理一个消息,避免过载
✅ 多消费者场景(水平扩展)
可以开启多个 consumer.js
实例,共享队列并均衡消费消息。
5️⃣ 常见应用场景
场景 | 示例 |
---|---|
异步任务处理 | 用户上传图片,异步压缩处理 |
邮件通知 | 注册后异步发送邮件 |
流量削峰 | 高并发请求写入队列,逐步处理 |
订单系统 | 下单后异步库存扣减、发货处理 |
区块链事件队列 | DApp 收到链上事件后异步处理与持久化 |
✅ 总结
优点 | 说明 |
---|---|
解耦模块 | 发送方不关心处理逻辑 |
提升系统性能 | 异步处理、削峰填谷 |
提高可靠性 | 可持久化、失败重试、限流 |
适用于微服务架构 | 服务之间通过消息队列通信 |
如你想进一步实现 Topic 交换机、死信队列(DLX)、延迟队列等,也可以告诉我,我将继续补充对应场景。
六、Express实现Topic 交换机、死信队列(DLX)、延迟队列
下面是关于 RabbitMQ 的高级用法:Topic 交换机、死信队列(DLX)、延迟队列 的完整实现与讲解,包括 概念 + 示例代码(Node.js + amqplib)+ 使用方式:
🧩 1️⃣ Topic 交换机
🔷 概念介绍
Topic 交换机(topic
)可以根据通配符匹配路由键,适合复杂的消息路由规则。
*
:匹配一个单词#
:匹配零个或多个单词
示例路由键:
路由键 | 匹配规则 |
---|---|
order.created |
匹配 order.* |
order.us.created |
匹配 order.# |
user.deleted |
不匹配 order.* |
✅ 示例代码(Producer + Consumer)
Producer:发送带有路由键的消息
// topicProducer.js
const amqp = require('amqplib');
const exchange = 'topic_logs';
async function sendMessage(routingKey, msg) {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
await channel.assertExchange(exchange, 'topic', { durable: true });
channel.publish(exchange, routingKey, Buffer.from(msg));
console.log(`[x] Sent '${msg}' with key '${routingKey}'`);
setTimeout(() => {
channel.close();
conn.close();
}, 500);
}
sendMessage('order.created', '订单已创建');
Consumer:监听带通配符的 key
// topicConsumer.js
const amqp = require('amqplib');
const exchange = 'topic_logs';
async function startConsumer(pattern) {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
await channel.assertExchange(exchange, 'topic', { durable: true });
const q = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(q.queue, exchange, pattern);
console.log(`[x] Waiting for messages with pattern: ${pattern}`);
channel.consume(q.queue, msg => {
console.log(`[✓] Received (${msg.fields.routingKey}): ${msg.content.toString()}`);
}, { noAck: true });
}
startConsumer('order.#');
🧨 2️⃣ 死信队列(DLX)
🔷 概念介绍
死信队列(Dead Letter Exchange) 用于接收未被正常消费的消息,比如:
消费失败未
ack
队列 TTL 超时
队列满
拒绝(
nack/reject
且不 requeue)
✅ 示例代码(TTL + DLX)
Producer:声明带 TTL 的主队列 + DLX
// dlxProducer.js
const amqp = require('amqplib');
const DLX_EXCHANGE = 'dlx-ex';
const NORMAL_EXCHANGE = 'normal-ex';
const QUEUE = 'normal-queue';
const DLX_QUEUE = 'dead-letter-queue';
async function setup() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
// 1. 声明死信交换机与队列
await ch.assertExchange(DLX_EXCHANGE, 'fanout', { durable: true });
await ch.assertQueue(DLX_QUEUE, { durable: true });
await ch.bindQueue(DLX_QUEUE, DLX_EXCHANGE, '');
// 2. 声明主交换机和带 DLX 属性的队列
await ch.assertExchange(NORMAL_EXCHANGE, 'direct', { durable: true });
await ch.assertQueue(QUEUE, {
durable: true,
deadLetterExchange: DLX_EXCHANGE, // 设置死信交换机
messageTtl: 5000 // 设置消息 TTL 为 5s
});
await ch.bindQueue(QUEUE, NORMAL_EXCHANGE, 'task');
// 3. 发送消息
ch.publish(NORMAL_EXCHANGE, 'task', Buffer.from('This will expire!'));
console.log('[x] Sent message with TTL');
setTimeout(() => {
ch.close();
conn.close();
}, 1000);
}
setup();
Consumer:不消费,让其过期 → 死信队列接收
// dlxConsumer.js
const amqp = require('amqplib');
async function startDLXConsumer() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
await ch.assertQueue('dead-letter-queue', { durable: true });
ch.consume('dead-letter-queue', msg => {
console.log(`[DLX] Received expired message: ${msg.content.toString()}`);
}, { noAck: true });
}
startDLXConsumer();
⏳ 3️⃣ 延迟队列(基于 TTL + DLX 实现)
🔷 概念介绍
RabbitMQ 本身不支持“精确到某时间点的延迟消息”,但可以组合:
消息 TTL + DLX 死信交换机
模拟延迟队列。
✅ 延迟队列示例
// delayQueue.js
const amqp = require('amqplib');
const DELAY_EX = 'delay-ex';
const DELAY_QUEUE = 'delay-queue';
const TARGET_EX = 'real-task-ex';
const TARGET_QUEUE = 'real-task-queue';
async function setupDelayQueue() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
// 1. 目标真实处理交换机/队列
await ch.assertExchange(TARGET_EX, 'fanout', { durable: true });
await ch.assertQueue(TARGET_QUEUE, { durable: true });
await ch.bindQueue(TARGET_QUEUE, TARGET_EX, '');
// 2. 延迟队列,消息过期后转发到目标交换机
await ch.assertExchange(DELAY_EX, 'direct', { durable: true });
await ch.assertQueue(DELAY_QUEUE, {
durable: true,
messageTtl: 10000, // 延迟10秒
deadLetterExchange: TARGET_EX
});
await ch.bindQueue(DELAY_QUEUE, DELAY_EX, 'delay');
// 3. 发送消息
ch.publish(DELAY_EX, 'delay', Buffer.from('Hello after 10s'));
console.log('[x] Message sent to delay queue');
setTimeout(() => {
ch.close();
conn.close();
}, 1000);
}
setupDelayQueue();
目标消费(真正业务执行)
// taskConsumer.js
const amqp = require('amqplib');
async function consumeRealTask() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
await ch.assertQueue('real-task-queue', { durable: true });
ch.consume('real-task-queue', msg => {
console.log(`[✓] 延迟后收到任务: ${msg.content.toString()}`);
}, { noAck: true });
}
consumeRealTask();
🧠 总结对比
功能 | 用途 | 技术实现 |
---|---|---|
Topic 交换机 | 多维匹配路由键 | topic 类型交换机 + 通配符 |
死信队列 | 异常消息托底 | 设置 x-dead-letter-exchange |
延迟队列 | 定时消息、延时执行 | TTL + DLX 模拟,或用插件实现精确延时 |
如需:
✅ 多级死信队列(死信消息再次进入延迟)
✅ RabbitMQ 插件方式实现精确延迟(rabbitmq_delayed_message_exchange
)
✅ NestJS 集成 RabbitMQ 的完整封装
七、RabbitMQ 的可靠性投递 与 消息的幂等性设计
🧩 1️⃣ 可靠性投递(Reliability Delivery)
🟦 概念介绍
可靠投递的目标是:确保消息从生产者 → RabbitMQ → 消费者 三段都不丢失、不重复。
涉及三段关键机制:
阶段 | 机制/方法 |
---|---|
生产者 → RabbitMQ | 事务机制、Confirm 模式 |
RabbitMQ → 队列 | 消息持久化、队列持久化 |
队列 → 消费者 | ack 确认消费 + nack 补偿 |
✅ 关键配置与代码说明(生产者 Confirm 模式)
// producer_confirm.js
const amqp = require('amqplib');
async function sendReliableMessage() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createConfirmChannel(); // 👈 Confirm 模式
const exchange = 'reliable-ex';
await ch.assertExchange(exchange, 'direct', { durable: true });
const routingKey = 'reliable';
const message = 'Hello with confirm!';
ch.publish(exchange, routingKey, Buffer.from(message), { persistent: true }, (err, ok) => {
if (err) {
console.error('消息发送失败', err);
} else {
console.log('[✓] 消息成功投递到交换机');
}
ch.close();
conn.close();
});
}
sendReliableMessage();
📝
persistent: true
确保消息写入磁盘
📝createConfirmChannel()
可确认 RabbitMQ 是否真正收到了消息(比事务高效)
🧠 2️⃣ 消息的幂等性(Idempotency)
🟦 概念介绍
幂等性是指:无论接收同一条消息多少次,结果都不变,避免重复消费导致的数据错误。
✅ 常用方法
方法 | 说明 |
---|---|
全局唯一 ID(msgId ) |
每条消息带唯一 ID,消费前判断是否处理过 |
Redis Set/Hash 缓存 | 存储 msgId 已处理记录 |
数据库唯一约束 / 乐观锁 | 保证写入唯一性或控制版本 |
✅ 消费者示例(Redis 保证幂等性)
// consumer_idempotent.js
const amqp = require('amqplib');
const Redis = require('ioredis');
const redis = new Redis(); // 默认连接 127.0.0.1:6379
async function consumeWithIdempotency() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
const queue = 'task-queue';
await ch.assertQueue(queue, { durable: true });
ch.consume(queue, async msg => {
const msgId = msg.properties.messageId; // 👈 消息唯一标识
const key = `processed:${msgId}`;
const alreadyProcessed = await redis.get(key);
if (alreadyProcessed) {
console.log(`[⚠️] 已处理跳过 msgId: ${msgId}`);
ch.ack(msg);
return;
}
const content = msg.content.toString();
console.log(`[✓] 处理消息: ${content}`);
// TODO: 执行业务逻辑...
// 标记为已处理,设置过期防止 Redis 爆炸
await redis.set(key, '1', 'EX', 86400); // 1天过期
ch.ack(msg);
}, { noAck: false });
}
consumeWithIdempotency();
✅ Producer 设置 messageId
// producer_with_id.js
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');
async function sendMessage() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
await ch.assertQueue('task-queue', { durable: true });
const msg = 'Buy 1 BTC';
const msgId = uuidv4(); // 👈 唯一 ID
ch.sendToQueue('task-queue', Buffer.from(msg), {
persistent: true,
messageId: msgId
});
console.log(`[x] Sent with msgId: ${msgId}`);
ch.close();
conn.close();
}
sendMessage();
✅ 小结一览
项目 | 目的 | 推荐技术 |
---|---|---|
投递可靠性 | 消息不丢 | Confirm 模式 + 持久化 |
幂等性处理 | 消息不重复处理 | Redis + msgId 唯一标识 |
消费异常重试/补偿 | 消息不丢 & 不乱处理 | nack + 死信/重投机制 |
如果你需要结合 NestJS、BullMQ、Kafka 或使用 Redlock 进行分布式幂等处理,也可以继续问我,我可以为你生成分布式架构模板方案。