mysql-connector-python是MySQL官方提供的Python連接器,不依賴C語言標準函數庫,使用更便捷。本文介紹如何在Python中通過mysql-connector-python串連Lindorm寬表引擎。
前提條件
已安裝Python環境,且Python版本為3.8及以上版本。
已開通MySQL協議相容功能。如何開通,請參見開通MySQL協議相容功能。
已將用戶端IP添加至白名單,具體操作請參見設定白名單。
操作步驟
安裝8.0.11版本的mysql-connector-python。您也可以通過
pip install mysql-connector-python==8.0.11語句直接通過pip進行安裝。建立串連並配置串連參數。
connection = mysql.connector.connect(host='<MySQL相容地址>', port=33060, user='<使用者名稱>', passwd='<密碼>', database='<資料庫名>') cursor = connection.cursor(prepared=True)參數說明
參數
說明
host
Lindorm寬表引擎的Lindorm 宽表SQL地址,需去掉末尾的冒號及連接埠號碼
:33060。如何擷取,請參見查看串連地址。重要如果應用部署在ECS,且ECS與Lindorm執行個體部署在同一專用網路,建議您通過專用網路訪問Lindorm執行個體,否則請使用公網訪問。通過公網串連Lindorm前需在控制台開通公網地址,開通方式請參見開通步驟。
port
Lindorm寬表引擎MySQL協議的連接埠,固定為
33060。user
如果您忘記使用者密碼,可以通過Lindorm寬表引擎的叢集管理系統修改密碼。具體操作,請參見修改使用者密碼。
passwd
database
需要串連的資料庫名稱。預設串連default資料庫。
通過寬表SQL文法使用Lindorm寬表引擎。以建立表為例。
sql_create_table = ("create table if not exists test_python(c1 integer, c2 integer, c3 varchar, primary key(c1))") print(sql_create_table) cursor.execute(sql_create_table)
程式碼範例
在Python中使用mysql-connector-python串連Lindorm寬表引擎時,支援兩種模式:
直接連接模式:適用於單次操作或低頻訪問情境,每次操作需獨立建立和關閉串連。
串連池模式:適用於高頻訪問情境,通過複用串連提升效能並減少資源開銷。
直接連接模式
範例程式碼如下:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json
import mysql.connector
# 開啟資料庫連接。
# host是Lindorm寬表引擎MySQL協議的串連地址。
# port是Lindorm寬表引擎MySQL協議的連接埠,一般為33060。
# user是Lindorm寬表引擎的使用者帳號。
# passwd是Lindorm寬表引擎的使用者帳號對應的密碼。
# database是Lindorm寬表引擎中的資料庫名。
connection = mysql.connector.connect(host='ld-bp1hn6yq0yb34****-proxy-sql-lindorm-public.lindorm.rds.aliyuncs.com', port=33060,user='root', passwd='test',database='default')
# 建立cursor,注意prepared=True
cursor = connection.cursor(prepared=True)
# 建立表
sql_create_table = ("create table if not exists test_python(c1 integer, c2 integer, c3 varchar, primary key(c1))")
print(sql_create_table)
cursor.execute(sql_create_table)
# 插入資料
sql_upsert = "upsert into test_python(c1, c2, c3) values(?, ?, ?)"
print(sql_upsert)
# 執行單條插入語句
cursor.execute(sql_upsert, (1, 1, '1'))
cursor.execute(sql_upsert, (2, 2, json.dumps({"key": "value2"})))
# 執行 batch 寫入, 一次寫入 2 行資料
sql_upsert_batch = ("upsert into test_python(c1, c2, c3) values(?, ?, ?), (?, ?, ?)")
cursor.execute(sql_upsert_batch, (3, 3, '3' , 4, 4, json.dumps({"key": "value4"})))
# 刪除資料
sql_delete = "delete from test_python where c1 = ?"
print(sql_delete)
cursor.execute(sql_delete, (3,))
# 修改資料
sql_update = "upsert into test_python(c1, c2, c3) values(?, ?, ?)"
print(sql_update)
cursor.execute(sql_update, (1, 2, '2'))
# 查詢特定資料
sql_select = "select * from test_python where c1 = ?"
print(sql_select)
cursor.execute(sql_select, (4,))
rows = cursor.fetchall()
print(rows)
# 查詢表中所有資料
sql_select_all = "select * from test_python"
print(sql_select_all)
cursor.execute(sql_select_all)
rows = cursor.fetchall()
print(rows)
# 關閉 cursor
cursor.close()
# 關閉串連
connection.close()如果寫入的字串中含有特殊字元(例如JSON字串中的雙引號),建議您通過prepared=True的方式寫入資料(上述範例程式碼所示方式),避免寫入Lindorm的資料被添加特殊的逸出字元。
串連池模式
範例程式碼如下:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json
import mysql.connector
from mysql.connector import pooling
# 建立串連池
# pool_name是串連池的名稱
# pool_size是串連池的大小,即串連池中最多包含多少個串連,建議按照業務實際情況進行修改
# host是Lindorm寬表引擎MySQL協議的串連地址。
# port是Lindorm寬表引擎MySQL協議的連接埠,一般為33060。
# user是Lindorm寬表引擎的使用者帳號。
# passwd是Lindorm寬表引擎的使用者帳號對應的密碼。
# database是Lindorm寬表引擎中的資料庫名。
connection_pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=20,
host='11.166.XX.X',
port=33060,
user='root',
password='root',
database='default',
)
# 從串連池中擷取串連
connection = connection_pool.get_connection()
# 建立 cursor,注意prepared=True
cursor = connection.cursor(prepared=True)
# 刪除表
sql_drop_table = "drop table if exists test_python"
print(sql_drop_table)
cursor.execute(sql_drop_table)
# 建立表
sql_create_table = ("create table test_python(c1 integer, c2 integer, c3 varchar, primary key(c1))")
print(sql_create_table)
cursor.execute(sql_create_table)
# 執行單條插入語句
sql_upsert = "insert into test_python(c1, c2, c3) values(?, ?, ?)"
print(sql_upsert)
cursor.execute(sql_upsert, (1, 1, '1'))
cursor.execute(sql_upsert, (2, 2, '2'))
# 執行 batch 寫入,一次寫入 3 行資料
sql_upsert_batch = "insert into test_python(c1, c2, c3) values(?, ?, ?), (?, ?, ?), (?, ?, ?)"
cursor.execute(sql_upsert_batch, (3, 3, '3', 4, 4, '4', 5, 5, '5'))
# 刪除資料
sql_delete = "delete from test_python where c1 = ?"
print(sql_delete)
cursor.execute(sql_delete, (3,))
# 修改資料
sql_update = "upsert into test_python(c1, c2, c3) values(?, ?, ?)"
print(sql_update)
cursor.execute(sql_update, (1, 2, '2'))
# 查詢特定資料
sql_select = "select * from test_python where c1 = ?"
print(sql_select)
cursor.execute(sql_select, (4,))
rows = cursor.fetchall()
print(rows)
# 查詢表中所有資料
sql_select_all = "select * from test_python"
print(sql_select_all)
cursor.execute(sql_select_all)
rows = cursor.fetchall()
print(rows)
# 關閉 cursor
cursor.close()
# 關閉串連,串連會返回到串連池中,而不是真正關閉
connection.close()