hadoop生态系统学习之路(十一)Zookeeper的简单使用

释放双眼,带上耳机,听听看~!

今天来给大家介绍下zookeeper的简单使用。之前使用的hbase就依赖zookeeper,选举master。
下面,笔者将分以下几个步骤进行介绍:

一、zookeeper的基本原理

数据模型,如下:
ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每个ZNode都可以通过其路径唯一标识,比如上图中第三层的第一个ZNode, 它的路径是/app1/c1。在每个ZNode上可存储少量数据(默认是1M, 可以通过配置修改, 通常不建议在ZNode上存储大量的数据),这个特性非常有用。另外,每个ZNode上还存储了其Acl信息,这里需要注意,虽说ZNode的树形结构跟Unix文件系统很类似,但是其Acl与Unix文件系统是完全不同的,每个ZNode的Acl的独立的,子结点不会继承父结点的。
ZooKeeper特性:
1、读、写(更新)模式
在ZooKeeper集群中,读可以从任意一个ZooKeeper Server读,这一点是保证ZooKeeper比较好的读性能的关键;写的请求会先Forwarder到Leader,然后由Leader来通过ZooKeeper中的原子广播协议,将请求广播给所有的Follower,Leader收到一半以上的写成功的Ack后,就认为该写成功了,就会将该写进行持久化,并告诉客户端写成功了。

2、 WAL和Snapshot
和大多数分布式系统一样,ZooKeeper也有WAL(Write-Ahead-Log),对于每一个更新操作,ZooKeeper都会先写WAL, 然后再对内存中的数据做更新,然后向Client通知更新结果。另外,ZooKeeper还会定期将内存中的目录树进行Snapshot,落地到磁盘上,这个跟HDFS中的FSImage是比较类似的。这么做的主要目的,一当然是数据的持久化,二是加快重启之后的恢复速度,如果全部通过Replay WAL的形式恢复的话,会比较慢。

3、FIFO
对于每一个ZooKeeper客户端而言,所有的操作都是遵循FIFO顺序的,这一特性是由下面两个基本特性来保证的:一是ZooKeeper Client与Server之间的网络通信是基于TCP,TCP保证了Client/Server之间传输包的顺序;二是ZooKeeper Server执行客户端请求也是严格按照FIFO顺序的。

4、 Linearizability
在ZooKeeper中,所有的更新操作都有严格的偏序关系,更新操作都是串行执行的,这一点是保证ZooKeeper功能正确性的关键。

二、zookeeper的常用命令

我们可以执行zookeeper-client或者执行/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/zookeeper/bin/zkCli.sh -server localhost,进入zookeeper命令行,如下:
然后,执行ls /可以看到:
然后,我们可以执行create /qyktest ‘qyktest’创建一个节点,如下:
然后,我们执行get /qyktest获取节点值,如下:
然后,我们可以执行set /qyktest ‘111’修改节点的值,如下:
最后,我们执行delete /qyktest便可删除此节点。
另外,我们还可以在qyktest此节点下继续创建子节点。
好了,几个基本命令就讲到这人啦,其它的命令还有很多,大家可以去查阅下资料。

三、zookeeper的java api操作

关于java api操作zookeeper比较简单,笔者直接贴出代码,如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
1/**
2 * Project Name:hive-demo
3 * File Name:ZookeeperClient.java
4 * Package Name:org.zookeeper.demo
5 * Date:2016年4月11日下午9:14:18
6 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
7 *
8*/
9
10package org.zookeeper.demo;
11/**
12 * ClassName:ZookeeperClient <br/>
13 * Function: TODO ADD FUNCTION. <br/>
14 * Reason:   TODO ADD REASON. <br/>
15 * Date:     2016年4月11日 下午9:14:18 <br/>
16 * @author   qiyongkang
17 * @version  
18 * @since    JDK 1.6
19 * @see      
20 */
21
22import java.io.IOException;
23import java.util.concurrent.CountDownLatch;
24
25import org.apache.zookeeper.CreateMode;
26import org.apache.zookeeper.KeeperException;
27import org.apache.zookeeper.WatchedEvent;
28import org.apache.zookeeper.Watcher;
29import org.apache.zookeeper.Watcher.Event.KeeperState;
30import org.apache.zookeeper.ZooDefs.Ids;
31import org.apache.zookeeper.ZooKeeper;
32
33/**
34 *
35 * ClassName: ZookeeperClient <br/>
36 * Function: zookeeper api <br/>
37 * date: 2016年4月12日 下午8:05:43 <br/>
38 *
39 * @author qiyongkang
40 * @version
41 * @since JDK 1.6
42 */
43public class ZookeeperClient implements Watcher {
44    //连接超时时间,10s
45    private static final int SESSION_TIMEOUT = 10000;
46
47    //连接的zookeeper server
48    private static final String CONNECTION_STRING = "172.31.25.8:2181";
49
50    private static final String ZK_PATH = "/qyktest";
51
52    private ZooKeeper zk = null;
53
54    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
55
56    /**
57     * 创建ZK连接
58     *
59     * @param connectString
60     *            ZK服务器地址列表
61     * @param sessionTimeout
62     *            Session超时时间
63     */
64    public void createConnection(String connectString, int sessionTimeout) {
65        this.releaseConnection();
66        try {
67            zk = new ZooKeeper(connectString, sessionTimeout, this);
68            connectedSemaphore.await();
69        } catch (InterruptedException e) {
70            System.out.println("连接创建失败,发生 InterruptedException");
71            e.printStackTrace();
72        } catch (IOException e) {
73            System.out.println("连接创建失败,发生 IOException");
74            e.printStackTrace();
75        }
76    }
77
78    /**
79     * 关闭ZK连接
80     */
81    public void releaseConnection() {
82        if (this.zk != null) {
83            try {
84                this.zk.close();
85            } catch (InterruptedException e) {
86                e.printStackTrace();
87            }
88        }
89    }
90
91    /**
92     * 创建节点
93     *
94     * @param path
95     *            节点path
96     * @param data
97     *            初始数据内容
98     * @return
99     */
100    public boolean createPath(String path, String data) {
101        try {
102            /**
103             * 此处采用的是CreateMode是PERSISTENT  表示The znode will not be automatically deleted upon client's disconnect.
104             * EPHEMERAL 表示The znode will be deleted upon the client's disconnect.
105             */
106            String result = this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
107            System.out.println("节点创建成功, Path: " + result + ", content: " + data);
108        } catch (KeeperException e) {
109            System.out.println("节点创建失败,发生KeeperException");
110            e.printStackTrace();
111        } catch (InterruptedException e) {
112            System.out.println("节点创建失败,发生 InterruptedException");
113            e.printStackTrace();
114        }
115        return true;
116    }
117
118    /**
119     * 读取指定节点数据内容
120     *
121     * @param path
122     *            节点path
123     * @return
124     */
125    public String readData(String path) {
126        try {
127            System.out.println("获取数据成功,path:" + path);
128            return new String(this.zk.getData(path, false, null));
129        } catch (KeeperException e) {
130            System.out.println("读取数据失败,发生KeeperException,path: " + path);
131            e.printStackTrace();
132            return "";
133        } catch (InterruptedException e) {
134            System.out.println("读取数据失败,发生 InterruptedException,path: " + path);
135            e.printStackTrace();
136            return "";
137        }
138    }
139
140    /**
141     * 更新指定节点数据内容
142     *
143     * @param path
144     *            节点path
145     * @param data
146     *            数据内容
147     * @return
148     */
149    public boolean writeData(String path, String data) {
150        try {
151            System.out.println("更新数据成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1));
152        } catch (KeeperException e) {
153            System.out.println("更新数据失败,发生KeeperException,path: " + path);
154            e.printStackTrace();
155        } catch (InterruptedException e) {
156            System.out.println("更新数据失败,发生 InterruptedException,path: " + path);
157            e.printStackTrace();
158        }
159        return false;
160    }
161
162    /**
163     * 删除指定节点
164     *
165     * @param path
166     *            节点path
167     */
168    public void deleteNode(String path) {
169        try {
170            this.zk.delete(path, -1);
171            System.out.println("删除节点成功,path:" + path);
172        } catch (KeeperException e) {
173            System.out.println("删除节点失败,发生KeeperException,path: " + path);
174            e.printStackTrace();
175        } catch (InterruptedException e) {
176            System.out.println("删除节点失败,发生 InterruptedException,path: " + path);
177            e.printStackTrace();
178        }
179    }
180
181    public static void main(String[] args) {
182        ZookeeperClient sample = new ZookeeperClient();
183        //获取连接
184        sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
185
186        //读数据
187        String qyk = sample.readData("/qyktest");
188        System.out.println("qyk:" + qyk);
189
190        String url = sample.readData("/qyk/db/url");
191        System.out.println("url" + url);
192
193        String driver = sample.readData("/qyk/db/driver");
194        System.out.println("driver" + driver);
195
196        String userName = sample.readData("/qyk/db/userName");
197        System.out.println("userName" + userName);
198
199        String password = sample.readData("/qyk/db/password");
200        System.out.println("password" + password);
201
202        //创建节点
203        sample.createPath(ZK_PATH, "我是节点初始内容");
204        System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");
205
206        //更新节点
207        sample.writeData(ZK_PATH, "更新后的数据");
208        System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");
209
210        //删除节点
211        sample.deleteNode(ZK_PATH);
212
213        //释放连接
214        sample.releaseConnection();
215    }
216
217    /**
218     * 收到来自Server的Watcher通知后的处理。
219     */
220    @Override
221    public void process(WatchedEvent event) {
222        System.out.println("收到事件通知:" + event.getState() + "\n");
223        if (KeeperState.SyncConnected == event.getState()) {
224            connectedSemaphore.countDown();
225        }
226
227    }
228
229}
230
231

然后,执行可以看到,控制台输出如下:

所以,像一些公用的配置,我们可以存到zookeeper里面,之后其它的服务就可以使用了。
好了,关于zookeeper的基本使用就说到这儿啦。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB最简单的入门教程之二 使用nodejs访问MongoDB

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索