全部產品
Search
文件中心

AnalyticDB:基於Client SDK資料寫入

更新時間:Feb 05, 2024

AnalyticDB PostgreSQL版Client SDK旨在通過API方式提供高效能COPY資料到AnalyticDB PostgreSQL版的方式。

AnalyticDB PostgreSQL版Client SDK支援使用者定製化開發或對接寫入程式。通過SDK開發寫入程式,可簡化在AnalyticDB PostgreSQL版中寫入資料的流程,無需擔心串連池、緩衝等問題,相比較直接COPY/INSERT寫入,通過並行化等內部機制有幾倍效能提升。

說明

AnalyticDB PostgreSQL版Client SDK主要職責是將您傳入的資料高效地寫入,不負責未經處理資料的讀取、處理等工作。

Maven repositories

您可以通過Maven管理配置新SDK的版本。Maven的配置資訊如下:

<dependency>
  <groupId>com.alibaba.cloud.analyticdb</groupId>
  <artifactId>adb4pgclient</artifactId>
  <version>1.0.4</version>
</dependency>
說明

離線Jar包下載連結:adb4pgclient-1.0.4.jar

介面列表

表 1. DatabaseConfig類

介面名

描述

setHost(String adbHost)

需要串連的AnalyticDB PostgreSQL版的串連地址。

setPort(int port)

需要串連的AnalyticDB PostgreSQL版的連接埠,預設為5432。

setDatabase(String database)

需要串連的AnalyticDB PostgreSQL版資料庫名稱。

setUser(String username)

需要串連的AnalyticDB PostgreSQL版使用的使用者名稱。

setPassword(String pwd)

設定串連的AnalyticDB PostgreSQL版使用的密碼。

addTable(List<String> table, String schema)

需要寫入的表名List,請按照表所屬schema分別添加。該方法可調用多次,但在使用DatabaseConfig構造Adb4PGClient對象之後再調用不再生效。

setColumns(List<String> columns, String tableName, String schemaName)

需要插入表的欄位名(若是全欄位插入,columnList.add("*")即可,table列表中的所有表都需要設定欄位名,否則檢查不會通過。

setInsertIgnore(boolean insertIgnore)

設定是否忽略發生主鍵衝突錯誤的資料行,要根據業務的使用情境進行判斷,針對配置的所有表。預設為false,表示不忽略發生主鍵衝突錯誤的資料行(即會覆蓋寫發生衝突的行)。

setEmptyAsNull(boolean emptyAsNull)

設定empty資料設定為null,預設false,針對配置的所有表。

setParallelNumber(int parallelNumber)

設定寫入AnalyticDB PostgreSQL版時的並發線程數,預設4,針對配置的所有表,一般情況不建議修改。

setLogger(Logger logger)

設定client中使用的logger對象,此處使用slf4j.Logger。

setRetryTimes(int retryTimes)

設定commit時,寫入AnalyticDB PostgreSQL版出現異常時重試的次數,預設為3。

setRetryIntervalTime(long retryIntervalTime)

設定稍候再試的時間,單位是ms,預設為 1000 ms。

setCommitSize(long commitSize)

設定自動認可的資料量(單位Byte),預設為10MB,一般不建議設定。

表 2. Row類

介面名稱

描述

setColumn(int index, Object value)

設定Row欄位列表的值,要求必須按照欄位的順序(此種方式,Row執行個體不可複用,每條資料必須單獨的Row執行個體)。

setColumnValues(List<Object> values)

直接將List格式資料行寫入Row中。

updateColumn(int index, Object value)

更新Row欄位列表的值,注意更新的欄位資料(此方法,Row執行個體可以複用,只需更新Row執行個體中的資料即可。

表 3. Adb4pgClient 類

介面名稱

描述

addRow(Row row, String tableName, String schemaName) / addRows(List<Row> rows, String tableName, String schemaName)

插入對應表的Row格式化的資料,即一條記錄,資料會儲存在SDK的緩衝區中,等待commit。如果資料量超過commitSize會在addRow/addRows的時候做一次自動commit,然後將最新的資料add進來;如果在自動commit失敗的時候失敗,調用方需要處理此異常,並且會在異常中得到失敗的資料list。

addMap(Map<String, String> dataMap,String tableName, String schemaName) / addMaps(List<Map<String, String>> dataMaps, String tableName, String schemaName)

對應於addRow,支援map格式資料的寫入,如果資料量滿了會在addMap/addMaps的時候做一次自動commit,然後將最新的資料add進來;如果在自動commit失敗的時候失敗,調用方需要處理此異常,並且會在異常中得到失敗的資料list。

commit()

將緩衝的資料進行提交,寫入AnalyticDB PostgreSQL版中,若commit失敗,會把執行錯誤的語句放在異常中拋出,調用方需要對此異常進行處理。

TableInfo getTableInfo(String tableName, String schemaName)

擷取對應table的結構資訊。

List<ColumnInfo> getColumnInfo(String tableName, String schemaName)

擷取對應table的欄位列表資訊,欄位類是ColumnInfo,可以通過columnInfo.isNullable()擷取該欄位是否能為null。

stop()

執行個體使用完之後,stop釋放內部線程池及資源,如果記憶體中有資料未commit,則會拋Exception,若需要強行stop,請使用forceStop()

forceStop()

強行釋放內部線程池及資源,會丟失掉緩衝在記憶體中未commit的資料,一般不推薦使用。

Connection getConnection() throws SQLException

從client串連池擷取資料庫Connection串連,調用方可以使用獲得的Connection做非copy操作,使用方式和jdbc的串連使用方式一致。

說明

使用結束後一定要釋放掉相應的資源(如ResultSet、Statement、Connection)。

表 4. ColumnInfo類

介面名稱

描述

boolean isNullable()

判斷該欄位是否能為null。

錯誤碼名

錯誤碼值

描述

COMMIT_ERROR_DATA_LIST

101

commit中某些資料出現異常,會返回異常的資料。

說明

通過e.getErrData()即可獲得異常資料List<String>,此錯誤碼在addMap(s)addRow(s)commit操作的時候都可能會發生,因此在這些操作的時候需要單獨處理此錯誤碼的異常

COMMIT_ERROR_OTHER

102

commit中的其他異常。

ADD_DATA_ERROR

103

add資料過程中出現的異常。

CREATE_CONNECTION_ERROR

104

建立串連出現異常。

CLOSE_CONNECTION_ERROR

105

關閉串連出現異常。

CONFIG_ERROR

106

配置DatabaseConfig出現配置錯誤。

STOP_ERROR

107

停止執行個體時的報錯。

OTHER

999

預設異常錯誤碼。

程式碼範例

public class Adb4pgClientUsage {
    public void demo() {
        DatabaseConfig databaseConfig = new DatabaseConfig();
        // Should set your database real host or url
        databaseConfig.setHost("100.100.100.100");
        // Should set your database real port
        databaseConfig.setPort(8888);
        // 串連資料庫的使用者名稱。
        databaseConfig.setUser("your user name");
        // 串連資料庫的密碼。
        databaseConfig.setPassword("your password");
      // 需要串連的database。
        databaseConfig.setDatabase("your database name");
        // 設定需要寫入的表名列表。
        List<String> tables = new ArrayList<String>();
        tables.add("your table name 1");
        tables.add("your table name 2");

        // 不同schema下的表可分別addTable,但是一旦使用databseconfig 創造Client執行個體之後,table配置是不可修改的。/
        // schema傳入null, 則預設schema為public。
        databaseConfig.addTable(tables, "table schema name");

        // 設定需要寫入的表欄位。
        List<String> columns = new ArrayList<String>();
        columns.add("column1");
        columns.add("column2");
        // 如果是所有欄位,欄位列表使用 columns.add("*") 即可。
        databaseConfig.setColumns(columns, "your table name 1", "table schema name");
        databaseConfig.setColumns(Collections.singletonList("*"),"your table name 2", "table schema name");


        // If the value of column is empty, set null
        databaseConfig.setEmptyAsNull(false);
        // 使用insert ignore into方式進行插入。
        databaseConfig.setInsertIgnore(true);
        // commit時,寫入資料庫出現異常時重試的3次。
        databaseConfig.setRetryTimes(3);
        // 稍候再試的時間為1s,單位是ms。
        databaseConfig.setRetryIntervalTime(1000);
        // Initialize AdbClient,初始化執行個體之後,databaseConfig的配置資訊不能再修改。
        Adb4pgClient adbClient = new Adb4pgClient(databaseConfig);

        // 資料需要攢批,多次add,再commit,具體攢批數量見"注意事項"。
        for (int i = 0; i < 10; i++) {
            // Add row(s) to buffer. One instance for one record
            Row row = new Row(columns.size());
            // Set column
            // the column index must be same as the sequence of columns
            // the column value can be any type, internally it will be formatted according to column type
            row.setColumn(0, i); // Number value
            row.setColumn(1, "string value"); // String value
            // 如果sql長度滿了會在addRow或者addMap的時候會進行一次自動認可。
            // 如果提交失敗會返回AdbClientException異常,錯誤碼為COMMIT_ERROR_DATA_LIST。
            adbClient.addRow(row, "your table name 1", "table schema name");
        }

        Row row = new Row();
        row.setColumn(0, 10); // Number value
        row.setColumn(1, "2018-01-01 08:00:00"); // Date/Timestamp/Time value
        adbClient.addRow(row, "your table name 1", "table schema name");
        // Update column. Row執行個體可複用。
        row.updateColumn(0, 11);
        row.updateColumn(1, "2018-01-02 08:00:00");
        adbClient.addRow(row, "your table name 1", "table schema name");

        // Add map(s) to buffer
        Map<String, String> rowMap = new HashMap<String, String>();
        rowMap.put("t1", "12");
        rowMap.put("t2", "string value");
        // 這邊需要攢批的,建議多次add之後在進行commit。
        adbClient.addMap(rowMap, "your table name 2", "table schema name");

        // Commit buffer to ADS
        // Buffer is cleaned after successfully commit to ADS
        try {
            adbClient.commit();
        } catch (Exception e) {
            // TODO: Handle exception here
        } finally {
            adbClient.stop();
        }
    }

}

注意事項

  • AnalyticDB PostgreSQL版Client SDK是非安全執行緒的,所以如果多線程調用的情況,需要每個線程維護自己的Client對象。

    重要

    不建議多線程共用SDK執行個體,除了安全執行緒問題外,也容易讓Client成為寫入效能的瓶頸。

  • 資料必須在調用commit成功後才能認為是寫入AnalyticDB PostgreSQL版成功的。

  • 針對Client拋出的異常,調用方要根據錯誤碼的意義自行判斷如何處理,如果是資料寫入有問題,可以重複提交或者記錄下有問題的資料後跳過。

  • 很多時候寫入線程並不是越多越好,因為業務程式會涉及到攢資料的情境,對記憶體的消耗是比較明顯的,所以業務調用方一定要多多關注應用程式的GC情況。

  • 資料攢批數量不要太小,如果太小,攢批寫入意義就不大了,條件允許的情況下可以add 10000條進行一次commit。

  • DatabaseConfig配置在執行個體化client對象成功之後是不能再修改的,所有配置項必須在client對象初始化之前完成配置。

  • Client SDK目的是對寫入(INSERT)提供效能最佳化,對於其他SQL操作,可以通過getConnection()獲得JDBC串連,通過標準JDBC介面進行處理。