前情回顾
前文我们完成了日志采集系统基本功能,包括日志监控,日志采集,配置热更新,协程动态启动和关闭,同时扩充支持了etcd管理文件路径。
本节目标
本节新增日志查询和检索功能。基本思路是将日志信息从kafka中读取,然后放到elasticsearch中,elasticsearch是一个分布式多用户能力
的全文搜索引擎,我们可以通过它提供的web接口访问和查询指定数据。另外,为了更方便的检索和查询,可以利用kibana配合elastic可视化
查询。Kibana 是为 Elasticsearch设计的开源分析和可视化平台。
源码实现
将日志从kafka中读取并解析写入elastic这部分功能,我们将其提炼到另外一个进程中,单独启动监控并处理kafka数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1package main
2import (
3 "fmt"
4 kafconsumer "golang-/logcatchsys/kafconsumer"
5 "golang-/logcatchsys/logconfig"
6)
7
8func main() {
9 v := logconfig.InitVipper()
10 if v == nil {
11 fmt.Println("vipper init failed!")
12 return
13 }
14
15 kafconsumer.GetMsgFromKafka()
16}
17
18
主函数调用了我封装的kafconsumer包的读取消息函数GetMsgFromKafka。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 1func GetMsgFromKafka() {
2 fmt.Println("kafka consumer begin ...")
3 config := sarama.NewConfig()
4 config.Consumer.Return.Errors = true
5 var kafkaddr = "localhost:9092"
6 kafkaconf, _ := logconfig.ReadConfig(logconfig.InitVipper(), "kafkaconfig.kafkaaddr")
7 if kafkaconf != nil {
8 kafkaddr = kafkaconf.(string)
9 }
10 //创建消费者
11 consumer, err := sarama.NewConsumer([]string{kafkaddr}, config)
12 if err != nil {
13 fmt.Println("consumer create failed, error is ", err.Error())
14 return
15 }
16 defer func(consumer sarama.Consumer) {
17 if err := recover(); err != nil {
18 fmt.Println("consumer panic error ", err)
19 }
20 consumer.Close()
21 topicSet = nil
22 //回收所有协程
23 for _, val := range topicMap {
24 for _, valt := range val {
25 valt.Cancel()
26 }
27 }
28
29 topicMap = nil
30 }(consumer)
31 topicSetTmp := ConstructTopicSet()
32 if topicSetTmp == nil {
33 fmt.Println("construct topic set error ")
34 return
35 }
36 topicSet = topicSetTmp
37 ConsumeTopic(consumer)
38}
39
40
GetMsgFromKafka中创建了kafka消费者,然后根据配置调用ConstructTopicSet构造topic集合,topicSet集合其实是一个map,
保证了集合中的topic不重复。然后调用ConsumeTopic函数根据topic从kafka取出数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1func ConstructTopicSet() map[string]bool {
2 topicSetTmp := make(map[string]bool)
3 configtopics, _ := logconfig.ReadConfig(logconfig.InitVipper(), "collectlogs")
4 if configtopics == nil {
5 goto CONFTOPIC
6 }
7 for _, configtopic := range configtopics.([]interface{}) {
8 confmap := configtopic.(map[interface{}]interface{})
9 for key, val := range confmap {
10 if key.(string) == "logtopic" {
11 topicSetTmp[val.(string)] = true
12 }
13 }
14 }
15CONFTOPIC:
16 return topicSetTmp
17}
18
19
ConstructTopicSet读取配置中的topic列表,然后将这些topic放到map中返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 1func ConsumeTopic(consumer sarama.Consumer) {
2
3 for key, _ := range topicSet {
4 partitionList, err := consumer.Partitions(key)
5 if err != nil {
6 fmt.Println("get consumer partitions failed")
7 fmt.Println("error is ", err.Error())
8 continue
9 }
10
11 for partition := range partitionList {
12 pc, err := consumer.ConsumePartition(key, int32(partition), sarama.OffsetNewest)
13 if err != nil {
14 fmt.Println("consume partition error is ", err.Error())
15 continue
16 }
17 defer pc.AsyncClose()
18
19 topicData := new(TopicData)
20 topicData.Ctx, topicData.Cancel = context.WithCancel(context.Background())
21 topicData.KafConsumer = pc
22 topicData.TPartition = new(TopicPart)
23 topicData.TPartition.Partition = int32(partition)
24 topicData.TPartition.Topic = key
25 _, okm := topicMap[key]
26 if !okm {
27 topicMap[key] = make(map[int32]*TopicData)
28 }
29 topicMap[key][int32(partition)] = topicData
30 go ReadFromEtcd(topicData)
31
32 }
33 }
34 for {
35 select {
36 case topicpart := <-topicChan:
37 fmt.Printf("receive goroutine exited, topic is %s, partition is %d\n",
38 topicpart.Topic, topicpart.Partition)
39 //重启消费者读取数据的协程
40 val, ok := topicMap[topicpart.Topic]
41 if !ok {
42 continue
43 }
44 tp, ok := val[topicpart.Partition]
45 if !ok {
46 continue
47 }
48 tp.Ctx, tp.Cancel = context.WithCancel(context.Background())
49 go ReadFromEtcd(tp)
50 }
51
52 }
53}
54
55
ConsumeTopic实际是将topic集合中的topic遍历放到map中,然后启动协程调用ReadFromEtcd函数读取消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 1func ReadFromEtcd(topicData *TopicData) {
2
3 fmt.Printf("kafka consumer begin to read message, topic is %s, part is %d\n", topicData.TPartition.Topic,
4 topicData.TPartition.Partition)
5
6 logger := log.New(os.Stdout, "LOGCAT", log.LstdFlags|log.Lshortfile)
7 elastiaddr, _ := logconfig.ReadConfig(logconfig.InitVipper(), "elasticconfig.elasticaddr")
8 if elastiaddr == nil {
9 elastiaddr = "localhost:9200"
10 }
11
12 esClient, err := elastic.NewClient(elastic.SetURL("http://"+elastiaddr.(string)),
13 elastic.SetErrorLog(logger))
14 if err != nil {
15 // Handle error
16 logger.Println("create elestic client error ", err.Error())
17 return
18 }
19
20 info, code, err := esClient.Ping("http://" + elastiaddr.(string)).Do(context.Background())
21 if err != nil {
22 logger.Println("elestic search ping error, ", err.Error())
23 esClient.Stop()
24 esClient = nil
25 return
26 }
27 fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
28
29 esversion, err := esClient.ElasticsearchVersion("http://" + elastiaddr.(string))
30 if err != nil {
31 fmt.Println("elestic search version get failed, ", err.Error())
32 esClient.Stop()
33 esClient = nil
34 return
35 }
36 fmt.Printf("Elasticsearch version %s\n", esversion)
37
38 defer func(esClient *elastic.Client) {
39 if err := recover(); err != nil {
40 fmt.Printf("consumer message panic %s, topic is %s, part is %d\n", err,
41 topicData.TPartition.Topic, topicData.TPartition.Partition)
42 topicChan <- topicData.TPartition
43 }
44
45 }(esClient)
46
47 var typestr = "catlog"
48 typeconf, _ := logconfig.ReadConfig(logconfig.InitVipper(), "elasticconfig.typestr")
49 if typeconf != nil {
50 typestr = typeconf.(string)
51 }
52
53 for {
54 select {
55 case msg, ok := <-topicData.KafConsumer.Messages():
56 if !ok {
57 fmt.Println("etcd message chan closed ")
58 return
59 }
60 fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n",
61 msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
62 idstr := strconv.FormatInt(int64(msg.Partition), 10) + strconv.FormatInt(msg.Offset, 10)
63 logdata := &LogData{Topic: msg.Topic, Log: string(msg.Value), Id: idstr}
64 createIndex, err := esClient.Index().Index(msg.Topic).Type(typestr).Id(idstr).BodyJson(logdata).Do(context.Background())
65
66 if err != nil {
67 logger.Println("create index failed, ", err.Error())
68 continue
69 }
70 fmt.Println("create index success, ", createIndex)
71
72 case <-topicData.Ctx.Done():
73 fmt.Println("receive exited from parent goroutine !")
74 return
75 }
76 }
77}
78
79
ReadFromEtcd函数将kafka中读取的数据写入elastic中,同时如果协程崩溃向父协程发送通知,重启该协程。
效果展示
我们启动之前的日志监控程序,然后启动现在设计的信息处理程序。
可以看到日志不断被写入时,监控程序将日志的变化信息写入kafka。
同时,信息处理程序不断的从kafka中读取数据写入elastic。
我们通过kibana查询数据