HBase的简单学习四

发布于:2024-04-27 ⋅ 阅读:(24) ⋅ 点赞:(0)

一 HBase的进阶

1.1 hbase的写流程

Hbase读取数据的流程:
1)是由客户端发起读取数据的请求,首先会与zookeeper建立连接
2)从zookeeper中获取一个hbase:meta表位置信息,被哪一个regionserver所管理着
     hbase:meta表:hbase的元数据表,在这个表中存储了自定义表相关的元数据,包括表名,表有哪些列簇,表有哪些region,每个region存储的位置,每个region被哪个regionserver所管理,这个表也是存储在某一个region上的,并且这个meta表只会被一个regionserver所管理。这个表的位置信息只有zookeeper知道。
3)连接这个meta表对应的regionserver,从meta表中获取当前你要读取的这个表对应的regionsever是谁。
     当一个表多个region怎么办呢?
     如果我们获取数据是以get的方式,只会返回一个regionserver
     如果我们获取数据是以scan的方式,会将所有的region对应的regionserver的地址全部返回。
4)连接要读取表的对应的regionserver,从regionserver上的开始读取数据:
       读取顺序:memstore-->blockcache-->storefile-->Hfile中
       注意:如果是scan操作,就不仅仅去blockcache了,而是所有都会去找。

1.2 hbase的写流程

Hbase的写入数据流程:
1)由客户端发起写数据请求,首先会与zookeeper建立连接
2)从zookeeper中获取hbase:meta表被哪一个regionserver所管理
3)连接hbase:meta表中获取对应的regionserver地址 (从meta表中获取当前要写入数据的表对应的region所管理的regionserver) 只会返回一个regionserver地址
4)与要写入数据的regionserver建立连接,然后开始写入数据,将数据首先会写入到HLog,然后将数据写入到对应store模块中的memstore中
(可能会写多个),当这两个地方都写入完成之后,表示数据写入完成。


-------------------------后面的步骤是服务器内部的操作-----------------
异步操作
5)随着客户端不断地写入数据,memstore中的数据会越来多,当内存中的数据达到阈值(128M/1h)的时候,放入到blockchache中,生成新的memstore接收用户过来的数据,然后当blockcache的大小达到一定阈值(0.85)的时候,开始触发flush机制,将数据最终刷新到HDFS中形成小的Hfile文件。

6)随着不断地刷新,storefile不断地在HDFS上生成小HFIle文件,当小的HFile文件达到阈值的时候(3个及3个以上),就会触发Compaction机制,将小的HFile合并成一个大的HFile.

7)随着不断地合并,大的HFile文件会越来越大,当达到一定阈值(2.0版本之后最终10G)的时候,会触发分裂机制(split),将大的HFile文件进行一分为二,同时管理这个大的HFile的region也会被一分为二,形成两个新的region和两个新的HFile文件,一对一的进行管理,将原来旧的region和分裂之前大的HFile文件慢慢地就会下线处理。

1.3 Region的分裂策略

region中存储的是一张表的数据,当region中的数据条数过多的时候,会直接影响查询效率。当region过大的时候,region会被拆分为两个region,HMaster会将分裂的region分配到不同的regionserver上,这样可以让请求分散到不同的RegionServer上,已达到负载均衡 , 这也是HBase的一个优点 。

  • ConstantSizeRegionSplitPolicy

    0.94版本前,HBase region的默认切分策略

    当region中最大的store大小超过某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。

    但是在生产线上这种切分策略却有相当大的弊端(切分策略对于大表和小表没有明显的区分):

    • 阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,形成热点,这对业务来说并不是什么好事。

    • 如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。

  • IncreasingToUpperBoundRegionSplitPolicy

    0.94版本~2.0版本默认切分策略

    总体看和ConstantSizeRegionSplitPolicy思路相同,一个region中最大的store大小大于设置阈值就会触发切分。 但是这个阈值并不像ConstantSizeRegionSplitPolicy是一个固定的值,而是会在一定条件下不断调整,调整规则和region所属表在当前regionserver上的region个数有关系.

    region split阈值的计算公式是:

    • 设regioncount:是region所属表在当前regionserver上的region的个数

    • 阈值 = regioncount^3 * 128M * 2,当然阈值并不会无限增长,最大不超过MaxRegionFileSize(10G),当region中最大的store的大小达到该阈值的时候进行region split

    例如:

    • 第一次split阈值 = 1^3 * 256 = 256MB

    • 第二次split阈值 = 2^3 * 256 = 2048MB

    • 第三次split阈值 = 3^3 * 256 = 6912MB

    • 第四次split阈值 = 4^3 * 256 = 16384MB > 10GB,因此取较小的值10GB

    • 后面每次split的size都是10GB了

    特点

    • 相比ConstantSizeRegionSplitPolicy,可以自适应大表、小表;

    • 在集群规模比较大的情况下,对大表的表现比较优秀

    • 对小表不友好,小表可能产生大量的小region,分散在各regionserver上

    • 小表达不到多次切分条件,导致每个split都很小,所以分散在各个regionServer上

  • SteppingSplitPolicy

    2.0版本默认切分策略

    相比 IncreasingToUpperBoundRegionSplitPolicy 简单了一些 ​ region切分的阈值依然和待分裂region所属表在当前regionserver上的region个数有关系

    • 如果region个数等于1,切分阈值为flush size 128M

    • 否则为MaxRegionFileSize。

    这种切分策略对于大集群中的大表、小表会比 IncreasingToUpperBoundRegionSplitPolicy 更加友好,小表不会再产生大量的小region,而是适可而止。

  • KeyPrefixRegionSplitPolicy

    根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。

  • DelimitedKeyPrefixRegionSplitPolicy

    保证相同前缀的数据在同一个region中,例如rowKey的格式为:userid_eventtype_eventid,指定的delimiter为 _ ,则split的的时候会确保userid相同的数据在同一个region中。 按照分隔符进行切分,而KeyPrefixRegionSplitPolicy是按照指定位数切分。

  • BusyRegionSplitPolicy

    按照一定的策略判断Region是不是Busy状态,如果是即进行切分

    如果你的系统常常会出现热点Region,而你对性能有很高的追求,那么这种策略可能会比较适合你。它会通过拆分热点Region来缓解热点Region的压力,但是根据热点来拆分Region也会带来很多不确定性因素,因为你也不知道下一个被拆分的Region是哪个。

  • DisabledRegionSplitPolicy

    不启用自动拆分, 需要指定手动拆分

1.4  Compaction操作

 Minor Compaction:

  • 指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次 Minor Compaction 的结果是更少并且更大的StoreFile。

Major Compaction:

  • 指将所有的StoreFile合并成一个StoreFile,这个过程会清理三类没有意义的数据:被删除的数据TTL过期数据版本号超过设定版本号的数据。另外,一般情况下,major compaction时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会将关闭自动触发major compaction功能,改为手动在业务低峰期触发。

1.5 经典面试题

HBase适合存储PB级别的海量数据(百亿千亿量级条记录),如果根据记录主键Rowkey来查询,能在几十到百毫秒内返回数据。

那么HBase是如何做到的呢?

接下来,简单阐述一下数据的查询思路和过程。

查询过程

第1步:

项目有100亿业务数据,存储在一个HBase集群上(由多个服务器数据节点构成),每个数据节点上有若干个Region(区域),每个Region实际上就是HBase中一批数据的集合(一段连续范围rowkey的数据)。

我们现在开始根据主键RowKey来查询对应的记录,通过meta表可以帮我们迅速定位到该记录所在的数据节点,以及数据节点中的Region,目前我们有100亿条记录,占空间10TB。所有记录被切分成5000个Region,那么现在,每个Region就是2G。

由于记录在1个Region中,所以现在我们只要查询这2G的记录文件,就能找到对应记录。

第2步:

由于HBase存储数据是按照列族存储的。比如一条记录有400个字段,前100个字段是人员信息相关,这是一个列簇(列的集合);中间100个字段是公司信息相关,是一个列簇。另外100个字段是人员交易信息相关,也是一个列簇;最后还有100个字段是其他信息,也是一个列簇

这四个列簇是分开存储的,这时,假设2G的Region文件中,分为4个列族,那么每个列族就是500M。

到这里,我们只需要遍历这500M的列簇就可以找到对应的记录。

第3步:

如果要查询的记录在其中1个列族上,1个列族在HDFS中会包含1个或者多个HFile。

如果一个HFile一般的大小为100M,那么该列族包含5个HFile在磁盘上或内存中。

由于HBase的内存进入磁盘中的数据是排好序(字典顺序)的,要查询的记录有可能在最前面,也有可能在最后面,按平均来算,我们只需遍历2.5个HFile共250M,即可找到对应的记录。

第4步:

每个HFile中,是以键值对(key/value)方式存储,只要遍历文件中的key位置并判断符合条件即可

一般key是有限的长度,假设key/value比是1:24,最终只需要10M的数据量,就可获取的对应的记录。

如果数据在机械磁盘上,按其访问速度100M/S,只需0.1秒即可查到。

如果是SSD的话,0.01秒即可查到。

当然,扫描HFile时还可以通过布隆过滤器快速定位到对应的HFile,以及HBase是有内存缓存机制的,如果数据在内存中,效率会更高。

 二 HBase与Hive的集成

2.1 相关概念

HBase与Hive的对比

hive:

数据仓库:Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。

用于数据分析、清洗:Hive适用于离线的数据分析和清洗,延迟较高。

基于HDFS、MapReduce:Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行。

HBase

数据库:是一种面向列族存储的非关系型数据库。

用于存储结构化和非结构化的数据:适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。

基于HDFS:数据持久化存储的体现形式是HFile,存放于DataNode中,被ResionServer以region的形式进行管理。

延迟较低,接入在线业务使用:面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。

2.1  配置相关文件

1.配置hive-site.xml 文件

	<property>
        <name>hive.zookeeper.quorum</name>
        <value>master,node1,node2</value>
    </property>

    <property>
        <name>hive.zookeeper.client.port</name>
        <value>2181</value>
    </property>

2.3 关联

1.

HBase中已经存储了某一张表,在Hive中创建一个外部表来关联HBase中的这张表

建立外部表的字段名要和hbase中的列名一致

前提是hbase中已经有表了

 示例,以后模仿这个

create external table students_hbase
(
id string,
name string,
age string,
gender string, 
clazz string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = "
:key,
info:name,
info:age,
info:gender,
info:clazz
")
tblproperties("hbase.table.name" = "default:students");

三 Phoenix

3.1 安装

1.准备好文件,上传到Linux中

2.解压 tar -zxvf phoenix-hbase-2.2-5.1.3-bin.tar.gz -C ../

3.2 配置文件

1.配置环境变量

/etc/profile

2.将phoenix-server-hbase-2.2-5.1.3.jar复制到所有节点的hbase lib目录下

3.3 启动

1.先要启动hbase

2.sqlline.py master,node1,node2

3.4 常用语法

1.建表语句

必须要设置主键

CREATE TABLE IF NOT EXISTS student_p1 (
 id VARCHAR NOT NULL PRIMARY KEY, 
 name VARCHAR,
 age BIGINT, 
 gender VARCHAR ,
 clazz VARCHAR
);

2.插入语句

语法 upsert into 表名 values(内容)

upsert into STUDENT values('1500100004','葛德曜',24,'男','理科三班');
upsert into STUDENT values('1500100005','宣谷芹',24,'男','理科六班');
upsert into STUDENT values('1500100006','羿彦昌',24,'女','理科三班');

3.查询语句

跟sql语句差不多

4.删除数据

语法 delete from 表名 where 主键名=主键值;

delete from student_p1 where id='1500100004';

5. 删除表

drop table student_p1;

6.退出

!quit

3.5 注意事项

1.在phoenix中创建表之后,我们可以使用 !table或者show table 查看所有的表,并以大写的形式展现给我们,我们使用sql语句查询的时候不分表的大小写。

2.在phoenix中创建表之后,可以在hbase中可以通过scan加大写的表名查看phoenix中的表,但是在hbase中创建的表,在phoenix就无法查看。如果想查看,则需要在phoenix中进行表的映射。映射方式有两种:视图映射和表映射

3、如何在phoneix中使用hbase原本的数据表呢?
        视图映射:视图并不是真正意义上的表,而是在phoneix创建一个映射关系,以表的形式将hbase中原本数据映射过来,可以在基础之上编写sql语句进行分析,需要注意的是,我们在视图上sql分析的时候,表名和列名需要加双引号。删除视图不会影响原本hbase中的数据,视图无法做修改,只能查询,视图在phoneix中被看作成一个只读表。
        表映射:建表的语法来说与视图映射相差一个单词,其他的没啥区别。使用上,表映射可以直接在phoneix中对表数据进行增删改查。将phoneix中表映射删了,原来hbase中的表也对应删除。
        
    4、映射查询的时候,主键可以不用加双引号,非主键的列必须加双引号

3.7映射

3.7.1 视图映射

1.Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作

2.在phoenix创建视图, primary key 对应到hbase中的rowkey

3.创建的映射要与想要映射的hbase中的表名一致,且视图要被双引号包起来。

4.视图映射相关建表案例

创建的是映射,所以是view

CREATE view "students" (
 id VARCHAR NOT NULL PRIMARY KEY, 
 "info"."name" VARCHAR,
 "info"."age" VARCHAR, 
 "info"."gender" VARCHAR,
 "info"."clazz" VARCHAR
) column_encoded_bytes=0;

5.查询相关事项

a.映射创建好之后,想要查看映射内容,必须使用sql语句的时候将视图名用双引号包起来,才能查看

b.映射查询的时候,主键可以不用加双引号,非主键的列必须加双引号

c.删除视图不会影响原本hbase中的数据,视图无法做修改,只能查询,视图在phoneix中被看作成一个只读表。

drop view "视图名";

3.7.2 表映射

1.建表映射

1) 当HBase中已经存在表时,可以以类似创建视图的方式创建关联表,只需要将create view改为create table即可。

2)当HBase中不存在表时,可以直接使用create table指令创建需要的表,并且在创建指令中可以根据需要对HBase表结构进行显示的说明。

3)创建的映射要与想要映射的hbase中的表名一致,且表要被双引号包起来。

CREATE table "students" (
 id VARCHAR NOT NULL PRIMARY KEY, 
 "info"."name" VARCHAR,
 "info"."age" VARCHAR, 
 "info"."gender" VARCHAR,
 "info"."clazz" VARCHAR
) column_encoded_bytes=0;

2.使用create table创建的关联表,如果对表进行了修改,源数据也会改变,同时如果关联表被删除,源表也会被删除。但是视图就不会,如果删除视图,源数据不会发生改变。

3.8 sql案例

1.过滤出文科一班的学生总分前10的学生信息

过滤出文科一班的学生
select ID as id,"name" as name,"clazz" as clazz from "students" where "clazz"='文科一班';

将成绩表做切分转换
select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores";

在第二步的基础之上求每个学生的总分
select t1.student_id as student_id,sum(to_number(t1.score)) as sum_score from (select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores") t1 group by t1.student_id;

与步骤1的文科学生进行关联
select b1.id as student_id,b1.name as name,b1.clazz as clazz,to_char(b2.sum_score) as sum_score from (select ID as id,"name" as name,"clazz" as clazz from "students" where "clazz"='文科一班') b1 join (select t1.student_id as student_id,sum(to_number(t1.score)) as sum_score from (select regexp_split(ID,'-')[1] as student_id,regexp_split(ID,'-')[2] as subject_id,"score" as score from "scores") t1 group by t1.student_id) b2 on (b1.id=b2.student_id) order by b2.sum_score desc limit 10;

2.注意事项

通过这个例子遇到的注意点:
1、切分字符串的函数不是split,而是regexp_split
2、Phoenix中,数组的索引是从1开始的
3、给字段起别名之后的嵌套查询,就不需要再加双引号了,主键本身就可以不用加
4、sum函数中的数据类型必须是数值类型,如果是10的整数倍,会以科学计数法进行标识 580->5.8E+2(5.8*10^2)
5、to_number()转数值  to_char()转字符串

四  bulkLoad实现批量导入

4.1相关概念

1.优点:

  1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。

  2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

2.限制:

  1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。

  2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群

4.2 编写代码

代码编写:

提前在Hbase中创建好表

生成Hfile基本流程:

  1. 设置Mapper的输出KV类型:

    K: ImmutableBytesWritable(代表行键)

    V: KeyValue (代表cell)

2. 开发Mapper

读取你的原始数据,按你的需求做处理

输出rowkey作为K,输出一些KeyValue(Put)作为V

3. 配置job参数

a. Zookeeper的连接地址

b. 配置输出的OutputFormat为HFileOutputFormat2,并为其设置参数

4. 提交job

导入HFile到RegionServer的流程

构建一个表描述对象

构建一个region定位工具

然后用LoadIncrementalHFiles来doBulkload操作

package com.shujia.jinjie;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.FileInputStream;
import java.io.IOException;

class MyBulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{
       /*
    ==> tl_hefei_shushan_503.txt <==
    D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C	117210031795040	83401	8340104	301	20180503190539	20180503233517	20180503
    8827F3196977C6F752680505FEC0C7D3A18D4DFC	\N	\N	\N	\N	\N	\N	\N
     */

    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        //处理脏数据
        if (!(line.contains("shushan")) && !(line.contains("\\N"))){
            String[] strings = line.split("\t");
            String phoneNum = strings[0];
            String wg = strings[1];
            String city = strings[2];
            String qx = strings[3];
            String stayTime = strings[4];
            String startTime = strings[5];
            String endTime = strings[6];
            String date = strings[7];

            //因为手机号代表一个人,一个人可能会出现在多个区域,不能直接将手机号作为rk,为了数据的完整性
            //可以将手机号和进入网格的时间拼接
            byte[] rk = Bytes.toBytes(phoneNum + "-" + startTime);
            byte[] family = Bytes.toBytes("info");

            //创建输出key对象
//            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
//            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rk);
            //将其余的6列封装成单元格
            KeyValue keyValue1 = new KeyValue(rk, family, Bytes.toBytes("wg"), Bytes.toBytes(wg));
            context.write(immutableBytesWritable,keyValue1);

            KeyValue keyValue2 = new KeyValue(rk, family, Bytes.toBytes("city"), Bytes.toBytes(city));
            context.write(immutableBytesWritable,keyValue2);

            KeyValue keyValue3 = new KeyValue(rk, family, Bytes.toBytes("qx"), Bytes.toBytes(qx));
            context.write(immutableBytesWritable,keyValue3);

            KeyValue keyValue4 = new KeyValue(rk, family, Bytes.toBytes("stayTime"), Bytes.toBytes(stayTime));
            context.write(immutableBytesWritable,keyValue4);

            KeyValue keyValue5 = new KeyValue(rk, family, Bytes.toBytes("endTime"), Bytes.toBytes(endTime));
            context.write(immutableBytesWritable,keyValue5);

            KeyValue keyValue6 = new KeyValue(rk, family, Bytes.toBytes("date"), Bytes.toBytes(date));
            context.write(immutableBytesWritable,keyValue6);

        }

    }
}

public class BulkLoadingDemo {
    public static void main(String[] args) throws Exception{
        //获取集群配置文件,可以是hdfs集群也可以是hbase集群
        Configuration conf = HBaseConfiguration.create();
        //设置zookeeper集群信息
        conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");

        //创建Job作业
        Job job = Job.getInstance(conf);

        //给job作业起一个名字
        job.setJobName("hbase使用bulkloading方式批量加载数据作业");

        //设置主类
        job.setJarByClass(BulkLoadingDemo.class);
        //设置Map类
        job.setMapperClass(MyBulkLoadMapper.class);

        //无需设置自定义reduce类,使用hbase中的reduce类
        job.setOutputFormatClass(HFileOutputFormat2.class);

        //设置map的输出key和value类型
        //key -- 行键
        //value -- 单元格(列)
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);

        //设置region中字典排序的过程的类
        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
        job.setReducerClass(KeyValueSortReducer.class);

        //设置文件的输入输出路径
        FileInputFormat.setInputPaths(job,new Path("/bigdata29/data/dianxin_data"));
        //设置hfile文件
        FileOutputFormat.setOutputPath(job,new Path("/bigdata29/hbase/out2"));

        //获取数据库连接对象
        Connection conn = ConnectionFactory.createConnection(conf);
        //获取数据库操作对象
        Admin admin = conn.getAdmin();
        TableName tn = TableName.valueOf("dianxin_data_bulk");
        Table table = conn.getTable(tn);
        RegionLocator regionLocator = conn.getRegionLocator(tn);

        //使用HFileOutputFormat2类创建Hfile文件
        HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);

        //提交作业并执行
        boolean b = job.waitForCompletion(true);
        if(b){
            System.out.println("====================== Hfile文件生成成功!! 在/bigdata29/hbase/out2目录下 ================================");
        }else {
            System.out.println("============= Hfile文件生成失败!! ==================");
        }



    }






}

 这个命令在Linux运行

hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles /bigdata29/hbase/out2 dianxin_data_bulk

五 HBase的rowKey设计

5.1.相关概念

HBase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对HBase中的数据进行快速定位。

HBase中rowkey可以唯一标识一行记录,在HBase查询的时候,有两种方式:

通过get方式,指定rowkey获取唯一一条记录

通过scan方式,设置startRow和stopRow参数进行范围匹配

全表扫描,即直接扫描整张表中所有行记录

5.2.相关原则

1.长度不宜过长

rowkey是一个二进制码流,可以是任意字符串,最大长度 64kb ,实际应用中一般为10-100bytes,以 byte[] 形式保存,一般设计成定长。

建议越短越好,不要超过16个字节,原因如下:

数据的持久化文件HFile中是按照KeyValue存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率;

MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。

目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。

2.散列性

如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。

3.唯一性

必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。

5.3 设计原则

1.什么是热点

HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。 热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。 设计良好的数据访问模式以使集群被充分,均衡的利用。

为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。

下面是一些常见的避免热点的方法以及它们的优缺点:

加盐

这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配一个随机前缀以使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。

哈希

哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据

反转

第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。

反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题

时间戳反转

一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如 [key]reverse_timestamp , [key] 的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。

示例

# 原数据:以时间戳_user_id作为rowkey
# 时间戳高位变化不大,太连续,最终可能会导致热点问题
1638584124_user_id
1638584135_user_id
1638584146_user_id
1638584157_user_id
1638584168_user_id
1638584179_user_id
...

# 解决方案:加盐、反转、哈希

# 加盐
# 加上随即前缀,随机的打散
# 该过程无法预测 前缀时随机的
00_1638584124_user_id
05_1638584135_user_id
03_1638584146_user_id
04_1638584157_user_id
02_1638584168_user_id
06_1638584179_user_id

# 反转
# 适用于高位变化不大,低位变化大的rowkey
4214858361_user_id
5314858361_user_id
6414858361_user_id
7514858361_user_id
8614858361_user_id
9714858361_user_id

# 散列 md5、sha1、sha256......
25531D7065AE158AAB6FA53379523979_user_id
60F9A0072C0BD06C92D768DACF2DFDC3_user_id
D2EFD883A6C0198DA3AF4FD8F82DEB57_user_id
A9A4C265D61E0801D163927DE1299C79_user_id
3F41251355E092D7D8A50130441B58A5_user_id
5E6043C773DA4CF991B389D200B77379_user_id

# 时间戳"反转"
# rowkey:时间戳_user_id
# rowkey是字典升序的,那么越新的记录会被排在最后面,不容易被获取到
# 需求:让最新的记录排在最前面

# 大数:9999999999
# 大数-小数

1638584124_user_id => 8361415875_user_id
1638584135_user_id => 8361415864_user_id
1638584146_user_id => 8361415853_user_id
1638584157_user_id => 8361415842_user_id
1638584168_user_id => 8361415831_user_id
1638584179_user_id => 8361415820_user_id

1638586193_user_id => 8361413806_user_id

5.4实战案例

电信案例

要求

手机号,网格编号,城市编号,区县编号,停留时间,进入时间,离开时间,时间分区
D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C,117210031795040,83401,8340104,301,20180503190539,20180503233517,20180503


将用户位置数据保存到hbase
    查询需求
        1、通过手机号查询用户最近10条位置记录

        2、获取用户某一天在一个城市中的所有位置

    怎么设计hbase表
        1、rowkey
        2、时间戳

代码

package com.shujia.jinjie;


import com.shujia.utils.HBaseUtils;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Scanner;

/**
 * 查询需求
 *         1、通过手机号查询用户最近10条位置记录
 *
 *         2、获取用户某一天在一个城市中的所有位置
 */
public class DianXinRowKeyDemo {

    //创建数据库连接对象
    //创建数据库操作对象
    private static Connection conn = HBaseUtils.CONNECTION;
    private static Admin admin = HBaseUtils.ADMIN;
    public static void main(String[] args) throws Exception{

        //建表
//        HBaseUtils.createOneTable("dianxin_tb1","info");

        //读取文件数据,添加到hbase中的dianxin_tb1表中
//        putsData();

        //通过手机号查询用户最近10条位置记录
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入要查询的手机号:");
        String phoneNum = sc.next();
//        findUser(phoneNum);

        //2、获取用户某一天在一个城市中的所有位置
        System.out.println("请输入要查询的城市编号:");
        String city = sc.next();
        findUserPosition(phoneNum,city);


    }

    public static void findUserPosition(String number,String city) throws Exception{
        //将表封装成一个TableName对象
        TableName tn = TableName.valueOf("dianxin_tb1");

        //获取表对象
        Table table = conn.getTable(tn);

        //scan
        Scan scan = new Scan();
        //创建一个行键前缀过滤器对象
        //public PrefixFilter(byte[] prefix)
        PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(number));

        //过滤城市
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("city"),
                CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(city)));

        //创建过滤器集合
        FilterList filterList = new FilterList();
        filterList.addFilter(prefixFilter);
        filterList.addFilter(singleColumnValueFilter);

        scan.setFilter(filterList);
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            String rk = Bytes.toString(result.getRow());
            //47BE1E866CFC071DB19D5E1C056BE28AE24C16E7-9496768070
            String[] strings = rk.split("-");
            String phoneNum = strings[0];
            long startTime = 20190000000000L - Long.parseLong(strings[1]);
            String wg = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("wg")));
            String city2 = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("city")));
            String qx = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("qx")));
            String stayTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("stayTime")));
            String endTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("endTime")));
            String date = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("date")));

            System.out.println("手机号:"+phoneNum+",进入网格时间:"+startTime+",离开网格时间:"+endTime
                    +",网格编号:"+wg+",城市:"
                    +city2+",区县:"+qx+",停留时间:"
                    +stayTime+",日期:"+date);
        }

    }

    //查询函数
    public static void findUser(String number) throws Exception{
        //将表封装成一个TableName对象
        TableName tn = TableName.valueOf("dianxin_tb1");

        //获取表对象
        Table table = conn.getTable(tn);

        //scan
        Scan scan = new Scan();
        //创建一个行键前缀过滤器对象
        //public PrefixFilter(byte[] prefix)
        PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(number));
        //设置过滤器
        //public Scan setFilter(Filter filter)
        scan.setFilter(prefixFilter);
        scan.setLimit(10);
        //创建结果对象
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            String rk = Bytes.toString(result.getRow());
            //47BE1E866CFC071DB19D5E1C056BE28AE24C16E7-9496768070
            String[] strings = rk.split("-");
            String phoneNum = strings[0];
            long startTime = 20190000000000L - Long.parseLong(strings[1]);
            String wg = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("wg")));
            String city = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("city")));
            String qx = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("qx")));
            String stayTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("stayTime")));
            String endTime = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("endTime")));
            String date = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("date")));

            System.out.println("手机号:"+phoneNum+",进入网格时间:"+startTime+",离开网格时间:"+endTime
                    +",网格编号:"+wg+",城市:"
                    +city+",区县:"+qx+",停留时间:"
                    +stayTime+",日期:"+date);
        }

    }



    //添加函数
    public static void putsData() throws Exception{


        //将表封装成一个TableName对象
        TableName tn = TableName.valueOf("dianxin_tb1");

        //获取表对象
        Table table = conn.getTable(tn);

        //获取IO对象
        BufferedReader br = new BufferedReader(new FileReader("hbase/data/dianxin_data"));
        String line=null;
        while ((line= br.readLine())!=null){
            if (!(line.contains("shushan")) && !(line.contains("\\N"))){
                String[] strings = line.split("\t");
                String phoneNum = strings[0];
                String wg = strings[1];
                String city = strings[2];
                String qx = strings[3];
                String stayTime = strings[4];
                String startTime = strings[5];
                String endTime = strings[6];
                String date = strings[7];

                //我们为了需求的查询更加方便,以手机号与进入网格的时间进行拼接
                //因为要查询最近的,可以使用时间戳反转对rowkey进行设计
                //20180503173254
                //20190000000000
                long new_time = 20190000000000L - Long.parseLong(startTime);
                String rk = phoneNum + "-" + new_time;
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","wg",wg);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","city",city);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","qx",qx);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","stayTime",stayTime);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","endTime",endTime);
                HBaseUtils.putOneDataToTable("dianxin_tb1",rk,"info","date",date);


            }

        }


    }
}

六 二级索引

6.1相关概念

二级索引的本质就是建立各列值与行键之间的映射关系

Hbase的局限性:

  HBase本身只提供基于行键和全表扫描的查询,而行键索引单一,对于多维度的查询困难。

6.2 常见的二级索引

HBase的一级索引就是rowkey,我们只能通过rowkey进行检索。如果我们相对hbase里面列族的列列进行一些组合查询,就需要采用HBase的二级索引方案来进行多条件的查询。

  1. MapReduce方案

  2. ITHBASE(Indexed-Transanctional HBase)方案

  3. IHBASE(Index HBase)方案

  4. Hbase Coprocessor(协处理器)方案

  5. Solr+hbase方案 redis+hbase 方案

  6. CCIndex(complementalclustering index)方案

6.3 二级索引的种类

1、创建单列索引

  2、同时创建多个单列索引

  3、创建联合索引(最多同时支持3个列)

  4、只根据rowkey创建索引

6.4 Mapreduce 创建二级索引

使用整合MapReduce的方式创建hbase索引。主要的流程如下:

1.1扫描输入表,使用hbase继承类TableMapper

1.2获取rowkey和指定字段名称和字段值

1.3创建Put实例, value=” “, rowkey=班级,column=学号

1.4使用TableReducer将数据写入索引表

首先二级索引表必须先要存在

该Map继承的是TableMapper

该Reducer继承的是TableReducer

代码如下

package com.shujia.jinjie;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//因为我们现在要读取的数据来自于hbase中的hfile文件,与hdfs上普通的block块文件有所区别,不能直接继承Mapper类
//要继承hbase读取数据专属的Mapper类     TableMapper
//public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>

class MyIndexMapper extends TableMapper<Text,NullWritable>{
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        //ImmutableBytesWritable key --相当于是读取到一行的行键
        //Result value --相当于读取到一行多列的封装
        //获取行键,

        String id = Bytes.toString(key.get());
        //获取姓名列
        // public byte[] getValue(byte[] family, byte[] qualifier)
        String name = Bytes.toString(value.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
        //将姓名与学号连接起来给到reduce
        context.write(new Text(id+"-"+name),NullWritable.get());


    }
}


//public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation>
class MyIndexReducer extends TableReducer<Text,NullWritable,NullWritable>{
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
        String string = key.toString();
        String id = string.split("-")[0];
        String name = string.split("-")[1];

        //将要添加的数据封装成Put类的对象
        Put put = new Put(Bytes.toBytes(name));
        //public Put addImmutable(byte[] family, byte[] qualifier, byte[] value)
        put.addColumn(Bytes.toBytes("info"),Bytes.toBytes(id),Bytes.toBytes(""));
        context.write(NullWritable.get(),put);

    }
}

public class HBaseIndexDemo1 {
    public static void main(String[] args) throws Exception {
        //获取集群配置文件,可以是hdfs集群也可以是hbase集群
        Configuration conf = HBaseConfiguration.create();
        //设置zookeeper集群信息
        conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
        //创建job对象
        Job job = Job.getInstance(conf);
        //给job任务取名字
        job.setJobName("给学生表创建二级索引表");

        //设置主类
        job.setJarByClass(HBaseIndexDemo1.class);

        //因为索引表的构建是建立列值与行键的映射关系,要获取所有的数据
        //scan扫描全表数据
        Scan scan = new Scan();
        //告诉输入的列值来自于哪一个列簇
        scan.addFamily(Bytes.toBytes("info"));




        /**
        * @param table  The table name to read from.
                * @param scan  The scan instance with the columns, time range etc.
         * @param mapper  The mapper class to use.
         * @param outputKeyClass  The class of the output key.
         * @param outputValueClass  The class of the output value.
         * @param job  The current job to adjust.  Make sure the passed job is
         * carrying all necessary HBase configuration.
                * @throws IOException When setting up the details fails.
                *public static void initTableMapperJob
         * (String table,Scan scan,Class<? extends TableMapper> mapper,Class<?> outputKeyClass,Class<?> outputValueClass,Job job)
         */
        //初始化Map任务
        TableMapReduceUtil.initTableMapperJob("students2",scan,MyIndexMapper.class, Text.class, NullWritable.class,job);
        //初始化Reduce任务
        TableMapReduceUtil.initTableReducerJob("students2_index",MyIndexReducer.class,job);
        //提交作业到集群中允许
        boolean b = job.waitForCompletion(true);
        if (b) {
            System.out.println("================== students2索引表构建成功!!!============================");
        } else {
            System.out.println("================== students2索引表构建失败!!!============================");
        }

    }
}

6.5 Redis创建二级索引

1.指定redis端口号

Jedis jedis = new Jedis("192.168.73.100", 12346);

2.在redis中构建映射关系(性别:学号)因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储

    public static void buildIndexInRedis() throws Exception {

        //获取要构建索引的原表
        Table students2 = conn.getTable(TableName.valueOf("students2"));

        Scan scan = new Scan();
        //获取男生的学号,放入到redis中
        //创建列值过滤器
        ValueFilter filter1 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("男")));
        scan.setFilter(filter1);
        ResultScanner resultScanner = students2.getScanner(scan);
        for (Result result : resultScanner) {
            //获取每一行的行键即可
            String id = Bytes.toString(result.getRow());
            //将学号以值的方式添加到redis键对应的值中
            //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
            jedis.sadd("性别:男", id);
        }

        //获取男生的学号,放入到redis中
        //创建列值过滤器
        ValueFilter filter2 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("女")));
        scan.setFilter(filter2);
        ResultScanner resultScanner2 = students2.getScanner(scan);
        for (Result result : resultScanner2) {
            //获取每一行的行键即可
            String id = Bytes.toString(result.getRow());
            //将学号以值的方式添加到redis键对应的值中
            //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
            jedis.sadd("性别:女", id);
        }

3.先通过查询redis中性别对应的学号,拿着学号去hbase原表中查询获取结果

package com.shujia.jinjie;

import com.shujia.utils.HBaseUtils;
import com.shujia.utils.HBaseUtils;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import redis.clients.jedis.Jedis;

import java.util.Scanner;
import java.util.Set;

/*
    使用redis第三方的存储工具存储hbase索引(本质依旧是列值与行键产生映射关系)
 */
public class HBaseWithRedisIndex {
    //1、获取hbase数据库连接对象和操作对象
    static Connection conn = HBaseUtils.CONNECTION;
    static Admin admin = HBaseUtils.ADMIN;

    //获取redis连接对象
    static Jedis jedis = new Jedis("192.168.73.100", 12346);

    public static void main(String[] args) throws Exception {
        //步骤1:在redis中构建映射关系(性别:学号)
//        buildIndexInRedis();

      //使用:先通过查询redis中性别对应的学号,拿着学号去hbase原表中查询获取结果
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入您要查询的性别:");
        String gender = sc.next();
        selectGenderFromHbase(gender);
    }

    public static void selectGenderFromHbase(String gender) throws Exception {
        if ("男".equals(gender)) {
            selectIdFromRedis(gender);
        } else if ("女".equals(gender)) {
            selectIdFromRedis(gender);
        } else {
            System.out.println("没有该性别");
        }
    }

    //单独编写一个方法查询redis
    public static void selectIdFromRedis(String gender) throws Exception {
        Table students2 = conn.getTable(TableName.valueOf("students2"));
        //redis中set查询值的方法
        Set<String> ids = jedis.smembers("性别:"+gender);
        for (String id : ids) {
//            Result result = students2.get(new Get(Bytes.toBytes(id)).addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")));
            Get get = new Get(Bytes.toBytes(id));
            //获取那一列的所有信息封装成一个结果对象
            get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
            Result result = students2.get(get);

            String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
            System.out.println("学号:" + id + ",姓名:" + name);
        }
    }

    public static void buildIndexInRedis() throws Exception {

        //获取要构建索引的原表
        Table students2 = conn.getTable(TableName.valueOf("students2"));

        Scan scan = new Scan();
        //获取男生的学号,放入到redis中
        //创建列值过滤器
        ValueFilter filter1 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("男")));
        scan.setFilter(filter1);
        ResultScanner resultScanner = students2.getScanner(scan);
        for (Result result : resultScanner) {
            //获取每一行的行键即可
            String id = Bytes.toString(result.getRow());
            //将学号以值的方式添加到redis键对应的值中
            //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
            jedis.sadd("性别:男", id);
        }

        //获取男生的学号,放入到redis中
        //创建列值过滤器
        ValueFilter filter2 = new ValueFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("女")));
        scan.setFilter(filter2);
        ResultScanner resultScanner2 = students2.getScanner(scan);
        for (Result result : resultScanner2) {
            //获取每一行的行键即可
            String id = Bytes.toString(result.getRow());
            //将学号以值的方式添加到redis键对应的值中
            //因为男生的学号有很多个,且不重复,所以我们在redis中采用set的数据类型存储
            jedis.sadd("性别:女", id);
        }


    }
}

6.6 Phoenix二级索引

对于Hbase,如果想精确定位到某行记录,唯一的办法就是通过rowkey查询。如果不通过rowkey查找数据,就必须逐行比较每一行的值,对于较大的表,全表扫描的代价是不可接受的。

1.开启索引,配置相关文件

# 关闭hbase集群
stop-hbase.sh

# 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>


# 同步到所有节点
scp hbase-site.xml node1:`pwd`
scp hbase-site.xml node2:`pwd`

# 修改phoenix目录下的bin目录中的hbase-site.xml
<property>
    <name>hbase.rpc.timeout</name>
    <value>60000000</value>
</property>
<property>
    <name>hbase.client.scanner.timeout.period</name>
    <value>60000000</value>
</property>
<property>
    <name>phoenix.query.timeoutMs</name>
    <value>60000000</value>
</property>


# 启动hbase
start-hbase.sh
# 重新进入phoenix客户端
sqlline.py master,node1,node2

6.6.1 全局索引

全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)

注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。

1.导入数据

首先创建一个shell命令,里面有建表语句,取名为DIANXIN.sql

CREATE TABLE IF NOT EXISTS DIANXIN (
     mdn VARCHAR ,
     start_date VARCHAR ,
     end_date VARCHAR ,
     county VARCHAR,
     x DOUBLE ,
     y  DOUBLE,
     bsid VARCHAR,
     grid_id  VARCHAR,
     biz_type VARCHAR, 
     event_type VARCHAR , 
     data_source VARCHAR ,
     CONSTRAINT PK PRIMARY KEY (mdn,start_date)
) column_encoded_bytes=0;

2.准备数据

DIANXIN.csv

3.输入命令

# 导入数据,Linux命令
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv

4.导入数据好的截图

5.创建全局索引
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );

6 查询数据 ( 索引未生效)
select * from DIANXIN where end_date = '20180503154014';

很明显查询需要2秒多,

7  强制使用索引 (索引生效) hint  语法糖

语法糖:/*+ INDEX(DIANXIN DIANXIN_INDEX) */
select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = '20180503154014';

很明显查询零点几秒,加了语法糖,查询速度加快

8 取索引列,(索引生效)

取我们设置的单级索引列,速度一样快
select end_date from DIANXIN where end_date = '20180503154014';

9.#创建多列索引
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );

取我们多级索引的某一个索引列单独查询速度很快的

10 查询所有列 (索引未生效)
select  * from DIANXIN where end_date = '20180503154014'  and COUNTY = '8340104';

这个语法糖写错了,就当没有写,查询需要1秒多

11 查询所有列 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX1) */ * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';

加了语法糖,只要零点几秒

12  单条件  (索引未生效)
select end_date from DIANXIN where  COUNTY = '8340103';
# 单条件  (索引生效) end_date 在前
select COUNTY from DIANXIN where end_date = '20180503154014';

因为创建多级索引是 CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY )

end_date 在前面

13 删除索引
drop index 索引名 on DIANXIN;
drop index DIANXIN_INDEX on DIANXIN;

6.6.2 本地索引

1.相关概念

本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。

注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。

 2.创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);

3. 索引生效
select grid_id from dianxin where grid_id='117285031820040';

索引生效
select * from dianxin where grid_id='117285031820040';

6.6.3 覆盖索引

1.相关概念

覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。

注意:查询是 select 的列和 where 的列都需要在索引中出现。

2.创建覆盖索引
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );

3.

# 查询所有列 (索引未生效)
select * from DIANXIN where x=117.288 and y =31.822;

# 强制使用索引 (索引生效)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ * from DIANXIN where x=117.288 and y =31.822;

# 查询索引中的列 (索引生效) mdn是DIANXIN表的RowKey中的一部分
select x,y,county from DIANXIN where x=117.288 and y =31.822;
select mdn,x,y,county from DIANXIN where x=117.288 and y =31.822;

# 查询条件必须放在索引中  select 中的列可以放在INCLUDE (将数据保存在索引中)
select /*+ INDEX(DIANXIN DIANXIN_INDEX_COVER) */ x,y,count(*) from DIANXIN group by x,y;

6.7 Phoenix JCBC

1.导入依赖

        <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-hbase-compat-2.2.5 -->
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-hbase-compat-2.2.5</artifactId>
            <version>5.1.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>5.1.3</version>
        </dependency>

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

代码

package com.shujia.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class PhoenixJDBC {
    public static void main(String[] args) throws Exception {
        //注册驱动
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");

        //创建与phoenix连接对象
        Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");

        PreparedStatement prep = conn.prepareStatement("select /*+ INDEX(DIANXIN DIANXIN_INDEX) */  * from DIANXIN where end_date = ? and start_date = ?");

        prep.setString(1,"20180503154014");
        prep.setString(2,"20180503154614");

        ResultSet resultSet = prep.executeQuery();
        while (resultSet.next()){
            /*
                 mdn VARCHAR ,
                 start_date VARCHAR ,
                 end_date VARCHAR ,
                 county VARCHAR,
                 x DOUBLE ,
                 y  DOUBLE,
                 bsid VARCHAR,
                 grid_id  VARCHAR,
                 biz_type VARCHAR,
                 event_type VARCHAR ,
                 data_source VARCHAR ,
             */
            String mdn = resultSet.getString("mdn");
            String start_date = resultSet.getString("start_date");
            String end_date = resultSet.getString("end_date");
            String county = resultSet.getString("county");
            double x = resultSet.getDouble("x");
            double  y = resultSet.getDouble("y");
            String bsid = resultSet.getString("bsid");
            String grid_id = resultSet.getString("grid_id");
            String biz_type = resultSet.getString("biz_type");
            String event_type = resultSet.getString("event_type");
            String data_source = resultSet.getString("data_source");

            System.out.println(mdn+","+start_date+","+end_date+","+county+","+x+","+y+","+bsid
                    +","+grid_id+","+biz_type+","+event_type+","+data_source);
        }

        //释放资源
        prep.close();
        conn.close();

    }
}

七 HBase的调优