缓存穿透
缓存穿透,指某一请求需要读取的数据在缓存层没有,而我们的数据库中也没有,此时,该请求就穿过了缓存和数据库,而且此种请求无论来多少次,都是一样的效果,因为我们的数据库确实没有该数据。如果请求都是正常请求,还不算太糟糕,但是如果是被别人恶意使用此种请求攻击我们的服务,那么我们的缓存此时就没有任何效果,大量的请求会压到数据库,导致性能降低,响应变慢甚至服务被拖垮。解决缓存穿透, 我们可以将数据库不存在的数据缓存为空,这样请求在缓存中就可以命中空值而不用去数据库请求,或者使用过滤器(布隆过滤器)来提前判断该请求是否为恶意请求,布隆过滤器是由一个初值都为0bit 数组和 N个哈希函数组成,可以判断某个数据是否存在。当我们想标记某个数据存在时,使用N个哈希函数,分别计算这个数据的哈希值,得到 N 个哈希值。然后,我们把这 N 个哈希值对 bit 数组的长度取模,得到每个哈希值在数组中的对应位置,我们把对应位置的bi 位设置为 1,这就完成了在布隆过滤器中标记数据的操作。使用布隆过滤器的时候,我们应该将数据库存在的数据先进行bit标记,当恶意请求到达时,首先经过布隆过滤器,如果数据在数据库不存在,那么该数据对应的哈希函数取模的bit位理论上的的值仍然为 0,只有全为1才是我们标记过的存在的数据。但是由于哈希冲突的存在,布隆过滤器并不能完全避免恶意请求,哈希函数越多,过滤效果理论上会越好。
缓存击穿
缓存击穿,通俗的说,就是请求穿过了缓存,最终请求到了后端数据库。而如果该key是一个热点数据,那么就会导致大量的请求抵达数据库,从而导致数据库压力过大,响应变慢甚至数据库宕机的情况。
为了避免缓存击穿给数据库带来较大压力,我们可以用两种方式解决,一种是访问频繁的热点数据,可以不设置过期时间,这样数据请求就都可以在缓存层处理,不需要等到数据库层处理。另一种是一个热key可能是访问频率跟时间段关系密切,它不是任何时候访问频次都很高,我们可以在访问缓存层和数据库层这里添加一个分布式锁,大量请求到来时,获得锁的那一个请求经过缓存 --> MySQL之后将结果放入缓存,然后释放锁,这样其它的请求竞争到锁的时候就可以直接从缓存中获取结果,这样做的好处是大量请求同一时间只会有一个请求压到后端数据库,减轻数据库的压力,但是弊端就是每个请求都需要先获取一次分布式锁,降低了并发性。
缓存雪崩
缓存雪崩一般可以认为是key集中过期,或者Redis实例宕机,这种情况下,缓存层形同虚设,大量的请求都会压到数据库,造成服务不可用,甚至还可以引起服务级联雪崩。解决缓存雪崩,首先要避免Redis实例宕机,因此需要构建Redis集群,避免缓存层不可用的情况。还有就是避免key集中过期,可以在设置失效时间时添加一个额外的随机数从而避免缓存集中过期。
大key和热key
Redis工作线程是单线程,因此使用复杂度较高的命令或者操作较大的key会导致Redis延时增大,因此要尽量避免使用O(n)复杂度的命令操作集合类型,尤其是集合元素较多的时候,一方面避免命令复杂,另一方面也要避免集合过大,业务上尽量将较大的key拆分为几个小key,如果实在避免不了,可以使用scan命令来部分扫描元素,通过多次命令取代一次大key操作。热key是针对访问频次特别高,高到超过Redis的单机瓶颈,比如某些秒杀场景下,如果某个key的每秒读取次数超过10w,那么肯定是超过了Redis的瓶颈,此时如果Redis被压垮,那么缓存就会不可用,即使是切片集群,也会因为海量请求顺着切片转移导致切片集群被逐一压垮,这时候就要考虑就热key打散,比如键值对 'joy':'1',可以设置’joy1‘:'1',’joy2‘:'1',’joyN‘:'1'等多个键值对,分发到不同的切片上,这样我们可以不用将所有的请求都集中打到同一个切片,这种情况下单台Redis的平均请求量就是 请求总数/数据备份个数,而不至于打到Redis的瓶颈。
Redission分布式锁解决原理
首先按照Java基础思路,在执行并发操作之前,一般做如下操作
1.获取锁
2.执行业务
3.释放锁
而在分布式环境下JVM不是同一个,因此Java类库中的同步方法都不能保证线程安全,Redis提供了命令set key value NX PX millisecond,表示key不存在则设置值,并在一定时间内过期,这个特性刚好适用于分布式环境,简单来说还是三步,setNx设置key,value,执行业务,删除key。
boolean lock = false;唯一值 删除锁时判读此刻的锁是否为自己设置的值String uuid = UUID.randomUUID().toString();try {尝试设置锁lock = redisTemplate.opsForValue().setIfAbsent("key", uuid, 3000, TimeUnit.MILLISECONDS);if (lock) {上锁成功,执行业务操作accessDb();} else {争抢锁失败, 直接返回}} finally {如果成功上锁,尝试删除所if (lock) {String value = redisTemplate.opsForValue().get("key");/ 如果此时的锁的值是自己设置的 就删除if (uuid.equals(value)) {redisTemplate.delete("key");}}}
示例代码是一个不那么完善的分布式锁操作代码,这个过程中可能会有许多问题,比如争抢锁失败后就立刻失败,通常我们的业务需要争抢锁失败后可以持续一段时间不断的尝试。比如业务操作时间过长,Redis锁释放导致其它线程可以正常获得锁,造成了并发冲突和超长的客户端执行完业务后释放了别的客户端设置的锁,还有持有锁的Redis实例宕机,在该key被同步到其它主从之前,新的主库并没有该key,导致锁的失效,还有释放锁操作时候的判断锁和真正释放锁的操作并非是原子性的,因此还是会出现误释放的问题,还有著名的时钟跳跃问题,Redis服务器时间跳跃,导致过期时间不准而引发的锁失效问题等等众多问题。Redisson 是当前Redis做分布式锁的一个解决方案,它是怎么解决上述种种问题的呢?设置锁阶段,为了解决单点故障问题,引入一组独立的可写的Redis节点,客户端同时向多个Redis节点发送setNx命令,如果成功个数超过一半,就表示竞争锁成功,如果持有锁的线程执行任务时间过长,watch dog线程会不断探测当前线程持有的锁是否依然存在,存在的话就续约(10s一次)。如果竞争锁失败,会在设定的时间内不断重试,而在释放锁阶段,为了保证释放操作的原子性,采用的lua脚本实现。到此,加锁阶段和释放锁阶段都保证了安全线。这就是Redisson的大致思想。
分布式锁在工作中可能也很常见,且经常需要写,因此为了开发方便,可以写一个aop用于简化开发,以下示例代码是一个简单的例子。
切面
@Component@Aspectpublic class PublicPoint {@Resourceprivate Redisson redisson;@Around("@annotation(lockAnnotation)")public String lockAround(ProceedingJoinPoint joinPoint, LockAnnotation lockAnnotation) throws Throwable {String lockKey = lockAnnotation.lockKey();int leastTime = lockAnnotation.leastTime();int waitTime = lockAnnotation.waitTime();TimeUnit timeUnit = lockAnnotation.timeUnit();Object[] args = joinPoint.getArgs();Boolean success = false;RLock lock = null;try {lock = redisson.getLock(lockKey);success = lock.tryLock(leastTime, waitTime, timeUnit);if (success) {String returnObject = String.valueOf(joinPoint.proceed(args));return returnObject;} else {System.out.println("获取锁失败");}return "操作繁忙";} finally {if (success) {lock.unlock();}}}}
注解
@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.METHOD, ElementType.TYPE})public @interface LockAnnotation {/*** 加锁的key的值* @return*/String lockKey() default "";/*** 锁自动释放时间,单位s** @return*/int leastTime() default 3;/*** 获取锁的最大等待时间,单位s,默认不等待,0即为快速失败** @return*/int waitTime() default 0;/*** 时间单位 默认秒* @return*/TimeUnit timeUnit() default TimeUnit.SECONDS;}
Service
@Service("shareLockService")public class ShareLockServiceImpl {static Integer count = 0;@LockAnnotation(lockKey = "testKey", waitTime = 3, leastTime = 3, timeUnit = TimeUnit.SECONDS)public void numberCounter() {for (int i = 0; i < 10000; i++) {count++;}System.out.println(count);}}
Controller
@RestControllerpublic class RedisController {@Resourceprivate ShareLockServiceImpl shareLockService;@GetMapping("/redisTest")public Integer redisTest(String name, String age) throws InterruptedException {List<Thread> thread = new ArrayList<>();for (int i = 0; i < 10; i++) {thread.add(new Thread(() -> shareLockService.numberCounter()));}for (Thread t : thread) {System.out.println(1);t.start();}for (Thread t : thread) {System.out.println(2);t.join();}return ShareLockServiceImpl.count;}}
代码大意是起10个线程,每个线程将count变量增加10000,使用分布式锁自定义注解时候结果稳定是返回100000,不加注解的时候结果总是小于100000。





