全部产品
Search
文档中心

云原生数据仓库AnalyticDB:通过实时计算Flink集成向量数据

更新时间:Nov 16, 2023

云原生数据仓库AnalyticDB PostgreSQL版支持通过flink-adbpg-connector集成向量化数据。本文以将Kafka数据导入至AnalyticDB PostgreSQL版为例,介绍如何将向量数据导入AnalyticDB PostgreSQL版

前提条件

  • 已创建AnalyticDB PostgreSQL版实例。具体操作,请参见创建实例

  • 已创建Flink全托管工作空间,且与AnalyticDB PostgreSQL版实例位于同一VPC下。具体操作,请参见开通Flink全托管

  • AnalyticDB PostgreSQL版数据库已安装向量检索插件FastANN。

    您可以在psql客户端通过\dx fastann命令查看是否安装。

    • 如果返回FastANN插件的相关信息,表示已安装。

    • 如果没有返回任何信息,请提交工单联系技术支持进行安装。

  • 已购买并部署Kafka实例,且Kafka实例与AnalyticDB PostgreSQL版实例位于同一VPC下。具体操作,请参见购买和部署实例

  • 已将Flink工作空间和Kafka实例所属的网段加入AnalyticDB PostgreSQL版的白名单。具体操作,请参见设置白名单

测试数据

为方便测试,AnalyticDB PostgreSQL版提供了测试数据。下载链接,请参见vector_sample_data.csv

测试数据的表结构如下。

字段

类型

说明

id

bigint

编号。

market_time

timestamp

汽车上市时间。

color

varchar(10)

汽车的颜色。

price

int

汽车的价格。

feature

float4[]

汽车照片的特征向量。

操作流程

  1. 创建结构化索引和向量化索引

  2. 将向量化测试数据写入Kafka Topic

  3. 创建映射表并导入数据

创建结构化索引和向量化索引

  1. 连接AnalyticDB PostgreSQL版数据库。本文以通过psql客户端连接数据库为例,详情请参见psql连接数据库

  2. 执行以下命令,创建测试库并切换至测试库。

    CREATE DATABASE adbpg_test;
    \c adbpg_test
  3. 执行以下命令,创建目标表。

    CREATE SCHEMA IF NOT EXISTS vector_test;
    CREATE TABLE IF NOT EXISTS vector_test.car_info
    (
      id bigint NOT NULL,
      market_time timestamp,
      color varchar(10),
      price int,
      feature float4[],
      PRIMARY KEY(id)
    ) DISTRIBUTED BY(id);
  4. 执行以下命令,创建结构化索引和向量化索引。

    -- 修改向量列的存储格式为PLAIN。
    ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN;
    
    -- 创建结构化索引。
    CREATE INDEX ON vector_test.car_info(market_time);
    CREATE INDEX ON vector_test.car_info(color);
    CREATE INDEX ON vector_test.car_info(price);
    
    -- 创建向量索引。
    CREATE INDEX ON vector_test.car_info USING ann(feature) 
    WITH (dim='10', pq_enable='0');

将向量化测试数据写入Kafka Topic

  1. 执行以下命令,创建Kafka Topic。

    bin/kafka-topics.sh --create --topic vector_ingest --partitions 1\ 
    --bootstrap-server <your_broker_list>
  2. 执行以下命令,将向量测试数据写入Kafka Topic。

    bin/kafka-console-producer.sh\
    --bootstrap-server <your_broker_list>\
    --topic vector_ingest < ../vector_sample_data.csv

<your_broker_list>:接入点信息。您可在云消息队列Kafka版控制台实例详情页面的接入点信息区域获取。

创建映射表并导入数据

  1. 创建Flink作业。

    1. 登录实时计算控制台,在Flink全托管页签,单击目标工作空间操作列下的控制台

    2. 在左侧导航栏,单击SQL开发,单击新建,选择空白的流作业草稿,单击下一步

    3. 新建作业草稿对话框,填写作业配置信息。

      作业参数

      说明

      示例

      文件名称

      作业的名称。

      说明

      作业名称在当前项目中必须保持唯一。

      adbpg-test

      存储位置

      指定该作业的代码文件所属的文件夹。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

      作业草稿

      引擎版本

      当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

      vvr-6.0.6-flink-1.15

  2. 执行以下命令,创建AnalyticDB PostgreSQL版映射表。

    CREATE TABLE vector_ingest (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature VARCHAR
    )WITH (
       'connector' = 'adbpg-nightly-1.13',
       'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
       'tablename' = 'car_info',
       'username' = '<your_username>',
       'password' = '<your_password>',
       'targetschema' = 'vector_test',
       'maxretrytimes' = '2',
       'batchsize' = '3000',
       'batchwritetimeoutms' = '10000',
       'connectionmaxactive' = '20',
       'conflictmode' = 'ignore',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    参数说明,请参见写入数据到AnalyticDB PostgreSQL版

  3. 执行以下命令,创建Kafka映射表。

    CREATE TABLE vector_kafka (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature string
    ) 
    WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = '<your_broker_list>',
        'topic' = 'vector_ingest',
        'format' = 'csv',
        'csv.field-delimiter' = '\t',
        'scan.startup.mode' = 'earliest-offset'
    );

    参数说明如下。

    参数

    是否必填

    说明

    connector

    连接器名。固定值为Kafka。

    properties.bootstrap.servers

    接入点信息。您可在云消息队列Kafka版控制台的实例详情页面的接入点信息区域获取。

    topic

    Kafka消息所在的Topic名称。

    format

    写入Kafka消息Value部分时使用的格式。支持的格式:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    csv.field-delimiter

    CSV字段分隔符。

    scan.startup.mode

    Kafka读取数据的启动位点。取值如下:

    • earliest-offset:从Kafka最早分区开始读取。

    • latest-offset:从Kafka最新位点开始读取。

  4. 执行以下命令,创建导入任务。

    INSERT INTO vector_ingest SELECT * FROM vector_kafka;