該組件將MTable展開成Table,方便使用者進行資料處理和展示。
支援的計算資源
MaxCompute/Flink/DLC
可視化配置組件
輸入樁
輸入樁(從左至右)
資料類型
建議上遊組件
是否必選
資料
無
是
組件參數
參數類型
參數
描述
欄位設定
選中的列名
計算資料行對應的列名,類型是STRING,格式是MTABLE。
演算法保留列名
選擇演算法保留列。
參數設定
Schema
展開的列名和類型。格式為
colname coltype[, colname2, coltype2[, ...]]
,例如f0 string, f1 bigint, f2 double
。處理無效值的方法
處理無效值的方法,取值如下:
ERROR(預設值):拋異常。
SKIP:跳過。
執行調優
底層Alink作業使用的計算資源
MaxCompute
使用MaxCompute/Flink計算資源,節點個數和單節點佔用的記憶體大小配置方法請參見附錄:如何預估資源的使用量。
Flink
DLC
使用DLC計算資源,請根據介面提示配置資源規格。
通過代碼方式配置組件
您可以將以下代碼複製到PyAlink指令碼組件中,使PyAlink指令碼組件實現與該組件相同的功能。
import numpy as np
import pandas as pd
from pyalink.alink import *
df_data = pd.DataFrame([
["a1", "11L", 2.2],
["a1", "12L", 2.0],
["a2", "11L", 2.0],
["a2", "12L", 2.0],
["a3", "12L", 2.0],
["a3", "13L", 2.0],
["a4", "13L", 2.0],
["a4", "14L", 2.0],
["a5", "14L", 2.0],
["a5", "15L", 2.0],
["a6", "15L", 2.0],
["a6", "16L", 2.0]
])
input = BatchOperator.fromDataframe(df_data, schemaStr='id string, f0 string, f1 double')
zip = GroupByBatchOp()\
.setGroupByPredicate("id")\
.setSelectClause("id, mtable_agg(f0, f1) as m_table_col")
flatten = FlattenMTableBatchOp()\
.setReservedCols(["id"])\
.setSelectedCol("m_table_col")\
.setSchemaStr('f0 string, f1 int')
zip.linkFrom(input).link(flatten).print()