代码 ---
public class Util {
public static String getRegNo(String callerId , String callTime){
//区域00-99
int hash = (callerId + callTime.substring(0, 6)).hashCode();
hash =(hash & Integer.MAX_VALUE) % 100;
//hash区域号
DecimalFormat df = new DecimalFormat();
df.applyPattern("00");
String regNo = df.format(hash);
return regNo ;
}
public class CalleeLogRegionObserver extends BaseRegionObserver{
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
FileWriter fw = new FileWriter("/home/centos/kkk.txt",true);
super.postPut(e, put, edit, durability);
//
String tableName0 = TableName.valueOf("ns1:calllogs").getNameAsString();
//得到当前的TableName对象
String tableName1 = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
fw.write(tableName1 + "\r\n");
if(!tableName0.equals(tableName1)){
return ;
}
//得到主叫的rowkey
//xx , callerid , time , direction, calleid ,duration
//被叫:calleid,time,
String rowkey = Bytes.toString(put.getRow());
String[] arr = rowkey.split(",");
if(arr[3].equals("1")){
return ;
}
String hash = Util.getRegNo(arr[4], arr[2]);
//hash
String newRowKey = hash + "," + arr[4] + "," + arr[2] + ",1," + arr[1] + "," + arr[5];
Put newPut = new Put(Bytes.toBytes(newRowKey));
newPut.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("dummy"), Bytes.toBytes("no"));
TableName tn = TableName.valueOf("ns1:calllogs");
Table t = e.getEnvironment().getTable(tn);
//
fw.write(t.getName().getNameAsString() + "\r\n");
t.put(newPut);
fw.close();
}
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
super.preGetOp(e, get, results);
}
-- 配置
<!-- 启用完全分布式 -->
hbase.cluster.distributed
true
<!-- 指定hbase数据在hdfs上的存放路径 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://mycluster/hbase</value>
</property>
<!-- 配置zk地址 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>s101:2181,s102:2181,s103:2181</value>
</property>
<!-- zk的本地目录 -->
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/centos/zookeeper</value>
</property>
<!--协处理器 -->
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.tjx.hbasedemo.coprocessor.CalleeLogRegionObserver</value>
</property>
我再重启hbase 的时候没有在主目录下 生成 kkk.txt 文件,也没有报错,就是调用不了协处理器器
------------------------------------------------------------插入代码
@Test
public void testPut() throws Exception{
//创建conf对象
Configuration conf = HBaseConfiguration.create();
//通过连接工厂创建连接对象
Connection conn = ConnectionFactory.createConnection(conf);
//通过连接查询tableName对象
TableName tname = TableName.valueOf("ns1:calllogs");
//获得table
Table table = conn.getTable(tname);
//主叫
String callerId = "13845456767";
//被叫
String calleeId = "13989898787";
SimpleDateFormat sdf = new SimpleDateFormat();
sdf.applyPattern("yyyyMMddHHmmss");
//通话时间
String callTime = sdf.format(new Date());
//通话时长
int duration = 100 ;
DecimalFormat dff = new DecimalFormat();
dff.applyPattern("00000");
String durStr = dff.format(duration);
// hash 区域00 -- 99
int hash = (calleeId + callTime.substring(0,6)).hashCode();
hash = hash & Integer.MAX_VALUE % 100;
DecimalFormat df = new DecimalFormat();
df.applyPattern("00");
String regNo = df.format(hash);
// 拼接rowkey
String rowkey = regNo +","+callerId+","+callTime+","+"0"+","+calleeId+","+durStr ;
byte[] rowid = Bytes.toBytes(rowkey);
//创建put对象
Put put = new Put(rowid);
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callerPos"),Bytes.toBytes("河北"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("calleePos"),Bytes.toBytes("河南"));
//执行插入
table.put(put);
System.out.println("over");
}
(一)Hbase协处理器的前世今生
Hbase是仿照Google的BigTable设计的,而其协处理器也是仿照BigTable的协处理实现完成的,具体链接可
参考:http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf
(二)什么是Hbase协处理器(Coprocessors )?
Hbase的协处理器在Hbase中属于高级的应用功能,它可以让开发者自定义的代码在服务器端执行,来完成特定的一些功能。
(三)为什么要用协处理器?
Hbase是一款高效的基于KV的NOSQL数据库,它有非常多的优点,但是也有不少缺点,hbase的设计全在rowkey上,所有能够高效的查询全是基于rowkey的,除了rowkey的设计之外,我们可能还有
一些其他的功能,如
(1)访问权限控制
(2)引用完整性,基于外键检验数据,
(3)给hbase设计二级索引,从而提高基于列过滤时的查询性能,
(4)像监控MySQL的binlog一样,监控hbase的wal预写log
(5)服务端自定义实现一些聚合函数的功能
(6).......
这样额外的功能,使用hbase的协处理来处理是非常方便的
(四)Hbase中协处理器的分类
在Hbase里面有两类Coprocessors :
1,基于Observer的Coprocessors ,类似于关系型数据库的触发器,可用来实现上面提到的功能中的1,2,3,4功能
常用的Observer:
RegionServerObserver 能够切面监测rowkey的数据的访问与删除
BaseMasterAndRegionObserver 能够切面监测hbase表的创建,删除,sheml修改
BaseWALObserver 能够切面监测hbase的wal的log写入
2,基于Endpoint的Coprocessors ,类似于关系型数据库的存储过程,可用来实现上面提到的功能中的5功能
在hbase0.96之后,采用probuff序列化通信的RPC数据,使用endpoint,需要同过protoc生成相关的service接口的java类
然后继承自己的生成的Service类并且实现hbase的Coprocessor, CoprocessorService接口,从而重写其中的业务方法构建一个
自定义的Endpoint的Coprocessors
协处理器的编程概念与Spring AOP的理念很相似,它也像MapReduce的数据运算方式,与本地local的数据产生计算,而不是远程读取数据再计算,
通过local计算的方式与RegionServer绑定,从而能提升数据计算的效率。
(5)协处理器的安装使用
Java代码
hbase.coprocessor.region.classes for RegionObservers and Endpoints.
hbase.coprocessor.wal.classes for WALObservers.
hbase.coprocessor.master.classes for MasterObservers.
在hbase官网文档中,介绍了两种使用方式:
静态方式(系统级),使用配置文件:
1,编写协处理器,并打成一个jar包,加入hbase/lib目录下,或者在hbase-env.sh里面配置相对应的jar,以及依赖的jar的路径
2,加入静态的配置,在hbase-site.xml里配置主类
Xml代码
hbase.coprocessor.region.classes
org.myname.hbase.coprocessor.endpoint.SumEndPoint
3,把依赖的jar分发到每一个regionserver上,然后重启hbase,
协处理生效,是系统级的协处理器
动态方式(表级别),使用Hbase shell:
1,编写协处理器,打成一个jar包,上传至HDFS,将依赖的jar拷贝到hbase的lib下,配置hbase-env.sh指定依赖jar的
2,建立表:
Java代码
create 'c', NAME=>'cf'
3,禁用表
Java代码
disable 'c'
4,指定协处理器的jar
Java代码
alter 'c', METHOD => 'table_att', 'coprocessor'=>'hdfs:///user/hbase_solr/hbase-increment-index.jar|com.hbase.easy.index.HbaseSolrIndexCoprocesser|1001|'
5,激活表
Java代码
enable 'c'
6,删除协处理jar
如果有多个协处理器,按照$1 $2 $n删除指定的jar配置
Java代码
alter 'c',METHOD => 'table_att_unset',NAME =>'coprocessor$1'
最后说一下,hbase的官方文档指出动态级别的协处理器,可以做到不重启hbase,更新协处理,做法就是
禁用表,卸载协处理器,重新指定协处理器, 激活表,即可,但实际测试发现
动态加载无效,是hbase的一个bug,看这个链接:
https://issues.apache.org/jira/browse/HBASE-8445
因为协处理器,已经被JVM加载,即使删除jar也不能重新load的jar,因为cache里面的hdfs的jar路径,没有变化,所以动态更新无效
,除非重启JVM,那样就意味着,需要重启RegionServer,
里面的小伙伴们指出了两种办法,使协处理器加载生效:
(1)滚动重启regionserver,避免停掉所有的节点
(2)改变协处理器的jar的类名字或者hdfs加载路径,以方便有新的ClassLoad去加载它
但总体来看,第2种方法,比较安全,第一种风险太大,一般情况下没有人会随便滚动重启线上的服务器的,这只在hbase升级的时候使用