Milvus is a database designed to handle queries over input vectors and can index vectors on a trillion scale. You can migrate data from a self-managed Milvus cluster to an AnalyticDB for PostgreSQL instance by using the Python programming language.
Prerequisites
A Milvus cluster of v2.3.x or later is created.
Python 3.8 or later is installed.
The required Python libraries are installed.
pip install psycopg2 pip install pymilvus==2.3.0 pip install pyaml pip install tqdm
Procedure
Step 1: Export data from a Milvus cluster
Prepare the
export.py
script andmilvus2csv.yaml
configuration file for data export, and create an output directory. In this topic,output
is used as the directory name.The
export.py
script contains the following content:import yaml import json from pymilvus import ( connections, DataType, Collection, ) import os from tqdm import tqdm with open("./milvus2csv.yaml", "r") as f: config = yaml.safe_load(f) print("configuration:") print(config) milvus_config = config["milvus"] milvus_type_to_adbpg_type = { DataType.BOOL: "bool", DataType.INT8: "smallint", DataType.INT16: "smallint", DataType.INT32: "integer", DataType.INT64: "bigint", DataType.FLOAT: "real", DataType.DOUBLE: "double precision", DataType.STRING: "text", DataType.VARCHAR: "varchar", DataType.JSON: "json", DataType.BINARY_VECTOR: "bit[]", DataType.FLOAT_VECTOR: "real[]", } def convert_to_binary(binary_data): decimal_value = int.from_bytes(binary_data, byteorder='big') binary_string = bin(decimal_value)[2:].zfill(len(binary_data) * 8) return ','.join(list(binary_string)) def data_convert_to_str(data, dtype, delimeter): if dtype == DataType.BOOL: return "1" if data else "0" elif dtype in [DataType.INT8, DataType.INT16, DataType.INT32, DataType.INT64, DataType.FLOAT, DataType.DOUBLE]: return str(data) elif dtype in [DataType.STRING, DataType.VARCHAR]: return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"") elif dtype == DataType.JSON: return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"") elif dtype == DataType.BINARY_VECTOR: return "{" + ','.join([convert_to_binary(d) for d in data]) + "}" elif dtype == DataType.FLOAT_VECTOR: return data Exception(f"Unsupported DataType {dtype}") def csv_write_rows(datum, fd, fields_types, delimiter="|"): for data in datum: for i in range(len(data)): ftype = fields_types[i] data[i] = data_convert_to_str(data[i], ftype, delimiter) fd.write(delimiter.join(data) + "\n") def csv_write_header(headers, fd, delimiter="|"): fd.write(delimiter.join(headers) + "\n") def dump_collection(collection_name: str): results = [] file_cnt = 0 print("connecting to milvus...") connections.connect("default", **milvus_config) export_config = config["export"] collection = Collection(collection_name) collection.load() tmp_path = os.path.join(export_config["output_path"], collection_name) if not os.path.exists(tmp_path): os.mkdir(tmp_path) fields_meta_str = "" fields_types = [] headers = [] for schema in collection.schema.fields: print(schema) fields_types.append(schema.dtype) headers.append(schema.name) if len(fields_meta_str) != 0: fields_meta_str += "," fields_meta_str += f"{schema.name} {milvus_type_to_adbpg_type[schema.dtype]}" if schema.dtype == DataType.VARCHAR and "max_length" in schema.params.keys(): fields_meta_str += f"({schema.params['max_length']})" if schema.is_primary: fields_meta_str += " PRIMARY KEY" create_table_sql = f"CREATE TABLE {collection.name} " \ f" ({fields_meta_str});" with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f: f.write(create_table_sql) print(create_table_sql) print(headers) total_num = collection.num_entities collection.load() query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers) def write_to_csv_file(col_names, data): if len(results) == 0: return nonlocal file_cnt assert(file_cnt <= 1e9) output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv") with open(output_file_name, "w", newline="") as csv_file: # write header csv_write_header(col_names, csv_file) # write data csv_write_rows(data, csv_file, fields_types) file_cnt += 1 results.clear() with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar: while True: res = query_iterator.next() if len(res) == 0: print("query iteration finished, close") # close the iterator query_iterator.close() break for row in res: row_list = [] for i in range(len(headers)): field = row[headers[i]] if isinstance(field, list) and fields_types[i] != DataType.BINARY_VECTOR: row_list.append("{" + ", ".join(str(x) for x in field) + "}") elif isinstance(field, dict): row_list.append(json.dumps(field, ensure_ascii=False)) else: row_list.append(field) results.append(row_list) if len(results) >= export_config["max_line_in_file"]: write_to_csv_file(headers, data=results) pbar.update(1) write_to_csv_file(headers, data=results) if __name__ == "__main__": for name in config["export"]["collections"]: dump_collection(name)
The
milvus2csv.yaml
configuration file contains the following content:milvus: host: '<localhost>' # The host address of the Milvus service. port: 19530 # The port number of the Milvus service. user: '<user_name>' # The username. password: '<password>' # The password. db_name: '<database_name>' # The name of the database. token: '<token_id>' # The access token. export: collections: - 'test' - 'medium_articles_with_json' # - 'hello_milvus' # - 'car' # - 'medium_articles_with_dynamic' # Specify the names of all collections that you want to export. max_line_in_file: 40000 # The maximum number of lines that are contained in each output CSV file. output_path: './output' # The path to the directory in which the exported CSV files are stored. In this example, ./output is used.
Store the
export.py
script, themilvus2csv.yaml
configuration file, and theoutput
directory in the same directory. Directory hierarchy:├── export.py ├── milvus2csv.yaml └── output
Modify the configuration items in the
milvus2csv.yaml
configuration file based on the information about the Milvus cluster.Run the Python script and view the output.
python export.py
Sample output:
. ├── export.py ├── milvus2csv.yaml └── output ├── medium_articles_with_json │ ├── 0000000000.csv │ ├── 0000000001.csv │ ├── 0000000002.csv │ └── create_table.sql └── test ├── 0000000000.csv └── create_table.sql
Step 2: Import data to an AnalyticDB for PostgreSQL vector database
Prepare the following data for import: the
import.py
script, thecsv2adbpg.yaml
configuration file, and the output directory created in Step 1.The
import.py
script contains the following content:import psycopg2 import yaml import glob import os if __name__ == "__main__": with open('csv2adbpg.yaml', 'r') as config_file: config = yaml.safe_load(config_file) print("current config:" + str(config)) db_host = config['database']['host'] db_port = config['database']['port'] db_name = config['database']['name'] schema_name = config['database']['schema'] db_user = config['database']['user'] db_password = config['database']['password'] data_path = config['data_path'] conn = psycopg2.connect( host=db_host, port=db_port, database=db_name, user=db_user, password=db_password, options=f'-c search_path={schema_name},public' ) cur = conn.cursor() # check schema cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,)) existing_schema = cur.fetchone() if existing_schema: print(f"Schema {schema_name} already exists.") else: # create schema cur.execute(f"CREATE SCHEMA {schema_name}") print(f"Created schema: {schema_name}") for table_name in os.listdir(data_path): table_folder = os.path.join(data_path, table_name) print(f"Begin Process table: {table_name}") if os.path.isdir(table_folder): create_table_file = os.path.join(table_folder, 'create_table.sql') with open(create_table_file, 'r') as file: create_table_sql = file.read() try: cur.execute(create_table_sql) except psycopg2.errors.DuplicateTable as e: print(e) conn.rollback() continue print(f"Created table: {table_name}") cnt = 0 csv_files = glob.glob(os.path.join(table_folder, '*.csv')) for csv_file in csv_files: with open(csv_file, 'r') as file: copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER" cur.copy_expert(copy_command, file) cnt += 1 print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done") conn.commit() print(f"Finished import table: {table_name}") print(' # '*60) cur.close() conn.close()
The
csv2adbpg.yaml
configuration file contains the following content:database: host: "192.16.XX.XX" # The public endpoint of the AnalyticDB for PostgreSQL instance. port: 5432 # The port number of the AnalyticDB for PostgreSQL instance. name: "vector_database" # The name of the destination database. user: "username" # The database account of the AnalyticDB for PostgreSQL instance. password: "" # The password of the database account. schema: "public" # The name of the schema. If the schema does not exist, the schema is automatically created. data_path: "./data" # The data source.
Store the
import.py
script, the csv2adbpg.yaml configuration file, and the data that you want to import in the same directory. Directory hierarchy:. ├── csv2adbpg.yaml ├── data │ ├── medium_articles_with_json │ │ ├── 0000000000.csv │ │ ├── 0000000001.csv │ │ ├── 0000000002.csv │ │ └── create_table.sql │ └── test │ ├── 0000000000.csv │ └── create_table.sql └── import.py
Modify the configuration items in the
csv2adbpg.yaml
configuration file based on the information about the AnalyticDB for PostgreSQL instance.Run the Python script.
python import.py
Check whether data is imported to the AnalyticDB for PostgreSQL vector database.
Rebuild the required indexes. For more information, see Create a vector index.
References
For more information about Milvus, see Milvus documentation.