全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:创建DataFrame

更新时间:Apr 02, 2024

本文为您介绍如何创建DataFrame引用数据源。

前提条件

操作下述代码示例前,您需要先准备好示例表pyodps_iris,详细操作请参见Dataframe数据处理

背景信息

在使用DataFrame时,您需要了解CollectionDataFrame)、SequenceScalar三类对象的操作。三类对象分别表示表结构(或者二维结构)、列(一维结构)和标量。

您使用Pandas创建的数据对象包含实际数据,但通过MaxCompute表创建的对象并不包含实际数据,仅包含对数据的操作。MaxCompute完成数据存储和计算。

创建DataFrame

您唯一需要直接创建的Collection对象是DataFrame。DataFrame用于引用MaxCompute表、MaxCompute分区、Pandas DataFrame或Sqlalchemy Table(数据库表)数据源。这几种数据源的操作相同,您可以不更改数据处理代码,仅修改输入和输出指向,便可以将本地运行的小数据量测试代码迁移到MaxCompute,迁移正确性由PyODPS保证。

从MaxCompute表创建DataFrame

从MaxCompute表创建DataFrame,您需要将Table对象传入DataFrame方法,或者使用表的to_df方法。代码示例如下:

from odps.df import DataFrame

# 传入Table对象。
iris = DataFrame(o.get_table('pyodps_iris'))
# 使用表的to_df方法。
iris2 = o.get_table('pyodps_iris').to_df() 

从MaxCompute分区创建DataFrame

从MaxCompute分区创建DataFrame,您需要将分区对象传入DataFrame方法,或者使用分区的to_df方法。代码示例如下:

from odps.df import DataFrame

# (可选)创建partitioned_table分区表。若已有分区表,可根据实际情况替换partitioned_table。
o.create_table('partitioned_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
# 传入Table对象。
pt_df = DataFrame(o.get_table('partitioned_table').get_partition('pt=20171111'))
# 使用分区的to_df方法。
pt_df2 = o.get_table('partitioned_table').get_partition('pt=20171111').to_df();

从Pandas DataFrame创建DataFrame

从Pandas DataFrame创建DataFrame,您需要将Pandas DataFrame对象传入DataFrame方法。

  • 代码示例

    from odps.df import DataFrame
    
    # 从Pandas DataFrame创建DataFrame。
    import pandas as pd
    import numpy as np
    df = DataFrame(pd.DataFrame(np.arange(9).reshape(3, 3), columns=list('abc')))
    
  • 注意事项

    用Pandas DataFrame初始化时:

    • PyODPS DataFrame会尝试对NUMPY OBJECT或STRING类型进行推断。如果一整列都为空,则会报错。为避免报错,您可以设置unknown_as_string值为True,将这些列指定为STRING类型。

    • 您可以通过as_type参数,强制转换类型。如果类型为基本类型,会在创建PyODPS DataFrame时强制转换类型。如果Pandas DataFrame中包含LIST或DICT列,系统不会推断该列的类型,必须手动使用as_type指定类型。as_type参数类型必须是DICT。

    示例:

    • 示例1:指定null_col2列类型为float

      df2 = DataFrame(df, unknown_as_string=True, as_type={'null_col2': 'float'})
      print(df2.dtypes)

      返回结果:

      odps.Schema {
        sepallength           float64
        sepalwidth            float64
        petallength           float64
        petalwidth            float64
        name                  string
        null_col1             string   # 无法识别,通过unknown_as_string设置成STRING类型。
        null_col2             float64  # 强制转换成FLOAT类型。
      }
    • 示例2:指定list_col列类型为list<int64>

      df4 = DataFrame(df3, as_type={'list_col': 'list<int64>'})
      print(df4.dtypes)

      返回结果:

      odps.Schema {
        id        int64
        list_col  list<int64>  # 无法识别且无法自动转换,通过as_type设置。
      }
    说明

    PyODPS不支持上传OSS或OTS外部表。

从Sqlalchemy Table创建DataFrame

从Sqlalchemy Table创建DataFrame,您需要将Sqlalchemy Table对象传入DataFrame方法,Sqlalchemy创建连接及相关参数说明请参见创建连接。代码示例如下:

from odps.df import DataFrame
import sqlalchemy

# 从Sqlalchemy Table创建DataFrame。
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
conn_string = 'odps://%s:%s@<project>/?endpoint=<endpoint>' % (
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
)
engine = sqlalchemy.create_engine(conn_string)
metadata = sqlalchemy.MetaData(bind=engine) # 需要绑定到Engine。
table = sqlalchemy.Table('pyodps_iris', metadata, extend_existing=True, autoload=True)
iris = DataFrame(table)