AnalyticDB PostgreSQL版Client SDK旨在通過API方式提供高效能COPY資料到AnalyticDB PostgreSQL版的方式。
AnalyticDB PostgreSQL版Client SDK支援使用者定製化開發或對接寫入程式。通過SDK開發寫入程式,可簡化在AnalyticDB PostgreSQL版中寫入資料的流程,無需擔心串連池、緩衝等問題,相比較直接COPY/INSERT寫入,通過並行化等內部機制有幾倍效能提升。
AnalyticDB PostgreSQL版Client SDK主要職責是將您傳入的資料高效地寫入,不負責未經處理資料的讀取、處理等工作。
Maven repositories
您可以通過Maven管理配置新SDK的版本。Maven的配置資訊如下:
<dependency>
<groupId>com.alibaba.cloud.analyticdb</groupId>
<artifactId>adb4pgclient</artifactId>
<version>1.0.4</version>
</dependency>
AnalyticDB PostgreSQL版Client SDK本身依賴druid(1.1.17)、postgresql(jdbc 42.2.5)、commons-lang3(3.4)、slf4j-api(1.7.24)、slf4j-log4j12(1.7.24)。
如果在使用過程中出現版本衝突,請檢查這幾個包的版本並解決衝突。
離線Jar包下載連結:adb4pgclient-1.0.4.jar。
介面列表
介面名 | 描述 |
setHost(String adbHost) | 需要串連的AnalyticDB PostgreSQL版的串連地址。 |
setPort(int port) | 需要串連的AnalyticDB PostgreSQL版的連接埠,預設為5432。 |
setDatabase(String database) | 需要串連的AnalyticDB PostgreSQL版資料庫名稱。 |
setUser(String username) | 需要串連的AnalyticDB PostgreSQL版使用的使用者名稱。 |
setPassword(String pwd) | 設定串連的AnalyticDB PostgreSQL版使用的密碼。 |
addTable(List<String> table, String schema) | 需要寫入的表名List,請按照表所屬schema分別添加。該方法可調用多次,但在使用DatabaseConfig構造Adb4PGClient對象之後再調用不再生效。 |
setColumns(List<String> columns, String tableName, String schemaName) | 需要插入表的欄位名(若是全欄位插入, |
setInsertIgnore(boolean insertIgnore) | 設定是否忽略發生主鍵衝突錯誤的資料行,要根據業務的使用情境進行判斷,針對配置的所有表。預設為false,表示不忽略發生主鍵衝突錯誤的資料行(即會覆蓋寫發生衝突的行)。 |
setEmptyAsNull(boolean emptyAsNull) | 設定empty資料設定為null,預設false,針對配置的所有表。 |
setParallelNumber(int parallelNumber) | 設定寫入AnalyticDB PostgreSQL版時的並發線程數,預設4,針對配置的所有表,一般情況不建議修改。 |
setLogger(Logger logger) | 設定client中使用的logger對象,此處使用slf4j.Logger。 |
setRetryTimes(int retryTimes) | 設定commit時,寫入AnalyticDB PostgreSQL版出現異常時重試的次數,預設為3。 |
setRetryIntervalTime(long retryIntervalTime) | 設定稍候再試的時間,單位是ms,預設為 1000 ms。 |
setCommitSize(long commitSize) | 設定自動認可的資料量(單位Byte),預設為10MB,一般不建議設定。 |
介面名稱 | 描述 |
setColumn(int index, Object value) | 設定Row欄位列表的值,要求必須按照欄位的順序(此種方式,Row執行個體不可複用,每條資料必須單獨的Row執行個體)。 |
setColumnValues(List<Object> values) | 直接將List格式資料行寫入Row中。 |
updateColumn(int index, Object value) | 更新Row欄位列表的值,注意更新的欄位資料(此方法,Row執行個體可以複用,只需更新Row執行個體中的資料即可。 |
介面名稱 | 描述 |
addRow(Row row, String tableName, String schemaName) / addRows(List<Row> rows, String tableName, String schemaName) | 插入對應表的Row格式化的資料,即一條記錄,資料會儲存在SDK的緩衝區中,等待commit。如果資料量超過commitSize會在addRow/addRows的時候做一次自動commit,然後將最新的資料add進來;如果在自動commit失敗的時候失敗,調用方需要處理此異常,並且會在異常中得到失敗的資料list。 |
addMap(Map<String, String> dataMap,String tableName, String schemaName) / addMaps(List<Map<String, String>> dataMaps, String tableName, String schemaName) | 對應於addRow,支援map格式資料的寫入,如果資料量滿了會在addMap/addMaps的時候做一次自動commit,然後將最新的資料add進來;如果在自動commit失敗的時候失敗,調用方需要處理此異常,並且會在異常中得到失敗的資料list。 |
commit() | 將緩衝的資料進行提交,寫入AnalyticDB PostgreSQL版中,若commit失敗,會把執行錯誤的語句放在異常中拋出,調用方需要對此異常進行處理。 |
TableInfo getTableInfo(String tableName, String schemaName) | 擷取對應table的結構資訊。 |
List<ColumnInfo> getColumnInfo(String tableName, String schemaName) | 擷取對應table的欄位列表資訊,欄位類是ColumnInfo,可以通過 |
stop() | 執行個體使用完之後,stop釋放內部線程池及資源,如果記憶體中有資料未commit,則會拋Exception,若需要強行stop,請使用 |
forceStop() | 強行釋放內部線程池及資源,會丟失掉緩衝在記憶體中未commit的資料,一般不推薦使用。 |
Connection getConnection() throws SQLException | 從client串連池擷取資料庫Connection串連,調用方可以使用獲得的Connection做非copy操作,使用方式和jdbc的串連使用方式一致。 說明 使用結束後一定要釋放掉相應的資源(如ResultSet、Statement、Connection)。 |
介面名稱 | 描述 |
boolean isNullable() | 判斷該欄位是否能為null。 |
錯誤碼名 | 錯誤碼值 | 描述 |
COMMIT_ERROR_DATA_LIST | 101 | commit中某些資料出現異常,會返回異常的資料。 說明 通過 |
COMMIT_ERROR_OTHER | 102 | commit中的其他異常。 |
ADD_DATA_ERROR | 103 | add資料過程中出現的異常。 |
CREATE_CONNECTION_ERROR | 104 | 建立串連出現異常。 |
CLOSE_CONNECTION_ERROR | 105 | 關閉串連出現異常。 |
CONFIG_ERROR | 106 | 配置DatabaseConfig出現配置錯誤。 |
STOP_ERROR | 107 | 停止執行個體時的報錯。 |
OTHER | 999 | 預設異常錯誤碼。 |
程式碼範例
public class Adb4pgClientUsage {
public void demo() {
DatabaseConfig databaseConfig = new DatabaseConfig();
// Should set your database real host or url
databaseConfig.setHost("100.100.100.100");
// Should set your database real port
databaseConfig.setPort(8888);
// 串連資料庫的使用者名稱。
databaseConfig.setUser("your user name");
// 串連資料庫的密碼。
databaseConfig.setPassword("your password");
// 需要串連的database。
databaseConfig.setDatabase("your database name");
// 設定需要寫入的表名列表。
List<String> tables = new ArrayList<String>();
tables.add("your table name 1");
tables.add("your table name 2");
// 不同schema下的表可分別addTable,但是一旦使用databseconfig 創造Client執行個體之後,table配置是不可修改的。/
// schema傳入null, 則預設schema為public。
databaseConfig.addTable(tables, "table schema name");
// 設定需要寫入的表欄位。
List<String> columns = new ArrayList<String>();
columns.add("column1");
columns.add("column2");
// 如果是所有欄位,欄位列表使用 columns.add("*") 即可。
databaseConfig.setColumns(columns, "your table name 1", "table schema name");
databaseConfig.setColumns(Collections.singletonList("*"),"your table name 2", "table schema name");
// If the value of column is empty, set null
databaseConfig.setEmptyAsNull(false);
// 使用insert ignore into方式進行插入。
databaseConfig.setInsertIgnore(true);
// commit時,寫入資料庫出現異常時重試的3次。
databaseConfig.setRetryTimes(3);
// 稍候再試的時間為1s,單位是ms。
databaseConfig.setRetryIntervalTime(1000);
// Initialize AdbClient,初始化執行個體之後,databaseConfig的配置資訊不能再修改。
Adb4pgClient adbClient = new Adb4pgClient(databaseConfig);
// 資料需要攢批,多次add,再commit,具體攢批數量見"注意事項"。
for (int i = 0; i < 10; i++) {
// Add row(s) to buffer. One instance for one record
Row row = new Row(columns.size());
// Set column
// the column index must be same as the sequence of columns
// the column value can be any type, internally it will be formatted according to column type
row.setColumn(0, i); // Number value
row.setColumn(1, "string value"); // String value
// 如果sql長度滿了會在addRow或者addMap的時候會進行一次自動認可。
// 如果提交失敗會返回AdbClientException異常,錯誤碼為COMMIT_ERROR_DATA_LIST。
adbClient.addRow(row, "your table name 1", "table schema name");
}
Row row = new Row();
row.setColumn(0, 10); // Number value
row.setColumn(1, "2018-01-01 08:00:00"); // Date/Timestamp/Time value
adbClient.addRow(row, "your table name 1", "table schema name");
// Update column. Row執行個體可複用。
row.updateColumn(0, 11);
row.updateColumn(1, "2018-01-02 08:00:00");
adbClient.addRow(row, "your table name 1", "table schema name");
// Add map(s) to buffer
Map<String, String> rowMap = new HashMap<String, String>();
rowMap.put("t1", "12");
rowMap.put("t2", "string value");
// 這邊需要攢批的,建議多次add之後在進行commit。
adbClient.addMap(rowMap, "your table name 2", "table schema name");
// Commit buffer to ADS
// Buffer is cleaned after successfully commit to ADS
try {
adbClient.commit();
} catch (Exception e) {
// TODO: Handle exception here
} finally {
adbClient.stop();
}
}
}
注意事項
AnalyticDB PostgreSQL版Client SDK是非安全執行緒的,所以如果多線程調用的情況,需要每個線程維護自己的Client對象。
重要不建議多線程共用SDK執行個體,除了安全執行緒問題外,也容易讓Client成為寫入效能的瓶頸。
資料必須在調用commit成功後才能認為是寫入AnalyticDB PostgreSQL版成功的。
針對Client拋出的異常,調用方要根據錯誤碼的意義自行判斷如何處理,如果是資料寫入有問題,可以重複提交或者記錄下有問題的資料後跳過。
很多時候寫入線程並不是越多越好,因為業務程式會涉及到攢資料的情境,對記憶體的消耗是比較明顯的,所以業務調用方一定要多多關注應用程式的GC情況。
資料攢批數量不要太小,如果太小,攢批寫入意義就不大了,條件允許的情況下可以add 10000條進行一次commit。
DatabaseConfig配置在執行個體化client對象成功之後是不能再修改的,所有配置項必須在client對象初始化之前完成配置。
Client SDK目的是對寫入(INSERT)提供效能最佳化,對於其他SQL操作,可以通過
getConnection()
獲得JDBC串連,通過標準JDBC介面進行處理。