HBase性能优化方法总结(4):读表操作

释放双眼,带上耳机,听听看~!

3. 读表操作

3.1 多HTable并发读

创建多个HTable客户端用于读操作,提高读数据的吞吐量,一个例子:


1
2
3
4
5
6
7
8
1static final Configuration conf = HBaseConfiguration.create();
2static final String table_log_name = “user_log”;
3rTableLog = new HTable[tableN];
4for (int i = 0; i < tableN; i++) {
5    rTableLog[i] = new HTable(conf, table_log_name);
6    rTableLog[i].setScannerCaching(50);
7}
8

3.2 HTable参数设置

3.2.1 Scanner Caching

hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。

有三个地方可以进行配置:1)在HBase的conf配置文件中进行配置;2)通过调用HTable.setScannerCaching(int scannerCaching)进行配置;3)通过调用Scan.setCaching(int caching)进行配置。三者的优先级越来越高。

3.2.2 Scan Attribute Selection


1
2
1scan时指定需要的Column Family,可以减少网络传输数据量,否则默认scan操作会返回整行所有Column Family的数据。
2

3.2.3 Close ResultScanner


1
2
1通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)。
2

3.3 批量读

通过调用HTable.get(Get)方法可以根据一个指定的row key获取一行记录,同样HBase提供了另一个方法:通过调用HTable.get(List<Get>)方法可以根据一个指定的row key列表,批量获取多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高而且网络传输RTT高的情景下可能带来明显的性能提升。

3.4 多线程并发读

在客户端开启多个HTable读线程,每个读线程负责通过HTable对象进行get操作。下面是一个多线程并发读取HBase,获取店铺一天内各分钟PV值的例子:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
1public class DataReaderServer {
2     //获取店铺一天内各分钟PV值的入口函数
3     public static ConcurrentHashMap&lt;String, String&gt; getUnitMinutePV(long uid, long startStamp, long endStamp){
4         long min = startStamp;
5         int count = (int)((endStamp - startStamp) / (60*1000));
6         List&lt;String&gt; lst = new ArrayList&lt;String&gt;();
7         for (int i = 0; i &lt;= count; i++) {
8            min = startStamp + i * 60 * 1000;
9            lst.add(uid + &quot;_&quot; + min);
10         }
11         return parallelBatchMinutePV(lst);
12     }
13      //多线程并发查询,获取分钟PV值
14private static ConcurrentHashMap&lt;String, String&gt; parallelBatchMinutePV(List&lt;String&gt; lstKeys){
15        ConcurrentHashMap&lt;String, String&gt; hashRet = new ConcurrentHashMap&lt;String, String&gt;();
16        int parallel = 3;
17        List&lt;List&lt;String&gt;&gt; lstBatchKeys  = null;
18        if (lstKeys.size() &lt; parallel ){
19            lstBatchKeys  = new ArrayList&lt;List&lt;String&gt;&gt;(1);
20            lstBatchKeys.add(lstKeys);
21        }
22        else{
23            lstBatchKeys  = new ArrayList&lt;List&lt;String&gt;&gt;(parallel);
24            for(int i = 0; i &lt; parallel; i++  ){
25                List&lt;String&gt; lst = new ArrayList&lt;String&gt;();
26                lstBatchKeys.add(lst);
27            }
28
29            for(int i = 0 ; i &lt; lstKeys.size() ; i ++ ){
30                lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
31            }
32        }
33        
34        List&lt;Future&lt; ConcurrentHashMap&lt;String, String&gt; &gt;&gt; futures = new ArrayList&lt;Future&lt; ConcurrentHashMap&lt;String, String&gt; &gt;&gt;(5);
35        
36        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
37        builder.setNameFormat(&quot;ParallelBatchQuery&quot;);
38        ThreadFactory factory = builder.build();
39        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
40        
41        for(List&lt;String&gt; keys : lstBatchKeys){
42            Callable&lt; ConcurrentHashMap&lt;String, String&gt; &gt; callable = new BatchMinutePVCallable(keys);
43            FutureTask&lt; ConcurrentHashMap&lt;String, String&gt; &gt; future = (FutureTask&lt; ConcurrentHashMap&lt;String, String&gt; &gt;) executor.submit(callable);
44            futures.add(future);
45        }
46        executor.shutdown();
47        
48        // Wait for all the tasks to finish
49        try {
50          boolean stillRunning = !executor.awaitTermination(
51              5000000, TimeUnit.MILLISECONDS);
52          if (stillRunning) {
53            try {
54                executor.shutdownNow();
55            } catch (Exception e) {
56                // TODO Auto-generated catch block
57                e.printStackTrace();
58            }
59          }
60        } catch (InterruptedException e) {
61          try {
62              Thread.currentThread().interrupt();
63          } catch (Exception e1) {
64            // TODO Auto-generated catch block
65            e1.printStackTrace();
66          }
67        }
68        
69        // Look for any exception
70        for (Future f : futures) {
71          try {
72              if(f.get() != null)
73              {
74                  hashRet.putAll((ConcurrentHashMap&lt;String, String&gt;)f.get());
75              }
76          } catch (InterruptedException e) {
77            try {
78                 Thread.currentThread().interrupt();
79            } catch (Exception e1) {
80                // TODO Auto-generated catch block
81                e1.printStackTrace();
82            }
83          } catch (ExecutionException e) {
84            e.printStackTrace();
85          }
86        }
87        
88        return hashRet;
89    }
90     //一个线程批量查询,获取分钟PV值
91    protected static ConcurrentHashMap&lt;String, String&gt; getBatchMinutePV(List&lt;String&gt; lstKeys){
92        ConcurrentHashMap&lt;String, String&gt; hashRet = null;
93        List&lt;Get&gt; lstGet = new ArrayList&lt;Get&gt;();
94        String[] splitValue = null;
95        for (String s : lstKeys) {
96            splitValue = s.split(&quot;_&quot;);
97            long uid = Long.parseLong(splitValue[0]);
98            long min = Long.parseLong(splitValue[1]);
99            byte[] key = new byte[16];
100            Bytes.putLong(key, 0, uid);
101            Bytes.putLong(key, 8, min);
102            Get g = new Get(key);
103            g.addFamily(fp);
104            lstGet.add(g);
105        }
106        Result[] res = null;
107        try {
108            res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
109        } catch (IOException e1) {
110            logger.error(&quot;tableMinutePV exception, e=&quot; + e1.getStackTrace());
111        }
112
113        if (res != null &amp;&amp; res.length &gt; 0) {
114            hashRet = new ConcurrentHashMap&lt;String, String&gt;(res.length);
115            for (Result re : res) {
116                if (re != null &amp;&amp; !re.isEmpty()) {
117                    try {
118                        byte[] key = re.getRow();
119                        byte[] value = re.getValue(fp, cp);
120                        if (key != null &amp;&amp; value != null) {
121                            hashRet.put(String.valueOf(Bytes.toLong(key,
122                                    Bytes.SIZEOF_LONG)), String.valueOf(Bytes
123                                    .toLong(value)));
124                        }
125                    } catch (Exception e2) {
126                        logger.error(e2.getStackTrace());
127                    }
128                }
129            }
130        }
131
132        return hashRet;
133    }
134}
135//调用接口类,实现Callable接口
136class BatchMinutePVCallable implements Callable&lt;ConcurrentHashMap&lt;String, String&gt;&gt;{
137     private List&lt;String&gt; keys;
138
139     public BatchMinutePVCallable(List&lt;String&gt; lstKeys ) {
140         this.keys = lstKeys;
141     }
142
143     public ConcurrentHashMap&lt;String, String&gt; call() throws Exception {
144         return DataReadServer.getBatchMinutePV(keys);
145     }
146}
147

3.5 缓存查询结果

对于频繁查询HBase的应用场景,可以考虑在应用程序中做缓存,当有新的查询请求时,首先在缓存中查找,如果存在则直接返回,不再查询HBase;否则对HBase发起读请求查询,然后在应用程序中将查询结果缓存起来。至于缓存的替换策略,可以考虑LRU等常用的策略。

3.6 Blockcache

HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。

写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。

读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。

一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能启动。默认BlockCache为0.2,而Memstore为0.4。对于注重读响应时间的系统,可以将 BlockCache设大些,比如设置BlockCache=0.4,Memstore=0.39,以加大缓存的命中率。

有关BlockCache机制,请参考这里:HBase的Block cacheHBase的blockcache机制hbase中的缓存的计算与使用

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索