全部产品
Search
文档中心

实时数仓Hologres:Python

更新时间:Oct 22, 2024

Psycopg是Python用于操作PostgreSQL的库。Hologres兼容PostgreSQL 11,因此您可以通过psycopg访问Hologres。本文将指导您使用psycopg2访问Hologres,示例使用的操作环境为基于CentOS 7系统的Python 3.8版本。

安装Python3.8

您可以基于Miniconda、Anaconda安装Python 3.8环境。如下内容以CentOS 7系统为例,安装Python 3.8版本。

  1. 安装Python 3.8。

    您可以下载对应版本的Python,执行如下命令进行安装。

    # yum install centos-release-scl
    # yum install rh-python38
    # scl enable rh-python38 bash
    # python --version
    Python 3.8.6
  2. 安装psycopg2模块。

    执行如下命令安装psycopg2模块。

     # pip install psycopg2-binary

连接Hologres

Python3.8环境和psycopg2安装完成之后,您可以执行如下操作并连接Hologres。

  1. 加载psycopg2。

    如果需要使用psycopg2,您可以执行命令import psycopg2加载安装的psycopg2。

  2. 创建数据库连接。

    您可以使用psycopg2.connect()函数连接Hologres,具体语法和参数说明如下所示。

    conn = psycopg2.connect(host="<Endpoint>",
                            port=<Port>,
                            dbname="<databases>",
                            user="<Access ID>",
                            password="<Access Key>",
                            keepalives=<keepalives>,
                            keepalives_idle=<keepalives_idle>,
                            keepalives_interval=<keepalives_interval>,
                            keepalives_count=<keepalives_count>,
                            application_name="<Application Name>"
    
                           )
    

    参数

    描述

    host

    Hologres实例的网络地址。

    进入Hologres管理控制台的实例详情页,从网络信息获取网络地址。

    port

    Hologres的实例端口。

    您可以进入Hologres管理控制台的实例详情页,从网络信息获取端口。

    dbname

    Hologres创建的数据库名称。

    user

    当前阿里云账号的AccessKey ID。

    您可以单击AccessKey 管理,获取AccessKey ID。

    password

    当前阿里云账号的AccessKey Secret。

    您可以单击AccessKey 管理,获取AccessKey Secret。

    application_name

    可选,应用名,用于记录查询日志时识别SQL代表的应用含义。

    说明

    配置该参数,有助于您在慢Query清单中根据Application Name快速定位您的发起请求的应用。

    keepalives

    可选(推荐使用),连接方式:

    • 1表示使用长连接。

    • 0表示非长连接。

    keepalives_idle

    空闲时,保持连接连通的时间间隔,单位秒(s)。

    keepalives_interval

    没得到回应时,等待重新尝试保持连通的时间间隔,单位秒(s)。

    keepalives_count

    尝试重新保持连通最大次数。

    代码示例如下。

    conn = psycopg2.connect(host="<Endpoint>",
                            port=<Port>,
                            dbname="<databases>",
                            user="<Access ID>",
                            password="<Access Key>",
                            keepalives=1,  # 保持连接
                            keepalives_idle=130,  # 空闲时,每130秒保持连接连通
                            keepalives_interval=10,   # 没得到回应时,等待10秒重新尝试保持连通
                            keepalives_count=15,   # 尝试最多15次重新保持连通
                            application_name="<Application Name>"
                           )
    
    

使用Hologres

当您成功连接Hologres数据库之后,即可通过psycopg2进行数据开发操作。如下内容将指导您创建表、插入数据、查询和释放资源等操作。如果需要使用Fixed Plan能力实现更高性能的读写操作,需要配置相关GUC参数,请参见Fixed Plan加速SQL执行

  1. 创建游标。

    在进行数据开发之前,您需要执行命令cur = conn.cursor()来创建连接的游标。

  2. 数据开发。

    1. 创建表

      您可以执行如下命令,创建一个表holo_test并定义表的数据类型为integer。您也可以根据业务需求定义表名称和数据类型。

      cur.execute("CREATE TABLE holo_test (num integer);")
    2. 插入数据

      您可以执行如下命令,为创建的表holo_test插入数据1~1000。

      cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
    3. 查询数据

      cur.execute("SELECT sum(num) FROM holo_test;")
      cur.fetchone()
  3. 提交事务。

    在查询数据的命令之后,您需要执行命令conn.commit()提交事务,此操作可以确保操作已经提交。也可以把autocommit参数设置为true,实现SQL命令的自动提交。

  4. 释放资源。

    为避免影响后续的操作,当操作执行完成后,您需要执行如下命令关闭游标并断开数据库连接。

    cur.close()
    conn.close()

Pandas DataFrame快速写入Hologres最佳实践

使用Python时,经常会使用Pandas将数据转换为DataFrame,并对DataFrame进行处理,最终将DataFrame导入Hologres,此时希望将DataFrame快速导入Hologres。导入时候常用to_sql函数,详情请参见Pandas

需要Pandas为V1.4.2及以上版本,您可以执行如下命令强制安装V1.5.1版本的Pandas库。

# pip install Pandas==1.5.1

推荐使用to_sql函数的callable方式,使用copy方式导入数据,样例的Python代码如下。

# 加载依赖
import pandas as pd
import psycopg2

# 生成连接字符串
host="hgpostcn-cn-xxxxxx-cn-hangzhou.hologres.aliyuncs.com"
port=80
dbname="demo"
user="LTAI5xxxxx"
password="fa8Kdgxxxxx"
application_name="Python Test"
conn = "postgresql+psycopg2://{}:{}@{}:{}/{}?application_name={}".format(user, password, host, port, dbname,application_name)
print(conn)

# 生成dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)

# 定义callable函数
import csv
from io import StringIO

def psql_insert_copy(table, conn, keys, data_iter):
    """
    Execute SQL statement inserting data

    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

# 导入数据
pd_data.to_sql(
    name="pd_data",
    con=conn,
    if_exists="append",
    index=False,
    method=psql_insert_copy
)

查看历史查询,验证已经使用COPY方式写入数据至Hologres。历史慢Query