HBase データソースは、HBase からの読み取りおよび HBase への書き込みの両方をサポートします。本トピックでは、DataWorks の HBase データソースにおけるデータ同期機能について説明します。
サポートされるバージョン
HBase プラグインには、標準 HBase プラグインと HBase{xx}xsql プラグインの 2 種類があります。HBase{xx}xsql プラグインを使用するには、HBase および Phoenix の両方が必要です。
HBase プラグイン:
このプラグインは
HBase0.94.x、HBase1.1.x、およびHBase2.xをサポートします。ウィザードモードおよびスクリプトモードの両方をサポートします。hbaseVersionパラメーターを使用して、バージョンを指定します。HBase0.94.xを使用している場合、Reader プラグインおよび Writer プラグインの両方でhbaseVersionを094xに設定します。"reader": { "hbaseVersion": "094x" }"writer": { "hbaseVersion": "094x" }HBase1.1.xまたはHBase2.xを使用している場合、Reader プラグインおよび Writer プラグインの両方でhbaseVersionを11xに設定します。"reader": { "hbaseVersion": "11x" }"writer": { "hbaseVersion": "11x" }HBase 1.1.x プラグインは HBase 2.0 と互換性があります。
HBase{xx}xsql プラグイン:
HBase20xsql プラグイン:
HBase2.xおよびPhoenix5.xをサポートします。スクリプトモードのみをサポートします。HBase11xsql プラグイン:
HBase1.1.xおよびPhoenix5.xをサポートします。スクリプトモードのみをサポートします。HBase{xx}xsql Writer プラグインを使用すると、HBase 内の SQL テーブル(Phoenix)へ大量のデータを一括インポートできます。Phoenix は行キー(rowkey)に対してデータエンコーディングを適用します。HBase API を直接使用してデータを書き込む場合、手動でのデータ変換が必要となり、複雑かつエラーが発生しやすくなります。HBase{xx}xsql Writer プラグインはこのプロセスを簡素化し、SQL テーブルへのデータインポートを容易に実現します。
説明このプラグインは Phoenix JDBC ドライバーを使用して UPSERT 文を実行し、データをバッチ単位でテーブルに書き込みます。この高レベルインターフェイスを介して動作するため、対応するインデックステーブルも同期的に更新されます。
制限事項
HBase Reader | HBase20xsql Reader | HBase11xsql Writer |
|
|
|
サポートされる機能
HBase Reader
HBase Reader は normal モードおよび multiVersionFixedColumn モードをサポートします。
normalモード: HBase テーブルを標準的な 2 次元テーブルとして扱い、最新バージョンのデータを取得します。hbase(main):017:0> scan 'users' ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0580 seconds以下の表に、出力結果を示します。
rowKey
address:city
address:contry
address:province
info:age
info:birthday
info:company
lisi
beijing
china
beijing
27
1987-06-17
baidu
xiaoming
hangzhou
china
zhejiang
29
1987-06-17
alibaba
multiVersionFixedColumnモード: HBase テーブルを縦方向のテーブルとして扱います。各レコードはrowKey、family:qualifier、timestamp、およびvalueの 4 つの列で構成されます。読み取る列を明示的に指定する必要があります。このモードでは、各セルの値を個別のレコードとして扱います。セルに複数のバージョンがある場合、各バージョンに対して個別のレコードが生成されます。hbase(main):018:0> scan 'users',{VERSIONS=>5} ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:age, timestamp=1457082178630, value=24 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0260 seconds以下の表に、出力結果を示します。
rowKey
column:qualifier
timestamp
Value
lisi
address:city
1457101972764
beijing
lisi
address:contry
1457102773908
china
lisi
address:province
1457101972736
beijing
lisi
info:age
1457101972548
27
lisi
info:birthday
1457101972604
1987-06-17
lisi
info:company
1457101972653
baidu
xiaoming
address:city
1457082196082
hangzhou
xiaoming
address:contry
1457082195729
china
xiaoming
address:province
1457082195773
zhejiang
xiaoming
info:age
1457082218735
29
xiaoming
info:age
1457082178630
24
xiaoming
info:birthday
1457082186830
1987-06-17
xiaoming
info:company
1457082189826
alibaba
HBase Writer
HBase Writer は、ソースの複数の列を連結することで
rowKeyを生成できます。HBase Writer は、以下の方法でデータの
version(タイムスタンプ)を設定できます:現在時刻を使用する。
ソース列の値を使用する。
ユーザーが指定した時刻を使用する。
サポートされるフィールドの型
バッチ読み取り
以下の表に、HBase Reader のデータ型マッピングを示します。
型
Data Integration の列型
データベースのデータ型
整数
long
short、int、および long
浮動小数点数
double
float および double
文字列
string
binary_string および string
日付および時刻
date
date
バイト
bytes
bytes
ブール値
boolean
boolean
HBase20xsql Reader は、Phoenix のデータ型のうち、ほとんどの型をサポートしていますが、すべての型をサポートしているわけではありません。使用前に、対象のデータ型がサポートされていることを確認してください。
以下の表に、HBase20xsql Reader が Phoenix のデータ型に対して使用する型マッピングを示します。
DataX 内部型
Phoenix のデータ型
long
INTEGER、TINYINT、SMALLINT、および BIGINT
double
FLOAT、DECIMAL、および DOUBLE
string
CHAR および VARCHAR
date
DATE、TIME、および TIMESTAMP
bytes
BINARY および VARBINARY
boolean
BOOLEAN
バッチ書き込み
以下の表に、HBase Writer のデータ型マッピングを示します。
column設定が、HBase テーブル内の対応する列型と一致していることを確認してください。以下の表に記載されているデータ型のみがサポートされます。
型 | データベースのデータ型 |
整数 | INT、LONG、および SHORT |
浮動小数点数 | FLOAT および DOUBLE |
ブール値 | BOOLEAN |
文字列 | STRING |
考慮事項
接続性テスト時にエラーメッセージ "tried to access method com.google.common.base.Stopwatch" が表示された場合、データソースの構成に hbaseVersion プロパティを追加し、HBase のバージョンを指定してください。
データソースの追加
DataWorks で同期タスクを開発する前に、データソース管理 の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールで パラメーターの説明を表示することで、各パラメーターの意味を確認できます。
データ同期タスク
同期タスクの設定に関するエントリポイントおよび手順については、以下の設定ガイドをご参照ください。
単一テーブルのオフライン同期タスク
設定手順については、「コードレス UI によるタスクの設定」および「コードエディタによるタスクの設定」をご参照ください。
デフォルトでは、HBase はスキーマレスのデータソースであるため、ウィザードモードでは [フィールドマッピング] セクションが表示されません。そのため、フィールドマッピングを手動で設定する必要があります:
HBase をデータソースとして使用する場合、ソースフィールド を次の形式で設定します:
data_type|column_family:column_name。HBase をデータ宛先として使用する場合、宛先フィールド および rowkey の両方を設定します。宛先フィールド には、
source_field_index|data_type|column_family:column_nameの形式を使用します。rowkey には、source_primary_key_index|data_typeの形式を使用します。
説明各フィールドは別行に記述する必要があります。
スクリプトモードにおけるパラメーターの完全な一覧およびスクリプトのサンプルについては、「付録:スクリプトのデモおよびパラメーター」をご参照ください。
よくある質問
Q: 推奨される同時実行数の設定はどれですか。インポートが遅い場合、同時実行数を増やすと改善しますか?
A: データインポートプロセスのデフォルト JVM ヒープサイズは 2 GB です。同時実行数(チャンネル数)はマルチスレッドで実装されています。ただし、過剰なスレッドを作成しても、必ずしもインポート速度が向上するわけではなく、頻繁なガーベジコレクション(GC)によりパフォーマンスが低下する可能性があります。ベストプラクティスとして、同時実行数(チャンネル数)を 5~10 に設定することを推奨します。
Q: 推奨される
batchSizeの設定はどれですか?A: デフォルト値は 256 ですが、平均行サイズに基づいて最適な
batchSizeを算出する必要があります。ベストプラクティスとして、1 バッチあたりの合計データサイズを 2 MB~4 MB にすることを推奨します。この目標サイズを平均行サイズで割ることで、適切なbatchSizeを算出できます。
付録:スクリプトのデモおよびパラメーター
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプトフォーマット要件に従って、スクリプト内で関連するパラメーターを設定する必要があります。詳細については、「コードエディタによるタスクの設定」をご参照ください。以下に、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要のあるパラメーターについて説明します。
HBase Reader スクリプトのデモ
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"hbase",// プラグイン名。
"parameter":{
"mode":"normal",// HBase からデータを読み取るモード。有効な値:normal および multiVersionFixedColumn。
"scanCacheSize":"256",// RPC ごとにクライアントがサーバーから読み取る行数。
"scanBatchSize":"100",// RPC ごとにクライアントがサーバーから読み取る列数。
"hbaseVersion":"094x/11x",// HBase のバージョン。
"column":[// 読み取る列。
{
"name":"rowkey",// 列名。
"type":"string"// データ型。
},
{
"name":"columnFamilyName1:columnName1",
"type":"string"
},
{
"name":"columnFamilyName2:columnName2",
"format":"yyyy-MM-dd",
"type":"date"
},
{
"name":"columnFamilyName3:columnName3",
"type":"long"
}
],
"range":{// HBase Reader の rowkey 範囲。
"endRowkey":"",// 終了 rowkey。
"isBinaryRowkey":true,// startRowkey および endRowkey をバイト配列に変換する方法を指定します。デフォルト値は false です。
"startRowkey":""// 開始 rowkey。
},
"maxVersion":"",// マルチバージョンモードで読み取るバージョン数。
"encoding":"UTF-8",// エンコード形式。
"table":"",// テーブル名。
"hbaseConfig":{// HBase クラスターへの接続構成(JSON 形式)。
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs://ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるエラー レコードの最大数。
},
"speed":{
"throttle":true,// 速度制限を有効化します。true の場合、mbps 値に基づいて速度制限が有効化されます。false の場合、mbps パラメーターは無視されます。
"concurrent":1,// ジョブの同時実行タスク数。
"mbps":"12"// 速度制限率。この例では、1 mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}HBase Reader のパラメーター
パラメーター | 説明 | 必須 | デフォルト |
haveKerberos | HBase クラスターで Kerberos 認証が必要かどうかを指定します。 説明
| いいえ | false |
hbaseConfig | HBase クラスターへの接続構成(JSON 形式)。HBase クラスターの ZooKeeper(ZK)アドレスを指定する hbase.zookeeper.quorum プロパティは必須です。サーバーとのインタラクションを最適化するために、スキャンキャッシュやバッチ設定などのその他の HBase クライアント構成を追加できます。 説明 ApsaraDB for HBase データベースに接続する場合は、その非公開アドレスを使用してください。 | はい | なし |
mode | HBase からデータを読み取るモード。有効な値: normal および multiVersionFixedColumn。 | はい | なし |
table | 読み取る HBase テーブルの名前。このパラメーターは大文字小文字を区別します。 | はい | なし |
encoding | HBase のバイナリ byte[] 配列を文字列に変換する際に使用するエンコード形式。有効な値:UTF-8 および GBK。 | いいえ | UTF-8 |
column | HBase から読み取る列。このパラメーターは normal モードおよび multiVersionFixedColumn モードの両方で必須です。
| はい | なし |
maxVersion | HBase Reader がマルチバージョンモードで読み取るバージョン数。すべてのバージョンを読み取る場合は -1 を設定し、それ以外の場合は 1 より大きい整数を設定します。 |
| なし |
range | HBase Reader の rowkey 範囲を指定します。
| いいえ | なし |
scanCacheSize | HBase Reader が RPC ごとにサーバーからフェッチする行数。 | いいえ | 256 |
scanBatchSize | HBase Reader が RPC ごとにサーバーからフェッチする列数。-1 を指定すると、すべての列が返されます。 説明 潜在的なデータ品質の問題を回避するため、scanBatchSize を実際の列数より大きい値に設定してください。 | いいえ | 100 |
HBase Writer スクリプトのデモ
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"hbase",// プラグイン名。
"parameter":{
"mode":"normal",// HBase へのデータ書き込みモード。
"walFlag":"false",// Write-Ahead Log(WAL)への書き込みを有効化するかどうかを指定します。false を指定すると無効化されます。
"hbaseVersion":"094x",// HBase のバージョン。
"rowkeyColumn":[// rowkey の生成に使用する列。
{
"index":"0",// シリアル番号。
"type":"string"// データ型。
},
{
"index":"-1",
"type":"string",
"value":"_"
}
],
"nullMode":"skip",// null 値の処理方法を指定します。
"column":[// 書き込む HBase 列。
{
"name":"columnFamilyName1:columnName1",// 列名。
"index":"0",// インデックス番号。
"type":"string"// データ型。
},
{
"name":"columnFamilyName2:columnName2",
"index":"1",
"type":"string"
},
{
"name":"columnFamilyName3:columnName3",
"index":"2",
"type":"string"
}
],
"encoding":"utf-8",// エンコード形式。
"table":"",// テーブル名。
"hbaseConfig":{// HBase クラスターへの接続構成(JSON 形式)。
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs://ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるエラー レコードの最大数。
},
"speed":{
"throttle":true,// 速度制限を有効化します。true の場合、mbps 値に基づいて速度制限が有効化されます。false の場合、mbps パラメーターは無視されます。
"concurrent":1, // ジョブの同時実行タスク数。
"mbps":"12"// 速度制限率。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}HBase Writer のパラメーター
パラメーター | 説明 | 必須 | デフォルト |
haveKerberos | HBase クラスターで Kerberos 認証が必要かどうかを指定します。 説明
| いいえ | false |
hbaseConfig | HBase クラスターへの接続構成(JSON 形式)。HBase クラスターの ZooKeeper(ZK)アドレスを指定する hbase.zookeeper.quorum プロパティは必須です。サーバーとのインタラクションを最適化するために、スキャンキャッシュやバッチ設定などのその他の HBase クライアント構成を追加できます。 説明 ApsaraDB for HBase データベースに接続する場合は、その非公開アドレスを使用してください。 | はい | なし |
mode | HBase へのデータ書き込みモード。現在は normal モードのみがサポートされています。 | はい | なし |
table | ターゲット HBase テーブルの名前。このパラメーターは大文字小文字を区別します。 | はい | なし |
encoding | STRING を HBase byte[] 配列に変換する際に使用するエンコード形式。有効な値:UTF-8 および GBK。 | いいえ | UTF-8 |
column | HBase に書き込む列:
| はい | なし |
rowkeyColumn | HBase へのデータ書き込み時に rowkey を構築するために使用する列を指定します:
構成は以下のとおりです: | はい | なし |
versionColumn | HBase に書き込まれるデータのタイムスタンプを指定します。現在のシステム時刻、ソース列の値、または固定値のいずれかを使用できます。このパラメーターが設定されていない場合、デフォルトで現在時刻が使用されます。
構成は以下のとおりです:
| いいえ | なし |
nullMode | ソースデータの null 値をどのように処理するかを指定します:
| いいえ | skip |
walFlag | クライアントが Put または Delete 操作を送信すると、データは MemStore に格納される前に、まず Write-Ahead Log(WAL)に書き込まれます。このプロセスにより、データの耐久性が確保されます。 このパラメーターを | いいえ | false |
writeBufferSize | HBase クライアントの書き込みバッファーのサイズ(バイト単位)。このバッファーは、デフォルトで無効化されているクライアントの
| いいえ | 8 MB |
fileSystemUsername | Ranger 権限の問題によりデータ同期タスクが失敗した場合、スクリプトモードに切り替えて、このパラメーターを HBase アクセス権限を持つユーザー名に設定します。DataWorks は、このユーザー名を使用して接続を行います。 | いいえ | なし |
HBase20xsql Reader スクリプトのデモ
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"stepType":"hbase20xsql",// プラグイン名。
"parameter":{
"queryServerAddress": "http://127.0.0.1:8765", // Phoenix QueryServer のアドレス。
"serialization": "PROTOBUF", // QueryServer のシリアル化形式。
"table": "TEST", // 読み取るテーブル。
"column": ["ID", "NAME"], // 読み取る列。
"splitKey": "ID" // 分割対象の列。これはテーブルのプライマリキーである必要があります。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// 許容されるエラー レコードの最大数。
},
"speed":{
"throttle":true,// 速度制限を有効化します。true の場合、mbps 値に基づいて速度制限が有効化されます。false の場合、mbps パラメーターは無視されます。
"concurrent":1,// ジョブの同時実行タスク数。
"mbps":"12"// 速度制限率。この例では、1 mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}HBase20xsql Reader のパラメーター
パラメーター | 説明 | 必須 | デフォルト |
queryServerAddress | HBase20xsql Reader は、Phoenix の軽量クライアントを使用して Phoenix QueryServer に接続します。QueryServer のアドレスをここで指定します。ApsaraDB for HBase 強化版(Lindorm)をご利用の場合、 | はい | なし |
serialization | QueryServer で使用されるシリアル化プロトコル。 | いいえ | PROTOBUF |
table | 読み取るテーブルの名前。このパラメーターは大文字小文字を区別します。 | はい | なし |
schema | テーブルを含むスキーマ。 | いいえ | なし |
column | 同期対象の列名を含む JSON 配列。空欄の場合は、すべての列が読み取られます。 | いいえ | すべての列 |
splitKey | データシャーディングに使用する列を指定します。splitKey を指定すると、並列データ同期が可能になり、パフォーマンスが向上します。2 種類の分割方法が利用可能です。splitPoint が空の場合、Method 1 に基づいて自動的にデータ分割が行われます:
| はい | なし |
splitPoints | 列の最小値および最大値に基づく分割は、データホットスポットを引き起こす可能性があります。そのため、startkey および endkey を基準に分割ポイントを設定することを推奨します。regions のこのアプローチにより、各クエリが単一のリージョンと一致し、ホットスポットを防止できます。 | いいえ | なし |
where | フィルター条件。テーブルクエリにフィルターを追加できます。HBase20xsql Reader は、指定された column、table、および where 条件に基づいて SQL クエリを構築し、そのクエリに基づいてデータを抽出します。 | いいえ | なし |
querySql | 一部のユースケースでは、where パラメーターでは目的のフィルター条件を十分に記述できない場合があります。このパラメーターを使用して、カスタムフィルター SQL クエリを定義できます。 | いいえ | なし |
HBase11xsql Writer スクリプトのデモ
{
"type": "job",
"version": "1.0",
"configuration": {
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,// 速度制限を有効化します。true の場合、mbps 値に基づいて速度制限が有効化されます。false の場合、mbps パラメーターは無視されます。
"concurrent":1, // ジョブの同時実行タスク数。
"mbps":"1"// 速度制限率。この例では、1 mbps は 1 MB/s に相当します。
}
},
"reader": {
"plugin": "odps",
"parameter": {
"datasource": "",
"table": "",
"column": [],
"partition": ""
}
},
"plugin": "hbase11xsql",
"parameter": {
"table": "ターゲット HBase テーブル名(大文字小文字を区別)。",
"hbaseConfig": {
"hbase.zookeeper.quorum": "ターゲット HBase クラスターの ZooKeeper サーバー アドレス。",
"zookeeper.znode.parent": "ターゲット HBase クラスターの znode。"
},
"column": [
"columnName"
],
"batchSize": 256,
"nullMode": "skip"
}
}
}HBase11xsql Writer のパラメーター
パラメーター | 説明 | 必須 | デフォルト |
plugin | プラグインの名前。必ず | はい | なし |
table | データインポートのターゲット Phoenix テーブルの名前。このパラメーターは大文字小文字を区別します。Phoenix テーブル名は通常大文字で記述されます。 | はい | なし |
column | 列名。このパラメーターは大文字小文字を区別します。Phoenix 列名は通常大文字で記述されます。 説明
| はい | なし |
hbaseConfig | HBase クラスターのアドレス。ZooKeeper クォーラムは必須です。形式: 説明
| はい | なし |
batchSize | バッチ書き込み操作における最大行数。 | いいえ | 256 |
nullMode | ソースデータの null 値をどのように処理するかを指定します:
| いいえ | skip |