kafka集群一键安装、启动、停止脚本

释放双眼,带上耳机,听听看~!

 kafka集群一键安装脚本


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
1#!/bin/bash
2#配置KAFKA的安装目录 修改的地方1 脚本可以自己创建
3currentTime=$(date '+%Y-%m-%d %H:%M:%S')
4echo -e "请输入kafka的安装目录,不存在脚本自动创建,最后一个/不要写 /bigdata/install"
5read kafkainstallpath
6
7#创建ES安装的目录
8if [ ! -d $kafkainstallpath ]; then
9   mkdir -p $kafkainstallpath
10fi
11if [ ! -d $kafkainstallpath ]; then
12  echo "创建目录$kafkainstallpath失败!请检查目录是否有权限"
13  exit
14fi
15
16#解压tar包
17currentdir=$(cd $(dirname $0); pwd)
18ls | grep 'kafka.*[gz]$'
19if [ $? -ne 0 ]; then
20   #当前目录没有kafka的压缩包
21   echo "在$currentdir下没有发现kafka*.tar.gz,请自行上传!"
22   exit
23else
24   #解压
25   tar -zxvf $currentdir/$(ls | grep 'kafka.*[gz]$') -C $kafkainstallpath
26fi
27
28kafkabanben=`ls $kafkainstallpath| grep 'kafka.*'`
29
30
31confpath=$kafkainstallpath/$kafkabanben/config
32#修改配置文件
33
34echo -e "请输入kafka节点id:唯一值 例如 1"
35read kafkanodename
36sed -i "s/^broker.id=0/broker.id=${kafkanodename}/g" $confpath/server.properties
37
38sed -i 's/^#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/:9092/g' $confpath/server.properties
39
40echo -e "请输入kafka日志存储目录:例如 /bigdata/data/kafka"
41read kafkalogspath
42
43#创建KAFKA日志存储目录
44if [ ! -d $kafkalogspath ]; then
45   mkdir -p $kafkalogspath
46fi
47if [ ! -d $kafkalogspath ]; then
48  echo "创建目录$kafkalogspath失败!请检查目录是否有权限"
49  exit
50fi
51
52bak_dir='log.dirs=/tmp/kafka-logs'
53new_dir='log.dirs='$kafkalogspath
54sed -i "s!${bak_dir}!${new_dir}!g" $confpath/server.properties
55
56echo -e '请输入zookeeper集群的所有节点:(严格按照示例格式) 例如cdh01:2181,cdh02:2181,cdh03:2181'
57read allhosts
58sed -i "s/^zookeeper.connect=localhost:2181/zookeeper.connect=$allhosts/g" $confpath/server.properties
59
60
61sed -i 's/^#delete.topic.enable=true/delete.topic.enable=true/g' $confpath/server.properties
62
63echo "log.cleanup.policy=delete" >>$confpath/server.properties
64
65sed -i 's/^#log.retention.bytes=1073741824/log.retention.bytes=1073741824/g' $confpath/server.properties
66
67#kafka参数优化
68
69sed -i 's/^log.retention.hours=16/log.retention.hours=72/g' $confpath/server.properties
70
71sed -i 's/^#log.flush.interval.messages=10000/log.flush.interval.messages=10000/g' $confpath/server.properties
72
73sed -i 's/^#log.flush.interval.ms=1000/log.flush.interval.ms=1000/g' $confpath/server.properties
74
75param=`cat /proc/cpuinfo | grep "cpu cores"| uniq`
76
77bak_count="num.network.threads=3"
78new_count="num.network.threads="$((${param:0-1:1}+1))
79sed -i "s!${bak_count}!${new_count}!g" $confpath/server.properties
80
81bak_io="num.network.threads=3"
82new_io="num.network.threads="$((${param:0-1:1}+${param:0-1:1}))
83sed -i "s!${bak_io}!${new_io}!g" $confpath/server.properties
84
85
86#PATH设置
87#末行插入
88echo "">>~/.bash_profile
89echo "#KAFKA $currentTime">>~/.bash_profile
90echo "export KAFKA_HOME=$kafkainstallpath/$kafkabanben">>~/.bash_profile
91echo 'export PATH=$PATH:$KAFKA_HOME/bin'>>~/.bash_profile
92source ~/.bash_profile
93
94
95echo -e "是否远程复制 请输入y/n"
96read flag
97if [[ $flag == "y" ]]; then
98
99#修改并分发安装文件
100espath=$kafkainstallpath/$kafkabanben
101espathtemp=$kafkainstallpath/$kafkabanben-temp
102cp -r $espath $espathtemp
103
104echo "以下输入的节点必须做免密登录"
105echo -e '请输入除当前之外的节点(当前节点cdh01),严格符合以下格式IP:kafkaID,空格隔开, cdh02:2 cdh03:3 cdh04:4 cdh05:5'
106read allnodes
107user=`whoami`
108array2=(${allnodes// / })
109for allnode in ${array2[@]}
110do
111 array3=(${allnode//:/ })
112 ip=${array3[0]}
113 esid=${array3[1]}
114 echo ======= $ip  =======
115
116 #修改文件
117 ssh $ip "rm -rf $espath"
118 ssh $ip "mkdir -p $espath"
119
120 bak_dir="broker.id=$kafkanodename"
121 new_dir="broker.id=$esid"
122
123 sed -i "s!${bak_dir}!${new_dir}!g" $espathtemp/config/server.properties
124
125 scp -r $espathtemp/* ${user}@$ip:$espath/
126
127 ssh $ip "echo ''>>~/.bash_profile"
128 ssh $ip "echo '#KAFKA $currentTime'>>~/.bash_profile"
129 ssh $ip "echo 'export KAFKA_HOME=$kafkainstallpath/$kafkabanben'>>~/.bash_profile"
130 ssh $ip 'echo "export PATH=\$PATH:\$KAFKA_HOME/bin">>~/.bash_profile'
131 ssh $ip "source ~/.bash_profile"
132
133
134 #再次修改回来 防止修改错误
135 new_dir="broker.id=$kafkanodename"
136 bak_dir="broker.id=$esid"
137 sed -i "s!${bak_dir}!${new_dir}!g" $espathtemp/config/server.properties
138
139 echo ======= $ip 远程复制完成  =======
140done
141
142#删除临时文件
143rm -rf $espathtemp
144
145fi
146
147

 

kafka集群一键启动脚本


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1#!/bin/bash
2kafkaServers='cdh01 cdh02 cdh03 cdh04 cdh05'
3#启动所有的kafka
4for kafka in $kafkaServers
5do
6    ssh -T $kafka <<EOF
7    source ~/.bash_profile
8    nohup kafka-server-start.sh /bigdata/kafka/config/server.properties 1>/dev/null 2>&1 &
9EOF
10echo 从节点 $kafka 启动kafka...[ done ]
11sleep 5
12done
13
14

 

kafka集群一键停止


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1#!/bin/bash
2kafkaServers='cdh01 cdh02 cdh03 cdh04 cdh05'
3#停止所有的kafka
4for kafka in $kafkaServers
5do
6    ssh -T $kafka <<EOF
7    source ~/.bash_profile
8    cd /bigdata/kafka
9    bin/kafka-server-stop.sh
10EOF
11echo 从节点 $kafka 停止kafka...[ done ]
12sleep 5
13done
14

 

给TA打赏
共{{data.count}}人
人已打赏
安全技术安全运维

Windows服务器如何发现被黑

2018-5-20 12:24:31

安全技术

用node.js从零开始去写一个简单的爬虫

2021-12-21 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索