AnalyticDB for PostgreSQL allows you to import vector data by using flink-adbpg-connector. This topic describes how to import vector data to AnalyticDB for PostgreSQL. In this example, ApsaraMQ for Kafka data is used.
Prerequisites
An AnalyticDB for PostgreSQL instance is created. For more information, see Create an instance.
A fully managed Flink workspace is created. The Flink workspace resides in the same virtual private cloud (VPC) as the AnalyticDB for PostgreSQL instance. For more information, see Activate fully managed Flink.
If you use open source self-managed Flink, make sure that flink-adbpg-connector is installed in the
$FLINK_HOME/lib
directory.If you use fully managed Flink, no operations are required.
The vector retrieval extension FastANN is installed in the AnalyticDB for PostgreSQL database.
You can run the
\dx fastann
command on the psql client to check whether the FastANN extension is installed.If relevant information about the extension is returned, the extension is installed.
If no information is returned, Submit a ticket to install the extension.
An ApsaraMQ for Kafka instance is purchased and deployed. The instance resides in the same VPC as the AnalyticDB for PostgreSQL instance. For more information, see Purchase and deploy an Internet- and VPC-connected instance.
The CIDR blocks of the Flink workspace and the ApsaraMQ for Kafka instance are added to an IP address whitelist of the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.
Test data
To facilitate your test, AnalyticDB for PostgreSQL provides a test data file named vector_sample_data.csv.
The following table describes the schema of the file.
Field | Type | Description |
id | bigint | The serial number of the car. |
market_time | timestamp | The time when the car is launched to the market. |
color | varchar(10) | The color of the car. |
price | int | The price of the car. |
feature | float4[] | The feature vectors of the car image. |
In the Linux system, you can run a command to download the test data. Sample command:
wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.cs
Procedure
Create structured indexes and a vector index
Connect to the AnalyticDB for PostgreSQL database. In this example, the psql client is used to connect to the instance. For more information, see the "psql" section of the Use client tools to connect to an instance topic.
Create and switch to a test database.
CREATE DATABASE adbpg_test; \c adbpg_test
Create a destination table.
CREATE SCHEMA IF NOT EXISTS vector_test; CREATE TABLE IF NOT EXISTS vector_test.car_info ( id bigint NOT NULL, market_time timestamp, color varchar(10), price int, feature float4[], PRIMARY KEY(id) ) DISTRIBUTED BY(id);
Create structured indexes and a vector index.
-- Change the storage format of the vector column to PLAIN. ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN; -- Create structured indexes. CREATE INDEX ON vector_test.car_info(market_time); CREATE INDEX ON vector_test.car_info(color); CREATE INDEX ON vector_test.car_info(price); -- Create a vector index. CREATE INDEX ON vector_test.car_info USING ann(feature) WITH (dim='10', pq_enable='0');
Write the vector test data to an ApsaraMQ for Kafka topic
Create an ApsaraMQ for Kafka topic.
bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 --bootstrap-server <your_broker_list>
Write the vector test data to the ApsaraMQ for Kafka topic.
bin/kafka-console-producer.sh --bootstrap-server <your_broker_list> --topic vector_ingest < ../vector_sample_data.csv
<your_broker_list>
: the endpoint of the ApsaraMQ for Kafka instance. You can go to the ApsaraMQ for Kafka console and view the endpoint of the instance in the Endpoint Information section of the Instance Details page.
Create mapping tables and import data
Create a Flink draft.
Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click SQL Editor. In the upper-left corner of the SQL Editor page, click New. In the New Draft dialog box, click Blank Stream Draft on the SQL Scripts tab and click Next.
In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.
Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
adbpg-test
Location
The folder in which the code file of the draft is stored.
You can click the icon to the right of a folder to create a subfolder.
Draft
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
vvr-6.0.6-flink-1.15
Create an AnalyticDB for PostgreSQL mapping table.
CREATE TABLE vector_ingest ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature VARCHAR )WITH ( 'connector' = 'adbpg-nightly-1.13', 'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test', 'tablename' = 'car_info', 'username' = '<your_username>', 'password' = '<your_password>', 'targetschema' = 'vector_test', 'maxretrytimes' = '2', 'batchsize' = '3000', 'batchwritetimeoutms' = '10000', 'connectionmaxactive' = '20', 'conflictmode' = 'ignore', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
For more information about the parameters, see Use Realtime Compute for Apache Flink to write data to AnalyticDB for PostgreSQL.
Create an ApsaraMQ for Kafka mapping table.
CREATE TABLE vector_kafka ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature string ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<your_broker_list>', 'topic' = 'vector_ingest', 'format' = 'csv', 'csv.field-delimiter' = '\t', 'scan.startup.mode' = 'earliest-offset' );
The following table describes the parameters.
Parameter
Required
Description
connector
Yes
The name of the connector. Set the value to kafka.
properties.bootstrap.servers
Yes
The endpoint of the ApsaraMQ for Kafka instance. You can go to the ApsaraMQ for Kafka console and view the endpoint of the instance in the Endpoint Information section of the Instance Details page.
topic
Yes
The name of the topic that contains ApsaraMQ for Kafka messages.
format
Yes
The format that is used to write the value fields of ApsaraMQ for Kafka messages. Valid values:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
csv.field-delimiter
Yes
The delimiter of CSV fields.
scan.startup.mode
Yes
The start offset from which data is read from the ApsaraMQ for Kafka instance. Valid values:
earliest-offset: Data is read from the earliest partition of the ApsaraMQ for Kafka instance.
latest-offset: Data is read from the latest partition of the ApsaraMQ for Kafka instance.
Create an import task.
INSERT INTO vector_ingest SELECT * FROM vector_kafka;