Go项目实战:打造高并发日志采集系统(八)

释放双眼,带上耳机,听听看~!

前情回顾

前文我们完成了日志采集系统基本功能,包括日志监控,日志采集,配置热更新,协程动态启动和关闭,同时扩充支持了etcd管理文件路径。

本节目标

本节新增日志查询和检索功能。基本思路是将日志信息从kafka中读取,然后放到elasticsearch中,elasticsearch是一个分布式多用户能力
的全文搜索引擎,我们可以通过它提供的web接口访问和查询指定数据。另外,为了更方便的检索和查询,可以利用kibana配合elastic可视化
查询。Kibana 是为 Elasticsearch设计的开源分析和可视化平台。

源码实现

将日志从kafka中读取并解析写入elastic这部分功能,我们将其提炼到另外一个进程中,单独启动监控并处理kafka数据。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1package main
2import (
3   "fmt"
4   kafconsumer "golang-/logcatchsys/kafconsumer"
5   "golang-/logcatchsys/logconfig"
6)
7
8func main() {
9   v := logconfig.InitVipper()
10  if v == nil {
11      fmt.Println("vipper init failed!")
12      return
13  }
14
15  kafconsumer.GetMsgFromKafka()
16}
17
18

主函数调用了我封装的kafconsumer包的读取消息函数GetMsgFromKafka。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1func GetMsgFromKafka() {
2   fmt.Println("kafka consumer begin ...")
3   config := sarama.NewConfig()
4   config.Consumer.Return.Errors = true
5   var kafkaddr = "localhost:9092"
6   kafkaconf, _ := logconfig.ReadConfig(logconfig.InitVipper(), "kafkaconfig.kafkaaddr")
7   if kafkaconf != nil {
8       kafkaddr = kafkaconf.(string)
9   }
10  //创建消费者
11  consumer, err := sarama.NewConsumer([]string{kafkaddr}, config)
12  if err != nil {
13      fmt.Println("consumer create failed, error is ", err.Error())
14      return
15  }
16  defer func(consumer sarama.Consumer) {
17      if err := recover(); err != nil {
18          fmt.Println("consumer panic error ", err)
19      }
20      consumer.Close()
21      topicSet = nil
22      //回收所有协程
23      for _, val := range topicMap {
24          for _, valt := range val {
25              valt.Cancel()
26          }
27      }
28
29      topicMap = nil
30  }(consumer)
31  topicSetTmp := ConstructTopicSet()
32  if topicSetTmp == nil {
33      fmt.Println("construct topic set error ")
34      return
35  }
36  topicSet = topicSetTmp
37  ConsumeTopic(consumer)
38}
39
40

GetMsgFromKafka中创建了kafka消费者,然后根据配置调用ConstructTopicSet构造topic集合,topicSet集合其实是一个map,
保证了集合中的topic不重复。然后调用ConsumeTopic函数根据topic从kafka取出数据。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1func ConstructTopicSet() map[string]bool {
2   topicSetTmp := make(map[string]bool)
3   configtopics, _ := logconfig.ReadConfig(logconfig.InitVipper(), "collectlogs")
4   if configtopics == nil {
5       goto CONFTOPIC
6   }
7   for _, configtopic := range configtopics.([]interface{}) {
8       confmap := configtopic.(map[interface{}]interface{})
9       for key, val := range confmap {
10          if key.(string) == "logtopic" {
11              topicSetTmp[val.(string)] = true
12          }
13      }
14  }
15CONFTOPIC:
16  return topicSetTmp
17}
18
19

ConstructTopicSet读取配置中的topic列表,然后将这些topic放到map中返回。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1func ConsumeTopic(consumer sarama.Consumer) {
2
3   for key, _ := range topicSet {
4       partitionList, err := consumer.Partitions(key)
5       if err != nil {
6           fmt.Println("get consumer partitions failed")
7           fmt.Println("error is ", err.Error())
8           continue
9       }
10
11      for partition := range partitionList {
12          pc, err := consumer.ConsumePartition(key, int32(partition), sarama.OffsetNewest)
13          if err != nil {
14              fmt.Println("consume partition error is ", err.Error())
15              continue
16          }
17          defer pc.AsyncClose()
18
19          topicData := new(TopicData)
20          topicData.Ctx, topicData.Cancel = context.WithCancel(context.Background())
21          topicData.KafConsumer = pc
22          topicData.TPartition = new(TopicPart)
23          topicData.TPartition.Partition = int32(partition)
24          topicData.TPartition.Topic = key
25          _, okm := topicMap[key]
26          if !okm {
27              topicMap[key] = make(map[int32]*TopicData)
28          }
29          topicMap[key][int32(partition)] = topicData
30          go ReadFromEtcd(topicData)
31
32      }
33  }
34  for {
35      select {
36      case topicpart := <-topicChan:
37          fmt.Printf("receive goroutine exited, topic is %s, partition is %d\n",
38              topicpart.Topic, topicpart.Partition)
39          //重启消费者读取数据的协程
40          val, ok := topicMap[topicpart.Topic]
41          if !ok {
42              continue
43          }
44          tp, ok := val[topicpart.Partition]
45          if !ok {
46              continue
47          }
48          tp.Ctx, tp.Cancel = context.WithCancel(context.Background())
49          go ReadFromEtcd(tp)
50      }
51
52  }
53}
54
55

ConsumeTopic实际是将topic集合中的topic遍历放到map中,然后启动协程调用ReadFromEtcd函数读取消息。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
1func ReadFromEtcd(topicData *TopicData) {
2
3   fmt.Printf("kafka consumer begin to read message, topic is %s, part is %d\n", topicData.TPartition.Topic,
4       topicData.TPartition.Partition)
5
6   logger := log.New(os.Stdout, "LOGCAT", log.LstdFlags|log.Lshortfile)
7   elastiaddr, _ := logconfig.ReadConfig(logconfig.InitVipper(), "elasticconfig.elasticaddr")
8   if elastiaddr == nil {
9       elastiaddr = "localhost:9200"
10  }
11
12  esClient, err := elastic.NewClient(elastic.SetURL("http://"+elastiaddr.(string)),
13      elastic.SetErrorLog(logger))
14  if err != nil {
15      // Handle error
16      logger.Println("create elestic client error ", err.Error())
17      return
18  }
19
20  info, code, err := esClient.Ping("http://" + elastiaddr.(string)).Do(context.Background())
21  if err != nil {
22      logger.Println("elestic search ping error, ", err.Error())
23      esClient.Stop()
24      esClient = nil
25      return
26  }
27  fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
28
29  esversion, err := esClient.ElasticsearchVersion("http://" + elastiaddr.(string))
30  if err != nil {
31      fmt.Println("elestic search version get failed, ", err.Error())
32      esClient.Stop()
33      esClient = nil
34      return
35  }
36  fmt.Printf("Elasticsearch version %s\n", esversion)
37
38  defer func(esClient *elastic.Client) {
39      if err := recover(); err != nil {
40          fmt.Printf("consumer message panic %s, topic is %s, part is %d\n", err,
41              topicData.TPartition.Topic, topicData.TPartition.Partition)
42          topicChan <- topicData.TPartition
43      }
44
45  }(esClient)
46
47  var typestr = "catlog"
48  typeconf, _ := logconfig.ReadConfig(logconfig.InitVipper(), "elasticconfig.typestr")
49  if typeconf != nil {
50      typestr = typeconf.(string)
51  }
52
53  for {
54      select {
55      case msg, ok := <-topicData.KafConsumer.Messages():
56          if !ok {
57              fmt.Println("etcd message chan closed ")
58              return
59          }
60          fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n",
61              msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
62          idstr := strconv.FormatInt(int64(msg.Partition), 10) + strconv.FormatInt(msg.Offset, 10)
63          logdata := &LogData{Topic: msg.Topic, Log: string(msg.Value), Id: idstr}
64          createIndex, err := esClient.Index().Index(msg.Topic).Type(typestr).Id(idstr).BodyJson(logdata).Do(context.Background())
65
66          if err != nil {
67              logger.Println("create index failed, ", err.Error())
68              continue
69          }
70          fmt.Println("create index success, ", createIndex)
71
72      case <-topicData.Ctx.Done():
73          fmt.Println("receive exited from parent goroutine !")
74          return
75      }
76  }
77}
78
79

ReadFromEtcd函数将kafka中读取的数据写入elastic中,同时如果协程崩溃向父协程发送通知,重启该协程。

效果展示

我们启动之前的日志监控程序,然后启动现在设计的信息处理程序。
可以看到日志不断被写入时,监控程序将日志的变化信息写入kafka。
同时,信息处理程序不断的从kafka中读取数据写入elastic。
我们通过kibana查询数据

源码下载

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Git 版本回退

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索