(一)缓存和数据库间数据一致性问题
分布式环境下(单机就不用说了)非常容易出现缓存和数据库间的数据一致性问题,针对这一点的话,只能说,如果你的项目对缓存的要求是强一致性的,那么请不要使用缓存。我们只能采取合适的策略来降低缓存和数据库间数据不一致的概率,而无法保证两者间的强一致性。合适的策略包括 合适的缓存更新策略,更新数据库后要及时更新缓存、缓存失败时增加重试机制,例如MQ模式的消息队列。
解决方案
1、采用延时双删策略
在写库前后都进行redis.del(key)操作,并且设定合理的超时时间。
具体的步骤就是:先删除缓存、再写数据库、休眠500毫秒、再次删除缓存
那么,这个500毫秒怎么确定的,具体该休眠多久呢?
需要评估自己的项目的读数据业务逻辑的耗时。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。
当然这种策略还要考虑redis和数据库主从同步的耗时。最后的的写数据的休眠时间:则在读数据业务逻辑的耗时基础上,加几百ms即可。比如:休眠1秒。
设置缓存过期时间:从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案。所有的写操作以数据库为准,只要到达缓存过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存。
该方案的弊端:结合双删策略+缓存超时设置,这样最差的情况就是在超时时间内数据存在不一致,而且又增加了写请求的耗时。
2、异步更新缓存(基于订阅binlog的同步机制)
技术整体思路:
**1、**MySQL binlog增量订阅消费+消息队列+增量数据更新到redis
- 读Redis:热数据基本都在Redis
- 写MySQL:增删改都是操作MySQL
- 更新Redis数据:MySQ的数据操作binlog,来更新到Redis
2、Redis更新
**数据操作主要分为两大块:**一个是全量(将全部数据一次写入到redis)、一个是增量(实时更新)。这里说的是增量,指的是mysql的update、insert、delate变更数据。
**读取binlog后分析 ,利用消息队列,推送更新各台的redis缓存数据。**这样一旦MySQL中产生了新的写入、更新、删除等操作,就可以把binlog相关的消息推送至Redis,Redis再根据binlog中的记录,对Redis进行更新。其实这种机制,很类似MySQL的主从备份机制,因为MySQL的主备也是通过binlog来实现的数据一致性。
这里可以结合使用canal(阿里的一款开源框架),通过该框架可以对MySQL的binlog进行订阅,而canal正是模仿了mysql的slave数据库的备份请求,使得Redis的数据更新达到了相同的效果。
当然,这里的消息推送工具你也可以采用别的第三方:kafka、rabbitMQ等来实现推送更新Redis
(二)缓存穿透
缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。
解决方案
1、接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截;
2、在缓存中设置key为null,缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)。这样可以防止攻击用户反复用同一个id暴力攻击
3、布隆过滤器
有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被 这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。另外也有一个更为简单粗暴的方法(我们采用的就是这种),如果一个查询返回的数据为空(不管是数 据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。
引入依赖 :https://github.com/RedisLabs/JReBloom
集群容器里添加
布隆过滤器lua脚本的写法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1 function common.filter(key,val)
2 local redis_list = {}
3 local ip_addr = ngx.shared.redis_cluster_addr:get("redis_addr")
4 local ip_addr_table = ngx_re_split(ip_addr,",")
5 for key,value in ipairs(ip_addr_table) do
6 local ip_addr = ngx_re_split(value,":")
7 redis_list[key] = {ip=ip_addr[1],port=ip_addr[2]}
8 end
9 local config = {
10 name = "testCluster", --rediscluster name
11 serv_list = redis_list,
12 keepalive_timeout = 60000, --redis connection pool idle timeout
13 keepalive_cons = 1000, --redis connection pool size
14 connection_timout = 1000, --timeout while connecting
15 max_redirection = 5, --maximum retry attempts for redirection,
16 auth = "password" --set password while setting auth
17 }
18 local red_c = redis_cluster:new(config)
19 --在redis嵌入lua脚本
20 local res, err = red_c:eval([[
21 local k = KEYS[1] --第一个key
22 local v = ARGV[1] --第一个值
23 local res,err = redis.call('BF.EXISTS',k,v)
24 --业务逻辑
25 return res
26 ]],1,key,val)
27 if err then
28 ngx.log(ngx.ERR,"过滤器错误:",err)
29 return false
30 end
31 return res
32 end
33
(三)缓存雪崩
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决方案
- 使用互斥锁排队。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1//定义常量
2
3defined('WS_REDIS_VALUE') or define('WS_REDIS_VALUE', 'ws'); //ws存储redis键,作用:标识并发情况下ws协议的采集唯一锁机制
4
5defined('AFTER_TIME') or define('AFTER_TIME', '8000'); //ws锁机制回滚延迟时长
6
7protected function startCollection($ids=[])
8 {
9 $wathNum = self::$reids->get(WS_REDIS_VALUE);
10 if (empty($wathNum)) {
11 self::$reids->watch(WS_REDIS_VALUE); //利用redis的watch机制
12 self::$reids->multi();
13 self::$reids->set(WS_REDIS_VALUE, 1);
14 self::$reids->exec();
15 echo "==============开始执行=============\n";
16 $market_model = Market::run();
17 //此Timer为swoole的延迟定时处理
18 Timer::getInstance()->after(AFTER_TIME, function (){
19 self::$reids->set(WS_REDIS_VALUE, 0);
20 echo "执行完成~~~~释放锁机制\n";
21 });
22 $data = $market_model->where(["id"=>["IN",$ids]])->field("id,symbol,update_time")->select();
23 array_walk($data, function($item, $io) use ($data){
24 if (strtotime($item['update_time']) < (time() - 200)) {
25 $this->execCollection($item['symbol']);
26 }
27 });
28 } else {
29 echo "正在执行采集处理----------等待其他程序执行\n";
30 }
31 }
32
33
- 建立备份缓存,缓存A和缓存B,A设置超时时间,B不设值超时时间,先从A读缓存,A没有读B,并且更新A缓存和B缓存;
1
2
3
4
5
6
7
8
9
10
11
12 1public function getByKey($keyA,$keyB) {
2 $value = $reids->get($keyA);
3 if (empty($value)) {
4 $value = $reids->get($keyB);
5 //查询数据库
6 $newValue = getFromDbById();
7 $reids->set($keyA,$newValue,['nx', 'ex' => 10]);
8 $reids->set($keyB,$newValue);
9 }
10 return $value;
11}
12
- 设置缓存超时时间的时候加上一个随机的时间长度,比如这个缓存key的超时时间是固定的5分钟加上随机的2分钟,酱紫可从一定程度上避免雪崩问题
lua脚本的写法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 1local mlcache = require "resty.mlcache"
2local common = require "resty.common"
3local key = ngx.re.match(ngx.var.request_uri,"/([0-9]+).html")
4--1、L1缓存使用lua_resty_lrucache
5local function fetch_shop(key)
6 --利用布隆过滤器过滤
7 if common.filter("shop_list",key) == 1 then --shop_list是通过命令比如bf.add shop_list 10添加的
8 local content = common.send("/index.php")
9 if content == nil then
10 return
11 end
12 return content
13 end
14
15end
16if type(key) == "table" then
17 --创建L1缓存
18 local cache,err = mlcache.new("cache_name","my_cache",{
19 lru_size = 500, -- 用于定义基础L1缓存(lua-resty-lrucache实例)的大小
20 ttl = 5, -- 指定缓存值的到期时间
21 neg_ttl = 6, -- 指定缓存的未命中的过期时间(当L3回调返回时nil)
22 ipc_shm = "ipc_shared_dict", --用于将L2的缓存设置到L1(与nginx配置文件中字典常量一致)
23 })
24 --如果没有值记录错误日志
25 if not cache then
26 ngx.log(ngx.ERR,"缓存创建失败",err)
27 end
28
29 local ok, err = cache:update()
30
31 --如果有值,获取值
32 local res,err,level = cache:get(key[1],nil,fetch_shop,key[1],cache)
33 -- 在L2缓存中设置一个值,并将事件广播到其他工作进程,
34 if level == 3 then
35 --随机种子,不让所有的key在同一时间失效
36 math.randomseed(tostring(os.time()))
37 local expire_time = math.random(1,6)
38 cache:set(key[1],{ttl=expire_time},res)
39 end
40end
41
42
(四)缓存击穿
缓存击穿表示恶意用户模拟请求很多缓存中不存在的数据,由于缓存中都没有,导致这些请求短时间内直接落在了数据库上,导致数据库异常。比如抢购活动、秒杀活动的接口API被大量的恶意用户刷,导致短时间内数据库超时等
解决方案
我们的目标是:尽量少的线程构建缓存(甚至是一个) + 数据一致性 + 较少的潜在危险,下面会介绍四种方法来解决这个问题:
- 使用互斥锁排队(mutex key):
就是只让一个线程构建缓存,其他线程等待构建缓存的线程执行完,重新从缓存获取数据就可以了(如下图)
2、接口限流与熔断、降级
重要的接口一定要做好限流策略,防止用户恶意刷接口,同时要降级准备,当接口中的某些服务不可用时候,进行熔断,失败快速返回机制。
3、布隆过滤器
bloomfilter就类似于一个hash set,用于快速判某个元素是否存在于集合中,其典型的应用场景就是快速判断一个key是否存在于某容器,不存在就直接返回。
4、 "永远不过期":
这里的“永远不过期”包含两层意思:
(1) 从redis上看,确实没有设置过期时间,这就保证了,不会出现热点key过期问题,也就是“物理”不过期。
(2) 从功能上看,如果不过期,那不就成静态的了吗?所以我们把过期时间存在key对应的value里,如果发现要过期了,通过一个后台的异步线程进行缓存的构建,也就是“逻辑”过期
从实战看,这种方法对于性能非常友好,唯一不足的就是构建缓存时候,其余线程(非构建缓存的线程)可能访问的是老数据,但是对于一般的互联网功能来说这个还是可以忍受。
5、 资源保护:
netflix的hystrix,可以做资源的隔离保护主线程池,如果把这个应用到缓存的构建也未尝不可。
四种方案对比:
作为一个并发量较大的互联网应用,我们的目标有3个:
1. 加快用户访问速度,提高用户体验。
2. 降低后端负载,保证系统平稳。
3. 保证数据“尽可能”及时更新(要不要完全一致,取决于业务,而不是技术)
解决方案 | 优点 | 缺点 |
简单分布式锁(Tim yang) | 1. 思路简单 2. 保证一致性 | 1. 代码复杂度增大 2. 存在死锁的风险 3. 存在线程池阻塞的风险 |
加另外一个过期时间(Tim yang) | 1. 保证一致性 | 同上 |
不过期(本文) | 1. 异步构建缓存,不会阻塞线程池 | 1. 不保证一致性。 2. 代码复杂度增大(每个value都要维护一个timekey)。 3. 占用一定的内存空间(每个value都要维护一个timekey)。 |
资源隔离组件hystrix(本文) | 1. hystrix技术成熟,有效保证后端。 2. hystrix监控强大。 | 1. 部分访问存在降级策略。 |
1 | 1 |
(五)缓存预热
缓存预热这个应该是一个比较常见的概念,相信很多小伙伴都应该可以很容易的理解,缓存预热就是系统上线后,将相关的缓存数据直接加载到缓存系统。这样就可以避免在用户请求的时候,先查询数据库,然后再将数据缓存的问题!用户直接查询事先被预热的缓存数据!
解决思路:
1、直接写个缓存刷新页面,上线时手工操作下;
2、数据量不大,可以在项目启动的时候自动进行加载;
3、定时刷新缓存;
(六)缓存更新
除了缓存服务器自带的缓存失效策略之外(Redis默认的有6中策略可供选择),我们还可以根据具体的业务需求进行自定义的缓存淘汰,常见的策略有两种:
(1)定时去清理过期的缓存;
(2)当有用户请求过来时,再判断这个请求所用到的缓存是否过期,过期的话就去底层系统得到新数据并更新缓存。
两者各有优劣,第一种的缺点是维护大量缓存的key是比较麻烦的,第二种的缺点就是每次用户请求过来都要判断缓存失效,逻辑相对比较复杂!具体用哪种方案,大家可以根据自己的应用场景来权衡。
解决方案
nginx+lua完成商品详情页访问流量实时上报kafka,在nginx这一层,接收到访问请求的时候,就把请求的流量上报发送给kafka
这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计,从lua脚本直接创建一个kafka producer,发送数据到kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 1git clone https://github.com/doujiang24/lua-resty-kafka.git
2
3cp -rf /usr/local/lua-resty-kafka/lib/resty /usr/local/openresty/lualib
4
5nginx -s reload
6
7local cjson = require("cjson")
8local producer = require("resty.kafka.producer")
9
10local broker_list = {
11 { host = "192.168.31.187", port = 9092 },
12 { host = "192.168.31.19", port = 9092 },
13 { host = "192.168.31.227", port = 9092 }
14}
15
16local log_json = {}
17log_json["headers"] = ngx.req.get_headers()
18log_json["uri_args"] = ngx.req.get_uri_args()
19log_json["body"] = ngx.req.read_body()
20log_json["http_version"] = ngx.req.http_version()
21log_json["method"] =ngx.req.get_method()
22log_json["raw_reader"] = ngx.req.raw_header()
23log_json["body_data"] = ngx.req.get_body_data()
24
25local message = cjson.encode(log_json);
26
27local productId = ngx.req.get_uri_args()["productId"]
28
29local async_producer = producer:new(broker_list, { producer_type = "async" })
30local ok, err = async_producer:send("access-log", productId, message)
31
32if not ok then
33 ngx.log(ngx.ERR, "kafka send err:", err)
34 return
35end
36
37
另两台机器上都这样做,才能统一上报流量到kafka
1
2
3
4
5 1bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --replication-factor 1 --partitions 1 --create
2
3bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --from-beginning
4
5
(七)缓存降级
当访问量剧增、服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务。系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级。
降级的最终目的是保证核心服务可用,即使是有损的。而且有些服务是无法降级的(如加入购物车、结算)。
在进行降级之前要对系统进行梳理,看看系统是不是可以丢卒保帅;从而梳理出哪些必须誓死保护,哪些可降级;比如可以参考日志级别设置预案:
(1)一般:比如有些服务偶尔因为网络抖动或者服务正在上线而超时,可以自动降级;
(2)警告:有些服务在一段时间内成功率有波动(如在95~100%之间),可以自动降级或人工降级,并发送告警;
(3)错误:比如可用率低于90%,或者数据库连接池被打爆了,或者访问量突然猛增到系统能承受的最大阀值,此时可以根据情况自动降级或者人工降级;
(4)严重错误:比如因为特殊原因数据错误了,此时需要紧急人工降级。