本文為您介紹如何使用Object Storage Service連接器。
阿里雲Object Storage Service(Object Storage Service)是一款海量、安全、低成本和高可靠的雲端儲存體服務,可提供99.9999999999%(12個9)的資料持久性,99.995%的資料可用性。多種儲存類型供選擇,全面最佳化儲存成本。
類別 | 詳情 |
支援類型 | 源表和結果表 |
運行模式 | 批模式和流模式 |
資料格式 | Orc、Parquet、Avro、Csv、JSON和Raw 說明 僅Realtime Compute引擎VVR 6.0.7及以上版本支援讀取Parquet格式的資料。 |
特有監控指標 | 暫無 |
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | 不支援更新和刪除結果表資料,只支援插入資料。 |
使用限制
通用
僅Flink計算引擎VVR 4.0.14及以上版本支援讀取或寫入OSS。
Flink計算引擎VVR 8.0.6以下版本僅支援讀取或寫入相同帳號下的OSS。
說明如需讀寫其他帳號下的OSS,請使用Flink計算引擎VVR 8.0.6及以上版本且配置Bucket鑒權資訊,詳情請參見配置Bucket鑒權資訊。
結果表專屬
對於寫入OSS,目前暫不支援寫Avro、CSV、JSON和Raw此類行存的格式,具體原因請參見FLINK-30635。
僅Flink計算引擎VVR6.0.6及以上版本支援寫入OSS-HDFS服務,具體請參見寫OSS-HDFS。
文法結構
CREATE TABLE OssTable (
column_name1 INT,
column_name2 STRING,
...
datetime STRING,
`hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = '...'
);
元資訊列
您可以在源表中讀取資訊列,以擷取讀取OSS資料對應的元資訊。例如,如果在OSS源表中定義了元資訊列file.path
,則該列的值為該行資料所在的檔案路徑。元資訊列的使用樣本如下。
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'oss://<bucket>/path',
'format' = 'json'
)
下表列出了OSS源表所支援的元資訊列:
Key | 資料類型 | 說明 |
file.path | STRING NOT NULL | 該行資料所在的檔案路徑。 |
file.name | STRING NOT NULL | 該行資料所在的檔案名稱,即距離檔案根路徑最遠的元素。 |
file.size | BIGINT NOT NULL | 該行資料所在的檔案的位元組數。 |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | 該行資料所在的檔案的修改時間。 |
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為
filesystem
。path
檔案系統路徑。
String
是
無
URI格式,例如
oss://my_bucket/my_path
。說明VVR 8.0.6及以上版本配置該參數後,您還需要配置Bucket鑒權資訊,才能正常讀寫指定檔案系統路徑下的資料,配置方法請參見配置Bucket鑒權資訊。
format
檔案格式。
String
是
無
參數取值如下:
csv
json
avro
parquet
orc
raw
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
source.monitor-interval
設定新檔案的監控時間間隔,並且必須設定>0的值。
Duration
否
無
如果未設定此配置項,則提供的路徑僅會被掃描一次,因此源將是有界的。
每個檔案都由其路徑唯一標識,一旦發現新檔案,就會處理一次。
已處理的檔案在source的整個生命週期記憶體儲在state中。因此,source的state在checkpoint和savepoint時進行儲存。更短的時間間隔檔案會被更快地發現,但也會更頻繁地遍曆檔案系統或Object Storage Service。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
partition.default-name
在分區欄位值為NULL或Null 字元串時,分區的名稱。
String
否
_DEFAULT_PARTITION__
無。
sink.rolling-policy.file-size
滾動前,檔案大小的最大值。
MemorySize
否
128 MB
寫入目錄下的資料被分割到part檔案中。每個分區對應sink的收到資料的subtask都至少會為該分區產生一個part檔案。根據可配置的滾動策略,當前in-progress part檔案將被關閉,產生新的part檔案。該策略基於大小和指定的檔案可被開啟的最大timeout時間長度,來滾動part檔案。
說明對於列存格式來說,
即使檔案不滿足設定的滾動策略,但是在做checkpoint時,總是會滾動檔案。
所以只要檔案滿足了設定的滾動策略或者做了checkpoint,檔案總是會被滾動。
而對於行存格式來說,只有在滿足rolling policy配置的情況下才會滾動檔案。
sink.rolling-policy.rollover-interval
滾動前,part檔案處於開啟狀態的最大時間長度。
Duration
否
30min
檢查頻率是由sink.rolling-policy.check-interval屬性控制。
sink.rolling-policy.check-interval
基於時間滾動策略的檢查間隔。
Duration
否
1min
該屬性控制了基於sink.rolling-policy.rollover-interval屬性的檢查檔案是否該被滾動了。
auto-compaction
在流式結果表中是否開啟自動合并功能。資料首先會被寫入臨時檔案。當checkpoint完成後,該檢查點產生的臨時檔案會被合并,臨時檔案在合并前不可見。
Boolean
否
false
如果啟用檔案合并功能,會根據目標檔案大小,將多個小檔案合并成大檔案。在生產環境中使用檔案合并功能時,需要注意:
只有checkpoint內部的檔案才會被合并,會至少產生與checkpoint個數相同的檔案個數。
合并前檔案不可見,檔案的可見時間是
checkpoint間隔時間長度+合并時間長度
。合并時間過長,將導致反壓,延長checkpoint所需時間。
compaction.file-size
合并目標檔案大小。
MemorySize
否
128 MB
預設值與滾動檔案大小sink.rolling-policy.file-size相同。
sink.partition-commit.trigger
分區提交觸發器類型。
String
否
process-time
對於寫分區表,Flink提供了兩種類型分區提交觸發器,類型如下兩種:
process-time:分區提交觸發器基於分區建立時間和當前系統時間,既不需要分區時間提取器,也不需要watermark產生器。一旦當前系統時間超過了分區建立系統時間和sink.partition-commit.delay之和,立即提交分區。這種觸發器更具通用性,但不是很精確。例如,資料延遲或故障將導致過早提交分區。
partition-time:基於提取的分區時間,需要watermark產生。這需要Job支援watermark產生,分區是根據時間來切割的,例如,按小時或按天分區。一旦watermark超過了分區建立系統時間和sink.partition-commit.delay之和立即提交分區。
sink.partition-commit.delay
分區被提交的最大延遲時間。表明該延遲時間之前分區不會被提交。
Duration
否
0s
如果是按天分區,可以設定為
1 d
。如果是按小時分區,應設定為
1 h
。
sink.partition-commit.watermark-time-zone
解析Long類型的watermark到TIMESTAMP類型時所採用的時區,解析得到的watermark的TIMESTAMP會跟分區時間進行比較,以判斷該分區是否需要被提交。
String
否
UTC
僅當sink.partition-commit.trigger被設定為partition-time時有效。
如果設定得不正確,例如,在TIMESTAMP_LTZ類型的列上定義了source rowtime,如果沒有設定該屬性,那麼使用者可能會在若干個小時後才看到分區的提交。預設值為UTC,意味著watermark是定義在TIMESTAMP類型的列上或者沒有定義watermark。
如果watermark定義在TIMESTAMP_LTZ類型的列上,watermark時區必須是會話時區。該屬性的可選值要麼是完整的時區名(例如'America/Los_Angeles'),要麼是自訂時區(例如'GMT-08:00')。
partition.time-extractor.kind
從分區欄位中提取時間的時間提取器。
String
否
default
參數取值如下:
default(預設):預設情況下,可以配置timestamp pattern或formatter。
custom:應指定提取器類。
partition.time-extractor.class
實現PartitionTimeExtractor介面的提取器類。
String
否
無
無。
partition.time-extractor.timestamp-pattern
允許使用者使用分區欄位來擷取合法的timestamp pattern的預設construction方式。
String
否
無
預設支援第一個欄位按
yyyy-MM-dd hh:mm:ss
這種模式提取。如果需要從一個分區欄位'dt'提取timestamp,可以配置:$dt。
如果需要從多個分區欄位,比如year、month和day和hour提取timestamp,可以配置成:
$year-$month-$day $hour:00:00
。如果需要從兩個分區欄位dt和hour提取timestamp,可以配置成:
$dt $hour:00:00
。
partition.time-extractor.timestamp-formatter
轉換分區timestamp字串值為timestamp的formatter,分區timestamp字串值通過partition.time-extractor.timestamp-pattern屬性工作表達。
String
否
yyyy-MM-dd HH:mm:ss
例如,分區timestamp提取來自多個分區欄位,比如year、month和day,可以配置partition.time-extractor.timestamp-pattern屬性為
$year$month$day
,並且配置partition.time-extractor.timestamp-formatter屬性為yyyyMMdd。預設的formatter是yyyy-MM-dd HH:mm:ss
。這裡的timestamp-formatter和Java的DateTimeFormatter是通用的。sink.partition-commit.policy.kind
分區提交策略類型。
String
否
無
分區提交策略通知下遊某個分區,該分區已經寫入完畢可以被讀取。參數取值如下:
success-file:在目錄中增加_success檔案。
custom:通過指定的類來建立提交策略。支援同時指定多個提交策略。
sink.partition-commit.policy.class
實現PartitionCommitPolicy介面的分區提交策略類。
String
否
無
該類只有在custom提交策略下才能使用。
sink.partition-commit.success-file.name
使用success-file分區提交策略時的檔案名稱。
String
否
_SUCCESS
無。
sink.parallelism
將檔案寫入外部檔案系統的parallelism。
Integer
否
無
預設情況下,該sink parallelism與上遊chained operator的parallelism一樣。當配置了跟上遊的chained operator不一樣的parallelism時,寫檔案的運算元會使用指定的sink parallelism,如果開啟了檔案合并,檔案合并的運算元也會使用指定的sink parallelism。
說明這個值應該大於0,否則將拋出異常。
配置Bucket鑒權資訊
僅Realtime Compute引擎VVR 8.0.6及以上版本支援配置Bucket鑒權資訊。
指定檔案系統路徑後,您還需要配置Bucket鑒權資訊,才能正常讀寫您指定檔案系統路徑下的資料。配置Bucket鑒權資訊需要在Realtime Compute開發控制台部署詳情頁簽運行參數配置地區的其他配置中添加如下代碼。
fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx
其中涉及到的參數解釋如下表所示:
配置項 | 說明 |
fs.oss.bucket.<bucketName>.accessKeyId | 參數配置說明如下:
|
fs.oss.bucket.<bucketName>.accessKeySecret |
寫OSS-HDFS
首先需要在Realtime Compute開發控制台部署詳情頁簽運行參數配置地區的其他配置中添加下如下配置:
fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx
其中涉及到的參數解釋如下表所示:
配置項 | 說明 |
fs.oss.jindo.buckets | 寫入OSS-HDFS服務中的Bucket名稱,可配置多個,以分號分隔。當Flink寫一個OSS路徑時,如果其對應的bucket包含在fs.oss.jindo.buckets中,則會寫入OSS-HDFS服務中。 |
fs.oss.jindo.accessKeyId | 阿里雲帳號的Access Key。擷取方法請參見查看RAM使用者的AccessKey資訊。 |
fs.oss.jindo.accessKeySecret | 阿里雲帳號的AccessKey Secret。擷取方法請參見查看RAM使用者的AccessKey資訊。 |
此外,還需要配置OSS-HDFS的EndPoint。目前支援兩種方式來配置OSS-HDFS的EndPoint:
在Realtime Compute開發控制台部署詳情頁簽運行參數配置地區的其他配置中添加如下配置項來配置OSS-HDFS的EndPoint
fs.oss.jindo.endpoint: xxx
在OSS的路徑中配置OSS-HDFS的EndPoint
通過如下的路徑來進行配置
oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>
其中user-defined-oss-hdfs-bucket為對應的bucket的名字,oss-hdfs-endpoint為OSS-HDFS的EndPoint ;此時配置項fs.oss.jindo.buckets需要包含<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>。
例如,假設bucket名字為jindo-test,其oss-hdfs的endpoint為
cn-beijing.oss-dls.aliyuncs.com。則OSS路徑需為oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>,配置項fs.oss.jindo.buckets需包含jindo-test.cn-beijing.oss-dls.aliyuncs.com。
使用樣本
源表
CREATE TEMPORARY TABLE fs_table_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) with ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
結果表
寫分區表
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE, ts BIGINT, -- 以毫秒為單位的時間 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定義 watermark ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假設使用者配置的時區為 'Asia/Shanghai' 'sink.partition-commit.policy.kind'='success-file' ); -- 流式 sql,插入檔案系統表 INSERT INTO fs_table_sink SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH') FROM datagen_source;
寫非分區表
CREATE TABLE datagen_source ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE fs_table_sink ( user_id STRING, order_amount DOUBLE ) WITH ( 'connector'='filesystem', 'path'='oss://<bucket>/path', 'format'='parquet' ); INSERT INTO fs_table_sink SELECT * FROM datagen_source;
DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。
使用 DataStream API寫OSS和OSS-HDFS的程式碼範例如下:
String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
out.println(element.toString());
})
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
outputStream.addSink(sink);
如果您需要寫OSS-HDFS,還需要在Realtime Compute開發控制台部署詳情頁簽運行參數配置地區的其他配置中配置與OSS-HDFS相關的參數,具體請參見寫OSS-HDFS。
相關文檔
Flink支援的連接器,請參見支援的連接器。
Table StoreTablestore(OTS)連接器使用方法,請參見Table StoreTablestore(OTS)。
流式資料湖倉Paimon連接器使用方法,請參見流式資料湖倉Paimon。