全部產品
Search
文件中心

Realtime Compute for Apache Flink:MaxCompute

更新時間:Aug 16, 2024

本文為您介紹MaxCompute連接器的文法結構、WITH參數和使用樣本等。

背景資訊

MaxCompute(原名ODPS)是一種快速、完全託管的EB級資料倉儲解決方案,致力於批量結構化資料的儲存和計算,提供海量資料倉儲的解決方案及分析建模服務。MaxCompute的詳情請參見什麼是MaxCompute

MaxCompute連接器支援的資訊如下。

類別

詳情

支援類型

源表、維表和結果表

運行模式

流模式和批模式

資料格式

暫不支援

特有監控指標

  • 源表

    numRecordsIn:源表當前讀取到的資料總條數。

    numRecordsInPerSecond:源表當前每秒讀取的資料條數。

    numBytesIn:源表當前讀取到的資料總位元組數(解壓縮後)。

    numBytesInPerSecond:源表當前每秒讀取的資料位元組數(解壓縮後)。

  • 結果表

    numRecordsOut:結果表當前寫出的資料總條數。

    numRecordsOutPerSecond:結果表當前每秒寫出的資料條數。

    numBytesOut:結果表當前寫出的資料總位元組數(壓縮前)。

    numBytesOutPerSecond:結果表當前每秒寫出的資料位元組數(壓縮前)。

  • 維表

    dim.odps.cacheSize:維表緩衝的資料條數。

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

Batch Tunnel和Stream Tunnel模式僅支援插入資料,Upsert Tunnel模式支援插入、更新和刪除資料。

前提條件

已建立MaxCompute表,詳情請參見建立表

使用限制

  • 僅Realtime Compute引擎VVR 2.0.0及以上版本支援MaxCompute連接器。

  • MaxCompute連接器僅支援At Least Once語義。

    說明

    At Least Once語義會保證資料不缺失,但在少部分情況下,可能會將重複資料寫入MaxCompute。不同的MaxCompute Tunnel出現重複資料的情況不同,MaxCompute Tunnel詳情請參見如何選擇資料通道?

  • 預設情況下源表為全量模式,僅會讀取partition參數中指定的分區,在讀完所有資料後結束運行,狀態轉換為finished,不會監控是否有新分區產生。

    如果您需要持續監控新分區,請通過WITH參數中指定startPartition使用增量源表模式。

    說明
    • 維表每次更新時都會檢查最新分區,不受這一限制。

    • 在源表開始運行後,向分區裡添加的新資料不會被讀取,請在分區資料完整的情況下運行作業。

文法結構

CREATE TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為odps。

    endpoint

    MaxCompute服務地址。

    String

    請參見Endpoint

    tunnelEndpoint

    MaxCompute Tunnel服務的串連地址。

    String

    請參見Endpoint

    說明
    • VPC環境下為必填。

    • 如果未填寫,MaxCompute會根據內部的負載平衡服務分配Tunnel的串連。

    project

    MaxCompute專案名稱。

    String

    無。

    schemaName

    MaxCompute Schema名稱。

    String

    僅當MaxCompute專案開啟Schema功能時,需填寫該值為MaxCompute表所屬Schema名,詳情請參見 Schema操作

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

    tableName

    MaxCompute表名。

    String

    無。

    accessId

    MaxCompute AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變數和密鑰管理

    accessKey

    MaxCompute AccessKey Secret。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理

    partition

    MaxCompute分區名。

    String

    對於非分區表和增量源表無需填寫。

    compressAlgorithm

    MaxCompute Tunnel使用的壓縮演算法。

    String

    • VVR 4.0.13及以上版本:ZLIB

    • VVR 6.0.1及以上版本:SNAPPY

    參數取值如下:

    • RAW(無壓縮)

    • ZLIB

    • SNAPPY

      SNAPPY相比ZLIB能帶來明顯的吞吐提升。在測試情境下,吞吐提升約50%。

    說明

    僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。

    quotaName

    MaxCompute獨享Data Transmission Service的quota名稱。

    String

    設定該值來使用獨享的MaxComputeData Transmission Service。

    重要
    • 僅Realtime Compute引擎VVR 8.0.3及以上版本支援該參數。

    • 設定該值時,必須刪除tunnelEndpoint參數,否則仍將使用tunnelEndpoint中指定的資料通道。

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    maxPartitionCount

    可以讀取的最大分區數量。

    Integer

    100

    如果讀取的分區數量超過了該參數,則會出現報錯The number of matched partitions exceeds the default limit

    重要

    由於一次性讀取大量分區會給MaxCompute服務帶來一定壓力,同時也會讓作業啟動速度變慢,因此您需要確認是否需要讀取這麼多分區(而不是誤填partition參數)。如果確實需要,需要手動調大maxPartitionCount參數。

    useArrow

    是否使用Arrow格式讀取資料。

    Boolean

    false

    使用Arrow格式能夠調用MaxCompute的Storage API,詳情請參見什麼是MaxCompute中使用者介面與開放性一節。

    重要
    • 僅在批作業中生效。

    • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

    splitSize

    在使用Arrow格式讀取資料時,一次拉取的資料大小。

    MemorySize

    256 MB

    僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

    重要

    僅在批作業中生效。

    compressCodec

    在使用Arrow格式讀取資料時,採用的壓縮演算法。

    String

    ""

    參數取值如下:

    • "" (無壓縮)

    • ZSTD

    • LZ4_FRAME

    指定壓縮演算法相比無壓縮能帶來一定的吞吐提升。

    重要
    • 僅在批作業中生效。

    • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

    dynamicLoadBalance

    是否允許動態分配分區。

    Boolean

    false

    參數取值如下:

    • true:允許

    • false:不允許

    允許動態分配分區能夠發揮Flink不同節點的處理效能,減少源表整體讀取時間,但也會導致不同節點讀取總資料量不一致,出現資料扭曲情況。

    重要
    • 僅在批作業中生效。

    • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

  • 增量源表專屬

    增量源表通過間歇輪詢MaxCompute伺服器擷取所有的分區資訊來發現新增的分區,讀取新分區時要求分區內資料已寫入完畢,詳情參見增量MaxCompute源表監聽到新分區時,如果該分區還有資料沒有寫完,如何處理?。通過startPartition可以指定起始點位,但注意唯讀取字典序大於等於起始點位的分區,例如分區year=2023,month=10字典序小於分區year=2023,month=9,對於這種類型的分區聲明可以通過加0補齊的方式來保證字典序正確,例如year=2023,month=09

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    startPartition

    增量讀取的起始MaxCompute分區點位(包含)。

    String

    • 使用該參數後啟用增量源表模式,將忽略partition參數。

    • 多級分區必須按分區層級從大到小聲明每個分區列的值。

    說明

    startPartition參數詳情,請參見如何填寫增量MaxCompute的startPartition參數?

    subscribeIntervalInSec

    輪詢MaxCompute擷取分區列表的時間間隔。

    Integer

    30

    單位為秒。

    modifiedTableOperation

    讀取分區過程中遇到分區資料被修改時的處理。

    Enum (NONE, SKIP)

    NONE

    由於下載session被儲存在檢查點中,每次從檢查點恢複時嘗試從該session恢複讀取進度,而該session由於分區資料被修改不可用,Flink任務會陷入不斷重啟。此時您可以設定該參數,參數取值如下:

    • NONE:需要您修改startPartition參數使其大於不可用分區,並從無狀態啟動作業。

    • SKIP:若不希望無狀態啟動,可將模式修改為SKIP,Flink嘗試從檢查點恢複session時將跳過停用分區。

    重要

    NONE和SKIP模式下,被修改分區中已讀取的資料不會被撤回,未讀取的資料將不會被讀取。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    useStreamTunnel

    是否使用MaxCompute Stream Tunnel上傳資料。

    Boolean

    false

    參數取值如下:

    • true:使用MaxCompute Stream Tunnel上傳資料。

    • false:使用MaxCompute Batch Tunnel上傳資料。

    說明

    flushIntervalMs

    MaxCompute Tunnel Writer緩衝區flush間隔。

    Long

    30000(30秒)

    MaxCompute Sink寫入記錄時,先將資料存放區到MaxCompute的緩衝區中,等緩衝區溢位或者每隔一段時間(flushIntervalMs),再把緩衝區裡的資料寫到目標MaxCompute表。

    對於Stream Tunnel,flush的資料立即可見;對於Batch Tunnel,資料flush後仍需要等待checkpoint完成後才可見,建議設定該參數為0來關閉定時flush。

    單位為毫秒。

    說明

    本參數可以與batchSize一同使用,滿足任一條件即會Flush資料。

    batchSize

    MaxCompute Tunnel Writer緩衝區flush的大小。

    Long

    67108864(64 MB)

    MaxCompute Sink寫入記錄時,先將資料存放區到MaxCompute的緩衝區中,等緩衝區達到一定大小(batchSize),再把緩衝區裡的資料寫到目標MaxCompute表。

    單位為位元組。

    說明
    • 僅Realtime Compute引擎VVR 4.0.14及以上版本支援該參數。

    • 本參數可以與flushIntervalMs一同使用,滿足任一條件即會Flush資料。

    numFlushThreads

    MaxCompute Tunnel Writer緩衝區flush的線程數。

    Integer

    1

    每個MaxCompute Sink並發將建立numFlushThreads個線程用於flush資料。當該值大於1時,將允許不同分區的資料並發Flush,提升Flush的效率。

    說明

    僅Realtime Compute引擎VVR 4.0.14及以上版本支援該參數。

    dynamicPartitionLimit

    寫入動態分區的最大數量。

    Integer

    100

    當結果表在兩次Checkpoint之間寫入的動態分區數量超過了dynamicPartitionLimit,則會出現報錯Too many dynamic partitions

    重要

    由於一次性寫入大量分區會給MaxCompute服務帶來一定壓力,同時也會導致結果表flush和作業Checkpoint變慢。因此當報錯出現時,您需要確認是否需要寫入這麼多分區。如果確實需要,需要手動調大dynamicPartitionLimit參數。

    retryTimes

    向MaxCompute伺服器請求最大重試次數。

    Integer

    3

    建立session、提交session、flush資料時可能存在短暫的MaxCompute服務停用情況,會根據該配置進行重試。

    sleepMillis

    稍候再試時間。

    Integer

    1000

    單位為毫秒。

    enableUpsert

    是否使用MaxCompute Upsert Tunnel上傳資料。

    Boolean

    false

    參數取值如下:

    • true:使用Upsert Tunnel,處理Flink中的INSERT、UPDATE_AFTER和DELETE資料。

    • false:根據useStreamTunnel參數使用Batch Tunnel或Stream Tunnel,處理Flink中的INSERT、UPDATE_AFTER資料。

    重要
    • 若Upsert模式下MaxCompute sink提交時出現報錯、失敗、耗時間長度等情況,建議限制sink節點的並發數在10以內。

    • 僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

    upsertAsyncCommit

    Upsert模式下在提交session時是否使用非同步模式。

    Boolean

    false

    參數取值如下:

    • true:使用非同步模式,提交耗時更短,但提交完成時寫入的資料非立即可讀。

    • false:預設為同步模式,提交時將等待服務側處理完session。

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

    upsertCommitTimeoutMs

    Upsert模式下提交session逾時時間。

    Integer

    120000

    (120秒)

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

  • 維表專屬

    MaxCompute維表在作業啟動時從指定的分區拉取全量資料,partition參數支援使用max_pt()等函數。當緩衝到期重新載入時會重新解析partition參數拉取最新的分區,使用max_two_pt()時維表可拉取兩個分區,其他情況下只支援指定單個分區。

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    cache

    緩衝策略。

    String

    目前MaxCompute維表僅支援ALL策略,必須顯式聲明。 適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。

    ALL策略:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表查詢都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

    說明
    • 因為系統會非同步載入維表資料,所以在使用CACHE ALL時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的至少4倍,具體值與MaxCompute儲存壓縮演算法有關。

    • 如果MaxCompute維表資料量較大,可以考慮使用SHUFFLE_HASH註解將維表資料均勻分散到各個並發中。詳情請參見如何使用維表SHUFFLE_HASH註解?

    • 在使用超大MaxCompute維表時,如果JVM頻繁GC導致作業異常,且在增加維表JOIN節點的記憶體仍無改善的情況下,建議改為支援LRU Cache策略的KV型維表,例如雲資料庫Hbase版維表。

    cacheSize

    最多緩衝的資料條數。

    Long

    100000

    如果維表資料量超過了cacheSize,則會出現報錯Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit

    重要

    由於維表資料量太大會佔用大量JVM堆記憶體,同時也會讓作業啟動和維表更新變慢,因此您需要確認是否需要緩衝這麼多資料,如果確實需要,需要手動調大該參數。

    cacheTTLMs

    緩衝逾時時間,也就是緩衝更新的間隔時間。

    Long

    Long.MAX_VALUE(相當於永不更新)

    單位為毫秒。

    cacheReloadTimeBlackList

    更新時間黑名單。在該參數規定的時間段內不會更新緩衝。

    String

    用於防止緩衝在關鍵時間段(例如活動流量峰值期間)更新導致作業不穩定。填寫方式詳情請參見如何填寫CacheReloadTimeBlackList參數?

    maxLoadRetries

    緩衝更新時(包含作業啟動時初次拉取資料)最多嘗試次數,超過該次數後作業運行失敗。

    Integer

    10

    無。

類型映射

MaxCompute支援的類型參見2.0資料類型版本

MaxCompute類型

Flink類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

重要

當MaxCompute物理表中同時存在嵌套的複合類型欄位(ARRAY、MAP或STRUCT)和JSON類型欄位時,需要在建立MaxCompute物理表時指定tblproperties('columnar.nested.type'='true'),才能被Flink正確讀寫。

使用樣本

SQL

  • 源表示例

    • 全量讀取

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=201809*'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT
         cid,
         COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
    • 增量讀取

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 從20180905對應分區開始讀取
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT cid, COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
  • 結果表示例

    • 寫入固定分區

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905' -- 寫入固定分區ds=20180905。
      );
      
      INSERT INTO odps_sink
      SELECT
        id, len, content
      FROM datagen_source;
    • 寫入動態分區

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id  INT,
        len INT,
        content VARCHAR,
        ds VARCHAR --需要顯式聲明動態分區列。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds' --不寫分區的值,表示根據ds欄位的值寫入不同分區。
      );
      
      INSERT INTO odps_sink
      SELECT
         id,
         len,
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM datagen_source;
  • 維表示例

    • 一對一維表

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR,
        PRIMARY KEY (k) NOT ENFORCED  -- 一對一維表需要聲明主鍵。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
    • 一對多維表

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR
        -- 一對多維表無需聲明主鍵。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream

重要
  • 通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。Maven中央庫中已經放置了MaxCompute DataStream連接器

  • 為了保護智慧財產權,從Realtime Compute引擎VVR6.0.6版本起,此連接器在本地調試單次運行作業的時間為30分鐘,30分鐘後作業會報錯並退出。本地運行和調試包含MaxCompute連接器的作業參見本地運行和調試包含連接器的作業

  • 若您在Flink開發控制台提交作業後,出現本地運行和調試包含連接器的作業中類似的MaxCompute相關類ClassNotFound問題,請下載Maven中央庫中對應版本中尾碼為uber.jar的檔案,添加為作業的附加依賴。以1.15-vvr-6.0.6版本為例,需下載的檔案為該倉庫目錄下的verveica-connector-odps-1.15-vvr-6.0.6-uber.jar。

MaxCompute連接器的Maven依賴包含了構建全量源表、增量源表、結果表和維表的所需要的類。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${connector.version}</version>
</dependency>

在DataStream中使用MaxCompute連接器推薦使用SQL聲明MaxCompute表,通過Table/DataStream相互轉換來串連MaxCompute表和資料流。

  • 串連源表

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.executeSql(String.join(
        "\n",
        "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
        "  cid VARCHAR,",
        "  rt DOUBLE",
        ") WITH (",
        "  'connector' = 'odps',",
        "  'endpoint' = '<yourEndpointName>',",
        "  'project' = '<yourProjectName>',",
        "  'tableName' = '<yourTableName>',",
        "  'accessId' = '<yourAccessId>',",
        "  'accessKey' = '<yourAccessPassword>',",
        "  'partition' = 'ds=201809*'",
        ")");
    DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
    source.print();
    env.execute("odps source"); 
  • 串連結果表

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.executeSql(String.join(
        "\n",
        "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
        "  cid VARCHAR,",
        "  rt DOUBLE",
        ") WITH (",
        "  'connector' = 'odps',",
        "  'endpoint' = '<yourEndpointName>',",
        "  'project' = '<yourProjectName>',",
        "  'tableName' = '<yourTableName>',",
        "  'accessId' = '<yourAccessId>',",
        "  'accessKey' = '<yourAccessPassword>',",
        "  'partition' = 'ds=20180905'",
        ")");
    DataStream<Row> data = env.fromElements(
        Row.of("id0", 3.),
        Row.of("id1", 4.));
    tEnv.fromDataStream(data).insertInto("odps_sink").execute();

常見問題