Observer協(xié)處理器通常在一個(gè)特定的事件(諸如Get或Put)之前或之后發(fā)生,相當(dāng)于RDBMS中的觸發(fā)器。Endpoint協(xié)處理器則類似于RDBMS中的存儲(chǔ)過程,因?yàn)樗梢宰屇阍赗egionServer上對數(shù)據(jù)執(zhí)行自定義計(jì)算,而不是在客戶端上執(zhí)行計(jì)算。
1 協(xié)處理器簡介
如果要統(tǒng)計(jì)HBase中的數(shù)據(jù),比如統(tǒng)計(jì)某個(gè)字段的最大值、統(tǒng)計(jì)滿足某種條件的記錄數(shù)、統(tǒng)計(jì)各種記錄的特點(diǎn)并按照記錄特點(diǎn)分類等等,常規(guī)的做法是把HBase中整個(gè)表的數(shù)據(jù)Scan出來,或者加一個(gè)Filter,進(jìn)行一些初步的過濾,然后在客戶端進(jìn)行統(tǒng)計(jì)處理。但是這么做會(huì)有很大的副作用,比如占用大量的網(wǎng)絡(luò)帶寬(大數(shù)據(jù)量尤為明顯),RPC的壓力也是不容小覷的。
HBase作為列式數(shù)據(jù)庫最經(jīng)常被人詬病的特性包括:無法輕易建立“二級索引”,難以執(zhí)行求和、計(jì)數(shù)、排序等操作。比如,在舊版本的(《0.92)HBase中,統(tǒng)計(jì)數(shù)據(jù)表的總行數(shù),需要使用Counter方法,執(zhí)行一次MapReduce Job才能得到。雖然HBase在數(shù)據(jù)存儲(chǔ)層中集成了MapReduce,能夠有效進(jìn)行數(shù)據(jù)表的分布式計(jì)算,然而在很多情況下,做一些簡單的相加或者聚合計(jì)算的時(shí)候,如果直接將計(jì)算過程放置在server端,能夠減少網(wǎng)絡(luò)開銷,從而獲得很好的性能提升。于是,HBase在0.92之后引入了協(xié)處理器(coprocessors),實(shí)現(xiàn)了一些激動(dòng)人心的新特性:能夠輕易建立二次索引、復(fù)雜過濾器以及訪問控制等。
簡單理解來說,協(xié)處理器是HBase讓用戶的部分邏輯在數(shù)據(jù)存放端即HBase服務(wù)端進(jìn)行計(jì)算的機(jī)制,它允許用戶在HBase服務(wù)端運(yùn)行自己的代碼。
2 協(xié)處理器的分類
協(xié)處理器分為兩種類型:系統(tǒng)協(xié)處理器可以全局導(dǎo)入Region Server上的所有數(shù)據(jù)表,表協(xié)處理器是用戶可以指定一張表使用的協(xié)處理器。協(xié)處理器框架為了更好支持其行為的靈活性,提供了兩個(gè)不同方面的插件。一個(gè)是觀察者(Observer),類似于關(guān)系數(shù)據(jù)庫的觸發(fā)器。另一個(gè)是終端(Endpoint),動(dòng)態(tài)的終端有點(diǎn)像存儲(chǔ)過程。
Observer的設(shè)計(jì)意圖是允許用戶通過插入代碼來重載協(xié)處理器框架的upcall方法,而具體的事件觸發(fā)的callback方法由HBase的核心代碼來執(zhí)行。協(xié)處理器框架處理所有的callback調(diào)用細(xì)節(jié),協(xié)處理器自身只需要插入添加或者改變的功能。
Endpoint是動(dòng)態(tài)RPC插件的接口,它的實(shí)現(xiàn)代碼被安裝在服務(wù)器端,從而能夠通過HBase RPC喚醒??蛻舳祟悗焯峁┝朔浅7奖愕姆椒▉碚{(diào)用這些動(dòng)態(tài)接口,它們可以在任意時(shí)候調(diào)用一個(gè)終端,它們的實(shí)現(xiàn)代碼會(huì)被目標(biāo)Region遠(yuǎn)程執(zhí)行,結(jié)果會(huì)返回到終端。用戶可以結(jié)合使用這些強(qiáng)大的插件接口,為HBase添加全新的特性。
3 Protocol Buffer的使用
由于下面的Endpoint編碼示例使用了Google公司的混合語言數(shù)據(jù)標(biāo)準(zhǔn)Protocol Buffer,所以首先了解一下這個(gè)常用于RPC系統(tǒng)的工具。
3.1 ProtocolBuffer介紹
Protocol Buffer是一種輕便高效的結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)格式,可以用于結(jié)構(gòu)化數(shù)據(jù)串行化,很適合做數(shù)據(jù)存儲(chǔ)或RPC數(shù)據(jù)交換格式。它可用于通訊協(xié)議、數(shù)據(jù)存儲(chǔ)等領(lǐng)域的語言無關(guān)、平臺(tái)無關(guān)、可擴(kuò)展的序列化結(jié)構(gòu)數(shù)據(jù)格式。目前提供了C++、Java、Python三種語言的 API。
為什么要使用Protocol Buffer呢?先看一個(gè)在實(shí)際開發(fā)中經(jīng)常會(huì)遇到的系統(tǒng)場景:我們的客戶端程序是使用Java開發(fā)的,可能運(yùn)行自不同的平臺(tái),如Linux、Windows或者是Android,而我們的服務(wù)器程序通常是基于Linux平臺(tái)并使用C++開發(fā)完成的。在這兩種程序之間進(jìn)行數(shù)據(jù)通訊時(shí)存在多種方式用于設(shè)計(jì)消息格式,如:
1、直接傳遞C/C++語言中字節(jié)對齊的結(jié)構(gòu)體數(shù)據(jù),只要結(jié)構(gòu)體的聲明為定長格式,那么該方式對于C/C++程序而言就非常方便了,僅需將接收到的數(shù)據(jù)按照結(jié)構(gòu)體類型強(qiáng)行轉(zhuǎn)換即可。事實(shí)上對于變長結(jié)構(gòu)體也不會(huì)非常麻煩。在發(fā)送數(shù)據(jù)時(shí),也只需定義一個(gè)結(jié)構(gòu)體變量并設(shè)置各個(gè)成員變量的值之后,再以char*的方式將該二進(jìn)制數(shù)據(jù)發(fā)送到遠(yuǎn)端。反之,該方式對于Java開發(fā)者而言就會(huì)非常繁瑣,首先需要將接收到的數(shù)據(jù)存于ByteBuffer之中,再根據(jù)約定的字節(jié)序逐個(gè)讀取每個(gè)字段,并將讀取后的值再賦值給另外一個(gè)值對象中的域變量,以便于程序中其他代碼邏輯的編寫。對于該類型程序而言,聯(lián)調(diào)的基準(zhǔn)是必須客戶端和服務(wù)器雙方均完成了消息報(bào)文構(gòu)建程序的編寫后才能展開,而該設(shè)計(jì)方式將會(huì)直接導(dǎo)致Java程序開發(fā)的進(jìn)度過慢。即便是Debug階段,也會(huì)經(jīng)常遇到Java程序中出現(xiàn)各種域字段拼接的小錯(cuò)誤。
2、使用SOAP協(xié)議(WebService)作為消息報(bào)文的格式載體,由該方式生成的報(bào)文是基于文本格式的,同時(shí)還存在大量的XML描述信息,因此將會(huì)大大增加網(wǎng)絡(luò)IO的負(fù)擔(dān)。又由于XML解析的復(fù)雜性,這也會(huì)大幅降低報(bào)文解析的性能。總之,使用該設(shè)計(jì)方式將會(huì)使系統(tǒng)的整體運(yùn)行性能明顯下降。
對于以上兩種方式所產(chǎn)生的問題,Protocol Buffer均可以很好的解決,不僅如此,Protocol Buffer還有一個(gè)非常重要的優(yōu)點(diǎn)就是可以保證同一消息報(bào)文新舊版本之間的兼容性。
3.2 安裝Protocol Buffer
// 在https://developers.google.com/protocol-buffers/docs/downloads下載protobuf-2.6.1.tar.gz后解壓至指定目錄
$ tar -xvf protobuf-2.6.1.tar.gz -C app/
// 刪除壓縮包
$ rm protobuf-2.6.1.tar.gz
// 安裝c++編譯器相關(guān)包
$ sudo apt-get install g++
// 編譯安裝protobuf
$ cd app/protobuf-2.6.1/
$ 。/configure
$ make
$ make check
$ sudo make install
// 添加到lib
$ vim ~/.bashrc
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
$ source ~/.bashrc
// 驗(yàn)證是否安裝成功
$ protoc --version
3.3 編寫proto文件
首先需要編寫一個(gè) proto 文件,定義程序中需要處理的結(jié)構(gòu)化數(shù)據(jù)。proto 文件非常類似java或者C語言的數(shù)據(jù)定義。如下代碼給出了示例中定義RPC接口的 endpoint.proto文件內(nèi)容:
[plain] view plain copy// 定義常用選項(xiàng)
option java_package = “com.hbase.demo.endpoint”; //指定生成Java代碼的包名
option java_outer_classname = “Sum”; //指定生成Java代碼的外部類名稱
option java_generic_services = true; //基于服務(wù)定義產(chǎn)生抽象服務(wù)代碼
option optimize_for = SPEED; //指定優(yōu)化級別
// 定義請求包
message SumRequest {
required string family = 1; //列族
required string column = 2; //列名
}
// 定義回復(fù)包
message SumResponse {
required int64 sum = 1 [default = 0]; //求和結(jié)果
}
// 定義RPC服務(wù)
service SumService {
//獲取求和結(jié)果
rpc getSum(SumRequest)
returns (SumResponse);
}
3.4 編譯proto文件
// 將proto文件編譯生成java代碼
$ protoc endpoint.proto --java_out=。/
// 生成的文件Sum.java如下圖所示:
4 Endpoint編碼示例
業(yè)務(wù)邏輯如求和、排序等功能放在服務(wù)端,在服務(wù)端完成計(jì)算后將結(jié)果發(fā)送給客戶端,可以減少數(shù)據(jù)的傳輸量。下面的示例將在HBase的服務(wù)端生成一個(gè)RPC服務(wù),即在服務(wù)端對指定表的指定列值進(jìn)行求和計(jì)算,并將計(jì)算結(jié)果返回給客戶端??蛻舳苏{(diào)用該RPC服務(wù),獲取響應(yīng)結(jié)果后輸出。
4.1 服務(wù)端代碼
首先,將通過Protocol Buffer生成的RPC接口文件Sum.java導(dǎo)入項(xiàng)目,然后在項(xiàng)目中新建類SumEndPoint編寫服務(wù)端代碼:
[java] view plain copypackage com.hbase.demo.endpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.hbase.demo.endpoint.Sum.SumRequest;
import com.hbase.demo.endpoint.Sum.SumResponse;
import com.hbase.demo.endpoint.Sum.SumService;
/**
* @author developer
* 說明:hbase協(xié)處理器endpooint的服務(wù)端代碼
* 功能:繼承通過protocol buffer生成的rpc接口,在服務(wù)端獲取指定列的數(shù)據(jù)后進(jìn)行求和操作,最后將結(jié)果返回客戶端
*/
public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService {
private RegionCoprocessorEnvironment env; // 定義環(huán)境
@Override
public Service getService() {
return this;
}
@Override
public void getSum(RpcController controller, SumRequest request, RpcCallback《SumResponse》 done) {
// 定義變量
SumResponse response = null;
InternalScanner scanner = null;
// 設(shè)置掃描對象
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
// 掃描每個(gè)region,取值后求和
try {
scanner = env.getRegion().getScanner(scan);
List《Cell》 results = new ArrayList《Cell》();
boolean hasMore = false;
Long sum = 0L;
do {
hasMore = scanner.next(results);
for (Cell cell : results) {
sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));
}
results.clear();
} while (hasMore);
// 設(shè)置返回結(jié)果
response = SumResponse.newBuilder().setSum(sum).build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 將rpc結(jié)果返回給客戶端
done.run(response);
}
// 協(xié)處理器初始化時(shí)調(diào)用的方法
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException(“no load region”);
}
}
// 協(xié)處理器結(jié)束時(shí)調(diào)用的方法
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
}
4.2 客戶端代碼
在項(xiàng)目中新建類SumClient作為調(diào)用RPC服務(wù)的客戶端測試程序,代碼如下:
[java] view plain copypackage com.hbase.demo.endpoint;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import com.google.protobuf.ServiceException;
import com.hbase.demo.endpoint.Sum.SumRequest;
import com.hbase.demo.endpoint.Sum.SumResponse;
import com.hbase.demo.endpoint.Sum.SumService;
/**
* @author developer
* 說明:hbase協(xié)處理器endpooint的客戶端代碼
* 功能:從服務(wù)端獲取對hbase表指定列的數(shù)據(jù)的求和結(jié)果
*/
public class SumClient {
public static void main(String[] args) throws ServiceException, Throwable {
long sum = 0L;
// 配置HBse
Configuration conf = HBaseConfiguration.create();
conf.set(“hbase.zookeeper.quorum”, “l(fā)ocalhost”);
conf.set(“hbase.zookeeper.property.clientPort”, “2222”);
// 建立一個(gè)數(shù)據(jù)庫的連接
Connection conn = ConnectionFactory.createConnection(conf);
// 獲取表
HTable table = (HTable) conn.getTable(TableName.valueOf(“sum_table”));
// 設(shè)置請求對象
final SumRequest request = SumRequest.newBuilder().setFamily(“info”).setColumn(“score”).build();
// 獲得返回值
Map《byte[], Long》 result = table.coprocessorService(SumService.class, null, null,
new Batch.Call《SumService, Long》() {
@Override
public Long call(SumService service) throws IOException {
BlockingRpcCallback《SumResponse》 rpcCallback = new BlockingRpcCallback《SumResponse》();
service.getSum(null, request, rpcCallback);
SumResponse response = (SumResponse) rpcCallback.get();
return response.hasSum() ? response.getSum() : 0L;
}
});
// 將返回值進(jìn)行迭代相加
for (Long v : result.values()) {
sum += v;
}
// 結(jié)果輸出
System.out.println(“sum: ” + sum);
// 關(guān)閉資源
table.close();
conn.close();
}
}
4.3 加載Endpoint
// 將Sum類和SumEndPoint類打包后上傳到HDFS
$ hadoopfs -put endpoint_sum.jar /input
// 修改hbase配置文件,添加配置
$ vimapp/hbase-1.2.0-cdh5.7.1/conf/hbase-site.xml
[html] view plain copy《property》
《name》hbase.table.sanity.checks《/name》
《value》false《/value》
《/property》
// 重啟hbase
$stop-hbase.sh
$start-hbase.sh
// 啟動(dòng)hbase shell
$hbase shell
// 創(chuàng)建表sum_table
》 create‘sum_table’,‘info’
// 插入測試數(shù)據(jù)
》 put‘sum_table’,‘rowkey01’,‘info:score’,‘95’
》 put‘sum_table’,‘rowkey02’,‘info:score’,‘98’
》 put‘sum_table’,‘rowkey02’,‘info:age’,‘20’
// 查看數(shù)據(jù)
》 scan‘sum_table’
// 加載協(xié)處理器
》disable ‘sum_table’
》 alter‘sum_table’,METHOD =》‘table_att’,‘coprocessor’ =》‘hdfs://localhost:9000/input/endpoint_sum.jar|com.hbase.demo.endpoint.SumEndPoint|100’
》enable ‘sum_table’
// 如果要卸載協(xié)處理器,可以先查看表中協(xié)處理器名,然后通過命令卸載
》disable ‘sum_table’
》 describe‘sum_table’
》 alter‘sum_table’,METHOD =》‘table_att_unset’,NAME=》‘coprocessor$1’
》 enable‘sum_table’
4.4 測試
在eclipse中運(yùn)行客戶端程序SumClient,輸出結(jié)果為193,正好符合預(yù)期,如下圖所示:
5 Observer編碼示例
一般來說,對數(shù)據(jù)庫建立索引,往往需要單獨(dú)的數(shù)據(jù)結(jié)構(gòu)來存儲(chǔ)索引的數(shù)據(jù)。在hbase表中,除了使用rowkey索引數(shù)據(jù)外,還可以另外建立一張索引表,查詢時(shí)先查詢索引表,然后用查詢結(jié)果查詢數(shù)據(jù)表。下面這個(gè)示例演示如何使用Observer協(xié)處理器生成HBase表的二級索引:將數(shù)據(jù)表ob_table中列info:name的值作為索引表index_ob_table的rowkey,將數(shù)據(jù)表ob_table中列info:score的值作為索引表index_ob_table中列info:score的值建立二級索引,當(dāng)用戶向數(shù)據(jù)表中插入數(shù)據(jù)時(shí),索引表將自動(dòng)插入二級索引,從而為查詢業(yè)務(wù)數(shù)據(jù)提供了便利。
5.1 代碼
在項(xiàng)目中新建類PutObserver作為Observer協(xié)處理器應(yīng)用邏輯類,代碼如下:
[java] view plain copypackage com.hbase.demo.observer;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
/**
* @author developer
* 說明:hbase協(xié)處理器observer的應(yīng)用邏輯代碼
* 功能:在應(yīng)用了該observer的hbase表中,所有的put操作,都會(huì)將每行數(shù)據(jù)的info:name列值作為rowkey、info:score列值作為value
* 寫入另一張二級索引表index_ob_table,可以提高對于特定字段的查詢效率
*/
@SuppressWarnings(“deprecation”)
public class PutObserver extends BaseRegionObserver{
@Override
public void postPut(ObserverContext《RegionCoprocessorEnvironment》 e,
Put put, WALEdit edit, Durability durability) throws IOException {
// 獲取二級索引表
HTableInterface table = e.getEnvironment().getTable(TableName.valueOf(“index_ob_table”));
// 獲取值
List《Cell》 cellList1 = put.get(Bytes.toBytes(“info”), Bytes.toBytes(“name”));
List《Cell》 cellList2 = put.get(Bytes.toBytes(“info”), Bytes.toBytes(“score”));
// 將數(shù)據(jù)插入二級索引表
for (Cell cell1 : cellList1) {
// 列info:name的值作為二級索引表的rowkey
Put indexPut = new Put(CellUtil.cloneValue(cell1));
for (Cell cell2 : cellList2) {
// 列info:score的值作為二級索引表中列info:score的值
indexPut.add(Bytes.toBytes(“info”), Bytes.toBytes(“score”), CellUtil.cloneValue(cell2));
}
// 數(shù)據(jù)插入二級索引表
table.put(indexPut);
}
// 關(guān)閉資源
table.close();
}
}
5.2 加載Observer
// 將PutObserver類打包后上傳到HDFS
$ hadoopfs -put ovserver_put.jar /input
// 啟動(dòng)hbase shell
$hbase shell
// 創(chuàng)建數(shù)據(jù)表ob_table
》 create‘ob_table’,‘info’
// 創(chuàng)建二級索引表ob_table
》 create‘index_ob_table’,‘info’
// 加載協(xié)處理器
》disable ‘ob_table’
》 alter‘ob_table’,METHOD =》‘table_att’,‘coprocessor’ =》‘hdfs://localhost:9000/input/observer_put.jar|com.hbase.demo.observer.PutObserver|100’
》 enable‘ob_table’
// 查看數(shù)據(jù)表ob_table
》 describe‘ob_table’
5.3 測試
// 在eclipse項(xiàng)目中編寫一個(gè)客戶端,向數(shù)據(jù)表ob_table中插入測試數(shù)據(jù)
[java] view plain copypackage com.hbase.demo.observer;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class Test {
public static void main(String[] args) throws IOException {
// 配置HBse
Configuration conf = HBaseConfiguration.create();
conf.set(“hbase.zookeeper.quorum”, “l(fā)ocalhost”);
conf.set(“hbase.zookeeper.property.clientPort”, “2222”);
// 建立一個(gè)數(shù)據(jù)庫的連接
Connection conn = ConnectionFactory.createConnection(conf);
// 獲取表
HTable table = (HTable) conn.getTable(TableName.valueOf(“ob_table”));
// 插入測試數(shù)據(jù)
Put put = new Put(Bytes.toBytes(“rowkey01”));
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”), Bytes.toBytes(“carl”));
put.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“score”), Bytes.toBytes(“92”));
table.put(put);
// 關(guān)閉資源
table.close();
conn.close();
}
}
// 插入數(shù)據(jù)后,在hbase shell中查看數(shù)據(jù)表ob_table中的數(shù)據(jù)
$hbase shell
》 scan‘ob_table’
//在hbase shell中查看二級索引表index_ob_table中的數(shù)據(jù)
》 scan‘index_ob_table’
評論
查看更多