Go项目实战:打造高并发日志采集系统(三)

释放双眼,带上耳机,听听看~!

前文中已经完成了文件的监控,kafka信息读写,今天主要完成配置文件的读写以及热更新。并且规划一下系统的整体结构,然后将之前的功能串起来形成一套完整的日志采集系统。

前情提要

上一节我们完成了如下目标
1 完成kafka消息读写
2 借助tailf实现文件监控,并模拟测试实时写文件以及文件备份时功能无误。

本节目标

1 编写系统结构,在主函数中加载配置
2 管理配置文件,实现热更新

实现文件管理,支持热更新

golang中vipper库提供了配置文件的读取和监控功能,我们可以监控配置文件从而实现热更新。
先规划下日志采集系统的目录结构

logcatchsys为项目根目录,其下logcatchsys文件夹中main.go为系统启动的主函数,该文件加载配置,根据配置启动协程,监控指定目录的日志文件,当配置更新时,main做热更新,如果路径从配置中删除,则中止对应的监控协程。如果有新的路径添加到配置文件,则启动协程监控,如果路径有修改,则中止原路径协程,启动新的协程监听修改后的路径。
logconfig为配置存放的路径,logconfig.go主要负责配置的管理,包括监控。


1
2
3
4
5
6
7
8
1var onceLogConf sync.Once
2
3type ConfigData struct {
4   ConfigKey    string
5   ConfigValue  string
6   ConfigCancel context.CancelFunc
7}
8

在logconfig.go中定义了once操作的变量onceLogConf,该变量保证监控配置的协程退出后只执行一次析构。 

ConfigData结构体存储了配置文件中路径的信息,ConfigKey表示路径名,ConfigValue表示路径值,ConfigCancel存储上下文的CancelFunc,因为一个路径对应一个日志文件,监控日志文件就要开启协程,我是通过context管理监控日志的协程的。
在config.yaml中记录的路径信息如下:


1
2
3
4
5
6
7
1configpath:
2  logdir1: "../logdir1/log.txt"
3  logdir2: "../logdir2/log.txt"
4  logdir3: "../logdir3/log.txt"
5  logdir5: "../logdir3/log.txt"
6
7

logdir1对应ConfigKey 

../logdir1/log.txt对应ConfigValue
接下来在logconfig.go中我实现了配置文件的加载


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1func ReadConfig(v *viper.Viper) (interface{}, bool) {
2   //设置读取的配置文件
3   v.SetConfigName("config")
4   //添加读取的配置文件路径
5   _, filename, _, _ := runtime.Caller(0)
6   fmt.Println(filename)
7   fmt.Println(path.Dir(filename))
8   v.AddConfigPath(path.Dir(filename))
9   //设置配置文件类型
10  v.SetConfigType("yaml")
11  if err := v.ReadInConfig(); err != nil {
12      fmt.Printf("err:%s\n", err)
13      return nil, false
14  }
15
16  configPaths := v.Get("configpath")
17  if configPaths == nil {
18      return nil, false
19  }
20
21  return configPaths, true
22}
23
24

以及配置文件的监听 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1func WatchConfig(ctx context.Context, v *viper.Viper, pathChan chan interface{}) {
2
3   defer func() {
4       onceLogConf.Do(func() {
5           fmt.Println("watch config goroutine exit")
6           if err := recover(); err != nil {
7               fmt.Println("watch config goroutine panic ", err)
8           }
9           close(pathChan)
10      })
11  }()
12
13  //设置监听回调函数
14  v.OnConfigChange(func(e fsnotify.Event) {
15      //fmt.Printf("config is change :%s \n", e.String())
16      configPaths := v.Get("configpath")
17      if configPaths == nil {
18          return
19      }
20      pathChan <- configPaths
21  })
22  //开始监听
23  v.WatchConfig()
24  //信道不会主动关闭,可以主动调用cancel关闭
25  <-ctx.Done()
26}
27

当配置文件config.yaml有变动时,OnConfigChange传入的匿名函数会触发,从而将configpath节点的value传入chan中,这样main函数可以从外部获取最新的配置文件。 

ctx为上下文,当main函数执行上下文中止时,监控配置的协程会自动退出。

根据配置变动,实现热更新

在main.go中,定义了mainOnce控制主协程资源析构,并且通过ConstructMgr全局函数构造configMgr这样的map记录最新的配置信息。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1var mainOnce sync.Once
2var configMgr map[string]*logconfig.ConfigData
3
4func ConstructMgr(configPaths interface{}) {
5   configDatas := configPaths.(map[string]interface{})
6   for conkey, confval := range configDatas {
7       configData := new(logconfig.ConfigData)
8       configData.ConfigKey = conkey
9       configData.ConfigValue = confval.(string)
10      _, cancel := context.WithCancel(context.Background())
11      configData.ConfigCancel = cancel
12      configMgr[conkey] = configData
13  }
14}
15
16

接下来实现主函数 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1func main() {
2   v := viper.New()
3   configPaths, confres := logconfig.ReadConfig(v)
4   if configPaths == nil || !confres {
5       fmt.Println("read config failed")
6       return
7   }
8   configMgr = make(map[string]*logconfig.ConfigData)
9   ConstructMgr(configPaths)
10  ctx, cancel := context.WithCancel(context.Background())
11  pathChan := make(chan interface{})
12  go logconfig.WatchConfig(ctx, v, pathChan)
13  defer func() {
14      mainOnce.Do(func() {
15          if err := recover(); err != nil {
16              fmt.Println("main goroutine panic ", err) // 这里的err其实就是panic传入的内容
17          }
18          cancel()
19      })
20  }()
21
22

在主函数中读取配置,并且将配置的路径信息存储在configMgr中。接着启动了一个协程用来监控配置文件,并且我实现了主协程的资源回收。 

我们在main中继续添加接受监控协程的数据逻辑。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1for {
2       select {
3       case pathData, ok := <-pathChan:
4           if !ok {
5               return
6           }
7           fmt.Println("main goroutine receive pathData")
8           fmt.Println(pathData)
9           pathDataNew := pathData.(map[string]interface{})
10
11          for oldkey, oldval := range configMgr {
12              _, ok := pathDataNew[oldkey]
13              if ok {
14                  continue
15              }
16              oldval.ConfigCancel()
17              delete(configMgr, oldkey)
18          }
19
20          for conkey, conval := range pathDataNew {
21              oldval, ok := configMgr[conkey]
22              if !ok {
23                  configData := new(logconfig.ConfigData)
24                  configData.ConfigKey = conkey
25                  configData.ConfigValue = conval.(string)
26                  _, cancel := context.WithCancel(context.Background())
27                  configData.ConfigCancel = cancel
28                  configMgr[conkey] = configData
29                  continue
30              }
31
32              if oldval.ConfigValue != conval.(string) {
33                  oldval.ConfigValue = conval.(string)
34                  oldval.ConfigCancel()
35                  _, cancel := context.WithCancel(context.Background())
36                  oldval.ConfigCancel = cancel
37                  continue
38              }
39
40          }
41
42          for mgrkey, mgrval := range configMgr {
43              fmt.Println(mgrkey)
44              fmt.Println(mgrval)
45          }
46      }
47  }
48}
49
50

主协程接受数据后对比新旧数据,将旧的配置中被删除的路径剔除,增加和修改新的路径。 

日志监控留给之后处理,这里打印下更新后的配置信息。
整体运行下main函数,然后我们手动修改config.yaml,将logdir4修改为logdir5,可以看到如下信息

 

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense老手经验

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索