このトピックでは、AnalyticDB for PostgreSQLクライアントSDKを使用して、API操作を呼び出してAnalyticDB for PostgreSQLにデータを書き込む方法について説明します。
AnalyticDB for PostgreSQL Client SDKは、データ書き込みプログラムのカスタム開発と統合をサポートしています。 AnalyticDB for PostgreSQL Client SDKを使用して開発されたカスタムデータ書き込みプログラムは、データ書き込みプロセスを簡素化し、並列処理などの内部メカニズムを提供します。 このメソッドの書き込みパフォーマンスは、COPYおよびINSERTステートメントの書き込みパフォーマンスよりも数倍高くなります。 接続プールやキャッシュの問題について心配する必要はありません。
AnalyticDB for PostgreSQL Client SDKは、データを効率的に書き込むように設計されています。 生データの読み取りや処理は行いません。
Mavenリポジトリ
Mavenを使用して、AnalyticDB for PostgreSQLクライアントSDKのバージョンを設定できます。 サンプルコード:
<dependency>
<groupId>com.alibaba.cloud.analyticdb</groupId>
<artifactId>adb4pgclient</artifactId>
<version>1.0.4</version>
</dependency>
AnalyticDB for PostgreSQLクライアントSDKは、druid(1.1.17) 、postgresql(jdbc 42.2.5) 、commons-lang3(3.4) 、slf4j-api(1.7.24) 、およびslf4j-log4j12(1.7.24) のパッケージに依存します。
システムがAnalyticDB for PostgreSQL Client SDKとバージョンが競合するとプロンプトした場合は、これらのパッケージの正しいバージョンを使用していることを確認します。
AnalyticDB for PostgreSQLクライアントSDKのJARパッケージ: adb4pgclient-1.0.4.jar
関連する API 操作
表 1 DatabaseConfigクラスの操作
API 操作 | 説明 |
setHost (String adbHost) | AnalyticDB for PostgreSQLデータベースへの接続に使用されるエンドポイントを指定します。 |
setPort(int port) | AnalyticDB for PostgreSQLデータベースへの接続に使用するポート番号を指定します。 デフォルト値: 5432 |
setDatabase (String データベース) | AnalyticDB for PostgreSQLデータベースの名前を指定します。 |
setUser (String ユーザー名) | AnalyticDB for PostgreSQLデータベースへの接続に使用するデータベースアカウントを指定します。 |
setPassword(String pwd) | AnalyticDB for PostgreSQLデータベースへの接続に使用するパスワードを指定します。 |
addTable(List<String> テーブル、Stringスキーマ) | データを挿入するテーブルを指定します。 この操作は複数回呼び出すことができます。 この操作を呼び出すたびに、同じスキーマを使用するテーブルを指定できます。 DatabaseConfigクラスを使用してAdb4pgClientオブジェクトを作成すると、この操作は無効になります。 |
setColumns(List<String> columns、String tableName、String schemaName) | テーブルに挿入する列を指定します。 すべての列をテーブルに挿入する場合は、columnsパラメーターを |
setInsertIgnore (boolean insertIgnore) | 主キーが競合する行を無視するかどうかを指定します。 この操作は、指定されたすべてのテーブルに対して有効になります。 デフォルト値:false この場合、競合する主キーを持つ行は上書きされます。 |
setEmptyAsNull (boolean emptyAsNull) | 空の値をnullに設定するかどうかを指定します。 この操作は、指定されたすべてのテーブルに対して有効になります。 デフォルト値:false |
setParallelNumber(int parallelNumber) | AnalyticDB for PostgreSQLデータベースにデータを書き込むために使用される同時スレッドの最大数を指定します。 この操作は、指定されたすべてのテーブルに対して有効になります。 デフォルト値: 4。 デフォルト値のままにすることを推奨します。 |
setLogger (Logger Logger) | Adb4pgClientオブジェクトによって使用されるロガーを指定します。 AnalyticDB for PostgreSQLでは、slf4j.Loggerが使用されます。 |
setRetryTimes(int retryTimes) | エラー発生時にデータをコミットする最大リトライ回数を指定します。 デフォルト値: 3。 |
setRetryIntervalTime(long retryIntervalTime) | 2回の再試行の間隔を指定します。 単位:ミリ秒。 デフォルト値は 1000 です。 |
setCommitSize (long commitSize) | 自動的にコミットされるデータの量を指定します。 単位はバイトです。 デフォルト値: 10,000,000 デフォルト値のままにすることを推奨します。 |
表 2. Rowクラスの操作
API 操作 | 説明 |
setColumn(int index, Object value) | 行の列の値を指定します。 この操作は、行内の列のシーケンスに基づいて複数回呼び出して、複数の列を指定できます。 この操作では、行オブジェクトの再利用はサポートされません。 各データレコードは、特定の行オブジェクトに関連付ける必要があります。 |
setColumnValues (<Object> values) | 行の複数の列の値を指定します。 |
updateColumn(int index, Object value) | 行の列の値を更新します。 この操作では、データを更新して行オブジェクトを再利用できます。 |
表 3. Adb4pgClientクラスの操作
API 操作 | 説明 |
addRow(Row row, String tableName, String schemaName) / addRows(List<Row> rows, String tableName, String schemaName) | 1つ以上の行形式のデータレコードをテーブルに挿入します。 データレコードは、レコードがコミットされるまで、AnalyticDB for PostgreSQL Client SDKのバッファに格納されます。 バッファーに格納されているデータレコードの量がcommitSizeパラメーターの値に達すると、addRowまたはaddRows操作を呼び出すと、システムは自動的にデータレコードをコミットします。 自動コミットが失敗した場合、システムは失敗したデータレコードを含むエラーを報告します。 エラー情報に基づいてエラーを処理する必要があります。 |
addMap(Map<String, String> dataMap,String tableName, String schemaName) / addMaps(List<Map<String, String>> dataMaps, String tableName, String schemaName) | 1つ以上のマップ形式のデータレコードをテーブルに挿入します。 データレコードは、AnalyticDB for PostgreSQL Client SDKのバッファーに保存されます。 バッファーに格納されているデータレコードの量がcommitSizeパラメーターの値に達すると、addMapまたはaddMaps操作を呼び出すと、システムは自動的にデータレコードをコミットします。 自動コミットが失敗した場合、システムは失敗したデータレコードを含むエラーを報告します。 エラー情報に基づいてエラーを処理する必要があります。 |
commit() | AnalyticDB for PostgreSQL Client SDKのバッファーに保存されているデータレコードをコミットします。 コミットが失敗した場合、システムは失敗したステートメントを含むエラーを報告します。 エラー情報に基づいてエラーを処理する必要があります。 |
TableInfo getTableInfo(String tableName, String schemaName) | テーブルのスキーマを取得します。 |
List<ColumnInfo> getColumnInfo(String tableName, String schemaName) | テーブルの列を取得します。 |
stop() | オブジェクトを使用した後、Adb4pgClientオブジェクトによって使用される内部スレッドプールとリソースを解放します。 この操作を呼び出したときにAnalyticDB for PostgreSQL Client SDKのバッファ内のデータレコードがコミットされない場合、システムはエラーを報告します。 内部スレッドのプールとリソースを強制的に解放するには、 |
forceStop() | Adb4pgClientオブジェクトによって使用される内部スレッドプールとリソースを強制的に解放します。 この操作を呼び出した後、AnalyticDB for PostgreSQL Client SDKのバッファにコミットされていないデータレコードは失われます。 必要な場合を除き、この操作を呼び出さないことを推奨します。 |
Connection getConnection() throws SQLException | Adb4pgClientオブジェクトの接続プールからAnalyticDB for PostgreSQLデータベースへの接続を取得します。 取得された接続は、Java Database Connectivity (JDBC) 接続と同じ方法で機能します。 取得した接続を使用して、COPY以外の書き込み操作を実行できます。 説明 オブジェクトを使用した後、Adb4pgClientオブジェクトによって使用されるResultSet、Statement、Connectionなどのリソースを解放する必要があります。 |
表 4. ColumnInfoクラスの操作
API 操作 | 説明 |
boolean isNullable() | ColumnInfoクラスをnullにできるかどうかを指定します。 |
エラーコード | HTTPステータスコード | 説明 |
COMMIT_ERROR_DATA_LIST | 101 | 複数のデータレコードがコミットに失敗したため、エラーが返されました。 説明
|
COMMIT_ERROR_OTHER | 102 | コミット失敗以外の例外が発生した場合に返されるエラー。 |
ADD_DATA_ERROR | 103 | データレコードの追加に失敗したため、エラーが返されました。 |
CREATE_CONNECTION_ERROR | 104 | 接続の確立に失敗した場合に返されるエラー。 |
CLOSE_CONNECTION_ERROR | 105 | 接続のクローズに失敗した場合に返されるエラー。 |
CONFIG_ERROR | 106 | DatabaseConfigクラスの設定に失敗した場合に返されるエラー。 |
STOP_ERROR | 107 | Adb4pgClientオブジェクトの停止に失敗した場合に返されるエラー。 |
OTHER | 999 | 返されるデフォルトのエラー。 |
サンプルコード
public class Adb4pgClientUsage {
public void demo() {
DatabaseConfig databaseConfig = new DatabaseConfig();
// Should set your database real host or url
databaseConfig.setHost("100.100.100.100");
// Should set your database real port
databaseConfig.setPort(8888);
// Specify the database account that is used to connect to your AnalyticDB for PostgreSQL database.
databaseConfig.setUser("your user name");
// Specify the password that is used to connect to your AnalyticDB for PostgreSQL database.
databaseConfig.setPassword("your password");
// Specify the name of your AnalyticDB for PostgreSQL database.
databaseConfig.setDatabase("your database name");
// Specify the tables into which you want to insert data.
List<String> tables = new ArrayList<String>();
tables.add("your table name 1");
tables.add("your table name 2");
// You can call the addTable operation to specify tables that use the same schema. After you use the DatabaseConfig class to create an Adb4pgClient object, the specified tables cannot be changed. /
// If you set the table schema name parameter to null, the public schema is used.
databaseConfig.addTable(tables, "table schema name");
// Specify the columns that you want to insert into a table.
List<String> columns = new ArrayList<String>();
columns.add("column1");
columns.add("column2");
// If you want to insert all columns into a table, set the columns parameter to columns.add("*").
databaseConfig.setColumns(columns, "your table name 1", "table schema name");
databaseConfig.setColumns(Collections.singletonList("*"),"your table name 2", "table schema name");
// If the value of column is empty, set null
databaseConfig.setEmptyAsNull(false);
// Specify whether to ignore the rows that have conflicting primary keys.
databaseConfig.setInsertIgnore(true);
// If data fails to be committed into AnalyticDB for PostgreSQL, you can retry to commit the data up to three times.
databaseConfig.setRetryTimes(3);
// Specify a 1000-millisecond retry interval.
databaseConfig.setRetryIntervalTime(1000);
// Initialize the Adb4pgClient object. After you initialize the Adb4pgClient object, you cannot modify the configuration of the DatabaseConfig class.
Adb4pgClient adbClient = new Adb4pgClient(databaseConfig);
// Store data records to the buffer of AnalyticDB for PostgreSQL Client SDK and then commit the records at a time. For more information, see the "Usage notes" section of this topic.
for (int i = 0; i < 10; i++) {
// Add row(s) to buffer. One instance for one record
Row row = new Row(columns.size());
// Set column
// the column index must be same as the sequence of columns
// the column value can be any type, internally it will be formatted according to column type
row.setColumn(0, i); // Number value
row.setColumn(1, "string value"); // String value
// If the amount of data records in the buffer reaches the upper limit, the system automatically commits the records when you call the addRow or addMap operation.
// If the automatic commit fails, the system returns the COMMIT_ERROR_DATA_LIST error.
adbClient.addRow(row, "your table name 1", "table schema name");
}
Row row = new Row();
row.setColumn(0, 10); // Number value
row.setColumn(1, "2018-01-01 08:00:00"); // Date/Timestamp/Time value
adbClient.addRow(row, "your table name 1", "table schema name");
// Update columns. A Row object can be reused.
row.updateColumn(0, 11);
row.updateColumn(1, "2018-01-02 08:00:00");
adbClient.addRow(row, "your table name 1", "table schema name");
// Add map(s) to buffer
Map<String, String> rowMap = new HashMap<String, String>();
rowMap.put("t1", "12");
rowMap.put("t2", "string value");
// If you want to commit multiple data records, we recommend that you store the records to the buffer of AnalyticDB for PostgreSQL Client SDK and then commit the records at a time.
adbClient.addMap(rowMap, "your table name 2", "table schema name");
// Commit buffer to ADS
// Buffer is cleaned after successfully commit to ADS
try {
adbClient.commit();
} catch (Exception e) {
// TODO: Handle exception here
} finally {
adbClient.stop();
}
}
}
使用上の注意
AnalyticDB for PostgreSQLクライアントSDKはスレッドセーフではありません。 複数のスレッドを呼び出す場合は、各スレッドが独自のAdb4pgClientオブジェクトを保持していることを確認してください。
重要複数のスレッドが同じAdb4pgClientオブジェクトを共有すると、セキュリティの問題が発生し、データ書き込みパフォーマンスが低下します。
データレコードは、レコードがコミットされた後にのみAnalyticDB for PostgreSQLに書き込まれたと見なされます。
Adb4pgClientオブジェクトがエラーを返した場合は、エラー情報に基づいてエラーを処理する必要があります。 データレコードをコミットできない場合は、レコードのコミットを再試行できます。 レコードをメモした後、失敗したデータレコードをスキップすることもできます。
AnalyticDB for PostgreSQLにデータを書き込むと、データレコードがバッファに格納され、バッチがコミットされる場合があります。 これにより、CPU利用率が増加する。 適切な数のスレッドを維持し、アプリケーションのガベージコレクションのステータスを監視することを推奨します。
一度に10,000のデータレコードをコミットすることを推奨します。
Adb4pgClientオブジェクトを作成する前に、DatabaseConfigクラスの設定を完了する必要があります。 Adb4pgClientオブジェクトを作成した後、DatabaseConfigクラスの構成を変更することはできません。
AnalyticDB for PostgreSQLクライアントSDKは、データ書き込み (INSERTステートメント) の最適化を目指しています。 他のSQL文の実行を最適化するには、
getConnection()
操作を呼び出してJDBC接続を確立します。