本文为您介绍JSON格式的使用方法和类型映射。
背景信息
JSON格式能基于JSON结构读写JSON数据。当前,JSON结构是从表结构自动推导而得的。支持JSON格式的连接器有:消息队列Kafka、Upsert Kafka、Elasticsearch、对象存储OSS,云数据库MongoDB和StarRocks等。
使用示例
利用Kafka以及JSON格式构建表的示例如下。
CREATE TABLE Orders (
orderId INT,
product STRING,
orderInfo MAP<STRING, STRING>,
orderTime TIMESTAMP(3),
WATERMARK FOR orderTime AS orderTime - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
配置选项
参数 | 是否必选 | 默认值 | 类型 | 说明 |
format | 是 | (none) | String | 声明使用的格式。使用JSON格式时,参数取值为json。 |
json.fail-on-missing-field | 否 | false | Boolean | 参数取值如下:
|
json.ignore-parse-errors | 否 | false | Boolean | 参数取值如下:
|
json.timestamp-format.standard | 否 | SQL | String | 指定输入和输出时间戳格式。参数取值如下:
|
json.map-null-key.mode | 否 | FAIL | String | 指定处理Map中key值为空的方法。参数取值如下:
|
json.map-null-key.literal | 否 | null | String | 当json.map-null-key.mode的参数是LITERAL时,指定字符串常量替换Map中的空key值。 |
json.encode.decimal-as-plain-number | 否 | false | Boolean | 参数取值如下:
|
json.write-null-properties | 否 | true | Boolean | 是否将空列写入JSON字符串,参数取值如下:
说明 仅实时计算引擎VVR 8.0.6及以上版本支持配置该参数。 |
类型映射
当前,JSON结构将会从表结构之中自动推导得到。在Flink中,JSON格式使用jackson databind API去解析和生成JSON。Flink与JSON的数据类型的映射关系如下。
Flink SQL类型 | JSON类型 |
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
TIME | string with format: time |
TIMESTAMP | string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | string with format: date-time (with UTC time zone) |
INTERVAL | number |
ARRAY | array |
MAP / MULTISET | object |
ROW | object |
其他使用说明
对于写入对象存储OSS,目前暂不支持写入JSON格式的文件,具体原因请参见FLINK-30635。