redis进阶

发布于:2025-08-29 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、基本的事务操作

redis单条命令是保证原子性的,但是事务不保证原子性。

redis事务本质:是一组命令的集合。一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行

一次性、顺序性、排他性。

没有原子性和隔离级别概念,所有的命令在事务中,并没有直接被执行。只有发起执行命令的时候才会执行。

事务:

  • 开启事务(multi)

  • 命令入队(...)

  • 执行事务(exec)

> multi
OK
> set k1 v2
QUEUED
> set k2 v2
QUEUED
> get k1
QUEUED
> exec

放弃事务

discard

编译型异常

如果命令本身有错,事务中所有命令都不会被执行

运行时异常

如果事务队列中存在语法性错误,那么执行命令的时候,其他命令是可以正常执行

二、redis实现乐观锁

乐观锁:更新数据的时候判断一下,在此期间是否有人修改过这个数据

  • 获取version

  • 更新的时候比较version

redis测监视测试

正常执行一个事务

> set money 100
OK
> set out 0
OK
> watch money
OK
> multi
OK
> decrby money 20
QUEUED
> incrby out 20
QUEUED
> exec

在一个事务执行中插入另一个事务操作,模拟

#one
> watch money
OK
> multi
OK
> decrby money 10
QUEUED
> incrby out 10
QUEUED

#two
> get money
"80"
> set money 1000

#one
> exec
nil

使用watch可以当作redis的乐观锁操作

如果事务执行失败,就先解锁

unwatch

再重新加锁,重新执行事务

三、通过Jedis操作redis

Jedis是redis推荐的java连接开发工具,使用java操作redis中间件。

1.创建一个空项目,并在其中创建一个maven的module

2.添加对应依赖

  
  <dependencies>
<!--        导入jedis-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
    </dependencies>

3.编码测试:

public class TestPing {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        System.out.println(jedis.ping());
        jedis.flushAll();
        jedis.close();
    }
}

四、springboot集成redis

jedis替换为lettuce:jedis采用直连,多个线程操作不安全,要想避免不安全,需要使用jedis pool连接池

lettuce采用netty,实例可以在多个线程中进行共享,不存在线程不安全的情况,可以减少线程数据。

测试

1.创建一个springboot项目

2.导入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </dependency>

2.配置参数

spring.redis.host=127.0.0.1
spring.redis.port=6379

3.测试

@Test
    public void contextLoads() {
        redisTemplate.getExpire("one");
    }

五、自定义Template

创建一个User对象

@Component
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {
    private String name;
    private int age;
}

进行测试

@Test
    public void UserTest() throws JsonProcessingException {
        User user = new User("sstx", 3);
        String jsonUser = new ObjectMapper().writeValueAsString(user);
        redisTemplate.opsForValue().set("user", jsonUser);
        Object user1 = redisTemplate.opsForValue().get("user");
        System.out.println(user1);
    }

如果直接存入对象:

@Test
    public void UserTest() throws JsonProcessingException {
        User user = new User("sstx", 3);
//        String jsonUser = new ObjectMapper().writeValueAsString(user);
        redisTemplate.opsForValue().set("user", user);
        Object user1 = redisTemplate.opsForValue().get("user");
        System.out.println(user1);
    }

会报错没有序列化。serializer.SerializationException: Cannot serialize

序列化:

public class User implements Serializable {
RedisConfig

package com.sstx.springbootredistest1.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.net.UnknownHostException;

@Configuration
public class RedisConfig {

    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

//配置自己的序列化方式
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//        //key采用string的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //hash的key也采用string序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value序列化方式也采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();


        return template;
    }
}
RedisUtil

package com.sstx.springbootredistest1.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public final class RedisUtils {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    /*
    指定缓存失效时间
    @param key 键
    @param time 时间(秒)
    @return
     */
    public boolean expire(String key,long time){
        try {
            if (time > 0){
                redisTemplate.expire(key,time, TimeUnit.SECONDS);
            }
            return true;
        }catch (Exception e){
            return false;
        }
    }
    /*
    根据key 获取过期时间
    @param key 键 不能为null
    @return 时间(秒)返回0代表永久有效
     */
    public long getExpire(String key){
        return redisTemplate.getExpire(key,TimeUnit.SECONDS);
    }
    /*
    判断key 是否存在
    @param key 键
    @return ture 存在 false不存在
     */
    public boolean hasKey(String key){
        try {
            return redisTemplate.hasKey(key);
        }catch (Exception e){
            return  false;
        }
    }
    /*
    删除缓存
    @param key 可以传一个值或者多个值
     */
    @SuppressWarnings("unchecked")
    public void del(String... key){
        if (key != null && key.length > 0){
            if (key.length == 1){
                redisTemplate.delete(key[0]);
            }else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }
    //=========================String==================================
    /*
    普通缓存获取
    @param key 键
    @return 值
     */
    public Object get(String key){
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }
    /*
    普通缓存放入
    @param key 键
    @param value 值
    @return turn 成功 false 失败
     */
    public boolean set(String key,Object value){
        try {
            redisTemplate.opsForValue().set(key,value);
            return true;
        }catch (Exception e){
            return false;
        }
    }
    /*
    普通缓存放入并设置时间
    @param key 键
    @param value 值
    @param time 时间(秒) time要大于0 如果time小于0,将设置成无限制
    @return turn 成功 false 失败
     */
    public  boolean set(String key,Object value,long time){
        try {
            if (time > 0){
                redisTemplate.opsForValue().set(key,value,time,TimeUnit.SECONDS);
            }else {
                redisTemplate.opsForValue().set(key,value);
            }
            return true;
        }catch (Exception e){
            return false;
        }
    }
    /*
    递增
    @param key 键
    @param delta 值 要增加几(大于0)
    @return
     */
    public long incr(String key,long delta){
        if (delta < 0){
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key,delta);
    }
    /*
    递减
    @param key 键
    @param delta 值 要减少几(大于0)
    @return
     */
    public long decr(String key,long delta){
        if (delta < 0){
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().decrement(key,delta);
    }
    // ================================Map=================================
    /**
     * HashGet
     * @param key 键 不能为null
     * @param item 项 不能为null
     * @return 值
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }
    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }
    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     * @return true 成功 false 失败
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * HashSet 并设置时间
     * @param key 键
     * @param map 对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 向一张hash表中放入数据,如果不存在将创建
     * @param key 键
     * @param item 项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 向一张hash表中放入数据,如果不存在将创建
     * @param key 键
     * @param item 项
     * @param value 值
     * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 删除hash表中的值
     * @param key 键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }
    /**
     * 判断hash表中是否有该项的值
     * @param key 键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }
    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     * @param key 键
     * @param item 项
     * @param by 要增加几(大于0)
     * @return
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }
    /**
     * hash递减
     * @param key 键
     * @param item 项
     * @param by 要减少记(小于0)
     * @return
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }
    // ============================set=============================
    /**
     * 根据key获取Set中的所有值
     * @param key 键
     * @return
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 根据value从一个set中查询,是否存在
     * @param key 键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 将数据放入set缓存
     * @param key 键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
    /**
     * 将set数据放入缓存
     * @param key 键
     * @param time 时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
    /**
     * 获取set缓存的长度
     * @param key 键
     * @return
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
    /**
     * 移除值为value的
     * @param key 键
     * @param values 值 可以是多个
     * @return 移除的个数
     */
    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
    // ===============================list=================================
    /**
     * 获取list缓存的内容
     * @param key 键
     * @param start 开始
     * @param end 结束 0 到 -1代表所有值
     * @return
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 获取list缓存的长度
     * @param key 键
     * @return
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
    /**
     * 通过索引 获取list中的值
     * @param key 键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     * @return
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @param time 时间(秒)
     * @return
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 将list放入缓存
     *
     * @param key 键
     * @param value 值
     * @param time 时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 根据索引修改list中的某条数据
     * @param key 键
     * @param index 索引
     * @param value 值
     * @return
     */
    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 移除N个值为value
     * @param key 键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */
    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

六、redis配置文件

单位

# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes
#
# units are case insensitive so 1GB 1Gb 1gB are all the same.

1.配置文件units单位对大小写不敏感

包含

# Include one or more other config files here.  This is useful if you
# have a standard template that goes to all Redis servers but also need
# to customize a few per-server settings.  Include files can include
# other files, so use this wisely.
#
# Note that option "include" won't be rewritten by command "CONFIG REWRITE"
# from admin or Redis Sentinel. Since Redis always uses the last processed
# line as value of a configuration directive, you'd better put includes
# at the beginning of this file to avoid overwriting config change at runtime.
#
# If instead you are interested in using includes to override configuration
# options, it is better to use include as the last line.
#
# Included paths may contain wildcards. All files matching the wildcards will
# be included in alphabetical order.
# Note that if an include path contains a wildcards but no files match it when
# the server is started, the include statement will be ignored and no error will
# be emitted.  It is safe, therefore, to include wildcard files from empty
# directories.
#
# include /path/to/local.conf
# include /path/to/other.conf
# include /path/to/fragments/*.conf

通过include把其他配置文件导进来

网络

bind 127.0.0.1 -::1  #绑定ip
protected-mode yes   #保护模式开启
port 6379            #端口

通用

daemonize no  #守护进程,默认为no,需自己开启为yes
pidfile /var/run/redis_6379.pid   #如果以后台方式运行,就需要指定一个pid文件
#日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel notice

logfile ""  #日志的文件输出名

databases 16    #默认的数据库数量
always-show-logo no  #是否总是显示logo

快照

save 3600 1 
save 300 100 
save 60 10000
#如果在xx秒内,至少有YY个key进行了修改,就进行持久化

stop-writes-on-bgsave-error yes #如果持久化出错是否继续工作

rdbcompression yes   #是否压缩rdb文件,会压缩CPU资源
rdbchecksum yes     #保存rdb文件时进行校验

dir ./              #rdb文件保存目录
持久化中用到,在规定时间内,执行多少次操作,则持久化到文件

redis是内存数据库,如果没有持久化,就会断电即失。

REPLICATION 复制

SECURITTY

密码设置

# requirepass foobared

命令设置

config get requirepass:获取密码
config set requirepass "111"
设置后登录前需验证
auth "111"

客户端限制CLIENTS

# maxclients 10000

MEMORY MANAGEMENT

# maxmemory <bytes> #最大内存

# maxmemory-policy noeviction  #内存达到上限后的处理策略
noeviction: 不删除策略, 达到最大内存限制时, 如果需要更多内存, 直接返回错误信息。(默认值)
allkeys-lru: 所有key通用; 优先删除最近最少使用(less recently used ,LRU) 的 key。
volatile-lru: 只限于设置了 expire 的部分; 优先删除最近最少使用(less recently used ,LRU) 的 key。
allkeys-random: 所有key通用; 随机删除一部分 key。
volatile-random: 只限于设置了 expire 的部分; 随机删除一部分 key。
volatile-ttl: 只限于设置了 expire 的部分; 优先删除剩余时间(time to live,TTL) 短的key。

APPEND ONLY模式 AOF配置

#默认不开启,默认使用RDB
appendonly no
appendfilename "appendonly.aof"    #文件名

#执行策略
# appendfsync always   #每次修改都会同步,消耗性能
appendfsync everysec   #可能丢失1秒数据
# appendfsync no       #不执行,操作系统自己同步

七、RDB操作

在指定的时间间隔内将内存中的数据集快照写入磁盘,即snapshot快照。它的恢复是将快照文件直接读到内存中。

redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的,确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加高效,RDB缺点是最后一次持久化后的数据可能丢失。

rdb保存的文件是dump.rdb

1.修改redis.conf,配置保存规则

save 60 5

2.向redis中添加数据

set k1 v1
set k2 v2
set k3 v3
set k4 v4
set k5 v5

3.关闭redis

shutdown

4.退出redis

exit

5.查看redis服务是否已关闭

ps -ef|grep redis

6.查看dump.rdb文件是否生成

/usr/local/bin下:

ls

7.开启服务

redis-server kconfig/redis.conf

8.进入redis

redis-cli -p 6379

9.取数据,验证是否持久化

get k1

触发规则

1.save规则满足,自动触发rdb规则

2.flushall之后会默认产生一个rdb文件

3.退出redis,也会产生rdb文件

备份会自动生成一个dump.rdb

如何恢复rdb文件

1.只需要将rdb文件放在redis启动目录就可以,redis启动时会自动检查dump.rdb恢复其中的数据

2.查看需要放置的目录位置

config get dir
1) "dir"
2) "/usr/local/bin"

优点:

1.适合大规模数据恢复,如果对数据完整性要求不高。

缺点:

1.需要一定的时间间隔进行操作,如果redis意外宕机,最后一次修改数据就丢失了

2.fork进程时会占用一定的内存空间

八、AOF操作

将我们的操作全部记录下来。以日志的形式记录每个写操作,将redis执行过的所有指令记录下来,读操作不吉利,只许追加文件但不可以改文件,redis启动之初会读取改文件重新构建数据。

保存的是appendonly.aof文件

1.默认不开启,要现在配置文件中开启

appendonly yes

2.关闭redis后,即生成appendonlydir文件夹

如果append.aof文件有问题,可以执行redis-check-aof 来进行检查

redis-check-aof --fix appendonly.aof

优点:

1.每一次修改都同步,文件完整性会更好

2.如果设置每秒同步一次,可能会丢失一秒数据

缺点:

1.相对于数据文件,aof远比rdb大,修复速度也比rdb慢

2.aof运行效率也比rdb慢

九、redis订阅发布

发布订阅(pub/sub)是一种消息通信模式,发送者发送消息,订阅者接收消息。

redis客户端可以订阅任意数量的频道。

测试

订阅端:订阅一个频道

subscribe sstx

发送端:发送消息

publish sstx "hello,moring"

redis是通过C实现的,通过publish,subscribe,psubscribe等命令实现发布和订阅功能。通过subscribe命令订阅某频道后,redis-server里维护了一个字典,字典的键即使channel,而字典的值是一个链表,链表中保存了所有订阅这个channel的客户端。subscribe命令的关键,就是将客户端添加到给定channel的订阅链表中。

pub/sub从字面理解就是发布和订阅,在redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值进行了消息发布后,所有订阅的客户端都会收到响应的消息。实时消息系统

十、redis主从复制

概念

主从复制,是指将一台redis服务器的数据,复制到其他的redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower)。数据的复制是单向的,只能从主节点到从节点。master以写为主,slave以读为主。

默认情况下,每台redis服务器都是主节点,且一个主节点可以有多个从节点,或没有从节点,但一个从节点只能有一个主节点。

主从复制的作用主要包括:

1.数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。

2.故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复,实际是一种服务的冗余。

3.负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务,即写redis数据时应用连接主节点,读redis数据时应用连接从节点,分担服务器负载。尤其在写少读多的场景下,通过多个从节点分担读负载,可以大大提供redis服务器的并发量。

4.高可用基石:主从复制还是哨兵和集群能够实施的基础,因此说主从复制是redis高可用的基础。

一般来说,要将redis运用于工程项目中,只使用一台redis是不可能的,原因如下:

1.从结构上,单个redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大。

2.从容量上,单个redis服务器内存容量有限,就算一台redis服务器内存容量为256g,也不能将所有内存用作redis存储内存,一般来说,单台redis最大使用内存不超过20G。

十一、redis集群环境搭建

环境配置

只配置从库,不用配置主库

info replication
#查看当前库的信息

搭建

1.在/usr/local/bin/kconfig下创建多个配置文件

cp redis.conf redis79.conf
cp redis.conf redis80.conf
cp redis.conf redis81.conf

2.修改配置文件

port 6379
pidfile /var/run/redis_6379.pid
logfile "6379.log"
dbfilename dump6379.rdb

配置一主二从

slaveof host port

真实的主从配置应该是在配置文件中配置

replicaof <masterip> <masterport>

细节

主机负责写,从机负责读

主机断开连接后,从机依然是连接到主机的。主机恢复后,从机依旧可以从主机获取值。

如果是使用命令行配置的主从,从机断开后再启动,就不会维持主从关系。但只要再次设置为从机,就会获取主机的数据。

复制原理

slave启动成功后连接到master,会发送一个sync同步命令

master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集的命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制:slave服务在接收到数据库文件数据后,将其存盘并加载到内存中

增量复制:master继续将新的所有收集到的修改命令一次传给slave,完成同步

只要重新连接master,一次全量复制将被自动执行。

十二、宕机后手动配置主机

贪吃蛇模型

一个从机连接一个从机,这时也能完成数据复制

主机宕机后,下一个节点的从机执行命令,手动完成变为主节点的操作

slaveof no one

十三、哨兵模式

哨兵模式是一种特殊的模式,首先redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等到redis服务器响应,从而监控运行的多个redis实例。

一个哨兵进程对redis服务器进行监控,可能会出现问题。因此,使用多个哨兵进行监控,各个哨兵之间也进行监控,这就是多哨兵模式。

假设主服务器宕机,哨兵1先检测到这个结果,系统不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象称为主观下线。当后面的哨兵也检测到主服务器不可用,且数量达到一定值时,哨兵之间会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。

测试

目前的状态是一主二从。

1.配置。在/usr/local/bin/kconfig下创建sentinel.conf文件

                 主机名称  监控谁         代表如果主机挂掉,就投票 
sentinel monitor myredis 127.0.0.1 6379 1

2.启动哨兵

redis-sentinel kconfig/sentinel.conf

如果master断掉,哨兵会检测到,然后进行投票选择一个新的主机

如果原主机恢复,只能归并到新的主机下,成为一个从机

优点:

1.哨兵集群,基于主从复制模式,所有的主从配置优点都有

2.主从可以切换,故障可以转移,系统可用性会更好

3.哨兵模式是主从模式的升级,自动,更加健壮

缺点:

1.redis不好在线扩容,集群容量达到上限,在线扩容就十分麻烦

2.实现哨兵模式的配置其实很麻烦,配置选择很多

十四、缓存穿透和雪崩

redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中最重要的问题,就是数据一致性问题。严格来讲,这个问题无解。如果对数据的一致性要求很高,就不能使用缓存。

缓存穿透

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询,发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库,这会给持久层数据库造成很大的压力,这就相当于出现了缓存穿透。

解决方案

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。

缓存空对象

当存储层不命中后,即使返回的空对象也将其缓存起来,同时设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源。

但是这样会有两个问题:

1.如果控制能够被缓存起来,意味着缓存需要更多地空间存储更多的键,因为这当中可能会有很多的空值的键。

2.即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响

缓存击穿

缓存击穿是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就会穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。

当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据。由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,导致数据库瞬间压力过大。

解决方案

设置热点数据永不过期

从缓存层面看,没有设置过期时间,就不会出现热点key过期后产生的问题

加互斥锁

分布式锁:使用分布式锁,保证对于每一个key,同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方法将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

缓存雪崩

缓存雪崩,是指在某一个时间段,缓存集中过期失效

产生雪崩的原因之一,就是如果缓存设置时间为一小时,一小时后,缓存过期,再来查询就会对数据库产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

解决方案

redis高可用

搭建集群

限流降级

在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。

数据预热

数据加热的含义是在正式部署前,把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。

原视频地址:https://www.bilibili.com/video/BV1S54y1R7SB?p=36&share_source=copy_web