本文為您介紹JSON格式的使用方法和類型映射。
背景資訊
JSON格式能基於JSON結構讀寫JSON資料。當前,JSON結構是從表結構自動推導而得的。支援JSON格式的連接器有:訊息佇列Kafka、Upsert Kafka、Elasticsearch、Object Storage Service,ApsaraDB for MongoDB和StarRocks等。
使用樣本
利用Kafka以及JSON格式構建表的樣本如下。
CREATE TABLE Orders (
orderId INT,
product STRING,
orderInfo MAP<STRING, STRING>,
orderTime TIMESTAMP(3),
WATERMARK FOR orderTime AS orderTime - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
配置選項
參數 | 是否必選 | 預設值 | 類型 | 說明 |
format | 是 | (none) | String | 聲明使用的格式。使用JSON格式時,參數取值為json。 |
json.fail-on-missing-field | 否 | false | Boolean | 參數取值如下:
|
json.ignore-parse-errors | 否 | false | Boolean | 參數取值如下:
|
json.timestamp-format.standard | 否 | SQL | String | 指定輸入和輸出時間戳記格式。參數取值如下:
|
json.map-null-key.mode | 否 | FAIL | String | 指定處理Map中key值為空白的方法。參數取值如下:
|
json.map-null-key.literal | 否 | null | String | 當json.map-null-key.mode的參數是LITERAL時,指定字串常量替換Map中的空key值。 |
json.encode.decimal-as-plain-number | 否 | false | Boolean | 參數取值如下:
|
json.write-null-properties | 否 | true | Boolean | 是否將空列寫入JSON字串,參數取值如下:
說明 僅Realtime Compute引擎VVR 8.0.6及以上版本支援配置該參數。 |
類型映射
當前,JSON結構將會從表結構之中自動推導得到。在Flink中,JSON格式使用jackson databind API去解析和產生JSON。Flink與JSON的資料類型的映射關係如下。
Flink SQL類型 | JSON類型 |
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
TIME | string with format: time |
TIMESTAMP | string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | string with format: date-time (with UTC time zone) |
INTERVAL | number |
ARRAY | array |
MAP / MULTISET | object |
ROW | object |
其他使用說明
對於寫入Object Storage Service,目前暫不支援寫入JSON格式的檔案,具體原因請參見FLINK-30635。