全部產品
Search
文件中心

Hologres:Python

更新時間:Oct 23, 2024

Psycopg是Python用於操作PostgreSQL的庫。Hologres相容PostgreSQL 11,因此您可以通過psycopg訪問Hologres。本文將指導您使用psycopg2訪問Hologres,樣本使用的作業環境為基於CentOS 7系統的Python 3.8版本。

安裝Python3.8

您可以基於Miniconda、Anaconda安裝Python 3.8環境。如下內容以CentOS 7系統為例,安裝Python 3.8版本。

  1. 安裝Python 3.8。

    您可以下載對應版本的Python,執行如下命令進行安裝。

    # yum install centos-release-scl
    # yum install rh-python38
    # scl enable rh-python38 bash
    # python --version
    Python 3.8.6
  2. 安裝psycopg2模組。

    執行如下命令安裝psycopg2模組。

     # pip install psycopg2-binary

串連Hologres

Python3.8環境和psycopg2安裝完成之後,您可以執行如下操作並串連Hologres。

  1. 載入psycopg2。

    如果需要使用psycopg2,您可以執行命令import psycopg2載入安裝的psycopg2。

  2. 建立資料庫連接。

    您可以使用psycopg2.connect()函數串連Hologres,具體文法和參數說明如下所示。

    conn = psycopg2.connect(host="<Endpoint>",
                            port=<Port>,
                            dbname="<databases>",
                            user="<Access ID>",
                            password="<Access Key>",
                            keepalives=<keepalives>,
                            keepalives_idle=<keepalives_idle>,
                            keepalives_interval=<keepalives_interval>,
                            keepalives_count=<keepalives_count>,
                            application_name="<Application Name>"
    
                           )
    

    參數

    描述

    host

    Hologres執行個體的網路地址。

    進入Hologres管理主控台的執行個體詳情頁,從網路資訊擷取網路地址。

    port

    Hologres的執行個體連接埠。

    您可以進入Hologres管理主控台的執行個體詳情頁,從網路資訊擷取連接埠。

    dbname

    Hologres建立的資料庫名稱。

    user

    當前阿里雲帳號的AccessKey ID。

    您可以單擊AccessKey 管理,擷取AccessKey ID。

    password

    當前阿里雲帳號的AccessKey Secret。

    您可以單擊AccessKey 管理,擷取AccessKey Secret。

    application_name

    可選,應用程式名稱,用於記錄查詢日誌時識別SQL代表的應用含義。

    說明

    配置該參數,有助於您在慢Query清單中根據Application Name快速定位您的發起請求的應用。

    keepalives

    可選(推薦使用),串連方式:

    • 1表示使用長串連。

    • 0表示非長串連。

    keepalives_idle

    空閑時,保持串連連通的時間間隔,單位秒(s)。

    keepalives_interval

    沒得到回應時,等待重新嘗試保持連通的時間間隔,單位秒(s)。

    keepalives_count

    嘗試重新保持連通最大次數。

    程式碼範例如下。

    conn = psycopg2.connect(host="<Endpoint>",
                            port=<Port>,
                            dbname="<databases>",
                            user="<Access ID>",
                            password="<Access Key>",
                            keepalives=1,  # 保持串連
                            keepalives_idle=130,  # 空閑時,每130秒保持串連連通
                            keepalives_interval=10,   # 沒得到回應時,等待10秒重新嘗試保持連通
                            keepalives_count=15,   # 嘗試最多15次重新保持連通
                            application_name="<Application Name>"
                           )
    
    

使用Hologres

當您成功串連Hologres資料庫之後,即可通過psycopg2進行資料開發操作。如下內容將指導您建立表、插入資料、查詢和釋放資源等操作。如果需要使用Fixed Plan能力實現更高效能的讀寫操作,需要配置相關GUC參數,請參見Fixed Plan加速SQL執行

  1. 建立遊標。

    在進行資料開發之前,您需要執行命令cur = conn.cursor()來建立串連的遊標。

  2. 資料開發。

    1. 建立表

      您可以執行如下命令,建立一個表holo_test並定義表的資料類型為integer。您也可以根據業務需求定義表名稱和資料類型。

      cur.execute("CREATE TABLE holo_test (num integer);")
    2. 插入資料

      您可以執行如下命令,為建立的表holo_test插入資料1~1000。

      cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
    3. 查詢資料

      cur.execute("SELECT sum(num) FROM holo_test;")
      cur.fetchone()
  3. 提交事務。

    在查詢資料的命令之後,您需要執行命令conn.commit()提交事務,此操作可以確保操作已經提交。也可以把autocommit參數設定為true,實現SQL命令的自動認可。

  4. 釋放資源。

    為避免影響後續的操作,當操作執行完成後,您需要執行如下命令關閉遊標並斷開資料庫連接。

    cur.close()
    conn.close()

Pandas DataFrame快速寫入Hologres最佳實務

使用Python時,經常會使用Pandas將資料轉換為DataFrame,並對DataFrame進行處理,最終將DataFrame匯入Hologres,此時希望將DataFrame快速匯入Hologres。匯入時候常用to_sql函數,詳情請參見Pandas

需要Pandas為V1.4.2及以上版本,您可以執行如下命令強制安裝V1.5.1版本的Pandas庫。

# pip install Pandas==1.5.1

推薦使用to_sql函數的callable方式,使用copy方式匯入資料,範例的Python代碼如下。

# 載入依賴
import pandas as pd
import psycopg2

# 產生連接字串
host="hgpostcn-cn-xxxxxx-cn-hangzhou.hologres.aliyuncs.com"
port=80
dbname="demo"
user="LTAI5xxxxx"
password="fa8Kdgxxxxx"
application_name="Python Test"
conn = "postgresql+psycopg2://{}:{}@{}:{}/{}?application_name={}".format(user, password, host, port, dbname,application_name)
print(conn)

# 產生dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)

# 定義callable函數
import csv
from io import StringIO

def psql_insert_copy(table, conn, keys, data_iter):
    """
    Execute SQL statement inserting data

    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

# 匯入資料
pd_data.to_sql(
    name="pd_data",
    con=conn,
    if_exists="append",
    index=False,
    method=psql_insert_copy
)

查看歷史查詢,驗證已經使用COPY方式寫入資料至Hologres。歷史慢Query