缓存穿透,缓存击穿,缓存雪崩解决方案分析

释放双眼,带上耳机,听听看~!

(一)缓存和数据库间数据一致性问题

分布式环境下(单机就不用说了)非常容易出现缓存和数据库间的数据一致性问题,针对这一点的话,只能说,如果你的项目对缓存的要求是强一致性的,那么请不要使用缓存。我们只能采取合适的策略来降低缓存和数据库间数据不一致的概率,而无法保证两者间的强一致性。合适的策略包括 合适的缓存更新策略,更新数据库后要及时更新缓存、缓存失败时增加重试机制,例如MQ模式的消息队列。

解决方案

1、采用延时双删策略

在写库前后都进行redis.del(key)操作,并且设定合理的超时时间。

具体的步骤就是:先删除缓存、再写数据库、休眠500毫秒、再次删除缓存

那么,这个500毫秒怎么确定的,具体该休眠多久呢?

需要评估自己的项目的读数据业务逻辑的耗时。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

当然这种策略还要考虑redis和数据库主从同步的耗时。最后的的写数据的休眠时间:则在读数据业务逻辑的耗时基础上,加几百ms即可。比如:休眠1秒。

设置缓存过期时间:从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案。所有的写操作以数据库为准,只要到达缓存过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存。

该方案的弊端:结合双删策略+缓存超时设置,这样最差的情况就是在超时时间内数据存在不一致,而且又增加了写请求的耗时。

2、异步更新缓存(基于订阅binlog的同步机制)

技术整体思路:

**1、**MySQL binlog增量订阅消费+消息队列+增量数据更新到redis

  • 读Redis:热数据基本都在Redis
  • 写MySQL:增删改都是操作MySQL
  • 更新Redis数据:MySQ的数据操作binlog,来更新到Redis

2、Redis更新

**数据操作主要分为两大块:**一个是全量(将全部数据一次写入到redis)、一个是增量(实时更新)。这里说的是增量,指的是mysql的update、insert、delate变更数据。

**读取binlog后分析 ,利用消息队列,推送更新各台的redis缓存数据。**这样一旦MySQL中产生了新的写入、更新、删除等操作,就可以把binlog相关的消息推送至Redis,Redis再根据binlog中的记录,对Redis进行更新。其实这种机制,很类似MySQL的主从备份机制,因为MySQL的主备也是通过binlog来实现的数据一致性。

这里可以结合使用canal(阿里的一款开源框架),通过该框架可以对MySQL的binlog进行订阅,而canal正是模仿了mysql的slave数据库的备份请求,使得Redis的数据更新达到了相同的效果。

当然,这里的消息推送工具你也可以采用别的第三方:kafka、rabbitMQ等来实现推送更新Redis

(二)缓存穿透

缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。

解决方案

1、接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截;

2、在缓存中设置key为null,缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)。这样可以防止攻击用户反复用同一个id暴力攻击

3、布隆过滤器

有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被 这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。另外也有一个更为简单粗暴的方法(我们采用的就是这种),如果一个查询返回的数据为空(不管是数 据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。

引入依赖 :https://github.com/RedisLabs/JReBloom

缓存穿透,缓存击穿,缓存雪崩解决方案分析

集群容器里添加

缓存穿透,缓存击穿,缓存雪崩解决方案分析

布隆过滤器lua脚本的写法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1 function common.filter(key,val)
2        local redis_list = {}
3        local ip_addr = ngx.shared.redis_cluster_addr:get(&quot;redis_addr&quot;)
4        local ip_addr_table = ngx_re_split(ip_addr,&quot;,&quot;)
5        for key,value in ipairs(ip_addr_table) do
6            local ip_addr = ngx_re_split(value,&quot;:&quot;)
7            redis_list[key] = {ip=ip_addr[1],port=ip_addr[2]}
8        end
9        local config = {
10            name = &quot;testCluster&quot;,                   --rediscluster name
11            serv_list = redis_list,
12            keepalive_timeout = 60000,              --redis connection pool idle timeout
13            keepalive_cons = 1000,                  --redis connection pool size
14            connection_timout = 1000,               --timeout while connecting
15            max_redirection = 5,                    --maximum retry attempts for redirection,
16            auth = &quot;password&quot;                       --set password while setting auth
17        }
18        local red_c = redis_cluster:new(config)
19        --在redis嵌入lua脚本
20        local res, err = red_c:eval([[
21            local k = KEYS[1] --第一个key
22            local v = ARGV[1] --第一个值
23            local res,err = redis.call(&#x27;BF.EXISTS&#x27;,k,v)
24            --业务逻辑
25            return res
26        ]],1,key,val)
27        if err then
28           ngx.log(ngx.ERR,&quot;过滤器错误:&quot;,err)
29           return false
30        end
31        return res
32    end
33

(三)缓存雪崩

缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

解决方案

  1. 使用互斥锁排队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1//定义常量
2
3defined(&#x27;WS_REDIS_VALUE&#x27;) or define(&#x27;WS_REDIS_VALUE&#x27;, &#x27;ws&#x27;); //ws存储redis键,作用:标识并发情况下ws协议的采集唯一锁机制
4
5defined(&#x27;AFTER_TIME&#x27;) or define(&#x27;AFTER_TIME&#x27;, &#x27;8000&#x27;); //ws锁机制回滚延迟时长
6
7protected function startCollection($ids=[])
8    {
9        $wathNum = self::$reids-&gt;get(WS_REDIS_VALUE);
10        if (empty($wathNum)) {
11            self::$reids-&gt;watch(WS_REDIS_VALUE); //利用redis的watch机制
12            self::$reids-&gt;multi();
13            self::$reids-&gt;set(WS_REDIS_VALUE, 1);
14            self::$reids-&gt;exec();
15            echo &quot;==============开始执行=============\n&quot;;
16            $market_model = Market::run();
17            //此Timer为swoole的延迟定时处理
18            Timer::getInstance()-&gt;after(AFTER_TIME, function (){
19                self::$reids-&gt;set(WS_REDIS_VALUE, 0);
20                echo &quot;执行完成~~~~释放锁机制\n&quot;;
21            });
22            $data = $market_model-&gt;where([&quot;id&quot;=&gt;[&quot;IN&quot;,$ids]])-&gt;field(&quot;id,symbol,update_time&quot;)-&gt;select();
23            array_walk($data, function($item, $io) use ($data){
24                if (strtotime($item[&#x27;update_time&#x27;]) &lt; (time() - 200)) {
25                    $this-&gt;execCollection($item[&#x27;symbol&#x27;]);
26                }
27            });
28        } else {
29            echo &quot;正在执行采集处理----------等待其他程序执行\n&quot;;
30        }
31    }
32
33

 

  1. 建立备份缓存,缓存A和缓存B,A设置超时时间,B不设值超时时间,先从A读缓存,A没有读B,并且更新A缓存和B缓存;

1
2
3
4
5
6
7
8
9
10
11
12
1public function getByKey($keyA,$keyB) {
2    $value = $reids-&gt;get($keyA);
3    if (empty($value)) {
4        $value = $reids-&gt;get($keyB);
5        //查询数据库
6        $newValue = getFromDbById();
7        $reids-&gt;set($keyA,$newValue,[&#x27;nx&#x27;, &#x27;ex&#x27; =&gt; 10]);
8        $reids-&gt;set($keyB,$newValue);
9    }
10    return $value;
11}
12

 

  1. 设置缓存超时时间的时候加上一个随机的时间长度,比如这个缓存key的超时时间是固定的5分钟加上随机的2分钟,酱紫可从一定程度上避免雪崩问题

lua脚本的写法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1local mlcache = require &quot;resty.mlcache&quot;
2local common = require &quot;resty.common&quot;
3local key = ngx.re.match(ngx.var.request_uri,&quot;/([0-9]+).html&quot;)
4--1、L1缓存使用lua_resty_lrucache
5local function fetch_shop(key)
6    --利用布隆过滤器过滤
7   if common.filter(&quot;shop_list&quot;,key) == 1 then  --shop_list是通过命令比如bf.add shop_list 10添加的
8       local content = common.send(&quot;/index.php&quot;)
9       if content == nil then
10            return
11       end
12       return content
13   end
14
15end
16if type(key) == &quot;table&quot; then
17    --创建L1缓存
18   local cache,err = mlcache.new(&quot;cache_name&quot;,&quot;my_cache&quot;,{
19            lru_size = 500,    -- 用于定义基础L1缓存(lua-resty-lrucache实例)的大小
20            ttl      = 5,   -- 指定缓存值的到期时间
21            neg_ttl  = 6,     -- 指定缓存的未命中的过期时间(当L3回调返回时nil)
22            ipc_shm  = &quot;ipc_shared_dict&quot;, --用于将L2的缓存设置到L1(与nginx配置文件中字典常量一致)
23    })
24    --如果没有值记录错误日志
25    if not cache then
26         ngx.log(ngx.ERR,&quot;缓存创建失败&quot;,err)
27    end
28
29    local ok, err = cache:update()
30
31    --如果有值,获取值
32    local res,err,level = cache:get(key[1],nil,fetch_shop,key[1],cache)
33    -- 在L2缓存中设置一个值,并将事件广播到其他工作进程,
34    if level == 3 then
35        --随机种子,不让所有的key在同一时间失效
36        math.randomseed(tostring(os.time()))
37        local expire_time = math.random(1,6)
38        cache:set(key[1],{ttl=expire_time},res)
39    end
40end
41
42

 

(四)缓存击穿

缓存击穿表示恶意用户模拟请求很多缓存中不存在的数据,由于缓存中都没有,导致这些请求短时间内直接落在了数据库上,导致数据库异常。比如抢购活动、秒杀活动的接口API被大量的恶意用户刷,导致短时间内数据库超时等

解决方案

我们的目标是:尽量少的线程构建缓存(甚至是一个) + 数据一致性 + 较少的潜在危险,下面会介绍四种方法来解决这个问题:

 

  1. 使用互斥锁排队(mutex key):

就是只让一个线程构建缓存,其他线程等待构建缓存的线程执行完,重新从缓存获取数据就可以了(如下图)

 

2、接口限流与熔断、降级

重要的接口一定要做好限流策略,防止用户恶意刷接口,同时要降级准备,当接口中的某些服务不可用时候,进行熔断,失败快速返回机制。

3、布隆过滤器

bloomfilter就类似于一个hash set,用于快速判某个元素是否存在于集合中,其典型的应用场景就是快速判断一个key是否存在于某容器,不存在就直接返回。

4、 "永远不过期":

    这里的“永远不过期”包含两层意思:

    (1) 从redis上看,确实没有设置过期时间,这就保证了,不会出现热点key过期问题,也就是“物理”不过期。

    (2) 从功能上看,如果不过期,那不就成静态的了吗?所以我们把过期时间存在key对应的value里,如果发现要过期了,通过一个后台的异步线程进行缓存的构建,也就是“逻辑”过期

   

    从实战看,这种方法对于性能非常友好,唯一不足的就是构建缓存时候,其余线程(非构建缓存的线程)可能访问的是老数据,但是对于一般的互联网功能来说这个还是可以忍受。

   

5、 资源保护:

       netflix的hystrix,可以做资源的隔离保护主线程池,如果把这个应用到缓存的构建也未尝不可。

四种方案对比:

      作为一个并发量较大的互联网应用,我们的目标有3个:

      1. 加快用户访问速度,提高用户体验。

      2. 降低后端负载,保证系统平稳。

      3. 保证数据“尽可能”及时更新(要不要完全一致,取决于业务,而不是技术) 

解决方案 优点 缺点
简单分布式锁(Tim yang)  1. 思路简单 2. 保证一致性 1. 代码复杂度增大 2. 存在死锁的风险 3. 存在线程池阻塞的风险
加另外一个过期时间(Tim yang)  1. 保证一致性 同上 
不过期(本文) 1. 异步构建缓存,不会阻塞线程池 1. 不保证一致性。 2. 代码复杂度增大(每个value都要维护一个timekey)。 3. 占用一定的内存空间(每个value都要维护一个timekey)。
资源隔离组件hystrix(本文) 1. hystrix技术成熟,有效保证后端。 2. hystrix监控强大。     1. 部分访问存在降级策略。

1
1

(五)缓存预热

缓存预热这个应该是一个比较常见的概念,相信很多小伙伴都应该可以很容易的理解,缓存预热就是系统上线后,将相关的缓存数据直接加载到缓存系统。这样就可以避免在用户请求的时候,先查询数据库,然后再将数据缓存的问题!用户直接查询事先被预热的缓存数据!

解决思路:

1、直接写个缓存刷新页面,上线时手工操作下;

2、数据量不大,可以在项目启动的时候自动进行加载;

3、定时刷新缓存;

(六)缓存更新

除了缓存服务器自带的缓存失效策略之外(Redis默认的有6中策略可供选择),我们还可以根据具体的业务需求进行自定义的缓存淘汰,常见的策略有两种:

(1)定时去清理过期的缓存;

(2)当有用户请求过来时,再判断这个请求所用到的缓存是否过期,过期的话就去底层系统得到新数据并更新缓存。

两者各有优劣,第一种的缺点是维护大量缓存的key是比较麻烦的,第二种的缺点就是每次用户请求过来都要判断缓存失效,逻辑相对比较复杂!具体用哪种方案,大家可以根据自己的应用场景来权衡。

解决方案

nginx+lua完成商品详情页访问流量实时上报kafka,在nginx这一层,接收到访问请求的时候,就把请求的流量上报发送给kafka

这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计,从lua脚本直接创建一个kafka producer,发送数据到kafka


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1git clone https://github.com/doujiang24/lua-resty-kafka.git
2
3cp -rf /usr/local/lua-resty-kafka/lib/resty /usr/local/openresty/lualib
4
5nginx -s reload
6
7local cjson = require(&quot;cjson&quot;)  
8local producer = require(&quot;resty.kafka.producer&quot;)  
9
10local broker_list = {  
11    { host = &quot;192.168.31.187&quot;, port = 9092 },  
12    { host = &quot;192.168.31.19&quot;, port = 9092 },  
13    { host = &quot;192.168.31.227&quot;, port = 9092 }
14}
15
16local log_json = {}  
17log_json[&quot;headers&quot;] = ngx.req.get_headers()  
18log_json[&quot;uri_args&quot;] = ngx.req.get_uri_args()  
19log_json[&quot;body&quot;] = ngx.req.read_body()  
20log_json[&quot;http_version&quot;] = ngx.req.http_version()  
21log_json[&quot;method&quot;] =ngx.req.get_method()
22log_json[&quot;raw_reader&quot;] = ngx.req.raw_header()  
23log_json[&quot;body_data&quot;] = ngx.req.get_body_data()  
24
25local message = cjson.encode(log_json);  
26
27local productId = ngx.req.get_uri_args()[&quot;productId&quot;]
28
29local async_producer = producer:new(broker_list, { producer_type = &quot;async&quot; })  
30local ok, err = async_producer:send(&quot;access-log&quot;, productId, message)  
31
32if not ok then  
33    ngx.log(ngx.ERR, &quot;kafka send err:&quot;, err)  
34    return  
35end
36
37

另两台机器上都这样做,才能统一上报流量到kafka


1
2
3
4
5
1bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --replication-factor 1 --partitions 1 --create
2
3bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --from-beginning
4
5

 

(七)缓存降级

当访问量剧增、服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务。系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级。

降级的最终目的是保证核心服务可用,即使是有损的。而且有些服务是无法降级的(如加入购物车、结算)。

在进行降级之前要对系统进行梳理,看看系统是不是可以丢卒保帅;从而梳理出哪些必须誓死保护,哪些可降级;比如可以参考日志级别设置预案:

(1)一般:比如有些服务偶尔因为网络抖动或者服务正在上线而超时,可以自动降级;

(2)警告:有些服务在一段时间内成功率有波动(如在95~100%之间),可以自动降级或人工降级,并发送告警;

(3)错误:比如可用率低于90%,或者数据库连接池被打爆了,或者访问量突然猛增到系统能承受的最大阀值,此时可以根据情况自动降级或者人工降级;

(4)严重错误:比如因为特殊原因数据错误了,此时需要紧急人工降级。

给TA打赏
共{{data.count}}人
人已打赏
安全经验

英文站如何做Google Adsense

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索