Go项目实战:打造高并发日志采集系统(二)

释放双眼,带上耳机,听听看~!

日志统计系统的整体思路就是监控各个文件夹下的日志,实时获取日志写入内容并写入kafka队列,写入kafka队列可以在高并发时排队,而且达到了逻辑解耦合的目的。然后从kafka队列中读出数据,根据实际需求显示网页上或者控制台等。

前情提要

上一节我们完成了如下目标
1 配置kafka,并启动消息队列。
2 编写代码向kafka录入消息,并且从kafka读取消息。

本节目标

1 写代码从kafka中读取消息,保证kafka消息读写功能无误。
2 借助tailf实现文件监控,并模拟测试事实写文件以及文件备份时功能无误。
3 本系列文章开发语言使用Go

从kafka中读取消息


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
1func main(){
2   fmt.Println("consumer begin...")
3   config := sarama.NewConfig()
4   config.Consumer.Return.Errors = true
5   wg  :=sync.WaitGroup{}
6   //创建消费者
7   consumer, err := sarama.NewConsumer([]string{"localhost:9092"},config)
8   if err != nil {
9       fmt.Println("consumer create failed, error is ", err.Error())
10      return
11  }
12  defer consumer.Close()
13 
14  //Partitions(topic):该方法返回了该topic的所有分区id
15    partitionList, err := consumer.Partitions("test")
16    if err != nil {
17      fmt.Println("get consumer partitions failed")
18      fmt.Println("error is ", err.Error())
19      return
20    }
21
22  for partition := range partitionList {
23      //ConsumePartition方法根据主题,
24      //分区和给定的偏移量创建创建了相应的分区消费者
25      //如果该分区消费者已经消费了该信息将会返回error
26      //OffsetNewest消费最新数据
27        pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
28        if err != nil {
29            panic(err)
30      }
31      //异步关闭,保证数据落盘
32        defer pc.AsyncClose()
33        wg.Add(1)
34        go func(sarama.PartitionConsumer) {
35            defer wg.Done()
36            //Messages()该方法返回一个消费消息类型的只读通道,由代理产生
37            for msg := range pc.Messages() {
38              fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n",
39              msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
40            }
41        }(pc)
42    }
43    wg.Wait()
44    consumer.Close()
45 
46}
47

这样我们启动zookeeper和kafka后,分别运行前文实现的向kafka中写入数据的代码,以及现在的从kafka中消费的代码,看到如下效果

实现文件监控

实现文件监控,主要是在文件中有内容写入时,程序可以及时获取写入的内容,类似于Linux命令中的tailf -f 某个文件的功能。
golang 中提供了tail库,我们借助这个库完成指定文件的监控,我的文件组织如下

 logdir文件夹下的log.txt记录的是不断增加的日志文件
tailf文件夹下logtailf.go实现log.txt监控功能。
writefile文件夹下writefile.go实现的是向log.txt文件写日志并备份的功能。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1func main() {
2   logrelative := `../logdir/log.txt`
3   _, filename, _, _ := runtime.Caller(0)
4   fmt.Println(filename)
5   datapath := path.Join(path.Dir(filename), logrelative)
6   fmt.Println(datapath)
7   tailFile, err := tail.TailFile(datapath, tail.Config{
8       //文件被移除或被打包,需要重新打开
9       ReOpen: true,
10      //实时跟踪
11      Follow: true,
12      //如果程序出现异常,保存上次读取的位置,避免重新读取
13      Location: &tail.SeekInfo{Offset: 0, Whence: 2},
14      //支持文件不存在
15      MustExist: false,
16      Poll:      true,
17  })
18
19  if err != nil {
20      fmt.Println("tail file err:", err)
21      return
22  }
23
24  for true {
25      msg, ok := <-tailFile.Lines
26      if !ok {
27          fmt.Printf("tail file close reopen, filename: %s\n", tailFile.Filename)
28          time.Sleep(100 * time.Millisecond)
29          continue
30      }
31      //fmt.Println("msg:", msg)
32      //只打印text
33      fmt.Println("msg:", msg.Text)
34  }
35}
36

为了测试监控的功能。我们实现向log.txt中每隔0.1s写入一行”Hello+时间戳”的日志。当写入20条内容后我们将log.txt备份重命名。
然后创建新的log.txt继续写入。
在writefile.go实现一个函数定时写入,并且备份功能


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1func writeLog(datapath string) {
2   filew, err := os.OpenFile(datapath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
3   if err != nil {
4       fmt.Println("open file error ", err.Error())
5       return
6   }
7
8   w := bufio.NewWriter(filew)
9   for i := 0; i < 20; i++ {
10      timeStr := time.Now().Format("2006-01-02 15:04:05")
11      fmt.Fprintln(w, "Hello current time is "+timeStr)
12      time.Sleep(time.Millisecond * 100)
13      w.Flush()
14  }
15  logBak := time.Now().Format("20060102150405") + ".txt"
16  logBak = path.Join(path.Dir(datapath), logBak)
17  filew.Close()
18  err = os.Rename(datapath, logBak)
19  if err != nil {
20      fmt.Println("Rename error ", err.Error())
21      return
22  }
23}
24

然后我们实现main函数,调用三次writeLog,这样会产生三个备份文件


1
2
3
4
5
6
7
8
9
10
1func main() {
2   logrelative := `../logdir/log.txt`
3   _, filename, _, _ := runtime.Caller(0)
4   fmt.Println(filename)
5   datapath := path.Join(path.Dir(filename), logrelative)
6   for i := 0; i < 3; i++ {
7       writeLog(datapath)
8   }
9}
10

我们分别启动文件监控和文件写入程序,效果如下
可以看到,当log.txt有内容写入时,logtailf.go实现了动态监控,而且当文件备份时,logtailf.go提示了文件被重命名备份。
最终我们看到产生三个备份文件

总结

目前我们已经完成了kafka消息读写,文件监控,动态写入和备份等功能,接下来我们实现项目的配置化和统筹代码。

 

给TA打赏
共{{data.count}}人
人已打赏
安全经验

如何避免Adsense违规封号

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索