全部產品
Search
文件中心

MaxCompute:Python SDK樣本:Table

更新時間:Feb 28, 2024

本文為您介紹Python SDK中表相關的典型情境操作樣本。

列出所有表

通過調用入口對象的list_tables()方法可以列出專案空間下的所有表。

for table in odps.list_tables():
    # 處理每張表。

判斷表是否存在

通過調用入口對象的exist_table()方法判斷表是否存在;通過調用get_table()方法擷取表。

t = odps.get_table('table_name')
t.schema
odps.Schema {
  c_int_a                 bigint
  c_int_b                 bigint
  c_double_a              double
  c_double_b              double
  c_string_a              string
  c_string_b              string
  c_bool_a                boolean
  c_bool_b                boolean
  c_datetime_a            datetime
  c_datetime_b            datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
 <column c_int_b, type bigint>,
 <column c_double_a, type double>,
 <column c_double_b, type double>,
 <column c_string_a, type string>,
 <column c_string_b, type string>,
 <column c_bool_a, type boolean>,
 <column c_bool_b, type boolean>,
 <column c_datetime_a, type datetime>,
 <column c_datetime_b, type datetime>]            

建立表的Schema

初始化方法有如下兩種:

  • 通過表的列以及可選的分區進行初始化。

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    ]
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    初始化後,您可擷取欄位資訊、分區資訊等。

    • 擷取所有欄位資訊。

      print(schema.columns)

      返回樣本如下。

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • 擷取分區欄位。

      print(schema.partitions)

      返回樣本如下。

      [<partition pt, type string>]
    • 擷取非分區欄位名稱。

      print(schema.names)

      返回樣本如下。

      ['num', 'num2']
    • 擷取非分區欄位類型。

      print(schema.types)

      返回樣本如下。

      [bigint, double]
  • 使用Schema.from_lists()方法。該方法更容易調用,但無法直接設定列和分區的注釋。

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    傳回值樣本如下。

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

建立表

您可以使用o.create_table()方法建立表,使用方式有兩種:使用表Schema方式、使用欄位名和欄位類型方式。同時建立表時表欄位的資料類型有一定的限制條件,詳情如下。

使用表Schema建立表

使用表Schema建立表時,您需要先建立表的Schema,然後通過Schema建立表。

#建立表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])

#通過schema建立表
table = o.create_table('my_new_table', schema)

#只有不存在表時,才建立表。
table = o.create_table('my_new_table', schema, if_not_exists=True)

#設定生命週期。
table = o.create_table('my_new_table', schema, lifecycle=7)

表建立完成後,您可以通過print(o.exist_table('my_new_table'))驗證表是否建立成功,返回True表示表建立成功。

使用欄位名及欄位類型建立表

#建立分區表my_new_table,可傳入(表欄位列表,分區欄位列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

#建立非分區表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

表建立完成後,您可以通過print(o.exist_table('my_new_table'))驗證表是否建立成功,返回True表示表建立成功。

使用欄位名及欄位類型建立表:新資料類型

未開啟新資料類型開關時(預設關閉),建立表的資料類型只允許為BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY類型。如果您需要建立TINYINT和STRUCT等新資料類型欄位的表,可以開啟options.sql.use_odps2_extension = True開關,樣本如下。

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

刪除表

使用delete_table()方法刪除已經存在的表。

o.delete_table('my_table_name', if_exists=True)  # 只有表存在時,才刪除表。
t.drop()  # Table對象存在時,直接調用Drop方法刪除。

表分區

  • 判斷是否為分區表。

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • 遍曆表全部分區。

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # 遍曆所有分區
        print(partition.name)  # 具體的遍曆步驟,這裡是列印分區名
    for partition in table.iterate_partitions(spec='pt=test'):  # 遍曆 pt=test 分區下的二級分區
        print(partition.name)  # 具體的遍曆步驟,這裡是列印分區名
    for partition in table.iterate_partitions(spec='dt>20230119'):  # 遍曆 dt>20230119 分區下的二級分區
        print(partition.name)  # 具體的遍曆步驟,這裡是列印分區名
    重要

    PyODPS自0.11.3版本開始,支援為iterate_partitions指定邏輯運算式,如上述樣本中的dt>20230119

  • 判斷分區是否存在。

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • 擷取分區。

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • 建立分區。

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # 指定if_not_exists參數,分區不存在時才建立分區。
  • 刪除分區。

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # 自定if_exists參數,分區存在時才刪除分區。
    partition.drop()  # 分區對象存在時,直接對分區對象調用Drop方法刪除。

讀取表資料

有若干種方法能夠擷取表資料。

  • 如果只是查看每個表的開始的小於1萬條資料,則可以使用head方法。

    from odps import ODPS
    t = o.get_table('dual')
    for record in t.head(3):
        # 處理每個Record對象
  • 使用 with 運算式的寫法:

    with t.open_reader(partition='pt=test') as reader:
    count = reader.count
    for record in reader[5:10]:  # 可以執行多次,直到將count數量的record讀完,這裡可以改造成並行操作
        # 處理一條記錄
  • 不使用 with 運算式的寫法:

    reader = t.open_reader(partition='pt=test')
    count = reader.count
    for record in reader[5:10]:  # 可以執行多次,直到將count數量的record讀完,這裡可以改造成並行操作
        # 處理一條記錄
  • 直接讀取成 Pandas DataFrame:

    with t.open_reader(partition='pt=test') as reader:
    pd_df = reader.to_pandas()

寫入表資料

類似於open_reader,table對象同樣能執行open_writer來開啟writer,並寫資料。

  • 使用with寫法:

    with t.open_writer(partition='pt=test') as writer:
    	  records = [[111, 'aaa', True],                 # 這裡可以是list
    	             [222, 'bbb', False],
    	             [333, 'ccc', True],
    	             [444, '中文', False]]
        writer.write(records)  # 這裡records可以是可迭代對象
    
    records = [t.new_record([111, 'aaa', True]),   # 也可以是Record對象
               t.new_record([222, 'bbb', False]),
               t.new_record([333, 'ccc', True]),
               t.new_record([444, '中文', False])]
    writer.write(records)
  • 如果分區不存在,可以使用 create_partition 參數指定建立分區,如:

    with t.open_writer(partition='pt=test', create_partition=True) as writer:
        records = [[111, 'aaa', True],                 # 這裡可以是list
                   [222, 'bbb', False],
                   [333, 'ccc', True],
                   [444, '中文', False]]
        writer.write(records)  # 這裡records可以是可迭代對象
  • 更簡單的寫資料方法是使用ODPS對象的write_table方法,例如:

    records = [[111, 'aaa', True],                 # 這裡可以是list
               [222, 'bbb', False],
               [333, 'ccc', True],
               [444, '中文', False]]
    o.write_table('test_table', records, partition='pt=test', create_partition=True)
    說明
    • 每次調用write_table,MaxCompute 都會在服務端產生一個檔案。這一操作需要較大的時間開銷, 同時過多的檔案會降低後續的查詢效率。因此,建議您在使用write_table方法時,一次性寫入多組資料, 或者傳入一個generator對象。

    • write_table寫表時會追加到原有資料。PyODPS不提供覆蓋資料的選項,如果需要覆蓋資料,需要手動清除原有資料。對於非分區表,需要調用table.truncate(),對於分區表,需要刪除分區後再建立。

使用Arrow格式讀寫資料

Apache Arrow是一種跨語言的通用資料讀寫格式,支援在各種不同平台間進行資料交換。 自2021年起, MaxCompute支援使用Arrow格式讀取表資料,PyODPS則從0.11.2版本開始支援該功能。具體來說,如果在Python環境中安裝pyarrow後,在調用open_writer時增加arrow=True參數,即可讀寫Arrow RecordBatch 。

import pandas as pd
import pyarrow as pa
with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
    records = [[111, 'aaa', True],
               [222, 'bbb', False],
               [333, 'ccc', True],
               [444, '中文', False]]
    df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
    # 寫入 RecordBatch
    batch = pa.RecordBatch.from_pandas(df)
    writer.write(batch)
    # 也可以直接寫入 Pandas DataFrame
    writer.write(df)