一、事务

Redis事务:redis可以保证一条命令原子性,但是不保证多条命令的原子性

Redis隔离级别:不存在隔离级别的概念!

Redis事务的操作:

  • 开启事务
  • 命令入队
  • 执行事务
# example
127.0.0.1:6379> multi  # 开启事务
OK
127.0.0.1:6379> mset k1 v1 k2 v2 # 设置多个值
QUEUED   # 进入队列
127.0.0.1:6379> set k3 v3  # 设置一个值
QUEUED
127.0.0.1:6379> get k2  # 获取一个值
QUEUED
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> exec  # 执行这个事务
1) OK  # 设置成功
2) OK # 设置成功
3) "v2"  # 获取到了k2的值
4) OK  # 设置成功

###################################################

# 取消事务
127.0.0.1:6379> flushdb  # 清空redis 
OK
127.0.0.1:6379> multi  # 开启事务
OK
127.0.0.1:6379> set key1 value1
QUEUED
127.0.0.1:6379> mset k2 v2 k3 v3 k4 v4 # 设置值
QUEUED
127.0.0.1:6379> DISCARD  # 取消事务
OK
127.0.0.1:6379> get key1  # 因为事务的取消  导致set设置的值没有
(nil)


###################################################
#  Redis事务(多条语句下)不支持原子性
127.0.0.1:6379> set k1 "v1"   # 设置一个字符串
OK
127.0.0.1:6379> multi # 开启事务
OK
127.0.0.1:6379> incr k1 # 对字符串进行自增,明显会报错
QUEUED
127.0.0.1:6379> set k2 c2  # 设置一个k2
QUEUED
127.0.0.1:6379> set k4 v5 # 设置k4
QUEUED
127.0.0.1:6379> exec  # 执行事务
1) (error) ERR value is not an integer or out of range
2) OK
3) OK
127.0.0.1:6379> get k4  # 即使自增报错,依旧可以拿到后面设置的值
"v5"


Redis多线程下乐观锁实现

# 线程1
127.0.0.1:6379> set flower 100  # 设置一个值
OK
127.0.0.1:6379> get flower   # 查看
"100"
127.0.0.1:6379> watch flower  # 监视这个key
OK
127.0.0.1:6379> multi   # 开启事务
OK
127.0.0.1:6379> DECRBY flower 10  # 对flower总体进行减法操作
QUEUED
127.0.0.1:6379> INCRBY out 10     # 既然flower减法操作了,那么就对应有加法操作
QUEUED
127.0.0.1:6379> exec    # 执行事务
(nil)


# 线程2
127.0.0.1:6379> get flower  # 在线程1还未执行事务的时候,就抢占了资源,先查看了flower
"100"
127.0.0.1:6379> set flower 1000  # 再对flower进行了操作, 那么线程1的flower值肯定不对了,这就导致了乐观锁的失败!!
OK

解决方案

# 线程1
127.0.0.1:6379> exec
1) (integer) 700
2) (integer) 300
127.0.0.1:6379> unwatch   # 放弃监视
OK
127.0.0.1:6379> watch flower  # 再次监视
OK
127.0.0.1:6379> multi  
OK
127.0.0.1:6379> INCRBY flower 300  # 加上300
QUEUED
127.0.0.1:6379> DECRBY out 300   # 减少300
QUEUED
127.0.0.1:6379> exec
1) (integer) 1000
2) (integer) 0

# 线程2 只获取了key,并没有修改,而线程1就不断的尝试,只要线程2不对数据进行修改就可以达到成功!!
127.0.0.1:6379> get flower
"700"
127.0.0.1:6379> get flower
"700"
127.0.0.1:6379> get flower
"1000"
  • 乐观锁:乐观锁是一种不实际加锁的过程,在运行过程中,就以为它就是不会改变,在redis中乐观锁就可以用watch和unwatch实现,通过watch监视,当乐观锁失败时,就使用unwatch取消监视然后又再次watch监视,再执行相同的操作,可以自己设置重试的次数,当某一次在线程1执行事务中,未被线程2抢占时间片去修改,那么线程1的事务就可以执行成功!

Redis 多线程

二、Springboot集成Redis

  • spring-boot-starter-data-redis版本在2.xxx之后采用的是lettuce作为redis的操作类型,它很好的支持了多线程并发下的线程安全问题(底层通信采用了netty),而原来的jedis在多线程情况下会出现线程不安全的情况

导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.5.5</version>
        </dependency>

RedisTemplate

  • 如果不自定义RedisTemplate,springboot就会自动使用他装配的默认RedisTemplate
@Configuration(
    proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
    public RedisAutoConfiguration() {
    }

    @Bean
    @ConditionalOnMissingBean(
        name = {"redisTemplate"}
    )
    
    // ConditionalOnSingleCandidate表示如果没有自定义RedisTemplate这个类那么springboot自带的RedisTemplate就会生效!
    @ConditionalOnSingleCandidate(RedisConnectionFactory.class)
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnSingleCandidate(RedisConnectionFactory.class)
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}
  • 自定义RedisTemplate

RedisTemplate需要进行序列化,不然存储的对象或者中文都无法进行正常的读取,可能会出现乱码行为

  /**
     * RedisTemplate通用模板
     * @param factory
     * @return
     */
    @Bean(name = "redisTemplate")
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        //(1)Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        //过期方法
        //om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        /**
         * Jackson ObjectMapper 中的 enableDefaultTyping 方法由于安全原因,
         * 从 2.10.0 开始标记为过期,建议用 activateDefaultTyping 方法代替,
         * 所以如果继续使用 enableDefaultTyping 会有警告出现,我们现在要消除这个警告,
         * 去看一下 enableDefaultTyping 源码
         */
        om.activateDefaultTyping(
                LaissezFaireSubTypeValidator.instance,
                ObjectMapper.DefaultTyping.NON_FINAL,
                JsonTypeInfo.As.WRAPPER_ARRAY);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //(2)String的序列化
        RedisSerializer<String> stringRedisSerializer = new StringRedisSerializer();

        //序列化设置,这样为了存储操作对象时正常显示的数据,也能正常存储和获取
        //key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //hash的key采用String序列化
        template.setHashKeySerializer(stringRedisSerializer);
        //value采用Json序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value采用json序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

自定义RedisUtil实现对RedisTemplate的再次封装

  • 使用RedisTemplate<String, Object> 底层api比较繁琐,可以自定义一个Redis工具类来实现并达到像操作redis命令行的操作!
// 通用模板RedisTemplate<String, Object>

package com.example.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Connection;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @author XiaZh
 * @version v1.0
 * @Description :
 * Created on 2021/10/11.
 */
@Component
public class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;


    /**
     * 设置过期时间
     * @param key
     * @param time
     * @param unit
     * @return
     */
    public Boolean expire(String key, Long time, TimeUnit unit) {
        return redisTemplate.expire(key, time, unit);
    }

    /**
     * 删除一个key
     * @param key
     */
    public void delete(String key) {
        redisTemplate.delete(key);
    }

    /**
     * 批量删除key
     * @param keys
     */
    public void delete(Collection<String> keys) {
        redisTemplate.delete(keys);
    }

    /**
     * 序列化key
     * @param key
     * @return
     */
    public byte[] dump(String key) {
        return redisTemplate.dump(key);
    }


    /**
     * 是否存在key
     *
     * @param key
     * @return
     */
    public Boolean hasKey(String key) {
        return redisTemplate.hasKey(key);
    }

    /**
     * 设置过期时间
     *
     * @param key
     * @param timeout
     * @param unit
     * @return
     */
    public Boolean expire(String key, long timeout, TimeUnit unit) {
        return redisTemplate.expire(key, timeout, unit);
    }

    /**
     * 设置过期时间
     *
     * @param key
     * @param date
     * @return
     */
    public Boolean expireAt(String key, Date date) {
        return redisTemplate.expireAt(key, date);
    }

    /**
     * 查找匹配的key
     *
     * @param pattern
     * @return
     */
    public Set<String> keys(String pattern) {
        return redisTemplate.keys(pattern);
    }

    /**
     * 将当前数据库的 key 移动到给定的数据库 db 当中
     *
     * @param key
     * @param dbIndex
     * @return
     */
    public Boolean move(String key, int dbIndex) {
        return redisTemplate.move(key, dbIndex);
    }

    /**
     * 移除 key 的过期时间,key 将持久保持
     *
     * @param key
     * @return
     */
    public Boolean persist(String key) {
        return redisTemplate.persist(key);
    }

    /**
     * 返回 key 的剩余的过期时间
     *
     * @param key
     * @param unit
     * @return
     */
    public Long getExpire(String key, TimeUnit unit) {
        return redisTemplate.getExpire(key, unit);
    }

    /**
     * 返回 key 的剩余的过期时间
     *
     * @param key
     * @return
     */
    public Long getExpire(String key) {
        return redisTemplate.getExpire(key);
    }

    /**
     * 从当前数据库中随机返回一个 key
     *
     * @return
     */
    public String randomKey() {
        return redisTemplate.randomKey();
    }

    /**
     * 修改 key 的名称
     *
     * @param oldKey
     * @param newKey
     */
    public void rename(String oldKey, String newKey) {
        redisTemplate.rename(oldKey, newKey);
    }

    /**
     * 仅当 newkey 不存在时,将 oldKey 改名为 newkey
     *
     * @param oldKey
     * @param newKey
     * @return
     */
    public Boolean renameIfAbsent(String oldKey, String newKey) {
        return redisTemplate.renameIfAbsent(oldKey, newKey);
    }

    /**
     * 返回 key 所储存的值的类型
     *
     * @param key
     * @return
     */
    public DataType type(String key) {
        return redisTemplate.type(key);
    }

    /** -------------------string相关操作--------------------- */

    /**
     * 设置指定 key 的值
     * @param key
     * @param value
     */
    public void set(String key, Object value) {
        redisTemplate.opsForValue().set(key, value);
    }

    /**
     * 获取指定 key 的值
     * @param key
     * @return
     */
    public Object get(String key) {
        return redisTemplate.opsForValue().get(key);
    }

    /**
     * 返回 key 中字符串值的子字符
     * @param key
     * @param start
     * @param end
     * @return
     */
    public String getRange(String key, long start, long end) {
        return redisTemplate.opsForValue().get(key, start, end);
    }

    /**
     * 将给定 key 的值设为 value ,并返回 key 的旧值(old value)
     *
     * @param key
     * @param value
     * @return
     */
    public Object getAndSet(String key, Object value) {
        return redisTemplate.opsForValue().getAndSet(key, value);
    }

    /**
     * 对 key 所储存的字符串值,获取指定偏移量上的位(bit)
     *
     * @param key
     * @param offset
     * @return
     */
    public Boolean getBit(String key, long offset) {
        return redisTemplate.opsForValue().getBit(key, offset);
    }

    /**
     * 批量获取
     *
     * @param keys
     * @return
     */
    public List<Object> multiGet(Collection<String> keys) {
        return redisTemplate.opsForValue().multiGet(keys);
    }

    /**
     * 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value
     *
     * @param key
     * @param value
     *            值,true为1, false为0
     * @return
     */
    public boolean setBit(String key, long offset, boolean value) {
        return redisTemplate.opsForValue().setBit(key, offset, value);
    }

    /**
     * 将值 value 关联到 key ,并将 key 的过期时间设为 timeout
     *
     * @param key
     * @param value
     * @param timeout
     *            过期时间
     * @param unit
     *            时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES
     *            秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS
     */
    public void setEx(String key, Object value, long timeout, TimeUnit unit) {
        redisTemplate.opsForValue().set(key, value, timeout, unit);
    }

    /**
     * 只有在 key 不存在时设置 key 的值
     *
     * @param key
     * @param value
     * @return 之前已经存在返回false,不存在返回true
     */
    public boolean setIfAbsent(String key, Object value) {
        return redisTemplate.opsForValue().setIfAbsent(key, value);
    }

    /**
     * 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始
     *
     * @param key
     * @param value
     * @param offset
     *            从指定位置开始覆写
     */
    public void setRange(String key, String value, long offset) {
        redisTemplate.opsForValue().set(key, value, offset);
    }

    /**
     * 获取字符串的长度
     *
     * @param key
     * @return
     */
    public Long size(String key) {
        return redisTemplate.opsForValue().size(key);
    }

    /**
     * 批量添加
     *
     * @param maps
     */
    public void multiSet(Map<String, String> maps) {
        redisTemplate.opsForValue().multiSet(maps);
    }

    /**
     * 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在
     *
     * @param maps
     * @return 之前已经存在返回false,不存在返回true
     */
    public boolean multiSetIfAbsent(Map<String, String> maps) {
        return redisTemplate.opsForValue().multiSetIfAbsent(maps);
    }

    /**
     * 增加(自增长), 负数则为自减
     *
     * @param key
     * @return
     */
    public Long incrBy(String key, long increment) {
        return redisTemplate.opsForValue().increment(key, increment);
    }

    /**
     * 增加(自增长), 负数则为自减
     * @param key
     * @return
     */
    public Double incrByFloat(String key, double increment) {
        return redisTemplate.opsForValue().increment(key, increment);
    }

    /**
     * 追加到末尾
     *
     * @param key
     * @param value
     * @return
     */
    public Integer append(String key, String value) {
        return redisTemplate.opsForValue().append(key, value);
    }

    /** -------------------hash相关操作------------------------- */

    /**
     * 获取存储在哈希表中指定字段的值
     *
     * @param key
     * @param field
     * @return
     */
    public Object hGet(String key, String field) {
        return redisTemplate.opsForHash().get(key, field);
    }

    /**
     * 获取所有给定字段的值
     *
     * @param key
     * @return
     */
    public Map<Object, Object> hGetAll(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * 获取所有给定字段的值
     *
     * @param key
     * @param fields
     * @return
     */
    public List<Object> hMultiGet(String key, Collection<Object> fields) {
        return redisTemplate.opsForHash().multiGet(key, fields);
    }

    public void hPut(String key, String hashKey, String value) {
        redisTemplate.opsForHash().put(key, hashKey, value);
    }

    public void hPutAll(String key, Map<String, String> maps) {
        redisTemplate.opsForHash().putAll(key, maps);
    }

    /**
     * 仅当hashKey不存在时才设置
     *
     * @param key
     * @param hashKey
     * @param value
     * @return
     */
    public Boolean hPutIfAbsent(String key, String hashKey, String value) {
        return redisTemplate.opsForHash().putIfAbsent(key, hashKey, value);
    }

    /**
     * 删除一个或多个哈希表字段
     *
     * @param key
     * @param fields
     * @return
     */
    public Long hDelete(String key, Object... fields) {
        return redisTemplate.opsForHash().delete(key, fields);
    }

    /**
     * 查看哈希表 key 中,指定的字段是否存在
     *
     * @param key
     * @param field
     * @return
     */
    public boolean hExists(String key, String field) {
        return redisTemplate.opsForHash().hasKey(key, field);
    }

    /**
     * 为哈希表 key 中的指定字段的整数值加上增量 increment
     *
     * @param key
     * @param field
     * @param increment
     * @return
     */
    public Long hIncrBy(String key, Object field, long increment) {
        return redisTemplate.opsForHash().increment(key, field, increment);
    }

    /**
     * 为哈希表 key 中的指定字段的整数值加上增量 increment
     *
     * @param key
     * @param field
     * @param delta
     * @return
     */
    public Double hIncrByFloat(String key, Object field, double delta) {
        return redisTemplate.opsForHash().increment(key, field, delta);
    }

    /**
     * 获取所有哈希表中的字段
     *
     * @param key
     * @return
     */
    public Set<Object> hKeys(String key) {
        return redisTemplate.opsForHash().keys(key);
    }

    /**
     * 获取哈希表中字段的数量
     *
     * @param key
     * @return
     */
    public Long hSize(String key) {
        return redisTemplate.opsForHash().size(key);
    }

    /**
     * 获取哈希表中所有值
     *
     * @param key
     * @return
     */
    public List<Object> hValues(String key) {
        return redisTemplate.opsForHash().values(key);
    }

    /**
     * 迭代哈希表中的键值对
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) {
        return redisTemplate.opsForHash().scan(key, options);
    }

    /** ------------------------list相关操作---------------------------- */

    /**
     * 通过索引获取列表中的元素
     *
     * @param key
     * @param index
     * @return
     */
    public Object lIndex(String key, long index) {
        return redisTemplate.opsForList().index(key, index);
    }

    /**
     * 获取列表指定范围内的元素
     *
     * @param key
     * @param start
     *            开始位置, 0是开始位置
     * @param end
     *            结束位置, -1返回所有
     * @return
     */
    public List<Object> lRange(String key, long start, long end) {
        return redisTemplate.opsForList().range(key, start, end);
    }

    /**
     * 存储在list头部
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPush(String key, Object value) {
        return redisTemplate.opsForList().leftPush(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushAll(String key, Object... value) {
        return redisTemplate.opsForList().leftPushAll(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushAll(String key, Collection<Object> value) {
        return redisTemplate.opsForList().leftPushAll(key, value);
    }

    /**
     * 当list存在的时候才加入
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushIfPresent(String key, Object value) {
        return redisTemplate.opsForList().leftPushIfPresent(key, value);
    }

    /**
     * 如果pivot存在,再pivot前面添加
     *
     * @param key
     * @param pivot
     * @param value
     * @return
     */
    public Long lLeftPush(String key, String pivot, Object value) {
        return redisTemplate.opsForList().leftPush(key, pivot, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPush(String key, Object value) {
        return redisTemplate.opsForList().rightPush(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushAll(String key, Object... value) {
        return redisTemplate.opsForList().rightPushAll(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushAll(String key, Collection<Object> value) {
        return redisTemplate.opsForList().rightPushAll(key, value);
    }

    /**
     * 为已存在的列表添加值
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushIfPresent(String key, Object value) {
        return redisTemplate.opsForList().rightPushIfPresent(key, value);
    }

    /**
     * 在pivot元素的右边添加值
     *
     * @param key
     * @param pivot
     * @param value
     * @return
     */
    public Long lRightPush(String key, String pivot, Object value) {
        return redisTemplate.opsForList().rightPush(key, pivot, value);
    }

    /**
     * 通过索引设置列表元素的值
     *
     * @param key
     * @param index
     *            位置
     * @param value
     */
    public void lSet(String key, long index, Object value) {
        redisTemplate.opsForList().set(key, index, value);
    }

    /**
     * 移出并获取列表的第一个元素
     *
     * @param key
     * @return 删除的元素
     */
    public Object lLeftPop(String key) {
        return redisTemplate.opsForList().leftPop(key);
    }

    /**
     * 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param key
     * @param timeout
     *            等待时间
     * @param unit
     *            时间单位
     * @return
     */
    public Object lBLeftPop(String key, long timeout, TimeUnit unit) {
        return redisTemplate.opsForList().leftPop(key, timeout, unit);
    }

    /**
     * 移除并获取列表最后一个元素
     *
     * @param key
     * @return 删除的元素
     */
    public Object lRightPop(String key) {
        return redisTemplate.opsForList().rightPop(key);
    }

    /**
     * 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param key
     * @param timeout
     *            等待时间
     * @param unit
     *            时间单位
     * @return
     */
    public Object lBRightPop(String key, long timeout, TimeUnit unit) {
        return redisTemplate.opsForList().rightPop(key, timeout, unit);
    }

    /**
     * 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
     *
     * @param sourceKey
     * @param destinationKey
     * @return
     */
    public Object lRightPopAndLeftPush(String sourceKey, String destinationKey) {
        return redisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
                destinationKey);
    }

    /**
     * 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param sourceKey
     * @param destinationKey
     * @param timeout
     * @param unit
     * @return
     */
    public Object lBRightPopAndLeftPush(String sourceKey, String destinationKey,
                                        long timeout, TimeUnit unit) {
        return redisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
                destinationKey, timeout, unit);
    }

    /**
     * 删除集合中值等于value得元素
     *
     * @param key
     * @param index
     *            index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;
     *            index<0, 从尾部开始删除第一个值等于value的元素;
     * @param value
     * @return
     */
    public Long lRemove(String key, long index, Object value) {
        return redisTemplate.opsForList().remove(key, index, value);
    }

    /**
     * 裁剪list
     *
     * @param key
     * @param start
     * @param end
     */
    public void lTrim(String key, long start, long end) {
        redisTemplate.opsForList().trim(key, start, end);
    }

    /**
     * 获取列表长度
     *
     * @param key
     * @return
     */
    public Long lLen(String key) {
        return redisTemplate.opsForList().size(key);
    }

    /** --------------------set相关操作-------------------------- */

    /**
     * set添加元素
     *
     * @param key
     * @param values
     * @return
     */
    public Long sAdd(String key, Object... values) {
        return redisTemplate.opsForSet().add(key, values);
    }

    /**
     * set移除元素
     *
     * @param key
     * @param values
     * @return
     */
    public Long sRemove(String key, Object... values) {
        return redisTemplate.opsForSet().remove(key, values);
    }

    /**
     * 移除并返回集合的一个随机元素
     *
     * @param key
     * @return
     */
    public Object sPop(String key) {
        return redisTemplate.opsForSet().pop(key);
    }

    /**
     * 将元素value从一个集合移到另一个集合
     *
     * @param key
     * @param value
     * @param destKey
     * @return
     */
    public Boolean sMove(String key, Object value, String destKey) {
        return redisTemplate.opsForSet().move(key, value, destKey);
    }

    /**
     * 获取集合的大小
     *
     * @param key
     * @return
     */
    public Long sSize(String key) {
        return redisTemplate.opsForSet().size(key);
    }

    /**
     * 判断集合是否包含value
     *
     * @param key
     * @param value
     * @return
     */
    public Boolean sIsMember(String key, Object value) {
        return redisTemplate.opsForSet().isMember(key, value);
    }

    /**
     * 获取两个集合的交集
     *
     * @param key
     * @param otherKey
     * @return
     */
    public Set<Object> sIntersect(String key, String otherKey) {
        return redisTemplate.opsForSet().intersect(key, otherKey);
    }

    /**
     * 获取key集合与多个集合的交集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<Object> sIntersect(String key, Collection<String> otherKeys) {
        return redisTemplate.opsForSet().intersect(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的交集存储到destKey集合中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sIntersectAndStore(String key, String otherKey, String destKey) {
        return redisTemplate.opsForSet().intersectAndStore(key, otherKey,
                destKey);
    }

    /**
     * key集合与多个集合的交集存储到destKey集合中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sIntersectAndStore(String key, Collection<String> otherKeys,
                                   String destKey) {
        return redisTemplate.opsForSet().intersectAndStore(key, otherKeys,
                destKey);
    }

    /**
     * 获取两个集合的并集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<Object> sUnion(String key, String otherKeys) {
        return redisTemplate.opsForSet().union(key, otherKeys);
    }

    /**
     * 获取key集合与多个集合的并集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<Object> sUnion(String key, Collection<String> otherKeys) {
        return redisTemplate.opsForSet().union(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的并集存储到destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sUnionAndStore(String key, String otherKey, String destKey) {
        return redisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);
    }

    /**
     * key集合与多个集合的并集存储到destKey中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sUnionAndStore(String key, Collection<String> otherKeys,
                               String destKey) {
        return redisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);
    }

    /**
     * 获取两个集合的差集
     *
     * @param key
     * @param otherKey
     * @return
     */
    public Set<Object> sDifference(String key, String otherKey) {
        return redisTemplate.opsForSet().difference(key, otherKey);
    }

    /**
     * 获取key集合与多个集合的差集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<Object> sDifference(String key, Collection<String> otherKeys) {
        return redisTemplate.opsForSet().difference(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的差集存储到destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sDifference(String key, String otherKey, String destKey) {
        return redisTemplate.opsForSet().differenceAndStore(key, otherKey,
                destKey);
    }

    /**
     * key集合与多个集合的差集存储到destKey中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sDifference(String key, Collection<String> otherKeys,
                            String destKey) {
        return redisTemplate.opsForSet().differenceAndStore(key, otherKeys,
                destKey);
    }

    /**
     * 获取集合所有元素
     *
     * @param key
     * @return
     */
    public Set<Object> setMembers(String key) {
        return redisTemplate.opsForSet().members(key);
    }

    /**
     * 随机获取集合中的一个元素
     *
     * @param key
     * @return
     */
    public Object sRandomMember(String key) {
        return redisTemplate.opsForSet().randomMember(key);
    }

    /**
     * 随机获取集合中count个元素
     *
     * @param key
     * @param count
     * @return
     */
    public List<Object> sRandomMembers(String key, long count) {
        return redisTemplate.opsForSet().randomMembers(key, count);
    }

    /**
     * 随机获取集合中count个元素并且去除重复的
     *
     * @param key
     * @param count
     * @return
     */
    public Set<Object> sDistinctRandomMembers(String key, long count) {
        return redisTemplate.opsForSet().distinctRandomMembers(key, count);
    }

    /**
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<Object> sScan(String key, ScanOptions options) {
        return redisTemplate.opsForSet().scan(key, options);
    }

    /**------------------zSet相关操作--------------------------------*/

    /**
     * 添加元素,有序集合是按照元素的score值由小到大排列
     *
     * @param key
     * @param value
     * @param score
     * @return
     */
    public Boolean zAdd(String key, String value, double score) {
        return redisTemplate.opsForZSet().add(key, value, score);
    }

    /**
     *
     * @param key
     * @param values
     * @return
     */
    public Long zAdd(String key, Set<ZSetOperations.TypedTuple<Object>> values) {
        return redisTemplate.opsForZSet().add(key, values);
    }

    /**
     *
     * @param key
     * @param values
     * @return
     */
    public Long zRemove(String key, Object... values) {
        return redisTemplate.opsForZSet().remove(key, values);
    }

    /**
     * 增加元素的score值,并返回增加后的值
     *
     * @param key
     * @param value
     * @param delta
     * @return
     */
    public Double zIncrementScore(String key, Object value, double delta) {
        return redisTemplate.opsForZSet().incrementScore(key, value, delta);
    }

    /**
     * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列
     *
     * @param key
     * @param value
     * @return 0表示第一位
     */
    public Long zRank(String key, Object value) {
        return redisTemplate.opsForZSet().rank(key, value);
    }

    /**
     * 返回元素在集合的排名,按元素的score值由大到小排列
     *
     * @param key
     * @param value
     * @return
     */
    public Long zReverseRank(String key, Object value) {
        return redisTemplate.opsForZSet().reverseRank(key, value);
    }

    /**
     * 获取集合的元素, 从小到大排序
     *
     * @param key
     * @param start
     *            开始位置
     * @param end
     *            结束位置, -1查询所有
     * @return
     */
    public Set<Object> zRange(String key, long start, long end) {
        return redisTemplate.opsForZSet().range(key, start, end);
    }

    /**
     * 获取集合元素, 并且把score值也获取
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<ZSetOperations.TypedTuple<Object>> zRangeWithScores(String key, long start,
                                                                   long end) {
        return redisTemplate.opsForZSet().rangeWithScores(key, start, end);
    }

    /**
     * 根据Score值查询集合元素
     *
     * @param key
     * @param min
     *            最小值
     * @param max
     *            最大值
     * @return
     */
    public Set<Object> zRangeByScore(String key, double min, double max) {
        return redisTemplate.opsForZSet().rangeByScore(key, min, max);
    }

    /**
     * 根据Score值查询集合元素, 从小到大排序
     *
     * @param key
     * @param min
     *            最小值
     * @param max
     *            最大值
     * @return
     */
    public Set<ZSetOperations.TypedTuple<Object>> zRangeByScoreWithScores(String key,
                                                                          double min, double max) {
        return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);
    }

    /**
     *
     * @param key
     * @param min
     * @param max
     * @param start
     * @param end
     * @return
     */
    public Set<ZSetOperations.TypedTuple<Object>> zRangeByScoreWithScores(String key,
                                                                          double min, double max, long start, long end) {
        return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,
                start, end);
    }

    /**
     * 获取集合的元素, 从大到小排序
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<Object> zReverseRange(String key, long start, long end) {
        return redisTemplate.opsForZSet().reverseRange(key, start, end);
    }

    /**
     * 获取集合的元素, 从大到小排序, 并返回score值
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<ZSetOperations.TypedTuple<Object>> zReverseRangeWithScores(String key,
                                                                          long start, long end) {
        return redisTemplate.opsForZSet().reverseRangeWithScores(key, start,
                end);
    }

    /**
     * 根据Score值查询集合元素, 从大到小排序
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Set<Object> zReverseRangeByScore(String key, double min,
                                            double max) {
        return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
    }

    /**
     * 根据Score值查询集合元素, 从大到小排序
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Set<ZSetOperations.TypedTuple<Object>> zReverseRangeByScoreWithScores(
            String key, double min, double max) {
        return redisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,
                min, max);
    }

    /**
     *
     * @param key
     * @param min
     * @param max
     * @param start
     * @param end
     * @return
     */
    public Set<Object> zReverseRangeByScore(String key, double min,
                                            double max, long start, long end) {
        return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max,
                start, end);
    }

    /**
     * 根据score值获取集合元素数量
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Long zCount(String key, double min, double max) {
        return redisTemplate.opsForZSet().count(key, min, max);
    }

    /**
     * 获取集合大小
     *
     * @param key
     * @return
     */
    public Long zSize(String key) {
        return redisTemplate.opsForZSet().size(key);
    }

    /**
     * 获取集合大小
     *
     * @param key
     * @return
     */
    public Long zZCard(String key) {
        return redisTemplate.opsForZSet().zCard(key);
    }

    /**
     * 获取集合中value元素的score值
     *
     * @param key
     * @param value
     * @return
     */
    public Double zScore(String key, Object value) {
        return redisTemplate.opsForZSet().score(key, value);
    }

    /**
     * 移除指定索引位置的成员
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Long zRemoveRange(String key, long start, long end) {
        return redisTemplate.opsForZSet().removeRange(key, start, end);
    }

    /**
     * 根据指定的score值的范围来移除成员
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Long zRemoveRangeByScore(String key, double min, double max) {
        return redisTemplate.opsForZSet().removeRangeByScore(key, min, max);
    }

    /**
     * 获取key和otherKey的并集并存储在destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long zUnionAndStore(String key, String otherKey, String destKey) {
        return redisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);
    }

    /**
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long zUnionAndStore(String key, Collection<String> otherKeys,
                               String destKey) {
        return redisTemplate.opsForZSet()
                .unionAndStore(key, otherKeys, destKey);
    }

    /**
     * 交集
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long zIntersectAndStore(String key, String otherKey,
                                   String destKey) {
        return redisTemplate.opsForZSet().intersectAndStore(key, otherKey,
                destKey);
    }

    /**
     * 交集
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long zIntersectAndStore(String key, Collection<String> otherKeys,
                                   String destKey) {
        return redisTemplate.opsForZSet().intersectAndStore(key, otherKeys,
                destKey);
    }

    /**
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<ZSetOperations.TypedTuple<Object>> zScan(String key, ScanOptions options) {
        return redisTemplate.opsForZSet().scan(key, options);
    }

}

三、Redis.config

网络

  • bind 127.0.0.1 // 绑定的ip地址
  • protected-mode yes // 保护模式
  • port 6379 // 端口设置

通用的GENERAL

  • daemonize // 以守护进程的方式运行 ,默认为no,需要手动开启(如果不开启的话,客户端退出 就会导致断开连接)

  • pidfile /var/run/redis_6379.pid // 后台自动运行,须指定一个pid文件

  • loglevel notice // 日志级别 (默认生产环境)

  • logfile “ ” // 日志的文件位置

  • database 16 // 数据库的个数

  • always-show-logo yes // 总是显示logo

快照

  • 持久化 -> 在规定的时间内,执行了多少次操作,会持久化到.rdb .aof
save 300 1   # 如果在300s内进行了1次操作,就会执行持久化
save 100 5   # 如果在100s内进行了5次操作,就会执行持久化
save 60 10   # 如果在60s内进行了10次操作,就会执行持久化

stop-writes-on-bgsave-error  yes # 持久化如果出错 设置为yes就是继续工作,改为no就取消工作

rdbcompression yes  # 是否压缩rdb文件,需要消耗cpu资源
rdbchecksum yes # 保存rdb文件 并进行错误校验
dir ./ # rdb默认保存的目录(当前目录)

Security 密码设置

127.0.0.1:6379> config get requirepass # 获取密码,默认为空1) "requirepass"2) ""127.0.0.1:6379> config set requirepass "ahui"  # 设置密码OK127.0.0.1:6379> ping  # 当设置密码 再去ping就需要进行登录校验(error) NOAUTH Authentication required.127.0.0.1:6379> auth ahui  # 登录校验成功OK127.0.0.1:6379> ping # 再次ping就会显示为通的PONG127.0.0.1:6379> config get requirepass  # 获取redis登录密码1) "requirepass"2) "ahui"

append only 模式 aof配置

  • appendonly no // 默认不开启aof模式,默认是rdb方式持久化

  • appendfilename "appendonly.aof" // 持久化的文件名字

  • appendfsync always // 每次修改都会sync 消耗性能

  • appendfsync everysec // 每秒执行一次sync 可能会丢失这1s的数据 (默认)

  • appendfsync no //不执行sync,这个时候操作系统自己同步数据,速度最快

四、Redis持久化

RDB理论理解:

rdb的持久化方式:在一定的时间间隔内将数据集的内存快照(snapshotting)保存到硬盘中去(下面的RDB触发规则)。

rdb的原理:他其实是通过fork一个子进程来专门进行持久化,会先将数据写入到一个临时文件里面,然后待持久化全部结束之后,再将这个临时文件去替换上次已经持久化好的文件(dump.rdb)。这个全程他的服务进程是不用去操作的,所以启动效率很高,性能也比较好,但是会在redis突然挂掉的时候,会出现最后一次数据丢失的问题。至于原因下面会讲到

参考大佬博客

RDB 触发规则

  • 1、save
    • save 300 1 # 如果在300s内进行了1次操作,就会执行持久化
      save 100 5 # 如果在100s内进行了5次操作,就会执行持久化
      save 60 10 # 如果在60s内进行了10次操作,就会执行持久化
  • 2、执行flushall命令,也会触发rdb规则
  • 3、退出redis(shutdown),也会触发rdb规则
  • 4、bgsave(bgsave命令会创建一个子进程,由子进程来负责创建RDB文件,父进程(即Redis主进程)则继续处理请求)
bgsave命令执行过程中,只有fork子进程时会阻塞服务器,而对于save命令,整个过程都会阻塞服务器,因此save已基本被废弃,线上环境要杜绝save的使用;后文中也将只介绍bgsave命令。此外,在自动触发RDB持久化时,Redis也会选择bgsave而不是save来进行持久化;下面介绍自动触发RDB持久化的条件。

恢复rdb文件

  • 只需要将rdb文件放在redis的启动目录下,redis启动时会自动检查并恢复数据

  • 查看需要存放的位置

    config get dir
    

只使用redis的缓存功能,而不使用redis 的持久化功能

redis-cli  config set save " "

AOF理论理解:

AOF默认是不开启的,是需要在redis.config中进行开启

Redis的AOF会像log日志一样的格式在.aof文件中进行追加操作的命令

Redis先将写命令追加到缓冲区,而不是直接写入文件,主要是为了避免每次有写命令都直接写入硬盘,导致硬盘IO成为Redis负载的瓶颈。
命令追加的格式是Redis命令请求的协议格式,它是一种纯文本格式,具有兼容性好、可读性强、容易处理、操作简单避免二次开销等优点;具体格式略。在AOF文件中,除了用于指定数据库的select命令(如select 0 为选中0号数据库)是由Redis添加的,其他都是客户端发送来的写命令。

五、发布订阅

使用场景:实时聊天系统.. 订阅关注系统....

模型:

栗子:

发布方

127.0.0.1:6379> PUBLISH ahui  hello  # 发布的频道及内容
(integer) 1

127.0.0.1:6379> PUBLISH ahui  sayredis!  
(integer) 1

127.0.0.1:6379> PUBLISH ahui test!!!
(integer) 1

订阅方(一个或多个)

127.0.0.1:6379> SUBSCRIBE ahui
Reading messages... (press Ctrl-C to quit)
1) "subscribe"  # 订阅此频道ahui
2) "ahui"      
3) (integer) 1
1) "message"   # 当发布方发送数据到队列中时,可以及时收到
2) "ahui"
3) "hello"

1) "message"
2) "ahui"
3) "sayredis!"

1) "message"
2) "ahui"
3) "test!!!"

命令用法

SUBSCRIBE ahui   # 订阅方订阅频道

PUBLISH ahui  hello [...channel]  # 发布方发布频道及内容(可以同时订阅多个)

PUBSUB channels   # 查看订阅与发布系统状态

PUNSUBSCRIBE ahui [...channel]   #  退订所以给定模式的频道信息
1) "punsubscribe"
2) "ahui"
3) (integer) 0

UNSUBSCRIBE ahui   # 退订指定的频道
1) "unsubscribe"
2) "ahui"
3) (integer) 0

六、主从复制

概述

主从复制:在大数据的信息量下,单台数据库的读写压力很大,以至于到现在推出的读写分离模式,即使用主从复制实现读写分离,在Redis中一般超过20G的话,redis的读写压力就很大了,需要做读写分离来实现对数据库的优化,主从复制就可以很好的实现读写分离,主机主要用于写的操作,而从机就可以实现读取的功能!

模型

主从配置命令(暂时性)

replicaof <masterip> <masterport>  # 在从机中配置主机ip + port 

info replication  # 查看主从机复制的信息

修改主从配置

  • 在.conf 中有一个replicaof,默认是注释的,可以进行打开,然后配置主机的相关信息

主从复制注意事项

  • 在主机宕机的情况下,这个主从复制就没有了写入的操作,但是还可以进行读取。
  • 在主从复制中,从机如果进行写的操作,就会报错
  • 在主机宕机后,修复好再次启动并进行写,从机依旧可以拿到主机的写入操作

复制方式及原理

  • 主库 master 和从库 slave 之间通过复制 id 进行匹配,避免 slave 挂到错误的 master
  • slave启动后,会向主机master发出sync同步命令
  • master会将传送整个数据文件传输到slave从机,这就完成了一次同步
  • 全量复制:slave第一次连接会进行全量复制,把master的数据全部同步,并加载到内存中
  • 增量复制:master会将写入的操作,依次传递到从机slave上,完成同步
  • master 在分发写请求时,同时会将写指令复制一份存入复制积压缓冲,这样当 slave 短时间断开重连时

主从复制的优缺点

  • 优点
    • 高可靠性:可以保证数据的完整性,也就是主机宕机了会有从机保证数据,从机宕机也可以采用sync同步
    • 读写分离策略:主机主要负责数据的写入操作,而从机负责主机的同步,以及读取操作。
  • 缺点
    • 故障恢复复杂,如果主机宕机了且没有哨兵模式的话,就需要手动去重新选举一个主机来替代原来的主机。
    • 主机的写入能力受限于单机操作,当写入操作较多时,主机可以考虑分片操作
    • 主机的存储能力受限于单机操作,当存储较多时,可以采用Pika

主机宕机,手动设置更改从机为主机

  • slaveof no one # 在主节点宕机后,从机使用此命令可以让自己升级为主机,当原来主节点修复好了,再次连接,它也不再是主节点!!!

七、哨兵模式

概念:哨兵模式就是来解决主机宕机后,主机的自动重新选举!!

具体参考

主从复制中单哨兵模式的应用

#配置端口
port 76379
#以守护进程模式启动
daemonize yes
#日志文件名
logfile "sentinel_76379.log"
#存放备份文件以及日志等文件的目录
dir "/opt/redis/data"
#监控的IP 端口号 名称 sentinel通过投票后认为mater宕机的数量,此处为至少2个
sentinel monitor mymaster 192.168.14.101 6379 2
#30秒ping不通主节点的信息,主观认为master宕机
sentinel down-after-milliseconds mymaster 30000
#故障转移后重新主从复制,1表示串行,>1并行
sentinel parallel-syncs mymaster 1
#故障转移开始,三分钟内没有完成,则认为转移失败
sentinel failover-timeout mymaster 180000

Linux上启动哨兵

redis-sentinel sentinel.conf

单哨兵弊端

  • 如果哨兵宕机了,也就代表着主机宕机的话,无法再自动的选取从机取代主机

哨兵集群

  • 主节点参数
port 6377
daemonize yes
logfile "6377.log"
dbfilename "dump-6377.rdb"

  • 从节点1参数
port 6379
daemonize yes
logfile "6379.log"
dbfilename "dump-6379.rdb"
slaveof 127.0.0.1 6377
  • 从节点2参数
port 6380
daemonize yes
logfile "6380.log"
dbfilename "dump-6380.rdb"
slaveof 127.0.0.1 6377

多哨兵模式缺点

  • 在多哨兵模式下,效率和安全得到了很大的提升,但是多哨兵模式的配置就很繁琐,主哨兵宕机也会像主从复制机制那样,重新选举一个主哨兵来进行监控

八、缓存穿透和雪崩(大面积查询且查不到)

正常查询数据库

导致缓存穿透

原因

  • 1、并发情况下,10000+用户不断请求缓存,但缓存都没有这个数据
  • 2、用户又不断去请求mysql数据库,mysql也没有这个数据就导致了缓存穿透

缓存击穿

  • 并发用户同时请求同一个地址,而缓存又刚好过期!缓存过期,就会查询mysql数据库,而此时Mysql就会承受大量的查询请求,就会导致缓存击穿

缓存击穿解决方案

  • 设置热点数据永不过期, 这就有点浪费资源了
  • 加分布式锁,在缓存和mysql之间加分布式锁,同一时间只能有一个线程进入mysql查询

缓存雪崩

原因:

  • 断电情况下,服务器正在写的操作,突然终止,就可能导致缓存雪崩
  • 服务器宕机
  • 缓存失效时间到,且没有加上过滤器等措施

雪崩解决方案

  • Redis高可用:采用类似集群模式的Redis服务器,一台宕机,另一台接上
  • 限流降级:在缓存失效后,采用加锁或者过滤等措施来保证同一时间只有一个线程操作
  • 数据预热:先测试数据到缓存中,防止大量数据一下加载到缓存,并设置不同的过期时间,让缓存失效的时间均匀分配

具体视频参考狂神说!