Go项目实战:打造高并发日志采集系统(六)

释放双眼,带上耳机,听听看~!

前情回顾

前文我们完成了日志采集系统的日志文件监控,配置文件热更新,协程异常检测和保活机制。

本节目标

本节加入kafka消息队列,kafka前文也介绍过了,可以对消息进行排队,解耦合和流量控制的作用,为什么一定要用kafka呢?主要原因就是在日志高并发读取后,如果直接将消息发给前端或者写入数据库,会造成崩溃或者卡死。kafka可以对消息进行排队和减轻压力,这样无论以后将这些消息录入数据库也好,传给前端分析也好,都能保证系统稳定性。代码我们也写过和测试了,只需要将之前写好的kafka读写消息代码整合过来即可。

主函数创建kafka生产者

在主函数中创建kafkaProducer,然后在defer中回收该资源。我们将该producer传递给每个监控日志的协程中,当日志有修改,就通过producer将修改的信息写入kafka,用kafka排队和缓存,可以提高稳定性,减少流量高峰。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
1func main() {
2   //省略...
3   kafkaProducer := &kafkaqueue.ProducerKaf{Producer: producer}
4   configMgr = make(map[string]*logconfig.ConfigData)
5   keyChan := make(chan string, KEYCHANSIZE)
6   ConstructMgr(configPaths, keyChan, kafkaProducer)
7  
8   defer func() {
9       mainOnce.Do(func() {
10            //省略...
11          kafkaProducer.Producer.Close()
12      })
13  }()
14
15  for {
16      select {
17      case pathData, ok := <-pathChan:
18          if !ok {
19              return
20          }
21           //省略...
22          for conkey, conval := range pathDataNew {
23              oldval, ok := configMgr[conkey]
24              if !ok {
25                  //省略...
26                  go logtailf.WatchLogFile(configData.ConfigKey, configData.ConfigValue,
27                      ctx, keyChan, kafkaProducer)
28                  continue
29              }
30                
31                if oldval.ConfigValue != conval.(string) {
32                  //省略...
33                  go logtailf.WatchLogFile(conkey, conval.(string),
34                      ctx, keyChan, kafkaProducer)
35                  continue
36              }
37
38          }  
39        
40        case keystr := <-keyChan:
41          val, ok := configMgr[keystr]
42          if !ok {
43              continue
44          }
45           //省略...
46          go logtailf.WatchLogFile(keystr, val.ConfigValue,
47              ctxcover, keyChan, kafkaProducer)
48      }
49  }
50}
51
52

WatchLogFile函数携带了该producer。有人会问多个协程共享producer是否会出问题?我查看了Producer发送消息的源码

 

 

红框中使用了chan传递数据,所以在多个协程调用producer的发送函数是没问题的。

监控协程写入kafka消息

当日志新增时,我们在监控日志的协程向kafka写入消息


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1func WatchLogFile(pathkey string, datapath string, ctx context.Context, keychan chan<- string, kafProducer *kafkaqueue.ProducerKaf) {
2    //省略逻辑...
3    for true {
4       select {
5       case msg, ok := <-tailFile.Lines:
6           //省略逻辑...
7           kafProducer.PutIntoKafka(pathkey, msg.Text)
8       case <-ctx.Done():
9           fmt.Println("receive main gouroutine exit msg")
10          fmt.Println("watch log file ", pathkey, " goroutine exited")
11          return
12      }
13
14  }
15}
16
17

封装kafkaProducer

上述代码中调用的kafkaProducer是我自己封装的,其实就是组合了原生的kafka生产者,并且封装了发送函数


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1func CreateKafkaProducer() (sarama.SyncProducer, error) {
2   config := sarama.NewConfig()
3
4   // 等待服务器所有副本都保存成功后的响应
5   config.Producer.RequiredAcks = sarama.WaitForAll
6   // 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
7   config.Producer.Partitioner = sarama.NewRandomPartitioner
8   // 是否等待成功和失败后的响应
9   config.Producer.Return.Successes = true
10
11  // 使用给定代理地址和配置创建一个同步生产者
12  producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
13  if err != nil {
14      fmt.Println("create producer failed, ", err.Error())
15      return nil, err
16  }
17  fmt.Println("create kafka producer success")
18
19  return producer, nil
20}
21
22

上面的函数返回了原生的kafka生产者接口,接下来我们封装这个原生接口,然后编写了写入kafka的方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1type ProducerKaf struct {
2   Producer sarama.SyncProducer
3}
4
5func (p *ProducerKaf) PutIntoKafka(keystr string, valstr string) {
6   //构建发送的消息,
7   msg := &sarama.ProducerMessage{
8       Topic: "logcatchsys",
9       Key:   sarama.StringEncoder(keystr),
10      Value: sarama.StringEncoder(valstr),
11  }
12  partition, offset, err := p.Producer.SendMessage(msg)
13
14  if err != nil {
15      fmt.Println("Send message Fail")
16      fmt.Println(err.Error())
17  }
18  fmt.Printf("Partition = %d, offset=%d, msgvalue=%s \n", partition, offset, valstr)
19
20}
21
22

启动kafka测试

我们先启动zookeeper和kafka
zookeeper进入bin文件夹点击zkServer.cmd即可启动
kafka启动使用如下命令


1
2
3
1.\bin\windows\kafka-server-start.bat .\config\server.properties
2
3

然后我们创建主题logcatchsys


1
2
3
1.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 16 --topic logcatchsys
2
3

这样我们为主题logcatchsys创建了16个分区。
接下来我们启动消费者


1
2
3
1.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic logcatchsys --from-beginning
2
3

然后我们启动我们的采集系统和测死脚本,看到如下

 

可以看到当日志文件不断被写入时,我们的采集系统会将修改的内容实时监控并写入kafka队列,然后kafka消费者从队列中取出这些消息。

总结

Go项目实战:打造高并发日志采集系统(六)

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense老手经验

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索