Go项目实战:打造高并发日志采集系统(七)

释放双眼,带上耳机,听听看~!

前情回顾

前文我们完成了日志采集系统基本功能,包括日志监控,日志采集,配置热更新,协程动态启动和关闭等。

本节目标

前文我们是通过将要监控的日志路径配置在配置文件中,根据配置文件热更新动态监控日志。
本节将etcd服务加入系统中,可以将要监控的日志文件路径和主题序列化为字符串保存到etcd中,这样系统可以监控etcd中该值得变化,从而动态启动协程和关闭协程监控指定日志。
这样可以通过etcd和配置文件两种方式监控日志。

图解系统监控日志流程

前文的日志系统流程如下
扩充后的日志系统流程如下
可见改进后的系统支持etcd服务检测日志路径变化,从而启动协程进行监控。

设计思路

我们在配置文件中添加几个key值,用来记录etcd存储的key。
1 我们的系统读取配置文件中的etcd的key值,启动协程A读取etcd中key的value,进而启动子协程B监控value记录的日志路径。
2 协程A还要监控etcd中的key对应的value是否有变化,如果value中有日志路径新增,则启动新的协程B监控新增日志。
如果value中有日志路径减少,关闭旧的协程B。
3 当配置文件中的key有变化,我们关闭原来的协程A及其子协程B,启动新的协程A,这样协程A继续启动新的子协程B监控日志。
而且新的协程A还要监控etcd的value是否有变化。

代码实现

简单阐述下部分代码,完整代码在文末。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1type EtcdLogConf struct {
2   Path          string                  `json:"path"`
3   Topic         string                  `json:"topic"`
4   Ctx           context.Context         `json:"-"`
5   Cancel        context.CancelFunc      `json:"-"`
6   KeyChan       chan string             `json:"-"`
7   KafkaProducer *kafkaqueue.ProducerKaf `json:"-"`
8}
9
10type EtcdLogMgr struct {
11  Ctx           context.Context
12  Cancel        context.CancelFunc
13  KeyChan       chan string
14  KafkaProducer *kafkaqueue.ProducerKaf
15  EtcdKey       string
16  EtcdClient    *clientv3.Client
17  EtcdConfMap   map[string]*EtcdLogConf
18}
19

定义了两个结构,EtcdLogMgr用来管理协程A,EtcdLogConf用来管理协程B。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1func ConstructEtcd(etcdDatas interface{}, keyChan chan string,
2   kafkaProducer *kafkaqueue.ProducerKaf, etcdaddr interface{}) map[string]*EtcdLogMgr {
3   etcdMgr := make(map[string]*EtcdLogMgr)
4   if etcdDatas == nil {
5       return etcdMgr
6   }
7   logkeys := etcdDatas.([]interface{})
8   for _, logkey := range logkeys {
9       clientv3 := InitEtcdClient(etcdaddr)
10      if clientv3 == nil {
11          continue
12      }
13      etcdData := new(EtcdLogMgr)
14      ctx, cancel := context.WithCancel(context.Background())
15      etcdData.Ctx = ctx
16      etcdData.Cancel = cancel
17      etcdData.KafkaProducer = kafkaProducer
18      etcdData.KeyChan = keyChan
19      etcdData.EtcdKey = logkey.(string)
20      etcdData.EtcdClient = clientv3
21      etcdMgr[logkey.(string)] = etcdData
22      fmt.Println(etcdData.EtcdKey, " init success ")
23  }
24  return etcdMgr
25}
26

ConstructEtcd根据参数构造了一个map返回,这个map主要是管理A类型的协程。
这个map在主函数检测到配置文件中的etcd的key值变化会动态修改map中key对应的value,并且启动或关闭对应的协程A。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1//根据etcd中的日志监控信息启动和关闭协程
2func UpdateEtcdGoroutine(etcdMgr map[string]*EtcdLogMgr, etcdlogData interface{},
3    kafkaProducer *kafkaqueue.ProducerKaf,keyChan chan string, etcdaddr interface{}) {
4   if etcdlogData == nil {
5       return
6   }
7   logkeys := etcdlogData.([]interface{})
8   newkeyMap := make(map[string]bool)
9   for _, logkey := range logkeys {
10      fmt.Println("update key is ", logkey.(string))
11      newkeyMap[logkey.(string)] = true
12  }
13
14  for oldkey, oldval := range etcdMgr {
15      if _, ok := newkeyMap[oldkey]; !ok {
16          oldval.Cancel()
17          delete(etcdMgr, oldkey)
18      }
19  }
20
21  for newkey, _ := range newkeyMap {
22      if _, ok := etcdMgr[newkey]; !ok {
23          clientv3 := InitEtcdClient(etcdaddr)
24          if clientv3 == nil {
25              continue
26          }
27          etcdData := new(EtcdLogMgr)
28          ctx, cancel := context.WithCancel(context.Background())
29          etcdData.Ctx = ctx
30          etcdData.Cancel = cancel
31          etcdData.KafkaProducer = kafkaProducer
32          etcdData.KeyChan = keyChan
33          etcdData.EtcdKey = newkey
34          etcdData.EtcdClient = clientv3
35          etcdMgr[newkey] = etcdData
36          fmt.Println(etcdData.EtcdKey, " init success ")
37          go WatchEtcdKeys(etcdData)
38      }
39  }
40}
41

UpdateEtcdGoroutine功能就是通过config.yaml中etcd的key变化而动态启动和关闭协程。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
1func WatchEtcdKeys(etcdMgr *EtcdLogMgr) {
2
3   defer func() {
4       if erreco := recover(); erreco != nil {
5           etcdMgr.KeyChan <- etcdMgr.EtcdKey
6           fmt.Println("watch etcd panic, exited")
7           goto CLEARLOG_GOROUTINE
8       }
9       fmt.Println("watch etcd  exit")
10      etcdMgr.EtcdClient.Close()
11  CLEARLOG_GOROUTINE:
12      for _, val := range etcdMgr.EtcdConfMap {
13          val.Cancel()
14      }
15      etcdMgr.EtcdConfMap = nil
16  }()
17  etcdMgr.EtcdConfMap = make(map[string]*EtcdLogConf)
18  ctxtime, cancel := context.WithTimeout(context.Background(), time.Second)
19  resp, err := etcdMgr.EtcdClient.Get(ctxtime, etcdMgr.EtcdKey)
20  cancel()
21  if err != nil {
22      fmt.Println("get failed, err:", err)
23      return
24  }
25
26  for _, ev := range resp.Kvs {
27      fmt.Printf("%s : %s ...\n", ev.Key, ev.Value)
28      etcdLogConf := make([]*EtcdLogConf, 0, 20)
29      unmarsherr := json.Unmarshal(ev.Value, &etcdLogConf)
30      if unmarsherr != nil {
31          fmt.Println("unmarshal error !, error is ", unmarsherr)
32          continue
33      }
34
35      for _, etcdval := range etcdLogConf {
36          etcdMgr.EtcdConfMap[etcdval.Topic] = etcdval
37          etcdval.Ctx, etcdval.Cancel = context.WithCancel(context.Background())
38          etcdval.KeyChan = logConfChan
39          etcdval.KafkaProducer = etcdMgr.KafkaProducer
40          go WatchEtcdFile(etcdval)
41      }
42      fmt.Println(etcdMgr.EtcdConfMap)
43  }
44  watchChan := etcdMgr.EtcdClient.Watch(etcdMgr.Ctx, etcdMgr.EtcdKey)
45  for {
46      select {
47      case wresp, ok := <-watchChan:
48          if !ok {
49              fmt.Println("watch etcd key  receive parent goroutine exited")
50              return
51          }
52          UpdateEtcdFile(etcdMgr, &wresp)
53      case logConfKey := <-logConfChan:
54          etcdvalt, ok := etcdMgr.EtcdConfMap[logConfKey]
55          if !ok {
56              continue
57          }
58          //重启日志监控协程
59          go WatchEtcdFile(etcdvalt)
60      }
61  }
62
63}
64

WatchEtcdKeys里实现了协程A从etcd中读取key对应的value,并且序列化出日志路径和主题。
然后启动子协程B执行WatchEtcdFile操作,WatchEtcdFile就是之前我们实现的监控指定路径的日志逻辑。


1
2
3
4
5
1func WatchEtcdFile(etcdFile *EtcdLogConf) {
2   logtailf.WatchLogFile(etcdFile.Topic, etcdFile.Path, etcdFile.Ctx,
3       etcdFile.KeyChan, etcdFile.KafkaProducer)
4}
5

WatchEtcdFile内部调用了logtailf包的WatchLogFile,这个是之前我们实现的日志监控逻辑


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1func UpdateEtcdFile(etcdMgr *EtcdLogMgr, wresp *clientv3.WatchResponse) {
2   etcdNewMap := make(map[string]*EtcdLogConf)
3   for _, ev := range wresp.Events {
4       fmt.Printf("%s %q:%q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
5       if ev.Type == mvccpb.DELETE {
6           continue
7       }
8       //panic("test panic")
9       etcdLogConfTmp := make([]*EtcdLogConf, 0, 20)
10      unmarsherr := json.Unmarshal(ev.Kv.Value, &etcdLogConfTmp)
11      if unmarsherr != nil {
12          fmt.Println("unmarshal error !, error is ", unmarsherr)
13          continue
14      }
15      for _, logslice := range etcdLogConfTmp {
16          etcdNewMap[logslice.Topic] = logslice
17      }
18  }
19
20  for oldkey, oldval := range etcdMgr.EtcdConfMap {
21      _, ok := etcdNewMap[oldkey]
22      if !ok {
23          //该日志文件取消监控
24          oldval.Cancel()
25          delete(etcdMgr.EtcdConfMap, oldkey)
26      }
27  }
28
29  for newkey, newval := range etcdNewMap {
30      oldval, ok := etcdMgr.EtcdConfMap[newkey]
31      if !ok {
32          //新增日志文件,启动协程监控
33          etcdMgr.EtcdConfMap[newval.Topic] = newval
34          newval.Ctx, newval.Cancel = context.WithCancel(context.Background())
35          newval.KeyChan = logConfChan
36          newval.KafkaProducer = etcdMgr.KafkaProducer
37          go WatchEtcdFile(newval)
38          continue
39      }
40
41      //判断val是否修改
42      if newval.Path != oldval.Path {
43          oldval.Cancel()
44          delete(etcdMgr.EtcdConfMap, oldval.Topic)
45          etcdMgr.EtcdConfMap[newval.Topic] = newval
46          newval.Ctx, newval.Cancel = context.WithCancel(context.Background())
47          newval.KeyChan = logConfChan
48          newval.KafkaProducer = etcdMgr.KafkaProducer
49          go WatchEtcdFile(newval)
50          continue
51      }
52  }
53}
54

UpdateEtcdFile实现了当etcd中的val有变化时,该函数对比之前的数据,启动新的协程监控新日志,
如果日志路径删除,则关闭监控该日志的协程

测试

在配置文件config.yaml中添加


1
2
3
4
5
6
7
8
1etcdkeys:
2  - "collectlogkey1"
3  - "collectlogkey2"
4etcdconfig:
5  - "localhost:2379"
6  - "localhost:3379"
7  - "localhost:4379"
8

etcdkeys为etcd服务中记录的key, etcdconfig为etcd服务的地址列表,我们启动的是个集群。
安装etcd服务后,启动etcd集群,这个百度一下就知道了。
然后我们启动日志采集系统,看到如下

Go项目实战:打造高并发日志采集系统(七)

 

 因为etcd服务中没有collectlogkey1和collectlogkey2,所以我们的采集系统没有输出监控信息。
但是采集系统已经启动协程A监控这两个key了,当etcd中这两个key有value或改变,协程A会启动协程B监控日志。
我们通过测试程序向etcd写入日志路径信息

Go项目实战:打造高并发日志采集系统(七)

 

 我们向collectlogkey1写入了两个日志路径,可以看到采集系统协程A检测到etcd变化从而启动了协程B监控这两个日志

Go项目实战:打造高并发日志采集系统(七)

 

 如果我们修改etcd中collectlogkey1的值,改为只监控一个日志,日志采集系统会动态关闭无用的协程B

Go项目实战:打造高并发日志采集系统(七)

 

 如果我们将collectlogkey1从etcd中删除, 那么采集系统会自动关闭监控collectlogkey1的协程A的所有子协程B。

Go项目实战:打造高并发日志采集系统(七)

 

 

总结

通过增加etcd服务监控,使系统的功能更全面。通过配置文件中etcd的多个key,主协程启动并管理多个协程A,
而每个协程A根据etcd中key对应的value启动多个协程B,协程B监控指定日志的变化。
这种协程嵌套使用一定要注意协程异常退出和正常退出这两种情况,以及资源回收问题。
当协程A异常退出时,主协程重启协程A,当协程B异常退出时,协程A重启协程B。
当协程A正常退出时,通知其下所有子协程B正常退出。
源码下载
https://github.com/secondtonone1/golang-/tree/master/logcatchsys
感谢关注公众号

 

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google AdSense 全面解析(申请+操作+作弊+忠告)

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索