全部產品
Search
文件中心

MaxCompute:表

更新時間:Jan 25, 2025

PyODPS支援對MaxCompute表的基本操作,包括建立表、建立表的Schema、同步表更新、擷取表資料、刪除表、表分區操作以及如何將錶轉換為DataFrame對象。

背景資訊

PyODPS提供對MaxCompute表的基本操作方法。

操作

說明

基本操作

列出專案空間下的所有表、判斷表是否存在、擷取表等基本操作。

建立表的Schema

使用PyODPS建立表的Schema。

建立表

使用PyODPS建立表。

同步表更新

使用PyODPS同步表更新。

表記錄類型(Record)

使用PyODPS讀取/寫入的記錄資料結構。

寫入表資料

使用PyODPS向表中寫入資料。

擷取表資料

使用PyODPS擷取表中資料。

刪除表

使用PyODPS刪除表。

轉換表為DataFrame

使用PyODPS轉換表為DataFrame。

表分區

使用PyODPS判斷是否為分區表、遍曆表全部分區、判斷分區是否存在、建立分區等。

資料上傳下載通道

使用PyODPS操作Tunnel向MaxCompute中上傳或者下載資料。

說明

更多PyODPS方法說明,請參見Python SDK方法說明

前提條件:準備運行環境

PyODPS支援在DataWorks的PyODPS節點或本地PC環境中運行,運行前您需先選擇運行工具並準備好運行環境。
  • 使用DataWorks:建立好PyODPS 2節點或PyODPS 3節點,詳情請參見通過DataWorks使用PyODPS
  • 使用本地PC環境:安裝好PyODPS並初始化ODPS入口對象。

基本操作

當前專案內的表操作

  • 列出專案空間下的所有表:

    o.list_tables()方法可以列出專案空間下的所有表。

    for table in o.list_tables():
        print(table)

    可以通過prefix參數只列舉給定首碼的表:

    for table in o.list_tables(prefix="table_prefix"):
        print(table.name)

    通過該方法擷取的 Table 對象不會自動載入表名以外的屬性,此時擷取這些屬性(例如table_schema或者creation_time)可能導致額外的請求並造成額外的時間開銷。如果需要在列舉表的同時讀取這些屬性,在 PyODPS 0.11.5 及後續版本中,可以為list_tables添加extended=True參數:

    for table in o.list_tables(extended=True):
        print(table.name, table.creation_time)

    如果需要按類型列舉表,可以指定type參數。不同類型的表列舉方法如下:

    managed_tables = list(o.list_tables(type="managed_table"))  # 列舉內建表
    external_tables = list(o.list_tables(type="external_table"))  # 列舉外表
    virtual_views = list(o.list_tables(type="virtual_view"))  # 列舉視圖
    materialized_views = list(o.list_tables(type="materialized_view"))  # 列舉物化視圖
  • 判斷表是否存在:

    o.exist_table()方法可以判斷表是否存在。

    print(o.exist_table('pyodps_iris'))
    # 返回True表示表pyodps_iris存在。
  • 擷取表:

    入口對象的o.get_table()方法可以擷取表。

    • 擷取表的schema資訊。

      t = o.get_table('pyodps_iris')
      print(t.schema)  # 擷取表pyodps_iris的schema

      傳回值樣本如下。

      odps.Schema {
        sepallength           double      # 片長度(cm)
        sepalwidth            double      # 片寬度(cm)
        petallength           double      # 瓣長度(cm)
        petalwidth            double      # 瓣寬度(cm)
        name                  string      # 種類
      }
    • 擷取表列資訊。

      t = o.get_table('pyodps_iris')
      print(t.schema.columns)  # 擷取表pyodps_iris的schema中的列資訊

      傳回值樣本如下。

      [<column sepallength, type double>,
       <column sepalwidth, type double>,
       <column petallength, type double>,
       <column petalwidth, type double>,
       <column name, type string>]
    • 擷取表的某個列資訊。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'])  # 擷取表pyodps_iris的sepallength列資訊

      傳回值樣本如下。

      <column sepallength, type double>
    • 擷取表的某個列的備忘資訊。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'].comment)  # 擷取表pyodps_iris的sepallength列的備忘資訊

      返回樣本如下。

      片長度(cm)
    • 擷取表的生命週期。

      t = o.get_table('pyodps_iris')
      print(t.lifecycle)  # 擷取表pyodps_iris的生命週期

      傳回值樣本如下。

      -1
    • 擷取表的建立時間。

      t = o.get_table('pyodps_iris')
      print(t.creation_time)  # 擷取表pyodps_iris的建立時間
    • 擷取表是否是虛擬視圖。

      t = o.get_table('pyodps_iris')
      print(t.is_virtual_view)  # 擷取表pyodps_iris是否是虛擬視圖,返回False,表示不是。

    與上述樣本類似,您也可以通過t.sizet.comment來擷取表的大小、表備忘等資訊。

    跨專案的表操作

    您可以通過project參數,跨專案擷取表。

    t = o.get_table('table_name', project='other_project')

    其中other_project為所跨的專案,table_name為跨專案擷取的表名稱。

建立表的Schema

初始化方法有如下兩種:

  • 通過表的列以及可選的分區進行初始化。

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    ]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    初始化後,您可擷取欄位資訊、分區資訊等。

    • 擷取所有欄位資訊。

      print(schema.columns)

      返回樣本如下。

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • 擷取分區欄位。

      print(schema.partitions)

      返回樣本如下。

      [<partition pt, type string>]
    • 擷取非分區欄位名稱。

      print(schema.names)

      返回樣本如下。

      ['num', 'num2']
    • 擷取非分區欄位類型。

      print(schema.types)

      返回樣本如下。

      [bigint, double]
  • 使用Schema.from_lists()方法。該方法更容易調用,但無法直接設定列和分區的注釋。

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    傳回值樣本如下。

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

建立表

您可以使用o.create_table()方法建立表,使用方式有兩種:使用表Schema方式、使用欄位名和欄位類型方式。同時建立表時表欄位的資料類型有一定的限制條件,詳情如下。

使用表Schema建立表

使用表Schema建立表時,您需要先建立表的Schema,然後通過Schema建立表。

#建立表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

#通過schema建立表
table = o.create_table('my_new_table', schema)

#只有不存在表時,才建立表。
table = o.create_table('my_new_table', schema, if_not_exists=True)

#設定生命週期。
table = o.create_table('my_new_table', schema, lifecycle=7)

表建立完成後,您可以通過print(o.exist_table('my_new_table'))驗證表是否建立成功,返回True表示表建立成功。

使用欄位名及欄位類型建立表

#建立分區表my_new_table,可傳入(表欄位列表,分區欄位列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

#建立非分區表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

表建立完成後,您可以通過print(o.exist_table('my_new_table'))驗證表是否建立成功,返回True表示表建立成功。

使用欄位名及欄位類型建立表:新資料類型

未開啟新資料類型開關時(預設關閉),建立表的資料類型只允許為BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY類型。如果您需要建立TINYINT和STRUCT等新資料類型欄位的表,可以開啟options.sql.use_odps2_extension = True開關,樣本如下。

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

同步表更新

當一個表被其他程式更新,例如改變了Schema,可以調用reload()方法同步表的更新。

#表schema變更
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

#通過reload()同步表更新
table = o.create_table('my_new_table', schema)
table.reload()

表記錄類型(Record)

Record類型表示表的一行記錄,為Table.open_reader或者Table.open_writer介面在讀寫時所使用的資料結構,也用於Tunnel介面TableDownloadSession.open_record_reader或者TableUploadSession.open_record_writer 。在Table對象上調用new_record方法即可建立一個新的Record執行個體。

下述樣本中,假定表結構為:

odps.Schema {
  c_int_a                 bigint
  c_string_a              string
  c_bool_a                boolean
  c_datetime_a            datetime
  c_array_a               array<string>
  c_map_a                 map<bigint,string>
  c_struct_a              struct<a:bigint,b:string>
}

該表對應的Record相關操作執行個體為:

import datetime
t = o.get_table('mytable')  # o 為 MaxCompute 入口對象
r = t.new_record([1024, 'val1', False, datetime.datetime.now(), None, None])  # 值的個數必須等於表schema的欄位數
r2 = t.new_record()  # 初始化時也可以不傳入值
r2[0] = 1024  # 可以通過位移設定值
r2['c_string_a'] = 'val1'  # 也可以通過欄位名設定值
r2.c_string_a = 'val1'  # 通過屬性設定值
r2.c_array_a = ['val1', 'val2']  # 設定 array 類型的值
r2.c_map_a = {1: 'val1'}  # 設定 map 類型的值
r2.c_struct_a = (1, 'val1')  # 使用 tuple 設定 struct 類型的值,當 PyODPS >= 0.11.5
r2.c_struct_a = {"a": 1, "b": 'val1'}  # 也可以使用 dict 設定 struct 類型的值

print(r[0])  # 取第0個位置的值
print(r['c_string_a'])  # 通過欄位取值
print(r.c_string_a)  # 通過屬性取值
print(r[0: 3])  # 切片操作
print(r[0, 2, 3])  # 取多個位置的值
print(r['c_int_a', 'c_double_a'])  # 通過多個欄位取值

MaxCompute資料類型與Python類型在Record中的對應關係如下:

MaxCompute類型

Python類型

說明

TINYINT、SMALLINT、INT、BIGINT

int

FLOAT、DOUBLE

float

STRING

str

詳情請參見注釋1

BINARY

bytes

DATETIME

datetime.datetime

詳情請參見注釋2

DATE

datetime.date

BOOLEAN

bool

DECIMAL

decimal.Decimal

詳情請參見注釋3

MAP

dict

ARRAY

list

STRUCT

tuple/namedtuple

詳情請參見注釋4

TIMESTAMP

pandas.Timestamp

詳情請參見注釋2,需要安裝Pandas。

TIMESTAMP_NTZ

pandas.Timestamp

結果不受時區設定影響,需要安裝Pandas。

INTERVAL_DAY_TIME

pandas.Timedelta

需要安裝Pandas。

對部分類型的說明如下。

  • PyODPS預設字串類型對應UNICODE字串,在Python 3中表示為str,在 Python 2 中為unicode。對於某些在字串中儲存二進位值的情境,需要設定options.tunnel.string_as_binary = True;,以避免可能出現的編碼問題。

  • PyODPS預設使用本地時間作為時區。若使用UTC時區,需要設定 options.local_timezone = False;選項。若使用其他時區,則需要將該參數設定為指定時區,例如Asia/Shanghai。MaxCompute不儲存時區值,因此在寫入資料時,會將該時間轉換為UNIX時間戳記進行儲存。

  • 對於Python 2,當安裝cdecimal包時,會使用cdecimal.Decimal類。

  • 對於0.11.5以下版本的PyODPS,MaxCompute中的STRUCT類型對應Python中的dict類型。當PyODPS為0.11.5及以上版本時,則預設對應namedtuple類型。如果希望使用舊版行為,則需要設定選項options.struct_as_dict = True;。DataWorks環境下,為保持歷史相容性,該值預設為False。為Record設定STRUCT類型的欄位值時,0.11.5及以上版本的PyODPS可同時接受dicttuple類型,而舊版則只接受dict類型。

  • 關於如何設定選項,請參考配置選項

寫入表資料

  • 使用入口對象的write_table()方法寫入資料。

    重要

    對於分區表,如果分區不存在,可以使用create_partition參數指定建立分區。

    records = [[111, 1.0],                 # 此處可以是list。
              [222, 2.0],
              [333, 3.0],
              [444, 4.0]]
    o.write_table('my_new_table', records, partition='pt=test', create_partition=True)  #建立pt=test分區並寫入資料
    說明
    • 每次調用write_table()方法,MaxCompute都會在服務端產生一個檔案。該操作耗時較長,同時檔案過多會降低後續的查詢效率。因此,建議您在使用此方法時,一次性寫入多組資料,或者傳入一個產生器對象。

    • 調用write_table()方法向表中寫入資料時會追加到原有資料中。PyODPS不提供覆蓋資料的選項,如果需要覆蓋資料,請手動清除原有資料。對於非分區表,需要調用table.truncate()方法;對於分區表,需要刪除分區後再建立新的分區。

  • 對錶對象調用open_writer()方法寫入資料。

    t = o.get_table('my_new_table')
    with t.open_writer(partition='pt=test02', create_partition=True) as writer:  #建立pt=test02分區並寫入資料
        records = [[1, 1.0],                 # 此處可以是List。
                  [2, 2.0],
                  [3, 3.0],
                  [4, 4.0]]
        writer.write(records)  # 這裡Records可以是可迭代對象。

    如果是多級分區表,寫入樣本如下。

    t = o.get_table('test_table')
    with t.open_writer(partition='pt1=test1,pt2=test2') as writer:  # 多級分區寫法。
        records = [t.new_record([111, 'aaa', True]),   # 也可以是Record對象。
                   t.new_record([222, 'bbb', False]),
                   t.new_record([333, 'ccc', True]),
                   t.new_record([444, '中文', False])]
        writer.write(records)
  • 使用多進程並行寫資料。

    每個進程寫資料時共用同一個Session_ID,但是有不同的Block_ID。每個Block對應服務端的一個檔案。主進程執行Commit,完成資料上傳。

    import random
    from multiprocessing import Pool
    from odps.tunnel import TableTunnel
    def write_records(tunnel, table, session_id, block_id):
        # 對使用指定的ID建立Session。
        local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
        # 建立Writer時指定Block_ID。
        with local_session.open_record_writer(block_id) as writer:
            for i in range(5):
                # 產生資料並寫入對應Block。
                record = table.new_record([random.randint(1, 100), random.random()])
                writer.write(record)
    
    if __name__ == '__main__':
        N_WORKERS = 3
    
        table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
        tunnel = TableTunnel(o)
        upload_session = tunnel.create_upload_session(table.name)
    
        # 每個進程使用同一個Session_ID。
        session_id = upload_session.id
    
        pool = Pool(processes=N_WORKERS)
        futures = []
        block_ids = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
            block_ids.append(i)
        [f.get() for f in futures]
    
        # 最後執行Commit,並指定所有Block。
        upload_session.commit(block_ids)

擷取表資料

擷取表資料的方法有多種,常用方法如下:

  • 使用入口對象的read_table()方法。

    # 處理一條記錄。
    for record in o.read_table('my_new_table', partition='pt=test'):
        print(record)
  • 如果您僅需要查看每個表最開始的小於1萬條資料,可以對錶對象調用head()方法。

    t = o.get_table('my_new_table')
    # 處理每個Record對象。
    for record in t.head(3):
        print(record)
  • 調用open_reader()方法讀取資料。

    • 使用with運算式的寫法如下。

      t = o.get_table('my_new_table')
      with t.open_reader(partition='pt=test') as reader:
      count = reader.count
      for record in reader[5:10]:  # 可以執行多次,直到將Count數量的Record讀完,此處可以改造成並行操作。
          print(record)  # 處理一條記錄,例如列印記錄本身
    • 不使用with運算式的寫法如下。

      reader = t.open_reader(partition='pt=test')
      count = reader.count
      for record in reader[5:10]:  # 可以執行多次,直到將Count數量的Record讀完,此處可以改造成並行操作。
          print(record)  # 處理一條記錄,例如列印記錄本身

刪除表

使用delete_table()方法刪除已經存在的表。

o.delete_table('my_table_name', if_exists=True)  # 只有表存在時,才刪除表。
t.drop()  # Table對象存在時,直接調用Drop方法刪除。

轉換表為DataFrame

PyODPS提供了DataFrame架構,支援以更方便的方式查詢和操作MaxCompute資料。使用to_df()方法,即可轉化為DataFrame對象。

table = o.get_table('my_table_name')
df = table.to_df()

表分區

  • 判斷是否為分區表。

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • 遍曆表全部分區。

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # 遍曆所有分區
        print(partition.name)  # 具體的遍曆步驟,這裡是列印分區名
    for partition in table.iterate_partitions(spec='pt=test'):  # 遍曆 pt=test 分區下的二級分區
        print(partition.name)  # 具體的遍曆步驟,這裡是列印分區名
    for partition in table.iterate_partitions(spec='dt>20230119'):  # 遍曆 dt>20230119 分區下的二級分區
        print(partition.name)  # 具體的遍曆步驟,這裡是列印分區名
    重要

    PyODPS自0.11.3版本開始,支援為iterate_partitions指定邏輯運算式,如上述樣本中的dt>20230119

  • 判斷分區是否存在。

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • 擷取分區。

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • 建立分區。

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # 指定if_not_exists參數,分區不存在時才建立分區。
  • 刪除分區。

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # 自定if_exists參數,分區存在時才刪除分區。
    partition.drop()  # 分區對象存在時,直接對分區對象調用Drop方法刪除。

資料上傳下載通道

Tunnel是MaxCompute的資料通道,使用者可以通過Tunnel向MaxCompute中上傳或者下載資料。

  • 上傳資料樣本

    from odps.tunnel import TableTunnel
    
    table = o.get_table('my_table')
    
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
    
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    
    # 需要在 with 代碼塊外 commit,否則資料未寫入即 commit,會導致報錯
    upload_session.commit([0])
  • 下載資料樣本

    from odps.tunnel import TableTunnel
    
    tunnel = TableTunnel(odps)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    # 處理每條記錄。
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
            print(record)  # 具體的遍曆步驟,這裡是列印記錄對象
說明
  • PyODPS不支援上傳外部表格,例如OSS和OTS的表。

  • 不推薦直接使用Tunnel介面,推薦您直接使用對象的寫和讀介面。

  • 如果您安裝了CPython,在安裝PyODPS時會編譯C代碼,加速Tunnel的上傳和下載。