雲原生資料倉儲AnalyticDB PostgreSQL版支援通過flink-adbpg-connector整合向量化資料。本文以將Kafka資料匯入至AnalyticDB PostgreSQL版為例,介紹如何將向量資料匯入AnalyticDB PostgreSQL版。
前提條件
已建立AnalyticDB PostgreSQL版執行個體。具體操作,請參見建立執行個體。
已建立Flink全託管工作空間,且與AnalyticDB PostgreSQL版執行個體位於同一VPC下。具體操作,請參見開通Realtime ComputeFlink版。
若Realtime ComputeFlink端使用開源自建版,請確保flink-adbpg-connector已安裝在
$FLINK_HOME/lib目錄下。若使用公用雲端託管版,則無任何操作。
AnalyticDB PostgreSQL版資料庫已安裝向量檢索外掛程式FastANN。
您可以在psql用戶端通過
\dx fastann命令查看是否安裝。如果返回FastANN外掛程式的相關資訊,表示已安裝。
如果沒有返回任何資訊,請提交工單聯絡支援人員進行安裝。
已購買並部署Kafka執行個體,且Kafka執行個體與AnalyticDB PostgreSQL版執行個體位於同一VPC下。具體操作,請參見購買和部署執行個體。
已將Flink工作空間和Kafka執行個體所屬的網段加入AnalyticDB PostgreSQL版的白名單。具體操作,請參見設定白名單。
測試資料
為方便測試,AnalyticDB PostgreSQL版提供了測試資料。下載連結,請參見vector_sample_data.csv。
測試資料的表結構如下。
欄位 | 類型 | 說明 |
id | bigint | 編號。 |
market_time | timestamp | 汽車上市時間。 |
color | varchar(10) | 汽車的顏色。 |
price | int | 汽車的價格。 |
feature | float4[] | 汽車照片的特徵向量。 |
操作流程
建立結構化索引和向量化索引
串連AnalyticDB PostgreSQL版資料庫。本文以通過psql用戶端串連資料庫為例,詳情請參見psql串連資料庫。
執行以下命令,建立測試庫並切換至測試庫。
CREATE DATABASE adbpg_test; \c adbpg_test執行以下命令,建立目標表。
CREATE SCHEMA IF NOT EXISTS vector_test; CREATE TABLE IF NOT EXISTS vector_test.car_info ( id bigint NOT NULL, market_time timestamp, color varchar(10), price int, feature float4[], PRIMARY KEY(id) ) DISTRIBUTED BY(id);執行以下命令,建立結構化索引和向量化索引。
-- 修改向量列的儲存格式為PLAIN。 ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN; -- 建立結構化索引。 CREATE INDEX ON vector_test.car_info(market_time); CREATE INDEX ON vector_test.car_info(color); CREATE INDEX ON vector_test.car_info(price); -- 建立向量索引。 CREATE INDEX ON vector_test.car_info USING ann(feature) WITH (dim='10', pq_enable='0');
將向量化測試資料寫入Kafka Topic
執行以下命令,建立Kafka Topic。
bin/kafka-topics.sh --create --topic vector_ingest --partitions 1\ --bootstrap-server <your_broker_list>執行以下命令,將向量測試資料寫入Kafka Topic。
bin/kafka-console-producer.sh\ --bootstrap-server <your_broker_list>\ --topic vector_ingest < ../vector_sample_data.csv
<your_broker_list>:存取點資訊。您可在雲訊息佇列Kafka版控制台的執行個體詳情頁面的存取點資訊地區擷取。
建立映射表並匯入資料
建立Flink作業。
登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊SQL開發,單擊建立,選擇空白的流作業草稿,單擊下一步。
在新增作業草稿對話方塊,填寫作業配置資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
adbpg-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊
表徵圖,建立子檔案夾。作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
vvr-6.0.6-flink-1.15
執行以下命令,建立AnalyticDB PostgreSQL版映射表。
CREATE TABLE vector_ingest ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature VARCHAR )WITH ( 'connector' = 'adbpg-nightly-1.13', 'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test', 'tablename' = 'car_info', 'username' = '<your_username>', 'password' = '<your_password>', 'targetschema' = 'vector_test', 'maxretrytimes' = '2', 'batchsize' = '3000', 'batchwritetimeoutms' = '10000', 'connectionmaxactive' = '20', 'conflictmode' = 'ignore', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );參數說明,請參見寫入資料到AnalyticDB PostgreSQL版。
執行以下命令,建立Kafka映射表。
CREATE TABLE vector_kafka ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature string ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<your_broker_list>', 'topic' = 'vector_ingest', 'format' = 'csv', 'csv.field-delimiter' = '\t', 'scan.startup.mode' = 'earliest-offset' );參數說明如下。
參數
是否必填
說明
connector
是
連接器名。固定值為Kafka。
properties.bootstrap.servers
是
存取點資訊。您可在雲訊息佇列Kafka版控制台的執行個體詳情頁面的存取點資訊地區擷取。
topic
是
Kafka訊息所在的Topic名稱。
format
是
寫入Kafka訊息Value部分時使用的格式。支援的格式:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
csv.field-delimiter
是
CSV欄位分隔符號。
scan.startup.mode
是
Kafka讀取資料的啟動位點。取值如下:
earliest-offset:從Kafka最早分區開始讀取。
latest-offset:從Kafka最新位點開始讀取。
執行以下命令,建立匯入任務。
INSERT INTO vector_ingest SELECT * FROM vector_kafka;