本文介紹如何使用自訂SelectDB連接器寫入資料至雲資料庫SelectDB版。
背景資訊
雲資料庫 SelectDB 版是新一代即時資料倉庫SelectDB在阿里雲上的全託管服務,100%相容Apache Doris。您可以在阿里雲上便捷地購買SelectDB數倉服務,滿足海量資料分析需求,具體的產品優勢和應用情境請參見什麼是雲資料庫SelectDB版。
自訂SelectDB連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 結果表 |
運行模式 | 流模式和批模式 |
資料格式 | JSON和CSV |
特有監控指標 | 無 |
API種類 | DataStream和SQL |
是否支援更新/刪除 | 是 |
特色功能
支援整庫資料同步。
SelectDB連接器提供Exactly-Once語義,保證資料不重複也不丟失。
相容1.0及以上Apache Doris,可以使用Flink SelectDB自訂連接器同步資料至Apache Doris。
注意事項
使用方法
單擊JAR包擷取SelectDB自訂連接器(需要為1.15~1.17)。
在Realtime Compute開發控制台上,上傳SelectDB自訂連接器,詳情請參見管理自訂連接器。
在SQL作業中使用SelectDB自訂連接器,作業開發詳情請參見SQL作業開發。
具體的文法結構如下。
CREATE TABLE selectdb_sink ( emp_no INT , birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.employees', 'username' = 'admin', 'password' = '****', 'sink.enable-delete' = 'true' );
connector為表類型,固定值為
doris
。SelectDB自訂連接器結果表參數配置詳情請參見Sink配置項。
類型映射
使用樣本
本文以MySQL資料寫入SelectDB為例為您詳細介紹如何使用SelectDB自訂連接器。
準備工作。
建立Flink工作空間、MySQL和SelectDB執行個體,詳情請參見開通Realtime ComputeFlink版、第一步:快捷建立RDS MySQL執行個體與設定資料庫和建立執行個體。
在MySQL中建立名稱為order_dw_mysql的資料庫和名稱為orders的表並匯入測試資料。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee decimal(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
通過DMS串連雲資料庫SelectDB版執行個體後建立名稱為selectdb的資料庫和名稱為selecttable的表。
CREATE DATABASE selectdb; CREATE TABLE `selecttable` ( order_id bigint, user_id varchar(50), shop_id bigint, product_id bigint, buy_fee DECIMAL, create_time DATETIME, update_time DATETIME, state int )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
將Realtime ComputeFlink版的虛擬交換器的網段資訊添加到SelectDB的白名單中,詳情請參見空間管理與操作和設定白名單。
在 Realtime Compute開發控制台上建立Flink SQL作業並啟動。
建立名稱為mysqlcatalog的MySQL Catalog,詳情請參見管理MySQL Catalog。
單擊JAR包擷取SelectDB自訂連接器(需要為1.15~1.17),註冊SelectDB自訂連接器,詳情請參見管理自訂連接器。
在
新增作業草稿,程式碼範例如下。CREATE TEMPORARY TABLE selectdb_sink ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee DECIMAL, create_time TIMESTAMP(6), update_time TIMESTAMP(6), state int ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'selectdb.selecttable', 'username' = 'admin', 'password' = '${secret_values.selectdb}', 'sink.enable-delete' = 'true' ); INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
通過DMS串連雲資料庫SelectDB版執行個體後,查詢名稱為selecttable的表資料。
SELECT * FROM `selecttable` ;