麻烦先关注方便了解最新分享。
关于VC++实现使用Redis不同方法,有效达到 Redis 性能优化、防击穿。
一、性能优化核心方法
1. 使用连接池复用连接
// 连接池结构体
struct RedisPool {
hiredisContext** connections;
int size;
sem_t sem; // 信号量控制并发
};
// 初始化连接池
RedisPool* create_redis_pool(const char* host, int port, int pool_size) {
RedisPool* pool = (RedisPool*)malloc(sizeof(RedisPool));
pool->size = pool_size;
pool->connections = (hiredisContext**)malloc(pool_size * sizeof**iredisContext*));
sem_init(&pool->sem, 0, pool_size); // 初始化信号量
for (int i = 0; i < pool_size; i++) {
pool->connections[i] = redisConnect(host, port);
if (pool->connections[i]->err) {
// 处理连接失败
redisFree(pool->connections[i]);
exit(1);
}
}
return pool;
}
// 从连接池获取连接
hiredisContext* get_connection(RedisPool* pool) {
sem_wait(&pool->sem); // 等待可用连接
for (int i = 0; i < pool->size; i++) {
if (pool->connections[i]->err == 0) { // 检查连接健康状态
return pool->connections[i];
}
}
return NULL; // 理论上信号量控制下不会出现
}
原理:避免频繁创建/销毁连接(单次连接耗时约 1-3ms),连接池大小建议为 CPU核心数*2+1 。
2. Pipeline 批量操作
void pipeline_example(RedisPool* pool) {
hiredisContext* conn = get_connection(pool);
redisBuffer* buf = redisBufferCreate(); // 创建 Pipeline 缓冲区
// 批量添加命令
redisAppendCommand(buf, "SET key1 %s", "value1");
redisAppendCommand(buf, "SET key2 %s", "value2");
redisAppendCommand(buf, "GET key1");
// 一次性发送所有命令
size_t len = redisBufferLength(buf);
if (redisWrite(conn, buf->data, len) != REDIS_OK) {
// 处理写入失败
redisFree(conn);
return;
}
// 读取所有结果
redisReply* reply;
int count = 3; // 命令数量
for (int i = 0; i < count; i++) {
if (redisGetReply(conn, (void**)&reply) == REDIS_OK) {
// 处理结果
if (reply->type == REDIS_REPLY_STRING) {
printf("GET result: %s\n", reply->str);
}
freeReplyObject(reply);
}
}
redisBufferFree(buf);
sem_post(&pool->sem); // 归还连接
}
效果:1000 次单命令耗时约 80ms,Pipeline 批量处理仅需 2ms(减少网络往返开销)。
3. 使用异步 API(libevent 异步库)
#include <event2/event.h>
#include <hiredis/adapters/libevent.h>
void async_callback(redisAsyncContext* ctx, void* reply, void* privdata) {
redisReply* r = (redisReply*)reply;
if (r->type == REDIS_REPLY_STRING) {
printf("Async GET: %s\n", r->str);
}
}
void async_example() {
struct event_base* base = event_base_new();
redisAsyncContext* ctx = redisAsyncConnect("127.0.0.1", 6379);
if (ctx->err) {
// 处理连接失败
return;
}
// 绑定异步事件
redisLibeventAttach(ctx, base);
redisAsyncCommand(ctx, async_callback, NULL, "GET key1");
event_base_dispatch(base); // 进入事件循环
event_base_free(base);
redisAsyncFree(ctx);
}
场景:适用于非阻塞业务(如日志采集、消息队列消费),单线程可处理上万并发请求。
二、防击穿/雪崩核心方案
4. 布隆过滤器拦截无效请求
// 布隆过滤器(基于 Redis BitMap)
void bloom_filter_add(RedisPool* pool, const char* key) {
hiredisContext* conn = get_connection(pool);
// 使用 SHA-1 生成 2 个哈希值
unsigned char hash1[20], hash2[20];
SHA1((unsigned char*)key, strlen(key), hash1);
SHA1((unsigned char*)key, strlen(key), hash2);
// 计算两个 bit 位置(假设布隆长度 1e6,哈希数 2)
long bit1 = ((((uint64_t)hash1[0]) << 56) | ((uint64_t)hash1[1] << 48) | ...) % 1000000;
long bit2 = ((((uint64_t)hash2[0]) << 56) | ...) % 1000000;
// 设置 bit 位
redisCommand(conn, "SETBIT bloom_filter %ld 1", bit1);
redisCommand(conn, "SETBIT bloom_filter %ld 1", bit2);
sem_post(&pool->sem);
}
bool bloom_filter_check(RedisPool* pool, const char* key) {
// 类似 add 逻辑计算 bit1/bit2
// 检查两个 bit 位是否都为 1
long res1 = (long)redisCommand(conn, "GETBIT bloom_filter %ld", bit1);
long res2 = (long)redisCommand(conn, "GETBIT bloom_filter %ld", bit2);
return res1 == 1 && res2 == 1;
}
参数:误判率建议设为 0.1%,布隆长度 = (条目数 * -ln(误判率)) / (ln2)^2 。
5. 互斥锁(RedLock)防止缓存击穿
// RedLock 实现(简化版)
#define LOCK_EXPIRE 5000 // 锁过期时间(ms)
#define LOCK_RETRY 500 // 重试间隔(ms)
bool redlock_acquire(RedisPool* pool, const char* lock_key, const char* client_id) {
time_t start = time(NULL);
while (time(NULL) - start < LOCK_EXPIRE) {
// 尝试获取锁(NX+PX 保证原子性)
hiredisContext* conn = get_connection(pool);
redisReply* reply = (redisReply*)redisCommand(conn,
"SET %s %s NX PX %d", lock_key, client_id, LOCK_EXPIRE);
sem_post(&pool->sem);
if (reply && reply->type == REDIS_REPLY_STRING && strcmp(reply->str, "OK") == 0) {
freeReplyObject(reply);
return true;
}
freeReplyObject(reply);
usleep(LOCK_RETRY * 1000); // 等待重试
}
return false;
}
void redlock_release(RedisPool* pool, const char* lock_key, const char* client_id) {
hiredisContext* conn = get_connection(pool);
// 验证客户端 ID 再释放(避免误删其他锁)
redisCommand(conn, "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end",
lock_key, client_id);
sem_post(&pool->sem);
}
注意:RedLock 需至少 3 个 Redis 实例保证分布式锁可靠性,获取锁需超过半数节点成功。
6. 热点数据本地缓存(如 LRU 机制)
// 基于 std::unordered_map 的 LRU 缓存
struct LRUItem {
std::string value;
time_t timestamp;
};
class LRUCache {
private:
std::unordered_map<std::string, LRUItem> cache;
size_t max_size;
time_t ttl; // 过期时间(秒)
public:
LRUCache(size_t max_size, time_t ttl) : max_size(max_size), ttl(ttl) {}
bool get(const std::string& key, std::string& value) {
auto it = cache.find(key);
if (it == cache.end()) return false;
if (time(NULL) - it->second.timestamp > ttl) {
cache.erase(it); // 过期淘汰
return false;
}
// 更新访问时间
it->second.timestamp = time(NULL);
value = it->second.value;
return true;
}
void set(const std::string& key, const std::string& value) {
if (cache.size() >= max_size) {
// 淘汰最久未使用项(简化实现,实际可用 list+map)
auto oldest = std::min_element(cache.begin(), cache.end(),
[](const auto& a, const auto& b) {
return a.second.timestamp < b.second.timestamp;
});
cache.erase(oldest);
}
cache[key] = {value, time(NULL)};
}
};
// 使用示例
LRUCache local_cache(1000, 300); // 最多存 1000 条,5 分钟过期
std::string value;
if (!local_cache.get("hot_key", value)) {
// 从 Redis 加载并写入本地缓存
hiredisContext* conn = get_connection(pool);
redisReply* reply = (redisReply*)redisCommand(conn, "GET hot_key");
if (reply && reply->type == REDIS_REPLY_STRING) {
value = reply->str;
local_cache.set("hot_key", value);
}
freeReplyObject(reply);
sem_post(&pool->sem);
}
场景:适合 QPS > 1w 的热点数据(如商品详情页),本地缓存命中率需维持在 80% 以上。
三、其他优化手段(附代码片段)
7. 避免大 Key(控制 Value 大小 < 100KB)
// 写入前校验 Value 大小
void check_big_value(const char* value, size_t max_size) {
size_t len = strlen(value);
if (len > max_size) {
throw std::runtime_error("Value exceeds 100KB limit");
}
}
8. 使用压缩编码(LZ4 压缩大 Value)
#include <lz4.h>
std::string compress_value(const std::string& raw) {
char* compressed = (char*)malloc(LZ4_compressBound(raw.size()));
int compressed_size = LZ4_compress_default(raw.data(), compressed, raw.size());
std::string result(compressed, compressed_size);
free(compressed);
return result;
}
std::string decompress_value(const std::string& compressed, size_t original_size) {
char* decompressed = (char*)malloc(original_size);
int decompressed_size = LZ4_decompress_safe(compressed.data(), decompressed, compressed.size(), original_size);
std::string result(decompressed, decompressed_size);
free(decompressed);
return result;
}
9. 异步淘汰过期 Key(定期删除 + 惰性删除)
// 定期删除任务(后台线程执行)
void async_expire_cleanup(RedisPool* pool) {
while (true) {
hiredisContext* conn = get_connection(pool);
redisReply* reply = (redisReply*)redisCommand(conn, "SCAN 0 MATCH key:* EXIT 1000"); // 每次扫描 1000 个 Key
if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 2) {
redisReply* keys_reply = reply->element[1];
for (int i = 0; i < keys_reply->elements; i++) {
std::string key = keys_reply->element[i]->str;
redisReply* ttl_reply = (redisReply*)redisCommand(conn, "TTL %s", key.c_str());
if (ttl_reply->integer < 0) continue; // 无过期时间
if (ttl_reply->integer < 60) { // 临近过期时主动淘汰
redisCommand(conn, "DEL %s", key.c_str());
}
freeReplyObject(ttl_reply);
}
}
freeReplyObject(reply);
sem_post(&pool->sem);
sleep(60); // 每分钟执行一次
}
}
四、完整优化配置示例(redis.conf 对应项)
# 性能优化配置
tcp-backlog 511 # 优化 TCP 三次握手队列
tcp-keepalive 300 # 保持连接活性(秒)
hz 100 # 提高事件循环频率(默认 10,高负载设为 100)
aof-rewrite-incremental-fsync yes # AOF 重写期间优化 fsync
# 防击穿配置
maxmemory 2gb # 限制内存使用
maxmemory-policy allkeys-lru # 内存不足时淘汰策略
notify-keyspace-events Ex # 开启 Key 过期通知
关键指标监控代码
// 实时监控 Redis 状态
void monitor_redis(RedisPool* pool) {
hiredisContext* conn = get_connection(pool);
redisReply* reply = (redisReply*)redisCommand(conn, "INFO stats");
if (reply->type == REDIS_REPLY_STRING) {
std::string info = reply->str;
// 解析 QPS:
size_t pos = info.find("instantaneous_ops_per_sec:");
if (pos != std::string::npos) {
int qps = std::stoi(info.substr(pos + 21));
if (qps > 10000) { // 阈值告警
send_alert("Redis QPS 过高: " + std::to_string(qps));
}
}
}
freeReplyObject(reply);
sem_post(&pool->sem);
}
注意事项
1. 线程安全: hiredis 非线程安全,连接池需配合锁/信号量使用。
2. 序列化:复杂对象需先序列化为 JSON/Protocol Buffers 再存入 Redis。
3. 压测工具:用 redis-benchmark 验证优化效果(如单实例 QPS 从 5w 提升至 8w)。
如需某方案的深度优化或特定场景扩展(如 Redis Cluster 分布式锁)。