本文由簡體中文內容自動轉碼而成。阿里雲不保證此自動轉碼的準確性、完整性及時效性。本文内容請以簡體中文版本為準。

MaxCompute

更新時間:2025-02-28 19:23

本文為您介紹MaxCompute連接器的文法結構、WITH參數和使用樣本等。

背景資訊

MaxCompute(原名ODPS)是一種快速、完全託管的EB級資料倉儲解決方案,致力於批量結構化資料的儲存和計算,提供海量資料倉儲的解決方案及分析建模服務。MaxCompute的詳情請參見什麼是MaxCompute

MaxCompute連接器支援的資訊如下。

類別

詳情

類別

詳情

支援類型

源表、維表和結果表

運行模式

流模式和批模式

資料格式

暫不支援

特有監控指標

  • 源表

    numRecordsIn:源表當前讀取到的資料總條數。

    numRecordsInPerSecond:源表當前每秒讀取的資料條數。

    numBytesIn:源表當前讀取到的資料總位元組數(解壓縮後)。

    numBytesInPerSecond:源表當前每秒讀取的資料位元組數(解壓縮後)。

  • 結果表

    numRecordsOut:結果表當前寫出的資料總條數。

    numRecordsOutPerSecond:結果表當前每秒寫出的資料條數。

    numBytesOut:結果表當前寫出的資料總位元組數(壓縮前)。

    numBytesOutPerSecond:結果表當前每秒寫出的資料位元組數(壓縮前)。

  • 維表

    dim.odps.cacheSize:維表緩衝的資料條數。

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

Batch Tunnel和Stream Tunnel模式僅支援插入資料,Upsert Tunnel模式支援插入、更新和刪除資料。

前提條件

已建立MaxCompute表,詳情請參見建立表

使用限制

  • MaxCompute連接器僅支援At Least Once語義。

    說明

    At Least Once語義會保證資料不缺失,但在少部分情況下,可能會將重複資料寫入MaxCompute。不同的MaxCompute Tunnel出現重複資料的情況不同,MaxCompute Tunnel詳情請參見如何選擇資料通道?

  • 預設情況下源表為全量模式,僅會讀取partition參數中指定的分區,在讀完所有資料後結束運行,狀態轉換為finished,不會監控是否有新分區產生。

    如果您需要持續監控新分區,請通過WITH參數中指定startPartition使用增量源表模式。

    說明
    • 維表每次更新時都會檢查最新分區,不受這一限制。

    • 在源表開始運行後,向分區裡添加的新資料不會被讀取,請在分區資料完整的情況下運行作業。

文法結構

CREATE TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為odps。

    endpoint

    MaxCompute服務地址。

    String

    請參見Endpoint

    tunnelEndpoint

    MaxCompute Tunnel服務的串連地址。

    String

    請參見Endpoint

    說明

    如果未填寫,MaxCompute會根據內部的負載平衡服務分配Tunnel的串連。

    project

    MaxCompute專案名稱。

    String

    無。

    schemaName

    MaxCompute Schema名稱。

    String

    僅當MaxCompute專案開啟Schema功能時,需填寫該值為MaxCompute表所屬Schema名,詳情請參見 Schema操作

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

    tableName

    MaxCompute表名。

    String

    無。

    accessId

    MaxCompute AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

    重要

    為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

    accessKey

    MaxCompute AccessKey Secret。

    String

    partition

    MaxCompute分區名。

    String

    對於非分區表和增量源表無需填寫。

    compressAlgorithm

    MaxCompute Tunnel使用的壓縮演算法。

    String

    • VVR 4.0.13及以上版本:ZLIB

    • VVR 6.0.1及以上版本:SNAPPY

    參數取值如下:

    • RAW(無壓縮)

    • ZLIB

    • SNAPPY

      SNAPPY相比ZLIB能帶來明顯的吞吐提升。在測試情境下,吞吐提升約50%。

    說明

    僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。

    quotaName

    MaxCompute獨享Data Transmission Service的quota名稱。

    String

    設定該值來使用獨享的MaxComputeData Transmission Service。

    重要
    • 僅Realtime Compute引擎VVR 8.0.3及以上版本支援該參數。

    • 設定該值時,必須刪除tunnelEndpoint參數,否則仍將使用tunnelEndpoint中指定的資料通道。

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    maxPartitionCount

    可以讀取的最大分區數量。

    Integer

    100

    如果讀取的分區數量超過了該參數,則會出現報錯The number of matched partitions exceeds the default limit

    重要

    由於一次性讀取大量分區會給MaxCompute服務帶來一定壓力,同時也會讓作業啟動速度變慢,因此您需要確認是否需要讀取這麼多分區(而不是誤填partition參數)。如果確實需要,需要手動調大maxPartitionCount參數。

    useArrow

    是否使用Arrow格式讀取資料。

    Boolean

    false

    使用Arrow格式能夠調用MaxCompute的Storage API,詳情請參見什麼是MaxCompute中使用者介面與開放性一節。

    重要
    • 僅在批作業中生效。

    • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

    splitSize

    在使用Arrow格式讀取資料時,一次拉取的資料大小。

    MemorySize

    256 MB

    僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

    重要

    僅在批作業中生效。

    compressCodec

    在使用Arrow格式讀取資料時,採用的壓縮演算法。

    String

    ""

    參數取值如下:

    • "" (無壓縮)

    • ZSTD

    • LZ4_FRAME

    指定壓縮演算法相比無壓縮能帶來一定的吞吐提升。

    重要
    • 僅在批作業中生效。

    • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

    dynamicLoadBalance

    是否允許動態分配分區。

    Boolean

    false

    參數取值如下:

    • true:允許

    • false:不允許

    允許動態分配分區能夠發揮Flink不同節點的處理效能,減少源表整體讀取時間,但也會導致不同節點讀取總資料量不一致,出現資料扭曲情況。

    重要
    • 僅在批作業中生效。

    • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

  • 增量源表專屬

    增量源表通過間歇輪詢MaxCompute伺服器擷取所有的分區資訊來發現新增的分區,讀取新分區時要求分區內資料已寫入完畢,詳情參見增量MaxCompute源表監聽到新分區時,如果該分區還有資料沒有寫完,如何處理?。通過startPartition可以指定起始點位,但注意唯讀取字典序大於等於起始點位的分區,例如分區year=2023,month=10字典序小於分區year=2023,month=9,對於這種類型的分區聲明可以通過加0補齊的方式來保證字典序正確,例如year=2023,month=09

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    startPartition

    增量讀取的起始MaxCompute分區點位(包含)。

    String

    • 使用該參數後啟用增量源表模式,將忽略partition參數。

    • 多級分區必須按分區層級從大到小聲明每個分區列的值。

    說明

    startPartition參數詳情,請參見如何填寫增量MaxCompute的startPartition參數?

    subscribeIntervalInSec

    輪詢MaxCompute擷取分區列表的時間間隔。

    Integer

    30

    單位為秒。

    modifiedTableOperation

    讀取分區過程中遇到分區資料被修改時的處理。

    Enum (NONE, SKIP)

    NONE

    由於下載session被儲存在檢查點中,每次從檢查點恢複時嘗試從該session恢複讀取進度,而該session由於分區資料被修改不可用,Flink任務會陷入不斷重啟。此時您可以設定該參數,參數取值如下:

    • NONE:需要您修改startPartition參數使其大於不可用分區,並從無狀態啟動作業。

    • SKIP:若不希望無狀態啟動,可將模式修改為SKIP,Flink嘗試從檢查點恢複session時將跳過停用分區。

    重要
    • 僅Realtime Compute引擎VVR 8.0.3及以上版本支援該參數。

    • NONE和SKIP模式下,被修改分區中已讀取的資料不會被撤回,未讀取的資料將不會被讀取。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    useStreamTunnel

    是否使用MaxCompute Stream Tunnel上傳資料。

    Boolean

    false

    參數取值如下:

    • true:使用MaxCompute Stream Tunnel上傳資料。

    • false:使用MaxCompute Batch Tunnel上傳資料。

    說明

    flushIntervalMs

    MaxCompute Tunnel Writer緩衝區flush間隔。

    Long

    30000(30秒)

    MaxCompute Sink寫入記錄時,先將資料存放區到MaxCompute的緩衝區中,等緩衝區溢位或者每隔一段時間(flushIntervalMs),再把緩衝區裡的資料寫到目標MaxCompute表。

    對於Stream Tunnel,flush的資料立即可見;對於Batch Tunnel,資料flush後仍需要等待checkpoint完成後才可見,建議設定該參數為0來關閉定時flush。

    單位為毫秒。

    說明

    本參數可以與batchSize一同使用,滿足任一條件即會Flush資料。

    batchSize

    MaxCompute Tunnel Writer緩衝區flush的大小。

    Long

    67108864(64 MB)

    MaxCompute Sink寫入記錄時,先將資料存放區到MaxCompute的緩衝區中,等緩衝區達到一定大小(batchSize),再把緩衝區裡的資料寫到目標MaxCompute表。

    單位為位元組。

    說明
    • 僅Realtime Compute引擎VVR 4.0.14及以上版本支援該參數。

    • 本參數可以與flushIntervalMs一同使用,滿足任一條件即會Flush資料。

    numFlushThreads

    MaxCompute Tunnel Writer緩衝區flush的線程數。

    Integer

    1

    每個MaxCompute Sink並發將建立numFlushThreads個線程用於flush資料。當該值大於1時,將允許不同分區的資料並發Flush,提升Flush的效率。

    說明

    僅Realtime Compute引擎VVR 4.0.14及以上版本支援該參數。

    dynamicPartitionLimit

    寫入動態分區的最大數量。

    Integer

    100

    當結果表在兩次Checkpoint之間寫入的動態分區數量超過了dynamicPartitionLimit,則會出現報錯Too many dynamic partitions

    重要

    由於一次性寫入大量分區會給MaxCompute服務帶來一定壓力,同時也會導致結果表flush和作業Checkpoint變慢。因此當報錯出現時,您需要確認是否需要寫入這麼多分區。如果確實需要,需要手動調大dynamicPartitionLimit參數。

    retryTimes

    向MaxCompute伺服器請求最大重試次數。

    Integer

    3

    建立session、提交session、flush資料時可能存在短暫的MaxCompute服務停用情況,會根據該配置進行重試。

    sleepMillis

    稍候再試時間。

    Integer

    1000

    單位為毫秒。

    enableUpsert

    是否使用MaxCompute Upsert Tunnel上傳資料。

    Boolean

    false

    參數取值如下:

    • true:使用Upsert Tunnel,處理Flink中的INSERT、UPDATE_AFTER和DELETE資料。

    • false:根據useStreamTunnel參數使用Batch Tunnel或Stream Tunnel,處理Flink中的INSERT、UPDATE_AFTER資料。

    重要
    • 若Upsert模式下MaxCompute sink提交時出現報錯、失敗、耗時間長度等情況,建議限制sink節點的並發數在10以內。

    • 僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

    upsertAsyncCommit

    Upsert模式下在提交session時是否使用非同步模式。

    Boolean

    false

    參數取值如下:

    • true:使用非同步模式,提交耗時更短,但提交完成時寫入的資料非立即可讀。

    • false:預設為同步模式,提交時將等待服務側處理完session。

    說明

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

    upsertCommitTimeoutMs

    Upsert模式下提交session逾時時間。

    Integer

    120000

    (120秒)

    僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

    sink.operation

    寫入Delta Table時的寫入模式。

    String

    insert

    參數取值如下:

    • insert: 寫入資料模式為追加

    • upsert:寫入資料模式為更新

    說明

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    sink.parallelism

    寫入Delta Table時的並行度

    Integer

    None

    • 寫入的並行度,如果不設定,則預設使用上遊資料並行度。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    重要

    確保Delta Table表屬性 write.bucket.num 是該配置值的整數倍,這樣可以獲得最佳的寫入效能,並且能夠最有效地節省 Sink 節點記憶體。

    sink.file-cached.enable

    寫入Delta table動態分區時,是否使用檔案快取模式。

    Boolean

    false

    參數取值如下:

    • true:使用檔案快取模式

    • false:不使用檔案快取模式

    使用檔案快取模式能夠減少寫入服務端的小檔案數量,但是寫出資料的延遲更高。在結果表並行度較高時建議使用檔案快取模式。

    說明

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    sink.file-cached.writer.num

    檔案快取模式下,單個Task上傳資料的並發數。

    Integer

    16

    • 僅在設定了 sink.file-cached.enable 為 true 的情況下生效。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    • 建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致記憶體溢出(OOM)。

    sink.bucket.check-interval

    檔案快取模式下,檢查檔案大小的周期,單位:毫秒(ms)。

    Integer

    60000

    • 僅在設定了 sink.file-cached.enable 為 true 的情況下生效。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    sink.file-cached.rolling.max-size

    檔案快取模式下,單個快取檔案的最大值。

    MemorySize

    16 M

    • 僅在設定了 sink.file-cached.enable 為 true 的情況下生效。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    • 若檔案大小超過該值,會將該檔案資料上傳到服務端。

    sink.file-cached.memory

    檔案快取模式下,寫入檔案使用的最大堆外記憶體大小。

    MemorySize

    64 M

    • 僅在設定了 sink.file-cached.enable 為 true 的情況下生效。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    sink.file-cached.memory.segment-size

    檔案快取模式下,寫入檔案的使用的buffer大小。

    MemorySize

    128 KB

    • 僅在設定了 sink.file-cached.enable 為 true 的情況下生效。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    sink.file-cached.flush.always

    檔案快取模式下,寫入檔案是否使用緩衝。

    Boolean

    true

    • 僅在設定了 sink.file-cached.enable 為 true 的情況下生效。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    sink.file-cached.write.max-retries

    檔案快取模式下,上傳資料的重試次數。

    Integer

    3

    • 僅在設定了 sink.file-cached.enable 為 true 的情況下生效。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.writer.max-retries

    Upsert Writer寫入Bucket失敗後的重試次數。

    Integer

    3

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.writer.buffer-size

    單個Upsert Writer資料在Flink中的緩衝大小。

    MemorySize

    64 m

    • 當所有Bucket的緩衝區大小總和達到預設閾值時,系統將自動觸發重新整理操作,將資料更新到伺服器端。

    說明

    一個upsert writer裡會同時寫入多個Bucket,建議提高該值,以提升寫入效率。

    若寫入分區較多時,會存在引發記憶體OOM風險,可考慮降低該參數值。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.writer.bucket.buffer-size

    單個Bucket資料在Flink中的緩衝大小。

    MemorySize

    1 m

    • 當叢集記憶體資源緊張時,可以減小該參數值。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.write.bucket.num

    寫入表的bucket數量。

    Integer

    None

    • 必須與寫入Delta Table表的write.bucket.num屬性值一致。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.write.slot-num

    單個Session使用Tunnel slot數量。

    Integer

    1

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.commit.max-retries

    Upsert Session Commit重試次數。

    Integer

    3

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.commit.thread-num

    Upsert Session Commit的並行度。

    Integer

    16

    • 不建議將此參數值調整得過大,因為當同時進行的提交並發數越多時,會導致資源消耗增加,可能導致效能問題或資源過度消耗。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.commit.timeout

    Upsert Session Commit等待逾時時間,單位:秒(s)。

    Integer

    600

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.flush.concurrent

    限制單個分區允許同時寫入的最大Bucket數。

    Integer

    2

    • 每當一個bucket的資料重新整理時,將會佔用一個Tunnel Slot資源。

    • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    insert.commit.thread-num

    Commit Session的並行度。

    Integer

    16

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    insert.arrow-writer.enable

    是否使用Arrow格式。

    Boolean

    false

    參數取值如下:

    • true:使用Arrow模式

    • false:不使用Arrow模式

    說明

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    insert.arrow-writer.batch-size

    Arrow Batch的最大行數。

    Integer

    512

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    insert.arrow-writer.flush-interval

    Writer Flush間隔,單位毫秒(ms)。

    Integer

    100000

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    insert.writer.buffer-size

    使用Buffered Writer的緩衝大小。

    MemorySize

    64 M

    僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

    upsert.partial-column.enable

    是否僅更新部分列。

    Boolean

    false

    只在結果表類型為Delta Table時生效,詳情請參見部分列更新

    參數取值如下:

    • true:僅更新部分列

    • false:更新全部列

    根據結果表是否存在更新資料的主鍵,資料寫入分以下幾種情況:

    • 結果表存在相同主鍵的資料,按照主鍵更新這條資料,使用指定列不為null的數值進行更新。

    • 結果表不存在相同主鍵的資料,按照主鍵新增一條資料,插入指定列的數值,對於指定列之外的列插入null

    說明

    僅Realtime Compute引擎VVR 8.0.11及以上版本支援該參數。

  • 維表專屬

    MaxCompute維表在作業啟動時從指定的分區拉取全量資料,partition參數支援使用max_pt()等函數。當緩衝到期重新載入時會重新解析partition參數拉取最新的分區,使用max_two_pt()時維表可拉取兩個分區,其他情況下只支援指定單個分區。

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    cache

    緩衝策略。

    String

    目前MaxCompute維表僅支援ALL策略,必須顯式聲明。 適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。

    ALL策略:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表查詢都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

    說明
    • 因為系統會非同步載入維表資料,所以在使用CACHE ALL時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的至少4倍,具體值與MaxCompute儲存壓縮演算法有關。

    • 如果MaxCompute維表資料量較大,可以考慮使用SHUFFLE_HASH註解將維表資料均勻分散到各個並發中。詳情請參見如何使用維表SHUFFLE_HASH註解?

    • 在使用超大MaxCompute維表時,如果JVM頻繁GC導致作業異常,且在增加維表JOIN節點的記憶體仍無改善的情況下,建議改為支援LRU Cache策略的KV型維表,例如雲資料庫Hbase版維表。

    cacheSize

    最多緩衝的資料條數。

    Long

    100000

    如果維表資料量超過了cacheSize,則會出現報錯Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit

    重要

    由於維表資料量太大會佔用大量JVM堆記憶體,同時也會讓作業啟動和維表更新變慢,因此您需要確認是否需要緩衝這麼多資料,如果確實需要,需要手動調大該參數。

    cacheTTLMs

    緩衝逾時時間,也就是緩衝更新的間隔時間。

    Long

    Long.MAX_VALUE(相當於永不更新)

    單位為毫秒。

    cacheReloadTimeBlackList

    更新時間黑名單。在該參數規定的時間段內不會更新緩衝。

    String

    用於防止緩衝在關鍵時間段(例如活動流量峰值期間)更新導致作業不穩定。填寫方式詳情請參見如何填寫CacheReloadTimeBlackList參數?

    maxLoadRetries

    緩衝更新時(包含作業啟動時初次拉取資料)最多嘗試次數,超過該次數後作業運行失敗。

    Integer

    10

    無。

類型映射

MaxCompute支援的類型參見2.0資料類型版本

MaxCompute類型

Flink類型

MaxCompute類型

Flink類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

重要

當MaxCompute物理表中同時存在嵌套的複合類型欄位(ARRAY、MAP或STRUCT)和JSON類型欄位時,需要在建立MaxCompute物理表時指定tblproperties('columnar.nested.type'='true'),才能被Flink正確讀寫。

使用樣本

SQL

  • 源表示例

    • 全量讀取

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=201809*'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT
         cid,
         COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
    • 增量讀取

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 從20180905對應分區開始讀取
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT cid, COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
  • 結果表示例

    • 寫入固定分區

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905' -- 寫入固定分區ds=20180905。
      );
      
      INSERT INTO odps_sink
      SELECT
        id, len, content
      FROM datagen_source;
    • 寫入動態分區

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id  INT,
        len INT,
        content VARCHAR,
        ds VARCHAR --需要顯式聲明動態分區列。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds' --不寫分區的值,表示根據ds欄位的值寫入不同分區。
      );
      
      INSERT INTO odps_sink
      SELECT
         id,
         len,
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM datagen_source;
  • 維表示例

    • 一對一維表

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR,
        PRIMARY KEY (k) NOT ENFORCED  -- 一對一維表需要聲明主鍵。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
    • 一對多維表

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR
        -- 一對多維表無需聲明主鍵。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream

重要
  • 通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法

  • 為了保護智慧財產權,從Realtime Compute引擎VVR6.0.6版本起,此連接器在本地調試單次運行作業的時間為30分鐘,30分鐘後作業會報錯並退出。本地運行和調試包含MaxCompute連接器的作業,請參見本地運行和調試包含連接器的作業

  • 暫不支援讀取Delta Table,即建表時指定了primary keytransactional=true的表,詳情請參見Delta Table概述

在DataStream中使用MaxCompute連接器推薦使用SQL聲明MaxCompute表,通過Table/DataStream相互轉換來串連MaxCompute表和資料流

串連源表
串連結果表
XML
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

MaxCompute連接器的Maven依賴包含了構建全量源表、增量源表、結果表和維表的所需要的類。Maven中央庫中已經放置了MaxCompute DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題

  • 本頁導讀 (1, M)
  • 背景資訊
  • 前提條件
  • 使用限制
  • 文法結構
  • WITH參數
  • 類型映射
  • 使用樣本
  • SQL
  • DataStream
  • 常見問題
文檔反饋
phone 聯絡我們

立即和Alibaba Cloud在線服務人員進行交談,獲取您想了解的產品信息以及最新折扣。

alicare alicarealicarealicare