以下是我的flume配置文件,有一些重要的配置项我都加了注释。
############################################
# producer config
###########################################
#agent section
producer.sources = s
producer.channels = c c1
producer.sinks = r r1
#source section
producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /var/log/nginx/access.log
producer.sources.s.channels = c c1
# Each sink's type must be defined
producer.sinks.r1.channel = c1
producer.sinks.r1.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r1.metadata.broker.list=192.168.166.31:9092
producer.sinks.r1.partition.key=0
producer.sinks.r1.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r1.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r1.request.required.acks=0
producer.sinks.r1.max.message.size=1000000
producer.sinks.r1.producer.type=sync
producer.sinks.r1.custom.encoding=UTF-8
producer.sinks.r1.custom.topic.name=test
producer.sinks.r.type = hdfs
producer.sinks.r.channel = c
producer.sinks.r.hdfs.fileType = DataStream
producer.sinks.r.hdfs.path = hdfs://hadoop001:8020/flume/%y-%m-%d
producer.sinks.r.hdfs.writeFormat = Text
producer.sinks.r.hdfs.rollInterval = 0
#文件大小128M
producer.sinks.r.hdfs.rollSize = 128000000
producer.sinks.r.hdfs.rollCount = 0
#1000文件写一次
producer.sinks.r.hdfs.batchSize = 1000
#如果60秒没有写入,会关闭当前文件,下次再次写入时会重新生成一个文件
producer.sinks.r.hdfs.idleTimeout = 60
#日志文件的后缀
producer.sinks.r.hdfs.fileSuffix = .log
#日志文件的前缀
#producer.sinks.r.hdfs.filePrefix = events
producer.sinks.r.hdfs.useLocalTimeStamp = true
producer.sinks.r.max.message.size = 1000000
producer.sinks.r.producer.type = sync
producer.sinks.r.custom.encoding = UTF-8
#Specify the channel the sink should use
producer.channels.c.type = memory
producer.channels.c.capacity = 500000
producer.channels.c.transactionCapacity=600
producer.channels.c1.type = memory
producer.channels.c1.capacity = 500000
producer.channels.c1.transactionCapacity=600
############################################
# consumer config
# 下面的配置都是默认配置
###########################################
consumer.sources = s
consumer.channels = c
consumer.sinks = r
consumer.sources.s.type = seq
consumer.sources.s.channels = c
consumer.sinks.r.type = logger
consumer.sinks.r.channel = c
consumer.channels.c.type = memory
consumer.channels.c.capacity = 100
consumer.sources.s.type = org.apache.flume.plugins.KafkaSource
consumer.sources.s.zookeeper.connect=127.0.0.1:2181
consumer.sources.s.group.id=testGroup
consumer.sources.s.zookeeper.session.timeout.ms=400
consumer.sources.s.zookeeper.sync.time.ms=200
consumer.sources.s.auto.commit.interval.ms=1000
consumer.sources.s.custom.topic.name=test
consumer.sources.s.custom.thread.per.consumer=4