HBase 系列(六)——HBase Java API 的基本使用

释放双眼,带上耳机,听听看~!

一、简述

截至到目前(2019.04),HBase 有两个主要的版本,分别是1.x 和 2.x ,两个版本的Java API有所不同,1.x 中某些方法在2.x中被标识为@deprecated过时。所以下面关于API的样例,我会分别给出1.x和2.x两个版本。完整的代码见本仓库:

  • Java API 1.x Examples
  • Java API 2.x Examples

同时你使用的客户端的版本必须与服务端版本保持一致,如果用2.x版本的客户端代码去连接1.x版本的服务端,会抛出NoSuchColumnFamilyException等异常。

二、Java API 1.x 基本使用

2.1 新建Maven工程,导入项目依赖

要使用Java API 操作HBase,需要引入hbase-client。这里选取的HBase Client的版本为1.2.0。


1
2
3
4
5
6
7
1<dependency>
2    <groupId>org.apache.hbase</groupId>
3    <artifactId>hbase-client</artifactId>
4    <version>1.2.0</version>
5</dependency>
6
7

2.2 API 基本使用


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
1public class HBaseUtils {
2
3    private static Connection connection;
4
5    static {
6        Configuration configuration = HBaseConfiguration.create();
7        configuration.set("hbase.zookeeper.property.clientPort", "2181");
8        // 如果是集群 则主机名用逗号分隔
9        configuration.set("hbase.zookeeper.quorum", "hadoop001");
10        try {
11            connection = ConnectionFactory.createConnection(configuration);
12        } catch (IOException e) {
13            e.printStackTrace();
14        }
15    }
16
17    /**
18     * 创建HBase表
19     *
20     * @param tableName      表名
21     * @param columnFamilies 列族的数组
22     */
23    public static boolean createTable(String tableName, List<String> columnFamilies) {
24        try {
25            HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
26            if (admin.tableExists(tableName)) {
27                return false;
28            }
29            HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
30            columnFamilies.forEach(columnFamily -> {
31                HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily);
32                columnDescriptor.setMaxVersions(1);
33                tableDescriptor.addFamily(columnDescriptor);
34            });
35            admin.createTable(tableDescriptor);
36        } catch (IOException e) {
37            e.printStackTrace();
38        }
39        return true;
40    }
41
42
43    /**
44     * 删除hBase表
45     *
46     * @param tableName 表名
47     */
48    public static boolean deleteTable(String tableName) {
49        try {
50            HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
51            // 删除表前需要先禁用表
52            admin.disableTable(tableName);
53            admin.deleteTable(tableName);
54        } catch (Exception e) {
55            e.printStackTrace();
56        }
57        return true;
58    }
59
60    /**
61     * 插入数据
62     *
63     * @param tableName        表名
64     * @param rowKey           唯一标识
65     * @param columnFamilyName 列族名
66     * @param qualifier        列标识
67     * @param value            数据
68     */
69    public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier,
70                                 String value) {
71        try {
72            Table table = connection.getTable(TableName.valueOf(tableName));
73            Put put = new Put(Bytes.toBytes(rowKey));
74            put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
75            table.put(put);
76            table.close();
77        } catch (IOException e) {
78            e.printStackTrace();
79        }
80        return true;
81    }
82
83
84    /**
85     * 插入数据
86     *
87     * @param tableName        表名
88     * @param rowKey           唯一标识
89     * @param columnFamilyName 列族名
90     * @param pairList         列标识和值的集合
91     */
92    public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
93        try {
94            Table table = connection.getTable(TableName.valueOf(tableName));
95            Put put = new Put(Bytes.toBytes(rowKey));
96            pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue())));
97            table.put(put);
98            table.close();
99        } catch (IOException e) {
100            e.printStackTrace();
101        }
102        return true;
103    }
104
105
106    /**
107     * 根据rowKey获取指定行的数据
108     *
109     * @param tableName 表名
110     * @param rowKey    唯一标识
111     */
112    public static Result getRow(String tableName, String rowKey) {
113        try {
114            Table table = connection.getTable(TableName.valueOf(tableName));
115            Get get = new Get(Bytes.toBytes(rowKey));
116            return table.get(get);
117        } catch (IOException e) {
118            e.printStackTrace();
119        }
120        return null;
121    }
122
123
124    /**
125     * 获取指定行指定列(cell)的最新版本的数据
126     *
127     * @param tableName    表名
128     * @param rowKey       唯一标识
129     * @param columnFamily 列族
130     * @param qualifier    列标识
131     */
132    public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) {
133        try {
134            Table table = connection.getTable(TableName.valueOf(tableName));
135            Get get = new Get(Bytes.toBytes(rowKey));
136            if (!get.isCheckExistenceOnly()) {
137                get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
138                Result result = table.get(get);
139                byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
140                return Bytes.toString(resultValue);
141            } else {
142                return null;
143            }
144
145        } catch (IOException e) {
146            e.printStackTrace();
147        }
148        return null;
149    }
150
151
152    /**
153     * 检索全表
154     *
155     * @param tableName 表名
156     */
157    public static ResultScanner getScanner(String tableName) {
158        try {
159            Table table = connection.getTable(TableName.valueOf(tableName));
160            Scan scan = new Scan();
161            return table.getScanner(scan);
162        } catch (IOException e) {
163            e.printStackTrace();
164        }
165        return null;
166    }
167
168
169    /**
170     * 检索表中指定数据
171     *
172     * @param tableName  表名
173     * @param filterList 过滤器
174     */
175
176    public static ResultScanner getScanner(String tableName, FilterList filterList) {
177        try {
178            Table table = connection.getTable(TableName.valueOf(tableName));
179            Scan scan = new Scan();
180            scan.setFilter(filterList);
181            return table.getScanner(scan);
182        } catch (IOException e) {
183            e.printStackTrace();
184        }
185        return null;
186    }
187
188    /**
189     * 检索表中指定数据
190     *
191     * @param tableName   表名
192     * @param startRowKey 起始RowKey
193     * @param endRowKey   终止RowKey
194     * @param filterList  过滤器
195     */
196
197    public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey,
198                                           FilterList filterList) {
199        try {
200            Table table = connection.getTable(TableName.valueOf(tableName));
201            Scan scan = new Scan();
202            scan.setStartRow(Bytes.toBytes(startRowKey));
203            scan.setStopRow(Bytes.toBytes(endRowKey));
204            scan.setFilter(filterList);
205            return table.getScanner(scan);
206        } catch (IOException e) {
207            e.printStackTrace();
208        }
209        return null;
210    }
211
212    /**
213     * 删除指定行记录
214     *
215     * @param tableName 表名
216     * @param rowKey    唯一标识
217     */
218    public static boolean deleteRow(String tableName, String rowKey) {
219        try {
220            Table table = connection.getTable(TableName.valueOf(tableName));
221            Delete delete = new Delete(Bytes.toBytes(rowKey));
222            table.delete(delete);
223        } catch (IOException e) {
224            e.printStackTrace();
225        }
226        return true;
227    }
228
229
230    /**
231     * 删除指定行的指定列
232     *
233     * @param tableName  表名
234     * @param rowKey     唯一标识
235     * @param familyName 列族
236     * @param qualifier  列标识
237     */
238    public static boolean deleteColumn(String tableName, String rowKey, String familyName,
239                                          String qualifier) {
240        try {
241            Table table = connection.getTable(TableName.valueOf(tableName));
242            Delete delete = new Delete(Bytes.toBytes(rowKey));
243            delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
244            table.delete(delete);
245            table.close();
246        } catch (IOException e) {
247            e.printStackTrace();
248        }
249        return true;
250    }
251
252}
253
254

2.3 单元测试

以单元测试的方式对上面封装的API进行测试。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
1public class HBaseUtilsTest {
2
3    private static final String TABLE_NAME = "class";
4    private static final String TEACHER = "teacher";
5    private static final String STUDENT = "student";
6
7    @Test
8    public void createTable() {
9        // 新建表
10        List<String> columnFamilies = Arrays.asList(TEACHER, STUDENT);
11        boolean table = HBaseUtils.createTable(TABLE_NAME, columnFamilies);
12        System.out.println("表创建结果:" + table);
13    }
14
15    @Test
16    public void insertData() {
17        List<Pair<String, String>> pairs1 = Arrays.asList(new Pair<>("name", "Tom"),
18                new Pair<>("age", "22"),
19                new Pair<>("gender", "1"));
20        HBaseUtils.putRow(TABLE_NAME, "rowKey1", STUDENT, pairs1);
21
22        List<Pair<String, String>> pairs2 = Arrays.asList(new Pair<>("name", "Jack"),
23                new Pair<>("age", "33"),
24                new Pair<>("gender", "2"));
25        HBaseUtils.putRow(TABLE_NAME, "rowKey2", STUDENT, pairs2);
26
27        List<Pair<String, String>> pairs3 = Arrays.asList(new Pair<>("name", "Mike"),
28                new Pair<>("age", "44"),
29                new Pair<>("gender", "1"));
30        HBaseUtils.putRow(TABLE_NAME, "rowKey3", STUDENT, pairs3);
31    }
32
33
34    @Test
35    public void getRow() {
36        Result result = HBaseUtils.getRow(TABLE_NAME, "rowKey1");
37        if (result != null) {
38            System.out.println(Bytes
39                    .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))));
40        }
41
42    }
43
44    @Test
45    public void getCell() {
46        String cell = HBaseUtils.getCell(TABLE_NAME, "rowKey2", STUDENT, "age");
47        System.out.println("cell age :" + cell);
48
49    }
50
51    @Test
52    public void getScanner() {
53        ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME);
54        if (scanner != null) {
55            scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes
56                    .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name")))));
57            scanner.close();
58        }
59    }
60
61
62    @Test
63    public void getScannerWithFilter() {
64        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
65        SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes(STUDENT),
66                Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("Jack"));
67        filterList.addFilter(nameFilter);
68        ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME, filterList);
69        if (scanner != null) {
70            scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes
71                    .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name")))));
72            scanner.close();
73        }
74    }
75
76    @Test
77    public void deleteColumn() {
78        boolean b = HBaseUtils.deleteColumn(TABLE_NAME, "rowKey2", STUDENT, "age");
79        System.out.println("删除结果: " + b);
80    }
81
82    @Test
83    public void deleteRow() {
84        boolean b = HBaseUtils.deleteRow(TABLE_NAME, "rowKey2");
85        System.out.println("删除结果: " + b);
86    }
87
88    @Test
89    public void deleteTable() {
90        boolean b = HBaseUtils.deleteTable(TABLE_NAME);
91        System.out.println("删除结果: " + b);
92    }
93}
94
95

三、Java API 2.x 基本使用

3.1 新建Maven工程,导入项目依赖

这里选取的HBase Client的版本为最新的2.1.4。


1
2
3
4
5
6
7
1<dependency>
2    <groupId>org.apache.hbase</groupId>
3    <artifactId>hbase-client</artifactId>
4    <version>2.1.4</version>
5</dependency>
6
7

3.2 API 的基本使用

2.x 版本相比于1.x 废弃了一部分方法,关于废弃的方法在源码中都会指明新的替代方法,比如,在2.x中创建表时:HTableDescriptor和HColumnDescriptor等类都标识为废弃,取而代之的是使用TableDescriptorBuilder和ColumnFamilyDescriptorBuilder来定义表和列族。

以下为HBase 2.x 版本Java API的使用示例:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
1public class HBaseUtils {
2
3    private static Connection connection;
4
5    static {
6        Configuration configuration = HBaseConfiguration.create();
7        configuration.set("hbase.zookeeper.property.clientPort", "2181");
8        // 如果是集群 则主机名用逗号分隔
9        configuration.set("hbase.zookeeper.quorum", "hadoop001");
10        try {
11            connection = ConnectionFactory.createConnection(configuration);
12        } catch (IOException e) {
13            e.printStackTrace();
14        }
15    }
16
17    /**
18     * 创建HBase表
19     *
20     * @param tableName      表名
21     * @param columnFamilies 列族的数组
22     */
23    public static boolean createTable(String tableName, List<String> columnFamilies) {
24        try {
25            HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
26            if (admin.tableExists(TableName.valueOf(tableName))) {
27                return false;
28            }
29            TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
30            columnFamilies.forEach(columnFamily -> {
31                ColumnFamilyDescriptorBuilder cfDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
32                cfDescriptorBuilder.setMaxVersions(1);
33                ColumnFamilyDescriptor familyDescriptor = cfDescriptorBuilder.build();
34                tableDescriptor.setColumnFamily(familyDescriptor);
35            });
36            admin.createTable(tableDescriptor.build());
37        } catch (IOException e) {
38            e.printStackTrace();
39        }
40        return true;
41    }
42
43
44    /**
45     * 删除hBase表
46     *
47     * @param tableName 表名
48     */
49    public static boolean deleteTable(String tableName) {
50        try {
51            HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
52            // 删除表前需要先禁用表
53            admin.disableTable(TableName.valueOf(tableName));
54            admin.deleteTable(TableName.valueOf(tableName));
55        } catch (Exception e) {
56            e.printStackTrace();
57        }
58        return true;
59    }
60
61    /**
62     * 插入数据
63     *
64     * @param tableName        表名
65     * @param rowKey           唯一标识
66     * @param columnFamilyName 列族名
67     * @param qualifier        列标识
68     * @param value            数据
69     */
70    public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier,
71                                 String value) {
72        try {
73            Table table = connection.getTable(TableName.valueOf(tableName));
74            Put put = new Put(Bytes.toBytes(rowKey));
75            put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
76            table.put(put);
77            table.close();
78        } catch (IOException e) {
79            e.printStackTrace();
80        }
81        return true;
82    }
83
84
85    /**
86     * 插入数据
87     *
88     * @param tableName        表名
89     * @param rowKey           唯一标识
90     * @param columnFamilyName 列族名
91     * @param pairList         列标识和值的集合
92     */
93    public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
94        try {
95            Table table = connection.getTable(TableName.valueOf(tableName));
96            Put put = new Put(Bytes.toBytes(rowKey));
97            pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue())));
98            table.put(put);
99            table.close();
100        } catch (IOException e) {
101            e.printStackTrace();
102        }
103        return true;
104    }
105
106
107    /**
108     * 根据rowKey获取指定行的数据
109     *
110     * @param tableName 表名
111     * @param rowKey    唯一标识
112     */
113    public static Result getRow(String tableName, String rowKey) {
114        try {
115            Table table = connection.getTable(TableName.valueOf(tableName));
116            Get get = new Get(Bytes.toBytes(rowKey));
117            return table.get(get);
118        } catch (IOException e) {
119            e.printStackTrace();
120        }
121        return null;
122    }
123
124
125    /**
126     * 获取指定行指定列(cell)的最新版本的数据
127     *
128     * @param tableName    表名
129     * @param rowKey       唯一标识
130     * @param columnFamily 列族
131     * @param qualifier    列标识
132     */
133    public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) {
134        try {
135            Table table = connection.getTable(TableName.valueOf(tableName));
136            Get get = new Get(Bytes.toBytes(rowKey));
137            if (!get.isCheckExistenceOnly()) {
138                get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
139                Result result = table.get(get);
140                byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
141                return Bytes.toString(resultValue);
142            } else {
143                return null;
144            }
145
146        } catch (IOException e) {
147            e.printStackTrace();
148        }
149        return null;
150    }
151
152
153    /**
154     * 检索全表
155     *
156     * @param tableName 表名
157     */
158    public static ResultScanner getScanner(String tableName) {
159        try {
160            Table table = connection.getTable(TableName.valueOf(tableName));
161            Scan scan = new Scan();
162            return table.getScanner(scan);
163        } catch (IOException e) {
164            e.printStackTrace();
165        }
166        return null;
167    }
168
169
170    /**
171     * 检索表中指定数据
172     *
173     * @param tableName  表名
174     * @param filterList 过滤器
175     */
176
177    public static ResultScanner getScanner(String tableName, FilterList filterList) {
178        try {
179            Table table = connection.getTable(TableName.valueOf(tableName));
180            Scan scan = new Scan();
181            scan.setFilter(filterList);
182            return table.getScanner(scan);
183        } catch (IOException e) {
184            e.printStackTrace();
185        }
186        return null;
187    }
188
189    /**
190     * 检索表中指定数据
191     *
192     * @param tableName   表名
193     * @param startRowKey 起始RowKey
194     * @param endRowKey   终止RowKey
195     * @param filterList  过滤器
196     */
197
198    public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey,
199                                           FilterList filterList) {
200        try {
201            Table table = connection.getTable(TableName.valueOf(tableName));
202            Scan scan = new Scan();
203            scan.withStartRow(Bytes.toBytes(startRowKey));
204            scan.withStopRow(Bytes.toBytes(endRowKey));
205            scan.setFilter(filterList);
206            return table.getScanner(scan);
207        } catch (IOException e) {
208            e.printStackTrace();
209        }
210        return null;
211    }
212
213    /**
214     * 删除指定行记录
215     *
216     * @param tableName 表名
217     * @param rowKey    唯一标识
218     */
219    public static boolean deleteRow(String tableName, String rowKey) {
220        try {
221            Table table = connection.getTable(TableName.valueOf(tableName));
222            Delete delete = new Delete(Bytes.toBytes(rowKey));
223            table.delete(delete);
224        } catch (IOException e) {
225            e.printStackTrace();
226        }
227        return true;
228    }
229
230
231    /**
232     * 删除指定行指定列
233     *
234     * @param tableName  表名
235     * @param rowKey     唯一标识
236     * @param familyName 列族
237     * @param qualifier  列标识
238     */
239    public static boolean deleteColumn(String tableName, String rowKey, String familyName,
240                                          String qualifier) {
241        try {
242            Table table = connection.getTable(TableName.valueOf(tableName));
243            Delete delete = new Delete(Bytes.toBytes(rowKey));
244            delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
245            table.delete(delete);
246            table.close();
247        } catch (IOException e) {
248            e.printStackTrace();
249        }
250        return true;
251    }
252
253}
254
255

四、正确连接Hbase

在上面的代码中,在类加载时就初始化了Connection连接,并且之后的方法都是复用这个Connection,这时我们可能会考虑是否可以使用自定义连接池来获取更好的性能表现?实际上这是没有必要的。

首先官方对于Connection的使用说明如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1Connection Pooling For applications which require high-end multithreaded  
2access (e.g., web-servers or  application servers  that may serve many  
3application threads in a single JVM), you can pre-create a Connection,  
4as shown in the following example:
5
6对于高并发多线程访问的应用程序(例如,在单个JVM中存在的为多个线程服务的Web服务器或应用程序服务器),  
7您只需要预先创建一个Connection。例子如下:
8
9// Create a connection to the cluster.
10Configuration conf = HBaseConfiguration.create();
11try (Connection connection = ConnectionFactory.createConnection(conf);
12     Table table = connection.getTable(TableName.valueOf(tablename))) {
13  // use table as needed, the table returned is lightweight
14}
15
16

之所以能这样使用,这是因为Connection并不是一个简单的socket连接,接口文档中对Connection的表述是:


1
2
3
4
5
6
7
8
9
1A cluster connection encapsulating lower level individual connections to actual servers and a  
2connection to zookeeper.  Connections are instantiated through the ConnectionFactory class.  
3The lifecycle of the connection is managed by the caller,  who has to close() the connection  
4to release the resources.
5
6Connection是一个集群连接,封装了与多台服务器(Matser/Region Server)的底层连接以及与zookeeper的连接。  
7连接通过ConnectionFactory  类实例化。连接的生命周期由调用者管理,调用者必须使用close()关闭连接以释放资源。
8
9

之所以封装这些连接,是因为HBase客户端需要连接三个不同的服务角色:

  • Zookeeper :主要用于获取meta表的位置信息,Master的信息;
  • HBase Master :主要用于执行HBaseAdmin接口的一些操作,例如建表等;
  • HBase RegionServer :用于读、写数据。

Connection对象和实际的Socket连接之间的对应关系如下图:

上面两张图片引用自博客:连接HBase的正确姿势

在HBase客户端代码中,真正对应Socket连接的是RpcConnection对象。HBase使用PoolMap这种数据结构来存储客户端到HBase服务器之间的连接。PoolMap的内部有一个ConcurrentHashMap实例,其key是ConnectionId(封装了服务器地址和用户ticket),value是一个RpcConnection对象的资源池。当HBase需要连接一个服务器时,首先会根据ConnectionId找到对应的连接池,然后从连接池中取出一个连接对象。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1@InterfaceAudience.Private
2public class PoolMap<K, V> implements Map<K, V> {
3  private PoolType poolType;
4
5  private int poolMaxSize;
6
7  private Map<K, Pool<V>> pools = new ConcurrentHashMap<>();
8
9  public PoolMap(PoolType poolType) {
10    this.poolType = poolType;
11  }
12  .....
13
14

HBase中提供了三种资源池的实现,分别是Reusable,RoundRobin和ThreadLocal。具体实现可以通hbase.client.ipc.pool.type配置项指定,默认为Reusable。连接池的大小也可以通过hbase.client.ipc.pool.size配置项指定,默认为1,即每个Server 1个连接。也可以通过修改配置实现:


1
2
3
4
5
1config.set("hbase.client.ipc.pool.type",...);
2config.set("hbase.client.ipc.pool.size",...);
3connection = ConnectionFactory.createConnection(config);
4
5

由此可以看出HBase中Connection类已经实现了对连接的管理功能,所以我们不必在Connection上在做额外的管理。

另外,Connection是线程安全的,但Table和Admin却不是线程安全的,因此正确的做法是一个进程共用一个Connection对象,而在不同的线程中使用单独的Table和Admin对象。Table和Admin的获取操作getTable()和getAdmin()都是轻量级,所以不必担心性能的消耗,同时建议在使用完成后显示的调用close()方法来关闭它们。

参考资料

  1. 连接HBase的正确姿势
  2. Apache HBase ™ Reference Guide

更多大数据系列文章可以参见个人 GitHub 开源项目: 程序员大数据入门指南

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索