Go项目实战:打造高并发日志采集系统(四)

释放双眼,带上耳机,听听看~!

前情回顾

前文我们完成了如下目标
1 项目架构整体编写
2 使框架支持热更新

本节目标

在前文的框架基础上,我们
1 将之前实现的日志监控功能整合到框架中。
2 一个日志对应一个监控协程,当配置热更新后根据新配置动态关闭和启动协程。
3 编写测试代码,模拟向文件中不断写入日志,并备份日志,观察监控功能是否健壮。

增加协程监控日志文件

我们将之前实现的日志监控功能整合到现有框架,文件结构如下
logdir为存储日志的文件夹,模拟不同系统记录的日志。实际生产中不同系统会自己记录日志并保存在指定文件夹中,logdir模拟的就是这些指定文件。
logtailf实现日志文件的监控功能。
writefile模拟不同系统向指定文件夹写入日志,用来测试日志写入和备份时,我们的采集系统是否健壮。
我们先看下logtailf.go


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1func WatchLogFile(datapath string, ctx context.Context) {
2   fmt.Println("begin goroutine watch log file ", datapath)
3   tailFile, err := tail.TailFile(datapath, tail.Config{
4       //文件被移除或被打包,需要重新打开
5       ReOpen: true,
6       //实时跟踪
7       Follow: true,
8       //如果程序出现异常,保存上次读取的位置,避免重新读取
9       Location: &tail.SeekInfo{Offset: 0, Whence: 2},
10      //支持文件不存在
11      MustExist: false,
12      Poll:      true,
13  })
14
15  if err != nil {
16      fmt.Println("tail file err:", err)
17      return
18  }
19
20  for true {
21      select {
22      case msg, ok := <-tailFile.Lines:
23          if !ok {
24              fmt.Printf("tail file close reopen, filename: %s\n", tailFile.Filename)
25              time.Sleep(100 * time.Millisecond)
26              continue
27          }
28          //fmt.Println("msg:", msg)
29          //只打印text
30          fmt.Println("msg:", msg.Text)
31      case <-ctx.Done():
32          fmt.Println("receive main gouroutine exit msg")
33          fmt.Println("watch log file ", datapath, " goroutine exited")
34          return
35      }
36
37  }
38}
39

只有一个函数WatchLogFile用来监控指定路径的日志文件,两个参数分别为日志路径和上下文开关,开关用来关闭这个协程,整体功能和前几篇讲述的一样,不做赘述。
接下来我们在main.go中填加协程启动逻辑监控日志文件


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1func ConstructMgr(configPaths interface{}) {
2   configDatas := configPaths.(map[string]interface{})
3   for conkey, confval := range configDatas {
4       configData := new(logconfig.ConfigData)
5       configData.ConfigKey = conkey
6       configData.ConfigValue = confval.(string)
7       ctx, cancel := context.WithCancel(context.Background())
8       configData.ConfigCancel = cancel
9       configMgr[conkey] = configData
10      go logtailf.WatchLogFile(configData.ConfigValue,
11          ctx)
12  }
13}
14

ConstructMgr新增了协程启动逻辑,并且将协程的ctx保存在map中,这样主协程可以根据热更新启动和关闭这个协程。然后我完善了main函数中的析构函数


1
2
3
4
5
6
7
8
9
10
11
12
13
1defer func() {
2       mainOnce.Do(func() {
3           if err := recover(); err != nil {
4               fmt.Println("main goroutine panic ", err) // 这里的err其实就是panic传入的内容
5           }
6           cancel()
7           for _, oldval := range configMgr {
8               oldval.ConfigCancel()
9           }
10          configMgr = nil
11      })
12  }()
13

析构函数里遍历map,关闭所有监控协程,并且将map设置为nil回收资源。在main函数中添加热更新控制协程开关。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1for oldkey, oldval := range configMgr {
2   _, ok := pathDataNew[oldkey]
3   if ok {
4           continue
5       }
6   oldval.ConfigCancel()
7   delete(configMgr, oldkey)
8}
9
10for conkey, conval := range pathDataNew {
11  oldval, ok := configMgr[conkey]
12  if !ok {
13      configData := new(logconfig.ConfigData)
14      configData.ConfigKey = conkey
15      configData.ConfigValue = conval.(string)
16      ctx, cancel := context.WithCancel(context.Background())
17      configData.ConfigCancel = cancel
18      configMgr[conkey] = configData
19      fmt.Println(conval.(string))
20      go logtailf.WatchLogFile(configData.ConfigValue,
21                  ctx)
22              continue
23  }
24
25  if oldval.ConfigValue != conval.(string) {
26      oldval.ConfigValue = conval.(string)
27      oldval.ConfigCancel()
28      ctx, cancel := context.WithCancel(context.Background())
29      oldval.ConfigCancel = cancel
30      go logtailf.WatchLogFile(conval.(string),
31                  ctx)
32      continue
33  }
34}
35

对比新旧配置,如果路径取消,则停止监控其的协程,如果路径新增或修改,则启动新的协程。
这样我们启动程序看到如下效果

编写测试文件,模拟日志写入和备份

我们在writefile.go中实现日志写入和备份,这部分内容前几篇有讲解,我只是将原来实现的功能搬过来


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1func writeLog(datapath string, wg *sync.WaitGroup) {
2   filew, err := os.OpenFile(datapath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
3   if err != nil {
4       fmt.Println("open file error ", err.Error())
5       return
6   }
7   defer func() {
8       wg.Done()
9   }()
10  w := bufio.NewWriter(filew)
11  for i := 0; i < 20; i++ {
12      timeStr := time.Now().Format("2006-01-02 15:04:05")
13      fmt.Fprintln(w, "Hello current time is "+timeStr)
14      time.Sleep(time.Millisecond * 100)
15      w.Flush()
16  }
17  logBak := time.Now().Format("20060102150405") + ".txt"
18  logBak = path.Join(path.Dir(datapath), logBak)
19  filew.Close()
20  err = os.Rename(datapath, logBak)
21  if err != nil {
22      fmt.Println("Rename error ", err.Error())
23      return
24  }
25}
26
27func main() {
28  v := viper.New()
29  configPaths, confres := logconfig.ReadConfig(v)
30  if !confres {
31      fmt.Println("config read failed")
32      return
33  }
34  wg := &sync.WaitGroup{}
35
36  for _, confval := range configPaths.(map[string]interface{}) {
37      wg.Add(1)
38      go writeLog(confval.(string), wg)
39  }
40  wg.Wait()
41}
42

根据配置文件启动多个协程,向日志文件中不断写入日志,写入20条后备份日志,我们启动日志收集程序和这个测试程序,可以看到日志不断被写入,日志收集程序不断打印日志新增的内容
日志备份为不同的文件
可以看到日志收集系统在日志不断写入时,可以健壮运行

热更新控制监控协程打开和关闭

现在config.yaml文件中路径记录如下


1
2
3
4
5
1configpath:
2  logdir1: "D:/golangwork/src/golang-/logcatchsys/logdir1/log.txt"
3  logdir2: "D:/golangwork/src/golang-/logcatchsys/logdir2/log.txt"
4  logdir3: "D:/golangwork/src/golang-/logcatchsys/logdir3/log.txt"
5

我们启动日志收集程序,模拟热更新配置,将logdir3这条数据修改, 配置变为如下


1
2
3
4
5
1configpath:
2  logdir1: "D:/golangwork/src/golang-/logcatchsys/logdir1/log.txt"
3  logdir2: "D:/golangwork/src/golang-/logcatchsys/logdir2/log.txt"
4  logdir4: "D:/golangwork/src/golang-/logcatchsys/logdir4/log.txt"
5

我们可以看到程序作出检测,并关闭了曾经监控logdir3的协程,并启动了新的协程监控logdir4,而其他的日志监控协程不受影响,正常运行。

源码下载地址
https://github.com/secondtonone1/golang-/tree/master/logcatchsys
我的公众号

给TA打赏
共{{data.count}}人
人已打赏
安全经验

英文站如何做Google Adsense

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索