Redis事务与驱动的学习(一)

发布于:2025-06-15 ⋅ 阅读:(17) ⋅ 点赞:(0)

一、redis pipeline

1.1、客户端同时发出多个请求,redis服务端依次执行,最后一次性返回结果。(节约往返时间)

之前是一请求一回应同步模式,CS之间往返次数较多,使用pipeline可以减少往返次数。
在这里插入图片描述

使用pipeline之后,提前告诉服务端,我要发几条命令,服务端一次性执行完之后返回结果。
在这里插入图片描述

1.2、redis 事务

是用户定义的,将多条命令打包,在服务端执行,要么全部执行,要么全部不执行。

  • 相关命令:
    • 1、开启事务
      multi
      
    • 2、检测key的变动,在未来,如果有其他连接修改了该key,则取消事务
      WATCH key
      
    • 3、EXEC 提交事务
    • 4、DISCARD 取消事务
  • 事务有什么用?
    以A,B两个客户端为例,连接redis服务器,同时操作同一个key,如果不使用事务,那么结果就可能被修改
    在这里插入图片描述

A客户端想要一个k1为100的值,但B客户端想要一个k1为200的值,此时A再使用k1时,发现值已经发生了变化,并不是A所想要的。

  • 使用事务之后
    在这里插入图片描述

分别执行提交事务两次,其中一个成功返回OK,另一个返回nil,这是因为使用watch命令,在提交事务之前,如果其他客户端修改了key的值,那么此次事务就会取消。实质上,redis采用的是乐观锁,不同于传统数据库MySQL,并不是悲观锁。

1.3 ACID特性

  • 原子性(Atomocity):事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败。
  • 一致性(Consistency):事务应确保数据从一个正确的状态转移到另一个正确的状态。注意:此处特指数据库层面的正确性,而非应用层面的正确性–逻辑上的一致性。
  • 隔离性(Isolation):事务的执行不应该被其他事务干扰;redis 是单线程执行,天然具备隔离性。
  • 持久性(Durability):一旦事务提交,则其所做的修改将会永远保存到数据库(磁盘)中;redis 只有在
    aof 持久化策略的时候,并且需要在 appendfsync=always 才具备持久性;

二、使用Lua与redis进行互动

实际开发中,使用lua来实现事务,保证原子性,很少使用上面这些命令

2.1、为什么使用Lua?

  • redis底层是用C语言写的,而Lua是用C语言写的,两者可以互相调用。
  • redis中加载了一个lua 虚拟机;用来执行redis lua 脚本。
  • Lua在执行的时候,具备原子性
  • 减少tcp网络流量的浪费,将多个命令封装成一个脚本执行,减少网络往返次数。
  • 复用性
  • 灵活性
  • 热更新

2.2、使用Lua

eval 'local key = KEYS[1]; local value = redis.call("get", key); return value' 1 k1

在这里插入图片描述

这是简单的lua脚本,KEYS[1]是redis传过来的参数,如果脚本代码过多这样就不方便。

2.3、加载Lua脚本

# 从文件中读取 lua脚本内容
cat test.lua | redis-cli script load --pipe
#获取脚本的 sha1 散列值==========>推荐使用
redis-cli -a pwd script load "$(cat test.lua)"

# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
script exists "b8059ba43af6ffe8bed3db65bac35d452f8115d8"

# 清除所有脚本缓存
script flush

# 如果当前脚本运行时间过长(死循环),可以通过 script kill 杀死当前运行的脚本
script kill

优化lua代码,减少TCP流量的浪费

-- 简单的加法运算
local key = KEYS[1];
local increment = tonumber(ARGV[1] or 1);
local current = tonumber(redis.call('get', key) or '0');

if not increment or not current then
    return nil;
end;

local new_value = current + increment;

redis.call('set', key, new_value);
return new_value;

在这里插入图片描述

那lua是否具备一致性?

  • 不具有一致性
  • set k1 100
    lpush k1 1
    get k1
    #代码就只执行到第一句,第二句出错,就不执行了。(以redis命令为例,lua同理)
    

2.4、小结

  • 1、使用lua脚本,可以保证原子性。
  • 2、减少网络往返次数。
  • 3、复用性好。
  • 4、灵活性高。
  • 5、热更新。
  • 6、lua具备原子性,但不具备一致性。

三、服务器与Redis进行交互

3.1、前置条件,生成库文件

cd redis-xxx/deps/hiredis
make
make install
# 使用cmake
mkdir build && cd build
cmake ..
make install

3.1、同步连接

作者给出hiredis文件中,已经实现,已经留出了接口,可以直接使用。

//连接redis服务器
redisContext *redisConnectWithOptions(const redisOptions *options);
redisContext *redisConnect(const char *ip, int port);
redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv);
redisContext *redisConnectNonBlock(const char *ip, int port);
redisContext *redisConnectBindNonBlock(const char *ip, int port,
                                       const char *source_addr);
redisContext *redisConnectBindNonBlockWithReuse(const char *ip, int port,
                                                const char *source_addr);
redisContext *redisConnectUnix(const char *path);
redisContext *redisConnectUnixWithTimeout(const char *path, const struct timeval tv);
redisContext *redisConnectUnixNonBlock(const char *path);
redisContext *redisConnectFd(redisFD fd);

//执行redis命令
void *redisvCommand(redisContext *c, const char *format, va_list ap);
void *redisCommand(redisContext *c, const char *format, ...);
void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen);

//断开连接
void freeReplyObject(void *reply);
void redisFree(redisContext *c);
    unsigned int j, isunix = 0;
    redisContext *c;
    redisReply *reply;
    const char *redis_password = "123456";

    // 定义连接超时时间
    struct timeval timeout = { 1, 500000 }; // 1.5 秒

    // 初始化连接选项
    redisOptions options;
    memset(&options, 0, sizeof(redisOptions));

    // 设置 Redis 服务器的主机名和端口
    const char *hostname = "127.0.0.1";
    const int port = 6379;
    REDIS_OPTIONS_SET_TCP(&options, hostname, port);

    // 设置连接超时和命令超时
    options.connect_timeout = &timeout;
    options.command_timeout = &timeout;

    // 尝试建立连接
    c = redisConnectWithOptions(&options);
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
            redisFree(c);
        } else {
            printf("Connection error: can not allocate redis context\n");
        }
        exit(1);
    }

    //认证密码,如果有密码的话,需要先认证一下,没有密码那就注释掉这行代码。
    reply = redisCommand(c, "AUTH %s", redis_password);
    if (reply->type == REDIS_REPLY_ERROR) {
        printf("Redis认证失败!\n");
    }
    else
    {
        printf("Redis认证成功!\n");
    }

    // 执行 Redis 命令
    int roleid = 10086;

    // 检查键是否存在
    reply = redisCommand(c, "EXISTS role:%d", roleid);
    if (reply == NULL) {
        printf("Command error: %s\n", c->errstr);
        redisFree(c);
        exit(1);
    }

    if (reply->type == REDIS_REPLY_INTEGER && reply->integer == 0) {
        printf("Key does not exist.\n");
        freeReplyObject(reply);
    } else {
        freeReplyObject(reply);

        // 执行 hgetall 命令
        reply = redisCommand(c, "hgetall role:%d", roleid);
        if (reply == NULL) {
            printf("Command error: %s\n", c->errstr);
            redisFree(c);
            exit(1);
        }

        // 处理命令回复
        if (reply->type != REDIS_REPLY_ARRAY) {
            if (reply->type == REDIS_REPLY_NIL) {
                printf("Key does not exist or hash is empty.\n");
            } else {
                printf("Reply error: expected array, got %d\n", reply->type);
            }
        } else {
            // 处理数组回复的逻辑
            printf("Reply: number of elements=%lu\n", reply->elements);
            for (size_t i = 0; i < reply->elements; i += 2) {
                printf("\t%s: %s\n", reply->element[i]->str, reply->element[i + 1]->str);
            }
        }

        // 释放回复对象
        freeReplyObject(reply);
    }

    // 释放连接上下文
    redisFree(c);

在这里插入图片描述

因为此时数据库是空的,所以没有查到数据。
在这里插入图片描述

在这里插入图片描述

3.2、异步连接,基于当前的网络模块-recator,实现驱动

  • 核心思路:
    • reactor是一种基于事件驱动的模式,所以在redis的异步连接中,我们也需要使用事件驱动的方式来处理网络IO操作。
    • 需要做的就是构建事件对象,redis事件对象,reactor事件对象
    • 适配事件控制,复用reactor事件控制
  • 具体操作
    • hiredis已经提供了事件操作的接口,我们只需要适配这些事件操作的接口
redis_event_t *re;
re = (redis_event_t*)hi_malloc(sizeof(*re));
//这些是 hiredis 要求你实现的“注册函数”。每当 Redis 需要监听某个 FD 的读/写事件,就会调用这些函数
ac->ev.addRead = redisAddRead;
ac->ev.delRead = redisDelRead;
ac->ev.addRead = redisAddWrite;
ac->ev.delWrite = redisDelWrite;
ac->ev.cleanup = redisCleanup;//清理函数必不可少
ac->ev.data = re;
  • 核心代码
/*在hiredis/async.h中,作者提供了许多异步执行的接口*/
int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap);
int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);
int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len);
/*===========================================================================
================================================================================*/
// 1. 创建 event loop
R = create_reactor();

// 2. 连接redis
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
if (c->err)
{
    /* Let *c leak for now... */
    redisAsyncFree(c);
    printf("Error: %s\n", c->errstr);
    return 1;
}
// 3. 将 redisAsyncContext 附加到 event loop 上,主要是注册回调函数,填写相关事件
redisAttach(R, c);

// 4.设置连接成功/断开回调函数
redisAsyncSetConnectCallback(c, connectCallback);
redisAsyncSetDisconnectCallback(c, disconnectCallback);

eventloop(R);

release_reactor(R);

在这里插入图片描述

在这里插入图片描述

四、总结

    1. redis事务是为了改变之前一请求一回应的模式,让多个命令可以并发执行;并且要么全部执行成功,要么全部执行失败
    1. redis底层内置Lua虚拟机,使用Lua脚本语言,亲和性会更高
    1. 使用lua脚本,可以保证原子性,但不具备一致性
    1. Lua脚本的优势:减少网络往返次数,复用性好,灵活性高,热更新等
    1. 服务器与redis进行交互,可以使用同步和异步两种方式,同步方式简单直接,但效率较低;而异步方式则可以充分利用网络IO的并发性,提高性能。
    1. 同步连接方案采用阻塞 io 来实现;优点是代码书写是同步的,业务逻辑没有割裂;缺点是阻塞当前线程,直至 redis 返回结果;通常用多个线程来实现线程池来解决效率问题;
    1. 异步连接方案采用非阻塞 io 来实现;优点是没有阻塞当前线程

五、问题

  • 5.1、什么情况下探讨事务?

    多条命令并发执行
  • 什么情况下探讨原子操作操作?
    多核情况下
  • 乐观锁和悲观锁的区别?
    乐观锁:假定不会发生并发冲突,只在提交操作时检查是否违反数据完整性。在提交事务前不锁定,而是在提交事务时检查访问权限。
    悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作。在访问资源之前就先将其锁定。

Code
Ovoice·Github