hadoop生态系统学习之路(五)hbase的简单使用

释放双眼,带上耳机,听听看~!

最近,参与了公司的一个大数据接口平台的开发,具体的处理过程是这样的。我们公司负责数据的入库,也就是一个etl过程,使用MR将数据入到hive里面,然后同步到impala,然后此接口平台提供查询接口,前台会将sql语句以参数传过来,然后接口平台通过调用impala提供的java api接口,将数据查询出来返回给用户。另外,如果查询的数据量很大,那么前台就会传一个taskId过来,第一次只需将数据查询出来,入到impala临时表,下次再查便将数据返回。那么,如何记录此任务的状态变化呢,这里我们就使用到了hbase,以taskId为row key,然后创建一个列簇记录状态信息。
下面,分以下几步对hbase进行介绍。

一、hbase的基本原理

HBase是一个构建在HDFS上的分布式列存储系统,主要用于海量结构化数据存储。
hbase的特点:

  1. 大,一个表可以有数十亿行,上百万列;
  2. 无模式,每行都有一个可排序的主键和任意多的列,列可以根据需要动态的增加,同一张表中不同的行可以有截然不同的列;
  3. 面向列,面向列(族)的存储和权限控制,列(族)独立检索;
  4. 稀疏,空(null)列并不占用存储空间,表可以设计的非常稀疏;
  5. 数据多版本,每个单元中的数据可以有多个版本,默认情况下版本号自动分配,是单元格插入时的时间戳;
  6. 数据类型单一,Hbase中的数据都是字符串,没有类型。

下面,再来看看hbase相关的组件:
Master:为Region server分配region,负责Region server的负载均衡,发现失效的Region server并重新分配其上的region,管理用户对table的增删改查操作。
RegionServer:Regionserver维护region,处理对这些region的IO请求,Regionserver负责切分在运行过程中变得过大的region。
Zookeeper:通过选举,保证任何时候,集群中只有一个master,Master与RegionServers 启动时会向ZooKeeper注册,存贮所有Region的寻址入口,实时监控Region server的上线和下线信息,并实时通知给Master,存储HBase的schema和table元数据,默认情况下,HBase 管理ZooKeeper 实例,比如, 启动或者停止ZooKeeper。Zookeeper的引入使得Master不再是单点故障。
大概的介绍下,关于hbase表结构,笔者下面再进行介绍。

二、hbase的常用命令

首先,我们可以执行hbase shell进入hbase命令行,如下:
然后,执行list,可以看到所有的表,如下:
,接下来,我们可以describe ‘表名’来查看表结构,如下:
可以看到,这个表有一个列族info。
然后,我们可以使用scan ‘表名’来查看,整张表的数据。
下面,我们使用get ‘result_info’,’test02’获取表中某个row key的所有列值,如下:
好了,就说这几个命令,还有很多,大家可以查阅下,多练练就熟了。

三、hbase 的java api基本操作

hbase包依赖,如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1    <properties>
2      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3      <hadoop.version>2.3.0-cdh5.0.0</hadoop.version>
4      <hbase.version>0.96.1.1-cdh5.0.0</hbase.version>
5      <hive.version>0.12.0-cdh5.0.0</hive.version>
6    </properties>
7
8        <!-- habase 相关jar-->
9        <dependency>
10          <groupId>org.apache.hbase</groupId>
11          <artifactId>hbase-client</artifactId>
12          <version>${hbase.version}</version>
13          <exclusions>
14              <exclusion>
15                  <artifactId>jdk.tools</artifactId>
16                  <groupId>jdk.tools</groupId>
17              </exclusion>
18          </exclusions>
19        </dependency>
20        <dependency>
21            <groupId>org.apache.hbase</groupId>
22            <artifactId>hbase-common</artifactId>
23            <version>${hbase.version}</version>
24        </dependency>
25
26        <dependency>
27            <groupId>org.apache.hbase</groupId>
28            <artifactId>hbase-server</artifactId>
29            <version>${hbase.version}</version>
30        </dependency>
31        <dependency>
32            <groupId>org.apache.hbase</groupId>
33            <artifactId>hbase-thrift</artifactId>
34            <version>${hbase.version}</version>
35        </dependency>
36        <dependency>
37            <groupId>org.apache.hbase</groupId>
38            <artifactId>hbase-testing-util</artifactId>
39            <version>${hbase.version}</version>
40            <scope>test</scope>
41        </dependency>
42

首先,我直接贴出代码,如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
1package org.hbase.demo;
2
3import java.io.IOException;
4
5import org.apache.hadoop.conf.Configuration;
6import org.apache.hadoop.hbase.HBaseConfiguration;
7import org.apache.hadoop.hbase.client.Get;
8import org.apache.hadoop.hbase.client.HTable;
9import org.apache.hadoop.hbase.client.Put;
10import org.apache.hadoop.hbase.client.Result;
11import org.apache.hadoop.hbase.util.Bytes;
12
13/**
14 * 关键点1_:将自动提交关闭,如果不关闭,每写一条数据都会进行提交,是导入数据较慢的做主要因素。
15 * 关键点2:设置缓存大小,当缓存大于设置值时,hbase会自动提交。此处可自己尝试大小,一般对大数据量,设置为5M即可,本文设置为3M。
16 * 关键点3:每一个分片结束后都进行flushCommits(),如果不执行,当hbase最后缓存小于上面设定值时,不会进行提交,导致数据丢失。
17 *
18 * @author qiyongkang
19 *
20 */
21public class Example {
22
23    /**
24     *
25     * insertBatch: 批量插入. <br/>
26     *
27     * @author qiyongkang
28     * @throws IOException
29     * @since JDK 1.6
30     */
31    public static void insertBatch() throws IOException {
32        Configuration config = HBaseConfiguration.create();
33        config.set("hbase.zookeeper.quorum", "172.31.25.8,172.31.25.2,172.31.25.3");
34
35        HTable htable = new HTable(config, "qyk_info");
36        htable.setAutoFlush(false, false); // 关键点1
37        htable.setWriteBufferSize(3 * 1024 * 1024); // 关键点2
38
39        int num = 1;
40        while (num <= 10) {
41            Put put = new Put(Bytes.toBytes(num + ""));
42            put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
43            put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("qyk" + num));
44            put.add(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes(num + ""));
45            htable.put(put);
46
47            num++;
48
49            if (num % 100 == 0) {
50                System.out.println("..." + num);
51            }
52        }
53        htable.flushCommits();// 关键点3
54        htable.close();
55    }
56
57    /**
58     *
59     * insertSingle:单个插入. <br/>
60     *
61     * @author qiyongkang
62     * @throws IOException
63     * @since JDK 1.6
64     */
65    public static void insertSingle() throws IOException {
66        Configuration config = HBaseConfiguration.create();
67        config.set("hbase.zookeeper.quorum", "172.31.25.8,172.31.25.2,172.31.25.3");
68
69        HTable htable = new HTable(config, "qyk_info");
70        Put put = new Put(Bytes.toBytes("0"));
71        put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
72        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("qyk" + 0));
73        put.add(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes("0"));
74        htable.put(put);
75
76        htable.close();
77    }
78
79    /**
80     *
81     * getData:根据row key获取列信息. <br/>
82     *
83     * @author qiyongkang
84     * @throws IOException
85     * @since JDK 1.6
86     */
87    public static void getData() throws IOException {
88        Configuration config = HBaseConfiguration.create();
89        config.set("hbase.zookeeper.quorum", "172.31.25.8,172.31.25.2,172.31.25.3");
90
91        HTable htable = new HTable(config, "qyk_info");
92
93        Get get = new Get(Bytes.toBytes("1"));
94        Result result = htable.get(get);
95
96        String age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
97        String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
98        String id = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("id")));
99
100        System.out.println("age:" + age + ",name:" + name + ",id:" + id);
101
102        htable.close();
103    }
104
105    public static void main(String[] args) throws IOException {
106        //单个插入
107        insertSingle();
108
109        //批量插入
110        insertBatch();
111
112        //根据row key获取数据
113        getData();
114    }
115
116}
117
118

分别对应三个操作,首先我们在hbase命令行执行create ‘qyk_info’, ‘info’创建表和列族,然后,再执行程序,可以看到控制台如下:
然后,我们执行scan ‘qyk_info’可以看到,如下:
然后,我们使用单个插入,rowkey还是0,将id改为11,age改为19,执行单个插入。
然后,在命令行执行get ‘qyk_info’, ‘0’可以看到:
其实,这个就是更新操作,cell中的值会有一个时间戳,每次显示此列的最新值。
好了,关于hbase的基本使用就讲到这儿了,比较粗浅,希望给大家带来帮助!

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB最简单的入门教程之二 使用nodejs访问MongoDB

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索