全部產品
Search
文件中心

AnalyticDB:通過Realtime ComputeFlink整合向量資料

更新時間:Mar 06, 2025

雲原生資料倉儲AnalyticDB PostgreSQL版支援通過flink-adbpg-connector整合向量化資料。本文以將Kafka資料匯入至AnalyticDB PostgreSQL版為例,介紹如何將向量資料匯入AnalyticDB PostgreSQL版

前提條件

  • 已建立AnalyticDB PostgreSQL版執行個體。具體操作,請參見建立執行個體

  • 已建立Flink全託管工作空間,且與AnalyticDB PostgreSQL版執行個體位於同一VPC下。具體操作,請參見開通Realtime ComputeFlink版

  • AnalyticDB PostgreSQL版資料庫已安裝向量檢索外掛程式FastANN。

    您可以在psql用戶端通過\dx fastann命令查看是否安裝。

    • 如果返回FastANN外掛程式的相關資訊,表示已安裝。

    • 如果沒有返回任何資訊,請提交工單聯絡支援人員進行安裝。

  • 已購買並部署Kafka執行個體,且Kafka執行個體與AnalyticDB PostgreSQL版執行個體位於同一VPC下。具體操作,請參見購買和部署執行個體

  • 已將Flink工作空間和Kafka執行個體所屬的網段加入AnalyticDB PostgreSQL版的白名單。具體操作,請參見設定白名單

測試資料

為方便測試,AnalyticDB PostgreSQL版提供了測試資料。下載連結,請參見vector_sample_data.csv

測試資料的表結構如下。

欄位

類型

說明

id

bigint

編號。

market_time

timestamp

汽車上市時間。

color

varchar(10)

汽車的顏色。

price

int

汽車的價格。

feature

float4[]

汽車照片的特徵向量。

操作流程

  1. 建立結構化索引和向量化索引

  2. 將向量化測試資料寫入Kafka Topic

  3. 建立映射表並匯入資料

建立結構化索引和向量化索引

  1. 串連AnalyticDB PostgreSQL版資料庫。本文以通過psql用戶端串連資料庫為例,詳情請參見psql串連資料庫

  2. 執行以下命令,建立測試庫並切換至測試庫。

    CREATE DATABASE adbpg_test;
    \c adbpg_test
  3. 執行以下命令,建立目標表。

    CREATE SCHEMA IF NOT EXISTS vector_test;
    CREATE TABLE IF NOT EXISTS vector_test.car_info
    (
      id bigint NOT NULL,
      market_time timestamp,
      color varchar(10),
      price int,
      feature float4[],
      PRIMARY KEY(id)
    ) DISTRIBUTED BY(id);
  4. 執行以下命令,建立結構化索引和向量化索引。

    -- 修改向量列的儲存格式為PLAIN。
    ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN;
    
    -- 建立結構化索引。
    CREATE INDEX ON vector_test.car_info(market_time);
    CREATE INDEX ON vector_test.car_info(color);
    CREATE INDEX ON vector_test.car_info(price);
    
    -- 建立向量索引。
    CREATE INDEX ON vector_test.car_info USING ann(feature) 
    WITH (dim='10', pq_enable='0');

將向量化測試資料寫入Kafka Topic

  1. 執行以下命令,建立Kafka Topic。

    bin/kafka-topics.sh --create --topic vector_ingest --partitions 1\ 
    --bootstrap-server <your_broker_list>
  2. 執行以下命令,將向量測試資料寫入Kafka Topic。

    bin/kafka-console-producer.sh\
    --bootstrap-server <your_broker_list>\
    --topic vector_ingest < ../vector_sample_data.csv

<your_broker_list>:存取點資訊。您可在雲訊息佇列Kafka版控制台執行個體詳情頁面的存取點資訊地區擷取。

建立映射表並匯入資料

  1. 建立Flink作業。

    1. 登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台

    2. 在左側導覽列,單擊SQL開發,單擊建立,選擇空白的流作業草稿,單擊下一步

    3. 新增作業草稿對話方塊,填寫作業配置資訊。

      作業參數

      說明

      樣本

      檔案名稱

      作業的名稱。

      說明

      作業名稱在當前專案中必須保持唯一。

      adbpg-test

      儲存位置

      指定該作業的代碼檔案所屬的檔案夾。

      您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

      作業草稿

      引擎版本

      當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

      vvr-6.0.6-flink-1.15

  2. 執行以下命令,建立AnalyticDB PostgreSQL版映射表。

    CREATE TABLE vector_ingest (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature VARCHAR
    )WITH (
       'connector' = 'adbpg-nightly-1.13',
       'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
       'tablename' = 'car_info',
       'username' = '<your_username>',
       'password' = '<your_password>',
       'targetschema' = 'vector_test',
       'maxretrytimes' = '2',
       'batchsize' = '3000',
       'batchwritetimeoutms' = '10000',
       'connectionmaxactive' = '20',
       'conflictmode' = 'ignore',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    參數說明,請參見寫入資料到AnalyticDB PostgreSQL版

  3. 執行以下命令,建立Kafka映射表。

    CREATE TABLE vector_kafka (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature string
    ) 
    WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = '<your_broker_list>',
        'topic' = 'vector_ingest',
        'format' = 'csv',
        'csv.field-delimiter' = '\t',
        'scan.startup.mode' = 'earliest-offset'
    );

    參數說明如下。

    參數

    是否必填

    說明

    connector

    連接器名。固定值為Kafka。

    properties.bootstrap.servers

    存取點資訊。您可在雲訊息佇列Kafka版控制台的執行個體詳情頁面的存取點資訊地區擷取。

    topic

    Kafka訊息所在的Topic名稱。

    format

    寫入Kafka訊息Value部分時使用的格式。支援的格式:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    csv.field-delimiter

    CSV欄位分隔符號。

    scan.startup.mode

    Kafka讀取資料的啟動位點。取值如下:

    • earliest-offset:從Kafka最早分區開始讀取。

    • latest-offset:從Kafka最新位點開始讀取。

  4. 執行以下命令,建立匯入任務。

    INSERT INTO vector_ingest SELECT * FROM vector_kafka;