PyODPS支援對MaxCompute表的基本操作,包括建立表、建立表的Schema、同步表更新、擷取表資料、刪除表、表分區操作以及如何將錶轉換為DataFrame對象。
背景資訊
PyODPS提供對MaxCompute表的基本操作方法。
操作 | 說明 |
列出專案空間下的所有表、判斷表是否存在、擷取表等基本操作。 | |
使用PyODPS建立表的Schema。 | |
使用PyODPS建立表。 | |
使用PyODPS同步表更新。 | |
使用PyODPS向表中寫入資料。 | |
使用PyODPS向表中插入一行記錄。 | |
使用PyODPS擷取表中資料。 | |
使用PyODPS刪除表。 | |
使用PyODPS轉換表為DataFrame。 | |
使用PyODPS判斷是否為分區表、遍曆表全部分區、判斷分區是否存在、建立分區等。 | |
使用PyODPS操作Tunnel向MaxCompute中上傳或者下載資料。 |
更多PyODPS方法說明,請參見Python SDK方法說明。
前提條件:準備運行環境
- 使用DataWorks:建立好PyODPS 2節點或PyODPS 3節點,詳情請參見通過DataWorks使用PyODPS。
- 使用本地PC環境:安裝好PyODPS並初始化ODPS入口對象。
基本操作
當前專案內的表操作
列出專案空間下的所有表:
o.list_tables()
方法可以列出專案空間下的所有表。for table in o.list_tables(): print(table)
可以通過
prefix
參數只列舉給定首碼的表:for table in o.list_tables(prefix="table_prefix"): print(table.name)
通過該方法擷取的 Table 對象不會自動載入表名以外的屬性,此時擷取這些屬性(例如
table_schema
或者creation_time
)可能導致額外的請求並造成額外的時間開銷。如果需要在列舉表的同時讀取這些屬性,在 PyODPS 0.11.5 及後續版本中,可以為list_tables
添加extended=True
參數:for table in o.list_tables(extended=True): print(table.name, table.creation_time)
如果需要按類型列舉表,可以指定
type
參數。不同類型的表列舉方法如下:managed_tables = list(o.list_tables(type="managed_table")) # 列舉內建表 external_tables = list(o.list_tables(type="external_table")) # 列舉外表 virtual_views = list(o.list_tables(type="virtual_view")) # 列舉視圖 materialized_views = list(o.list_tables(type="materialized_view")) # 列舉物化視圖
判斷表是否存在:
o.exist_table()
方法可以判斷表是否存在。print(o.exist_table('pyodps_iris')) # 返回True表示表pyodps_iris存在。
擷取表:
入口對象的
o.get_table()
方法可以擷取表。擷取表的schema資訊。
t = o.get_table('pyodps_iris') print(t.schema) # 擷取表pyodps_iris的schema
傳回值樣本如下。
odps.Schema { sepallength double # 片長度(cm) sepalwidth double # 片寬度(cm) petallength double # 瓣長度(cm) petalwidth double # 瓣寬度(cm) name string # 種類 }
擷取表列資訊。
t = o.get_table('pyodps_iris') print(t.schema.columns) # 擷取表pyodps_iris的schema中的列資訊
傳回值樣本如下。
[<column sepallength, type double>, <column sepalwidth, type double>, <column petallength, type double>, <column petalwidth, type double>, <column name, type string>]
擷取表的某個列資訊。
t = o.get_table('pyodps_iris') print(t.schema['sepallength']) # 擷取表pyodps_iris的sepallength列資訊
傳回值樣本如下。
<column sepallength, type double>
擷取表的某個列的備忘資訊。
t = o.get_table('pyodps_iris') print(t.schema['sepallength'].comment) # 擷取表pyodps_iris的sepallength列的備忘資訊
返回樣本如下。
片長度(cm)
擷取表的生命週期。
t = o.get_table('pyodps_iris') print(t.lifecycle) # 擷取表pyodps_iris的生命週期
傳回值樣本如下。
-1
擷取表的建立時間。
t = o.get_table('pyodps_iris') print(t.creation_time) # 擷取表pyodps_iris的建立時間
擷取表是否是虛擬視圖。
t = o.get_table('pyodps_iris') print(t.is_virtual_view) # 擷取表pyodps_iris是否是虛擬視圖,返回False,表示不是。
與上述樣本類似,您也可以通過
t.size
、t.comment
來擷取表的大小、表備忘等資訊。跨專案的表操作
您可以通過
project
參數,跨專案擷取表。t = o.get_table('table_name', project='other_project')
其中other_project為所跨的專案,table_name為跨專案擷取的表名稱。
建立表的Schema
初始化方法有如下兩種:
通過表的列以及可選的分區進行初始化。
from odps.models import Schema, Column, Partition columns = [ Column(name='num', type='bigint', comment='the column'), Column(name='num2', type='double', comment='the column2'), ] partitions = [Partition(name='pt', type='string', comment='the partition')] schema = Schema(columns=columns, partitions=partitions)
初始化後,您可擷取欄位資訊、分區資訊等。
擷取所有欄位資訊。
print(schema.columns)
返回樣本如下。
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
擷取分區欄位。
print(schema.partitions)
返回樣本如下。
[<partition pt, type string>]
擷取非分區欄位名稱。
print(schema.names)
返回樣本如下。
['num', 'num2']
擷取非分區欄位類型。
print(schema.types)
返回樣本如下。
[bigint, double]
使用
Schema.from_lists()
方法。該方法更容易調用,但無法直接設定列和分區的注釋。from odps.models import Schema schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string']) print(schema.columns)
傳回值樣本如下。
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
建立表
您可以使用o.create_table()
方法建立表,使用方式有兩種:使用表Schema方式、使用欄位名和欄位類型方式。同時建立表時表欄位的資料類型有一定的限制條件,詳情如下。
使用表Schema建立表
使用表Schema建立表時,您需要先建立表的Schema,然後通過Schema建立表。
#建立表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
#通過schema建立表
table = o.create_table('my_new_table', schema)
#只有不存在表時,才建立表。
table = o.create_table('my_new_table', schema, if_not_exists=True)
#設定生命週期。
table = o.create_table('my_new_table', schema, lifecycle=7)
表建立完成後,您可以通過print(o.exist_table('my_new_table'))
驗證表是否建立成功,返回True
表示表建立成功。
使用欄位名及欄位類型建立表
#建立分區表my_new_table,可傳入(表欄位列表,分區欄位列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
#建立非分區表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)
表建立完成後,您可以通過print(o.exist_table('my_new_table'))
驗證表是否建立成功,返回True
表示表建立成功。
使用欄位名及欄位類型建立表:新資料類型
未開啟新資料類型開關時(預設關閉),建立表的資料類型只允許為BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY類型。如果您需要建立TINYINT和STRUCT等新資料類型欄位的表,可以開啟options.sql.use_odps2_extension = True
開關,樣本如下。
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')
同步表更新
當一個表被其他程式更新,例如改變了Schema,可以調用reload()
方法同步表的更新。
#表schema變更
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
#通過reload()同步表更新
table = o.create_table('my_new_table', schema)
table.reload()
寫入表資料
使用入口對象的
write_table()
方法寫入資料。重要對於分區表,如果分區不存在,可以使用create_partition參數指定建立分區。
records = [[111, 1.0], # 此處可以是list。 [222, 2.0], [333, 3.0], [444, 4.0]] o.write_table('my_new_table', records, partition='pt=test', create_partition=True) #建立pt=test分區並寫入資料
說明每次調用
write_table()
方法,MaxCompute都會在服務端產生一個檔案。該操作耗時較長,同時檔案過多會降低後續的查詢效率。因此,建議您在使用此方法時,一次性寫入多組資料,或者傳入一個產生器對象。調用
write_table()
方法向表中寫入資料時會追加到原有資料中。PyODPS不提供覆蓋資料的選項,如果需要覆蓋資料,請手動清除原有資料。對於非分區表,需要調用table.truncate()
方法;對於分區表,需要刪除分區後再建立新的分區。
對錶對象調用
open_writer()
方法寫入資料。t = o.get_table('my_new_table') with t.open_writer(partition='pt=test02', create_partition=True) as writer: #建立pt=test02分區並寫入資料 records = [[1, 1.0], # 此處可以是List。 [2, 2.0], [3, 3.0], [4, 4.0]] writer.write(records) # 這裡Records可以是可迭代對象。
如果是多級分區表,寫入樣本如下。
t = o.get_table('test_table') with t.open_writer(partition='pt1=test1,pt2=test2') as writer: # 多級分區寫法。 records = [t.new_record([111, 'aaa', True]), # 也可以是Record對象。 t.new_record([222, 'bbb', False]), t.new_record([333, 'ccc', True]), t.new_record([444, '中文', False])] writer.write(records)
使用多進程並行寫資料。
每個進程寫資料時共用同一個Session_ID,但是有不同的Block_ID。每個Block對應服務端的一個檔案。主進程執行Commit,完成資料上傳。
import random from multiprocessing import Pool from odps.tunnel import TableTunnel def write_records(tunnel, table, session_id, block_id): # 對使用指定的ID建立Session。 local_session = tunnel.create_upload_session(table.name, upload_id=session_id) # 建立Writer時指定Block_ID。 with local_session.open_record_writer(block_id) as writer: for i in range(5): # 產生資料並寫入對應Block。 record = table.new_record([random.randint(1, 100), random.random()]) writer.write(record) if __name__ == '__main__': N_WORKERS = 3 table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True) tunnel = TableTunnel(o) upload_session = tunnel.create_upload_session(table.name) # 每個進程使用同一個Session_ID。 session_id = upload_session.id pool = Pool(processes=N_WORKERS) futures = [] block_ids = [] for i in range(N_WORKERS): futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i))) block_ids.append(i) [f.get() for f in futures] # 最後執行Commit,並指定所有Block。 upload_session.commit(block_ids)
向表中插入一行記錄
Record表示表的一行記錄,對錶對象調用new_record()
方法即可建立一個新的Record。
t = o.get_table('test_table')
r = t.new_record(['val0', 'val1']) # 值的個數必須等於表Schema的欄位數。
r2 = t.new_record() # 可以不傳入值。
r2[0] = 'val0' # 通過位移設定值。
r2['field1'] = 'val1' # 通過欄位名設定值。
r2.field1 = 'val1' # 通過屬性設定值。
print(record[0]) # 取第0個位置的值。
print(record['c_double_a']) # 通過欄位取值。
print(record.c_double_a) # 通過屬性取值。
print(record[0: 3]) # 切片操作。
print(record[0, 2, 3]) # 取多個位置的值。
print(record['c_int_a', 'c_double_a']) # 通過多個欄位取值。
擷取表資料
擷取表資料的方法有多種,常用方法如下:
使用入口對象的
read_table()
方法。# 處理一條記錄。 for record in o.read_table('my_new_table', partition='pt=test'): print(record)
如果您僅需要查看每個表最開始的小於1萬條資料,可以對錶對象調用
head()
方法。t = o.get_table('my_new_table') # 處理每個Record對象。 for record in t.head(3): print(record)
調用
open_reader()
方法讀取資料。使用
with
運算式的寫法如下。t = o.get_table('my_new_table') with t.open_reader(partition='pt=test') as reader: count = reader.count for record in reader[5:10]: # 可以執行多次,直到將Count數量的Record讀完,此處可以改造成並行操作。 print(record) # 處理一條記錄,例如列印記錄本身
不使用
with
運算式的寫法如下。reader = t.open_reader(partition='pt=test') count = reader.count for record in reader[5:10]: # 可以執行多次,直到將Count數量的Record讀完,此處可以改造成並行操作。 print(record) # 處理一條記錄,例如列印記錄本身
刪除表
使用delete_table()
方法刪除已經存在的表。
o.delete_table('my_table_name', if_exists=True) # 只有表存在時,才刪除表。
t.drop() # Table對象存在時,直接調用Drop方法刪除。
轉換表為DataFrame
PyODPS提供了DataFrame架構,支援以更方便的方式查詢和操作MaxCompute資料。使用to_df()
方法,即可轉化為DataFrame對象。
table = o.get_table('my_table_name')
df = table.to_df()
表分區
判斷是否為分區表。
table = o.get_table('my_new_table') if table.schema.partitions: print('Table %s is partitioned.' % table.name)
遍曆表全部分區。
table = o.get_table('my_new_table') for partition in table.partitions: # 遍曆所有分區 print(partition.name) # 具體的遍曆步驟,這裡是列印分區名 for partition in table.iterate_partitions(spec='pt=test'): # 遍曆 pt=test 分區下的二級分區 print(partition.name) # 具體的遍曆步驟,這裡是列印分區名 for partition in table.iterate_partitions(spec='dt>20230119'): # 遍曆 dt>20230119 分區下的二級分區 print(partition.name) # 具體的遍曆步驟,這裡是列印分區名
重要PyODPS自0.11.3版本開始,支援為
iterate_partitions
指定邏輯運算式,如上述樣本中的dt>20230119
。判斷分區是否存在。
table = o.get_table('my_new_table') table.exist_partition('pt=test,sub=2015')
擷取分區。
table = o.get_table('my_new_table') partition = table.get_partition('pt=test') print(partition.creation_time) partition.size
建立分區。
t = o.get_table('my_new_table') t.create_partition('pt=test', if_not_exists=True) # 指定if_not_exists參數,分區不存在時才建立分區。
刪除分區。
t = o.get_table('my_new_table') t.delete_partition('pt=test', if_exists=True) # 自定if_exists參數,分區存在時才刪除分區。 partition.drop() # 分區對象存在時,直接對分區對象調用Drop方法刪除。
資料上傳下載通道
Tunnel是MaxCompute的資料通道,使用者可以通過Tunnel向MaxCompute中上傳或者下載資料。
上傳資料樣本
from odps.tunnel import TableTunnel table = o.get_table('my_table') tunnel = TableTunnel(odps) upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test') with upload_session.open_record_writer(0) as writer: record = table.new_record() record[0] = 'test1' record[1] = 'id1' writer.write(record) record = table.new_record(['test2', 'id2']) writer.write(record) # 需要在 with 代碼塊外 commit,否則資料未寫入即 commit,會導致報錯 upload_session.commit([0])
下載資料樣本
from odps.tunnel import TableTunnel tunnel = TableTunnel(odps) download_session = tunnel.create_download_session('my_table', partition_spec='pt=test') # 處理每條記錄。 with download_session.open_record_reader(0, download_session.count) as reader: for record in reader: print(record) # 具體的遍曆步驟,這裡是列印記錄對象
PyODPS不支援上傳外部表格,例如OSS和OTS的表。
不推薦直接使用Tunnel介面,推薦您直接使用對象的寫和讀介面。
如果您安裝了CPython,在安裝PyODPS時會編譯C代碼,加速Tunnel的上傳和下載。