HBase建立二级索引的一些解决方式

释放双眼,带上耳机,听听看~!

HBase的一级索引就是rowkey,我们仅仅能通过rowkey进行检索。

假设我们相对hbase里面列族的列列进行一些组合查询。就须要採用HBase的二级索引方案来进行多条件的查询。
常见的二级索引方案有下面几种:
1.MapReduce方案
2.ITHBASE方案
3.IHBASE方案
4.Coprocessor方案
5.Solr+hbase方案

MapReduce方案

IndexBuilder:利用MR的方式构建Index
长处:并发批量构建Index
缺点:不能实时构建Index

举例:
原表:


1
2
3
4
1row  1      f1:name  zhangsan
2row  2      f1:name  lisi
3row  3      f1:name  wangwu
4

索引表:


1
2
3
4
1row     zhangsan    f1:id   1
2row     lisi        f1:id   2
3row     wangwu      f1:id   3
4

Demo:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
1package IndexDouble;
2
3import java.io.IOException;
4import java.util.HashMap;
5import java.util.Map;
6import java.util.Set;
7
8import org.apache.commons.collections.map.HashedMap;
9import org.apache.hadoop.conf.Configuration;
10import org.apache.hadoop.hbase.HBaseConfiguration;
11import org.apache.hadoop.hbase.client.HConnection;
12import org.apache.hadoop.hbase.client.HConnectionManager;
13import org.apache.hadoop.hbase.client.Put;
14import org.apache.hadoop.hbase.client.Result;
15import org.apache.hadoop.hbase.client.Scan;
16import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
18import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
19import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
20import org.apache.hadoop.hbase.mapreduce.TableMapper;
21import org.apache.hadoop.hbase.util.Bytes;
22import org.apache.hadoop.mapreduce.Job;
23import org.apache.hadoop.util.GenericOptionsParser;
24
25
26public class IndexBuilder {
27    private String rootDir;
28    private String zkServer;
29    private String port;
30    private Configuration conf;
31    private HConnection hConn = null;
32
33    private IndexBuilder(String rootDir,String zkServer,String port) throws IOException{
34        this.rootDir = rootDir;
35        this.zkServer = zkServer;
36        this.port = port;
37
38        conf = HBaseConfiguration.create();
39        conf.set("hbase.rootdir", rootDir);
40        conf.set("hbase.zookeeper.quorum", zkServer);
41        conf.set("hbase.zookeeper.property.clientPort", port);
42
43        hConn = HConnectionManager.createConnection(conf);  
44    }
45
46    static class MyMapper extends TableMapper<ImmutableBytesWritable, Put>{
47
48        //记录了要进行索引的列
49        private Map<byte[], ImmutableBytesWritable> indexes = new
50                HashMap<byte[], ImmutableBytesWritable>();
51
52        private String familyName;
53
54        @Override
55        protected void map(ImmutableBytesWritable key, Result value,
56                Context context) throws IOException, InterruptedException {
57            //原始表列
58            Set<byte[]> keys = indexes.keySet();
59
60            //索引表的rowkey是原始表的列。索引表的列是原始表的rowkey
61
62            for (byte[] k : keys){
63
64                //获得新建索引表的表名
65                ImmutableBytesWritable indexTableName = indexes.get(k);
66
67                //Result存放的是原始表的数据
68                //查找到内容             依据列族 和 列 得到原始表的值
69                byte[] val = value.getValue(Bytes.toBytes(familyName), k);
70
71                if (val != null) {
72                    //索引表
73                    Put put = new Put(val);//索引表行键
74                    //列族  列   原始表的行键
75                    put.add(Bytes.toBytes("f1"),Bytes.toBytes("id"),key.get());
76                    context.write(indexTableName, put);
77                }
78            }
79
80        }
81
82        //真正运行Map之前运行一些处理。
83

@Override protected void setup(Context context) throws IOException, InterruptedException { //通过上下文得到配置 Configuration conf = context
.getConfiguration()
; //获得表名 String tableName = conf
.get(
"tableName")
; //String family = conf
.get(
"familyName")
; //获得列族 familyName = conf
.get(
"columnFamily")
; //获得列 String[] qualifiers = conf
.getStrings(
"qualifiers")
; for (String qualifier : qualifiers) { //建立一个映射,为每个列创建一个表,表的名字tableName+
"-"+qualifier //原始表的列 索引表新建表名 indexes
.put(Bytes
.toBytes(qualifier), new ImmutableBytesWritable(Bytes
.toBytes(tableName+
"-"+qualifier)))
; } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String rootDir =
"hdfs://hadoop1:8020/hbase"
; String zkServer =
"hadoop1"
; String port =
"2181"
; IndexBuilder conn = new IndexBuilder(rootDir,zkServer,port)
; String[] otherArgs = new GenericOptionsParser(conn
.conf, args)
.getRemainingArgs()
; //IndexBuilder: TableName,ColumnFamily,Qualifier if(otherArgs
.length<
3){ System
.exit(-
1)
; } //表名 String tableName = otherArgs[
0]
; //列族 String columnFamily = otherArgs[
1]
; conn
.conf
.set(
"tableName", tableName)
; conn
.conf
.set(
"columnFamily", columnFamily)
; //列 可能存在多个列 String[] qualifiers = new String[otherArgs
.length-
2]
; for (int i =
0
; i < qualifiers.length; i++) { qualifiers[i] = otherArgs[i+
2]
; } //设置列 conn
.conf
.setStrings(
"qualifiers", qualifiers)
; @SuppressWarnings(
"deprecation") Job job = new Job(conn
.conf,tableName)
; job
.setJarByClass(IndexBuilder
.class)
; job
.setMapperClass(MyMapper
.class)
; job
.setNumReduceTasks(
0)
;//因为不须要运行reduce阶段 job
.setInputFormatClass(TableInputFormat
.class)
; job
.setOutputFormatClass(MultiTableOutputFormat
.class)
; Scan scan = new Scan()
; TableMapReduceUtil
.initTableMapperJob(tableName,scan, MyMapper
.class, ImmutableBytesWritable
.class, Put
.class, job)
; job
.waitForCompletion(true)
; } }


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1创建原始表
2hbase(main):002:0&gt; create &#x27;studentinfo&#x27;,&#x27;f1&#x27;
30 row(s) in 0.6520 seconds
4
5=&gt; Hbase::Table - studentinfo
6
7
8hbase(main):003:0&gt; put &#x27;studentinfo&#x27;,&#x27;1&#x27;,&#x27;f1:name&#x27;,&#x27;zhangsan&#x27;
90 row(s) in 0.1640 seconds
10
11hbase(main):004:0&gt; put &#x27;studentinfo&#x27;,&#x27;2&#x27;,&#x27;f1:name&#x27;,&#x27;lisi&#x27;
120 row(s) in 0.0240 seconds
13
14hbase(main):005:0&gt; put &#x27;studentinfo&#x27;,&#x27;3&#x27;,&#x27;f1:name&#x27;,&#x27;wangwu&#x27;
150 row(s) in 0.0290 seconds
16
17hbase(main):006:0&gt; scan &#x27;studentinfo&#x27;
18ROW                      COLUMN+CELL
19 1                       column=f1:name, timestamp=1436262175823, value=zhangsan
20 2                       column=f1:name, timestamp=1436262183922, value=lisi
21 3                       column=f1:name, timestamp=1436262189250, value=wangwu
223 row(s) in 0.0530 seconds
23

1
2
3
4
5
6
7
1创建索引表
2
3hbase(main):007:0&gt; create &#x27;studentinfo-name&#x27;,&#x27;f1&#x27;
40 row(s) in 0.7740 seconds
5
6=&gt; Hbase::Table - studentinfo-name
7

运行结果

ITHBASE方案

长处:ITHBase(Indexed Transactional HBase)是HBase的一个事物型的带索引的扩展。
缺点:须要重构hbase,几年没有更新。
http://github.com/hbase-trx/hbase-transactional-tableindexed

IHBASE方案

**长处:**IHBase(Indexed HBase)是HBase的一个扩展。用干支持更快的扫描。
**缺点:**须要重构hbase。
原理:在Memstore满了以后刷磁盘时。IHBase会进行拦截请求,并为这个memstore的数据构建索引。索引还有一个CF的方式存储在表内。scan的时候,IHBase会结合索引列中的标记。来加速scan。
http://github.com/ykulbak/ihbase

Coprocessor方案

HIndex–来自华为的HBase二级索引
http://github.com/Huawei-Hadoop/hindex

The solution is 100% Java, compatible with Apache HBase 0.94.8, and is open sourced under ASL.

Following capabilities are supported currently.
1.multiple indexes on table,
2.multi column index,
3.index based on part of a column value,
4.equals and range condition scans using index, and
5.bulk loading data to indexed table (Indexing done with bulk load).

Solr+hbase方案

Solr是一个独立的企业级搜索应用server,它对并提供相似干Web-service的API接口。用户能够通过http请求,向搜索引擎server提交一定格式的XML文件,生成索引。也能够通过Http Get操作提出查找请求,并得到XML格式的返回结果。

Solr是一个高性能。採用Java5开发。基干Lucene的全文搜索server。同一时候对其进行了扩展。提供了比Lucene更为丰富的查询语言,同一时候实现了可配置、可扩展并对查询性能进行了优化,而且提供了一个完好的功能节理界面。是一款非常优秀的全文搜索引擎。

HBase无可置疑拥有其优势,但其本身仅仅对rowkey支持毫秒级的高速检索,对于多字段的组合查询却无能为力。
基于Solr的HBase多条件查询原理非常easy。将HBase表中涉及条件过滤的字段和rowkey在Solr中建立索引,通过Solr的多条件查询高速获得符合过滤条件的rowkey值,拿到这些rowkey之后在HBASE中通过指定rowkey进行查询。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索