Qdrant is a vector similarity search engine used to store, search, and manage vector data. You can migrate collection data from a self-managed Qdrant cluster to an AnalyticDB for PostgreSQL instance by using the Python programming language.
A Qdrant cluster is created.
Python is installed. We recommend that you use Python 3.8 or later.
The required Python libraries are installed.
pip install psycopg2 pip install qdrant-client==1.6.0 pip install pyaml pip install tqdm
Step 1: Export data from a Qdrant cluster
Prepare the
script andqdrant2csv.yaml
configuration file for data export, and create an output directory. In this topic,output
is used as the directory name.The
script contains the following content:import yaml import json from qdrant_client import QdrantClient import os from enum import IntEnum from tqdm import tqdm with open("./qdrant2csv.yaml", "r") as f: config = yaml.safe_load(f) print("configuration:") print(config) qdrant_config = config["qdrant"] class DataType(IntEnum): ID = 1 FLOAT_VECTOR = 2 JSON = 3 def data_convert_to_str(data, dtype, delimeter): if dtype == DataType.ID: return str(data) elif dtype == DataType.FLOAT_VECTOR: return "{" + ", ".join(str(x) for x in data) + "}" elif dtype == DataType.JSON: return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"") Exception(f"Unsupported DataType {dtype}") def csv_write_rows(datum, fd, fields_types, delimiter="|"): for data in datum: for i in range(len(data)): data[i] = data_convert_to_str(data[i], fields_types[i], delimiter) fd.write(delimiter.join(data) + "\n") def csv_write_header(headers, fd, delimiter="|"): fd.write(delimiter.join(headers) + "\n") def dump_collection(collection_name: str): results = [] file_cnt = 0 print("connecting to qdrant...") client = QdrantClient(**qdrant_config) export_config = config["export"] tmp_path = os.path.join(export_config["output_path"], collection_name) if not os.path.exists(tmp_path): os.mkdir(tmp_path) # fetch info of collection fields_meta_list = ["id bigint"] fields_types = [DataType.ID] headers = ["id"] collection = client.get_collection(collection_name) total_num = collection.points_count if isinstance(collection.config.params.vectors, dict): # multi vectors for vec_name in collection.config.params.vectors.keys(): fields_types.append(DataType.FLOAT_VECTOR) fields_meta_list.append(f"{vec_name} real[]") headers.append(vec_name) else: # single vector fields_types.append(DataType.FLOAT_VECTOR) fields_meta_list.append("vector real[]") headers.append("vector") fields_types.append(DataType.JSON) fields_meta_list.append("payload json") headers.append("payload") fields_meta_str = ','.join(fields_meta_list) create_table_sql = f"CREATE TABLE {collection_name} " \ f" ({fields_meta_str});" with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f_d: f_d.write(create_table_sql) print(create_table_sql) def write_to_csv_file(col_names, data): if len(results) == 0: return nonlocal file_cnt assert(file_cnt <= 1e9) output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv") with open(output_file_name, "w", newline="") as csv_file: # write header csv_write_header(col_names, csv_file) # write data csv_write_rows(data, csv_file, fields_types) file_cnt += 1 results.clear() offset_id = None with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar: while True: res = client.scroll(collection_name=collection_name, limit=1000, offset=offset_id, with_payload=True, with_vectors=True) records = res[0] for record in records: # append id record_list = [record.id] # append vectors if isinstance(record.vector, dict): # multi vector for vector_name in headers[1:-1]: record_list.append(record.vector[vector_name]) else: # single vector record_list.append(record.vector) # append payload record_list.append(json.dumps(record.payload, ensure_ascii=False)) results.append(record_list) if len(results) >= export_config["max_line_in_file"]: write_to_csv_file(headers, data=results) pbar.update(1) if len(res) == 0 or len(res[0]) == 0 or res[1] is None: # finished break else: offset_id = res[1] write_to_csv_file(headers, data=results) for name in config["export"]["collections"]: dump_collection(name)
configuration file contains the following content:qdrant: # The configuration items that are used to connect to the Qdrant cluster. host: 'localhost' # The host address of the Qdrant service. port: 6333 # The port number of the Qdrant service. Default value: 6433. grpc_port: 6434 # The port number of the gRPC interface. Default value: 6334. api_key: '' # The API key for authentication in Qdrant Cloud. url: '' # The hostname or string."Optional[scheme], host, Optional[port], Optional[prefix]" location: '' # If you set this field to memory, this prompts the script to connect to the Qdrant cluster in in-memory mode. If you enter a regular string, this operation is the same as specifying a complete URL in the url field. If you do not specify this field, the cluster is connected by using the host and port fields. export: collections: - 'test_collection' - 'multi' # All Qdrant collections that you want to export. max_line_in_file: 40000 # The maximum number of lines that are contained in each output CSV file. output_path: './output' # The path to the directory in which the exported CSV files are stored.
Store the
script, theqdrant2csv.yaml
configuration file, and theoutput
directory in the same directory. Directory hierarchy:├── export.py ├── qdrant2csv.yaml └── output
Modify the configuration items in the
configuration file based on the information about the Qdrant cluster.Run the Python script and view the output.
python export.py
Sample output:
. ├── export.py ├── qdrant2csv.yaml └── output ├── test_collection │ ├── 0000000000.csv │ ├── 0000000001.csv │ ├── 0000000002.csv │ └── create_table.sql └── multi ├── 0000000000.csv └── create_table.sql
Step 2: Import data to an AnalyticDB for PostgreSQL vector database
Prepare the following data for import: the
script, thecsv2adbpg.yaml
configuration file, and the output directory created in Step 1.The
script contains the following content:import psycopg2 import yaml import glob import os if __name__ == "__main__": with open('csv2adbpg.yaml', 'r') as config_file: config = yaml.safe_load(config_file) print("current config:" + str(config)) db_host = config['database']['host'] db_port = config['database']['port'] db_name = config['database']['name'] schema_name = config['database']['schema'] db_user = config['database']['user'] db_password = config['database']['password'] data_path = config['data_path'] conn = psycopg2.connect( host=db_host, port=db_port, database=db_name, user=db_user, password=db_password, options=f'-c search_path={schema_name},public' ) cur = conn.cursor() # check schema cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,)) existing_schema = cur.fetchone() if existing_schema: print(f"Schema {schema_name} already exists.") else: # create schema cur.execute(f"CREATE SCHEMA {schema_name}") print(f"Created schema: {schema_name}") for table_name in os.listdir(data_path): table_folder = os.path.join(data_path, table_name) print(f"Begin Process table: {table_name}") if os.path.isdir(table_folder): create_table_file = os.path.join(table_folder, 'create_table.sql') with open(create_table_file, 'r') as file: create_table_sql = file.read() try: cur.execute(create_table_sql) except psycopg2.errors.DuplicateTable as e: print(e) conn.rollback() continue print(f"Created table: {table_name}") cnt = 0 csv_files = glob.glob(os.path.join(table_folder, '*.csv')) for csv_file in csv_files: with open(csv_file, 'r') as file: copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER" cur.copy_expert(copy_command, file) cnt += 1 print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done") conn.commit() print(f"Finished import table: {table_name}") print('#'*60) cur.close() conn.close()
configuration file contains the following content:database: host: "192.16.XX.XX" # The public endpoint of the AnalyticDB for PostgreSQL instance. port: 5432 # The port number of the AnalyticDB for PostgreSQL instance. name: "vector_database" # The name of the destination database. user: "username" # The database account of the AnalyticDB for PostgreSQL instance. password: "" # The password of the database account. schema: "public" # The name of the schema. If the schema does not exist, the schema is automatically created. data_path: "./data" # The data source.
Store the
script, thecsv2adbpg.yaml
configuration file, and the data that you want to import in the same directory. Directory hierarchy:. ├── csv2adbpg.yaml ├── data │ ├── test_collection │ │ ├── 0000000000.csv │ │ ├── 0000000001.csv │ │ ├── 0000000002.csv │ │ └── create_table.sql │ └── multi │ ├── 0000000000.csv │ └── create_table.sql └── import.py
Modify the configuration items in the
configuration file based on the information about the AnalyticDB for PostgreSQL instance.Run the Python script.
python import.py
Check whether data is imported to the AnalyticDB for PostgreSQL vector database.
Rebuild the required indexes. For more information, see Create a vector index.
For more information about Qdrant, see Qdrant documentation.