AnalyticDB for PostgreSQL クライアントソフトウェア開発キット (SDK) は、API を使用して AnalyticDB for PostgreSQL にデータをコピーするためのパフォーマンス専有型メソッドを提供します。
AnalyticDB for PostgreSQL クライアント SDK を使用して、データを書き込むためのカスタムプログラムを開発できます。この SDK は、AnalyticDB for PostgreSQL へのデータ書き込みプロセスを簡素化します。接続プールやキャッシュを管理する必要はありません。並列処理などの内部メカニズムにより、直接的な COPY や INSERT の書き込み操作と比較して、パフォーマンスが大幅に向上します。
AnalyticDB for PostgreSQL クライアント SDK の主な機能は、提供されたデータを効率的に書き込むことです。この SDK は生データの読み取りや処理は行いません。

Maven リポジトリ
Maven を使用して SDK のバージョンを管理および構成できます。以下は Maven の構成です。
<dependency>
<groupId>com.alibaba.cloud.analyticdb</groupId>
<artifactId>adb4pgclient</artifactId>
<version>1.0.16</version>
</dependency>AnalyticDB for PostgreSQL クライアント SDK は、次のパッケージに依存します: druid 1.1.17、postgresql-jdbc 42.3.9、commons-lang3 3.4、slf4j-api 1.7.24、および slf4j-log4j12 1.7.24。
バージョン競合が発生した場合は、これらのパッケージのバージョンを確認し、競合を解決してください。
次のリンクからオフライン JAR パッケージをダウンロードできます: adb4pgclient-1.0.16.jar および adb4pgclient-1.0.16-jar-with-dependencies.jar。
メソッド
表 1. DatabaseConfig クラス
API 名 | 説明 |
setHost(String adbHost) | 接続する AnalyticDB for PostgreSQL インスタンスのエンドポイント。 |
setPort(int port) | AnalyticDB for PostgreSQL インスタンスのポート。デフォルト値は 5432 です。 |
setDatabase(String database) | 接続する AnalyticDB for PostgreSQL データベースの名前。 |
setUser(String username) | AnalyticDB for PostgreSQL データベース接続用のユーザー名。 |
setPassword(String pwd) | AnalyticDB for PostgreSQL データベース接続用のパスワード。 |
addTable(List<String> table, String schema) | 書き込み対象のテーブルのリスト。スキーマに基づいてテーブルを個別に追加します。このメソッドは複数回呼び出すことができます。DatabaseConfig オブジェクトを使用して Adb4PGClient オブジェクトを構築した後は、このメソッドは無効になります。 |
setColumns(List<String> columns, String tableName, String schemaName) | データを挿入するテーブルの列。すべての列にデータを挿入するには、 |
setInsertIgnore(boolean insertIgnore) | プライマリキーの競合を引き起こすデータ行を無視するかどうかを指定します。この設定は、構成されているすべてのテーブルに適用されます。デフォルト値は false で、競合する行は上書きされることを意味します。 |
setEmptyAsNull(boolean emptyAsNull) | 空のデータを null に設定するかどうかを指定します。この設定は、構成されているすべてのテーブルに適用されます。デフォルト値は false です。 |
setParallelNumber(int parallelNumber) | AnalyticDB for PostgreSQL にデータを書き込むための同時実行スレッド数。この設定は、構成されているすべてのテーブルに適用されます。デフォルト値は 4 です。ほとんどの場合、この値を変更することは推奨されません。 |
setLogger(Logger logger) | クライアントで使用されるロガーオブジェクト。slf4j.Logger を使用します。 |
setRetryTimes(int retryTimes) | AnalyticDB for PostgreSQL へのコミット中に例外が発生した場合のリトライ回数。デフォルト値は 3 です。 |
setRetryIntervalTime(long retryIntervalTime) | 再試行間隔。単位はミリ秒 (ms) です。デフォルト値は 1000 ms です。 |
setCommitSize(long commitSize) | 自動コミットのデータボリューム (バイト単位)。デフォルト値は 10 MB です。この設定を変更することは推奨されません。 |
setEnablePreHash(boolean enablePreHash) | 宛先テーブルが Beam ストレージを使用し、書き込みプロセス中にデッドロックが発生した場合、このスイッチを有効にすると書き込みパフォーマンスが向上します。 |
setMetaDataSchedulerInterval(int metaDataSchedulerInterval) | AnalyticDB for PostgreSQL クライアント SDK の書き込みテーブルのメタデータ更新間隔を調整します。デフォルト値は 1 分です。単位は分 (min) です。 |
表 2. Row クラス
API 名 | 説明 |
setColumn(int index, Object value) | 列の値を設定します。列は順番に設定する必要があります。このメソッドを使用すると、Row インスタンスは再利用できません。各データレコードには個別の Row インスタンスが必要です。 |
setColumnValues(List<Object> values) | List 形式のデータ行を Row に直接書き込みます。 |
updateColumn(int index, Object value) | 列の値を更新します。このメソッドを使用すると、Row インスタンスを再利用できます。Row インスタンス内のデータを更新するだけです。 |
表 3. Adb4pgClient クラス
API 名 | 説明 |
addRow(Row row, String tableName, String schemaName) / addRows(List<Row> rows, String tableName, String schemaName) | 行形式でデータレコードを挿入します。データは SDK バッファーに格納され、コミットを待ちます。データボリュームが `commitSize` 値を超えると、新しいデータが追加される前に `addRow` または `addRows` の呼び出し中に自動コミットがトリガーされます。自動コミットが失敗した場合、失敗したデータレコードのリストを含む例外を処理する必要があります。 |
addMap(Map<String, String> dataMap,String tableName, String schemaName) / addMaps(List<Map<String, String>> dataMaps, String tableName, String schemaName) | マップ形式でデータレコードを挿入します。このメソッドは `addRow` に似ています。バッファーがいっぱいになると、新しいデータが追加される前に `addMap` または `addMaps` の呼び出し中に自動コミットがトリガーされます。自動コミットが失敗した場合、失敗したデータレコードのリストを含む例外を処理する必要があります。 |
commit() | キャッシュされたデータを AnalyticDB for PostgreSQL にコミットします。コミットが失敗した場合、失敗した文を含む例外がスローされます。この例外を処理する必要があります。 |
TableInfo getTableInfo(String tableName, String schemaName) | 指定されたテーブルの構造情報を取得します。 |
List<ColumnInfo> getColumnInfo(String tableName, String schemaName) | 指定されたテーブルの列のリストを取得します。列は `ColumnInfo` オブジェクトとして返されます。 |
stop() | インスタンスの使用が終了したら、`stop` メソッドを呼び出して、内部のスレッドプールとリソースを解放します。メモリ内にコミットされていないデータが存在する場合、例外がスローされます。強制的に停止するには、 |
forceStop() | 内部のスレッドプールとリソースを強制的に解放します。メモリにキャッシュされているコミットされていないデータはすべて失われます。このメソッドは、一般的な使用には推奨されません。 |
Connection getConnection() throws SQLException | クライアント接続プールからデータベースの `Connection` オブジェクトを取得します。取得した `Connection` オジェクトは、COPY 以外の操作に使用できます。このオブジェクトは、標準の Java Database Connectivity (JDBC) 接続と同じ方法で使用されます。 説明 接続の使用が終了したら、`ResultSet`、`Statement`、`Connection` オブジェクトなど、対応するリソースを解放する必要があります。 |
表 4. ColumnInfo クラス
API 名 | 説明 |
boolean isNullable() | 列が null にできるかどうかを確認します。 |
エラーコード | エラーコード | 説明 |
COMMIT_ERROR_DATA_LIST | 101 | コミット中に一部のデータで例外が発生しました。失敗したデータが返されます。 説明
|
COMMIT_ERROR_OTHER | 102 | コミット中に他の例外が発生しました。 |
ADD_DATA_ERROR | 103 | データの追加中に例外が発生しました。 |
CREATE_CONNECTION_ERROR | 104 | 接続の作成中に例外が発生しました。 |
CLOSE_CONNECTION_ERROR | 105 | 接続のクローズ中に例外が発生しました。 |
CONFIG_ERROR | 106 | DatabaseConfig で構成エラーが発生しました。 |
STOP_ERROR | 107 | インスタンスの停止中にエラーが発生しました。 |
OTHER | 999 | デフォルトの例外エラーコード。 |
コード例
public class Adb4pgClientUsage {
public void demo() {
DatabaseConfig databaseConfig = new DatabaseConfig();
// データベースの実際のホストまたは URL を設定する必要があります
databaseConfig.setHost("100.100.100.100");
// データベースの実際のポートを設定する必要があります
databaseConfig.setPort(8888);
// データベース接続用のユーザー名。
databaseConfig.setUser("ご利用のユーザー名");
// データベース接続用のパスワード。
databaseConfig.setPassword("ご利用のパスワード");
// 接続するデータベース。
databaseConfig.setDatabase("ご利用のデータベース名");
// 書き込み対象のテーブルのリストを設定します。
List<String> tables = new ArrayList<String>();
tables.add("ご利用のテーブル名 1");
tables.add("ご利用のテーブル名 2");
// 異なるスキーマのテーブルに対して addTable を個別に呼び出すことができます。ただし、DatabaseConfig から Client インスタンスが作成された後は、テーブル構成を変更することはできません。 /
// スキーマに null を渡すと、デフォルトのスキーマ 'public' が使用されます。
databaseConfig.addTable(tables, "テーブルスキーマ名");
// 書き込み対象のテーブルの列を設定します。
List<String> columns = new ArrayList<String>();
columns.add("column1");
columns.add("column2");
// すべての列に書き込むには、columns.add("*") を使用します。
databaseConfig.setColumns(columns, "ご利用のテーブル名 1", "テーブルスキーマ名");
databaseConfig.setColumns(Collections.singletonList("*"),"ご利用のテーブル名 2", "テーブルスキーマ名");
// 列の値が空の場合は、null を設定します
databaseConfig.setEmptyAsNull(false);
// 挿入には 'insert ignore' メソッドを使用します。
databaseConfig.setInsertIgnore(true);
// コミット中にデータベースへの書き込みで例外が発生した場合、3 回リトライします。
databaseConfig.setRetryTimes(3);
// 再試行間隔は 1 秒です。単位は ms です。
databaseConfig.setRetryIntervalTime(1000);
// AdbClient を初期化します。インスタンスが初期化された後は、DatabaseConfig の設定を変更することはできません。
Adb4pgClient adbClient = new Adb4pgClient(databaseConfig);
// データをバッチ処理します。複数回追加してからコミットします。バッチサイズの詳細については、「注意事項」セクションをご参照ください。
for (int i = 0; i < 10; i++) {
// 行をバッファーに追加します。1 レコードにつき 1 インスタンスです
Row row = new Row(columns.size());
// 列を設定
// 列のインデックスは、列のシーケンスと同じでなければなりません
// 列の値は任意の型にできます。内部的には列の型に応じてフォーマットされます
row.setColumn(0, i); // 数値
row.setColumn(1, "string value"); // 文字列値
// バッファーがいっぱいになると、addRow または addMap の呼び出し中に自動コミットがトリガーされます。
// コミットが失敗した場合、エラーコード COMMIT_ERROR_DATA_LIST を持つ AdbClientException が返されます。
adbClient.addRow(row, "ご利用のテーブル名 1", "テーブルスキーマ名");
}
Row row = new Row();
row.setColumn(0, 10); // 数値
row.setColumn(1, "2018-01-01 08:00:00"); // 日付/タイムスタンプ/時刻の値
adbClient.addRow(row, "ご利用のテーブル名 1", "テーブルスキーマ名");
// 列を更新します。Row インスタンスは再利用できます。
row.updateColumn(0, 11);
row.updateColumn(1, "2018-01-02 08:00:00");
adbClient.addRow(row, "ご利用のテーブル名 1", "テーブルスキーマ名");
// マップをバッファーに追加
Map<String, String> rowMap = new HashMap<String, String>();
rowMap.put("t1", "12");
rowMap.put("t2", "string value");
// ここではバッチ処理が必要です。複数回の add 呼び出しの後にコミットすることを推奨します。
adbClient.addMap(rowMap, "ご利用のテーブル名 2", "テーブルスキーマ名");
// バッファーをデータベースにコミットします
// データベースへのコミットが成功するとバッファーはクリアされます
try {
adbClient.commit();
} catch (Exception e) {
// TODO: ここで例外を処理します
} finally {
adbClient.stop();
}
}
}注意事項
AnalyticDB for PostgreSQL クライアント SDK はスレッドセーフではありません。マルチスレッド環境では、各スレッドが独自の `Client` オブジェクトを維持する必要があります。
重要複数のスレッド間で SDK インスタンスを共有することは推奨されません。スレッドセーフの問題に加えて、共有クライアントは書き込みパフォーマンスのボトルネックになりやすいためです。
データが AnalyticDB for PostgreSQL に正常に書き込まれたと見なされるのは、`commit` の呼び出しが成功した後のみです。
クライアントが例外をスローした場合、エラーコードに基づいてその処理方法を決定する必要があります。データ書き込みエラーが発生した場合は、データを再送信するか、問題のあるデータをログに記録してからスキップすることができます。
書き込みスレッドを増やしても、必ずしもパフォーマンスが向上するとは限りません。アプリケーションはデータをバッチ処理することが多いため、メモリ消費量が高くなる可能性があります。したがって、アプリケーションのガベージコレクション (GC) の状態を注意深くモニターする必要があります。
バッチサイズを小さくしすぎないでください。そうしないと、バッチ書き込みの利点が失われます。可能であれば、データをコミットする前に少なくとも 10,000 レコードを追加することを推奨します。
クライアントオブジェクトが初期化される前に、すべての `DatabaseConfig` 設定項目を設定する必要があります。これらの設定は、クライアントオブジェクトがインスタンス化された後は変更できません。
クライアント SDK は、書き込み (INSERT) パフォーマンスを最適化するように設計されています。他の SQL 操作については、
getConnection()メソッドを使用して JDBC 接続を取得し、標準の JDBC インターフェイスを介して操作を処理してください。