After you create a Kafka JSON catalog, you can access JSON-formatted topics of a Kafka cluster in the development console of Realtime Compute for Apache Flink without the need to define a schema. This topic describes how to create, view, use, and drop a Kafka JSON catalog in the development console of Realtime Compute for Apache Flink.
Background information
A Kafka JSON catalog automatically parses JSON-formatted messages to infer the schema of a topic. Therefore, you can use a JSON catalog to obtain specific fields of the messages without the need to declare the schema of a Kafka table in Flink SQL. When you use a Kafka JSON catalog, take note of the following points:
The name of a table of a Kafka JSON catalog matches the name of a topic of the Kafka cluster. This way, you do not need to execute DDL statements to register the Kafka table to access the topic of the Kafka cluster. This improves the efficiency and accuracy of data development.
Tables of Kafka JSON catalogs can be used as source tables in Flink SQL deployments.
You can use Kafka JSON catalogs together with the CREATE TABLE AS statement to synchronize schema changes.
This topic describes the operations that you can perform to manage Kafka JSON catalogs:
Limits
Kafka JSON catalogs support only JSON-formatted topics.
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.2 or later supports Kafka JSON catalogs.
NoteIf your deployments use VVR 4.X, we recommend that you upgrade the VVR version of your deployments to VVR 6.0.2 or later before you use Kafka JSON catalogs.
You cannot modify the existing Kafka JSON catalogs by executing DDL statements.
You can only query data tables by using Kafka JSON catalogs. You are not allowed to create, modify, or delete databases and tables by using Kafka JSON catalogs.
NoteIf you use Kafka JSON catalogs together with CREATE DATABASE AS statement or the CREATE TABLE AS statement, topics can be automatically created.
You cannot use Kafka JSON catalogs to read data from or write data to Kafka clusters for which SSL-based authentication or Simple Authentication and Security Layer (SASL) authentication is enabled.
Tables of Kafka JSON catalogs can be used as source tables in Flink SQL deployments but cannot be used as result tables or lookup tables that are used as dimension tables.
ApsaraMQ for Kafka does not allow you to call the API operation that is the same as the API operation used by Apache Kafka to delete groups. When you create a Kafka JSON catalog, you must configure the aliyun.kafka.instanceId, aliyun.kafka.accessKeyId, aliyun.kafka.accessKeySecret, aliyun.kafka.endpoint, and aliyun.kafka.regionId parameters to automatically delete group IDs. For more information, see Comparison between ApsaraMQ for Kafka and open source Apache Kafka.
Create a Kafka JSON catalog
In the code editor of the Scripts tab on the Scripts page, enter the following statement to create a Kafka JSON catalog:
Statement used to create a Kafka JSON catalog for a self-managed Kafka cluster or an E-MapReduce (EMR) Kafka cluster
CREATE CATALOG <YourCatalogName> WITH( 'type'='kafka', 'properties.bootstrap.servers'='<brokers>', 'format'='json', 'default-database'='<dbName>', 'key.fields-prefix'='<keyPrefix>', 'value.fields-prefix'='<valuePrefix>', 'timestamp-format.standard'='<timestampFormat>', 'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>', 'infer-schema.primitive-as-string'='<primitiveAsString>', 'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>', 'infer-schema.compacted-topic-as-upsert-table'='true', 'max.fetch.records'='100' );
Statement used to create a Kafka JSON catalog for an ApsaraMQ for Kafka instance
CREATE CATALOG <YourCatalogName> WITH( 'type'='kafka', 'properties.bootstrap.servers'='<brokers>', 'format'='json', 'default-database'='<dbName>', 'key.fields-prefix'='<keyPrefix>', 'value.fields-prefix'='<valuePrefix>', 'timestamp-format.standard'='<timestampFormat>', 'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>', 'infer-schema.primitive-as-string'='<primitiveAsString>', 'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>', 'infer-schema.compacted-topic-as-upsert-table'='true', 'max.fetch.records'='100', 'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>', 'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>', 'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>', 'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>', 'aliyun.kafka.regionId'='<aliyunKafkaRegionId>' );
Parameter
Data type
Description
Required
Remarks
YourCatalogName
STRING
The name of the Kafka JSON catalog.
Yes
Enter a custom name.
ImportantYou must remove the angle brackets (<>) when you replace the value of the parameter with the name of your catalog. Otherwise, an error is returned during the syntax check.
type
STRING
The type of the catalog.
Yes
Set the value to kafka.
properties.bootstrap.servers
STRING
The IP addresses or endpoints of Kafka brokers.
Yes
Format:
host1:port1,host2:port2,host3:port3
.Separate multiple host:port pairs with commas (,).
format
STRING
The format of Kafka messages.
Yes
Only the JSON format is supported. Realtime Compute for Apache Flink parses JSON-formatted Kafka messages to obtain the schema.
default-database
STRING
The name of the Kafka cluster.
No
Default value: kafka. A catalog defines a table based on catalog_name.db_name.table_name. The default value of db_name is used in catalog_name.db_name.table_name. Kafka does not provide databases. You can use a string to change the value of db_name for the Kafka cluster.
key.fields-prefix
STRING
The prefix that is added to the field that is parsed from the key field in a Kafka message. You can configure this parameter to prevent name conflicts after the key field in the Kafka message is parsed.
No
Default value: key_. If the name of the key field is a, the name of the key that is obtained after the key field in the Kafka message is parsed is key_a.
NoteThe value of the key.fields-prefix parameter cannot be the same as the prefix in the value of the value.fields-prefix parameter. For example, if the value.fields-prefix parameter is set to test1_value_, you cannot set the key.fields-prefix parameter to test1_.
value.fields-prefix
STRING
The prefix that is added to the field that is parsed from a value field in a Kafka message. You can configure this parameter to prevent name conflicts after value fields in the Kafka message are parsed.
No
Default value: value_. If the name of a value field is b, the name of the value that is obtained after the value field in the Kafka message is parsed is value_b.
NoteThe value of the value.fields-prefix parameter cannot be the same as the prefix in the value of the key.fields-prefix parameter. For example, if the key.fields-prefix parameter is set to test2_value_, you cannot set the value.fields-prefix parameter to test2_.
timestamp-format.standard
STRING
The format of a field of the TIMESTAMP type in a JSON-formatted Kafka message. Realtime Compute for Apache Flink parses the field in the format that you configured. If Realtime Compute for Apache Flink fails to parse the field in the format that you configured, Realtime Compute for Apache Flink attempts to parse the field in another format.
No
Valid values:
SQL (default value)
ISO-8601
infer-schema.flatten-nested-columns.enable
BOOLEAN
Specifies whether to recursively expand nested columns in a JSON text when a value field in the JSON-formatted Kafka message is parsed.
No
Valid values:
true: Nested columns are recursively expanded.
Realtime Compute for Apache Flink uses the path that indexes the value of the column that is expanded as the name of the column. For example, the column col in
{"nested": {"col": true}}
is named nested.col after the column is expanded.NoteIf you set this parameter to true, we recommend that you use this parameter together with the CREATE TABLE AS statement. Other DML statements cannot be used to automatically expand nested columns.
false: Nested types are parsed as the STRING type. This is the default value.
infer-schema.primitive-as-string
BOOLEAN
Specifies whether to infer all basic types as the STRING type when value fields in the JSON-formatted Kafka message are parsed.
No
Valid values:
true: All basic types are inferred as the STRING type.
false: Data types are inferred based on the data type mappings. This is the default value.
infer-schema.parse-key-error.field-name
STRING
The data of the key field. When the key field in the JSON-formatted Kafka message is parsed, if the key field is specified but fails to be parsed, a column whose name is the key.fields-prefix prefix and the value of this parameter is added to the schema of the table that matches the topic. This column is of the VARBINARY type and indicates the data of the key field.
No
Default value: col. For example, if the value field in the JSON-formatted Kafka message is parsed as value_name and the key field is specified but fails to be parsed, the returned schema of the table that matches the topic contains two fields: key_col and value_name.
infer-schema.compacted-topic-as-upsert-table
BOOLEAN
Specifies whether to use a table as an Upsert Kafka table when the log cleanup policy of the Kafka topic is compact and the key field is specified.
No
Default value: true. You must set this parameter to true when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.
max.fetch.records
INT
The maximum number of JSON-formatted messages that the system attempts to consume when the messages are parsed.
No
Default value: 100.
aliyun.kafka.accessKeyId
STRING
The AccessKey ID of your Alibaba Cloud account. For more information about how to obtain the AccessKey ID, see Create an AccessKey pair.
No
You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.
aliyun.kafka.accessKeySecret
STRING
The AccessKey secret of your Alibaba Cloud account. For more information about how to obtain the AccessKey secret, see Create an AccessKey pair.
No
You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.
aliyun.kafka.instanceId
STRING
The ID of the ApsaraMQ for Kafka instance. You can view the ID on the Instance Details page of the ApsaraMQ for Kafka console.
No
You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.
aliyun.kafka.endpoint
STRING
The endpoint of ApsaraMQ for Kafka. For more information, see Endpoints.
No
You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.
aliyun.kafka.regionId
STRING
The region ID of the ApsaraMQ for Kafka instance to which the topic belongs. For more information, see Endpoints.
No
You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.
Select the code that is used to create a catalog and click Run that appears on the left side of the code.
In the Catalogs pane on the left side of the Catalogs tab, view the catalog that you create.
View a Kafka JSON catalog
In the code editor of the Scripts tab on the Scripts page, enter the following statement:
DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;
Parameter
Description
${catalog_name}
The name of the Kafka JSON catalog.
${db_name}
The name of the Kafka cluster.
${topic_name}
The name of the Kafka topic.
Select the code that is used to view a catalog and click Run that appears on the left side of the code.
After the statement is executed, you can view the information about the table that matches the topic in the result.
Use a Kafka JSON catalog
If a table of the Kafka JSON catalog is used as a source table, you can read data from the Kafka topic that matches the table.
INSERT INTO ${other_sink_table} SELECT... FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
NoteIf you need to specify other parameters in the WITH clause when you use a Kafka JSON catalog, we recommend that you use SQL hints to add other parameters. In the preceding SQL statement, SQL hints are used to specify that the consumption starts from the earliest data. For more information about other parameters, see Create an ApsaraMQ for Kafka source table and Create an ApsaraMQ for Kafka result table.
If a table of the Kafka JSON catalog is used as a Message Queue for Apache Kafka source table, you can synchronize data from the Kafka topic that matches the table to the destination table by using the CREATE TABLE AS statement.
Synchronize data from a single topic in real time.
CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH(...) AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}` /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
Synchronize data from multiple topics in a deployment.
BEGIN STATEMENT SET; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0` AS TABLE `kafka-catalog`.`kafka`.`topic0` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1` AS TABLE `kafka-catalog`.`kafka`.`topic1` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2` AS TABLE `kafka-catalog`.`kafka`.`topic2` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; END;
You can use the CREATE TABLE AS statement together with Kafka JSON catalogs to synchronize data from multiple Kafka topics in a deployment. To synchronize data from multiple Kafka topics in a deployment, make sure that the following conditions are met:
topic-pattern is not configured for all tables that match the topics.
The values of Kafka parameters in each table are the same. The values of the parameters whose prefix is properties. are the same. The parameters include properties.bootstrap.servers and properties.group.id.
The values of the scan.startup.mode parameter are the same for all the tables. The scan.startup.mode parameter can be set only to group-offsets, latest-offset, or earliest-offset.
The following figure shows an example. In the following figure, the upper two tables meet the preceding conditions and the lower two tables do not meet the conditions.
For more information about how to synchronize data from a source table in a Kafka JSON catalog to a destination table in a destination catalog, see Ingest log data into data warehouses in real time.
Drop a Kafka JSON catalog
After you drop a Kafka JSON catalog, the deployments that are running are not affected. However, the deployments that use a table of the catalog can no longer find the table when the deployments are published or restarted. Proceed with caution when you drop a Kafka JSON catalog.
In the code editor of the Scripts tab on the Scripts page, enter the following statement:
DROP CATALOG ${catalog_name};
catalog_name specifies the name of the Kafka JSON catalog that you want to drop.
Right-click the statement that is used to drop the catalog and choose Run from the shortcut menu.
View the Catalogs pane on the left side of the Catalog List page to check whether the catalog is dropped.
Schema inference description
To allow you to easily use the table that is obtained after you configure a Kafka JSON catalog, the system automatically adds default configuration parameters, metadata columns, and a primary key to the table. This section describes the information about the table that is obtained after you configure a Kafka JSON catalog.
Table schema
When JSON-formatted Kafka messages are parsed to obtain the topic schema, the system attempts to consume messages that are less than or equal to the value of the max.fetch.records parameter. The system parses the schema of each data record and merges the schemas as the topic schema. The system parses the messages based on the data type mappings that are used when you use the CREATE TABLE AS statement to synchronize data of Kafka tables.
ImportantWhen a Kafka JSON catalog is used to infer the topic schema, a consumer group is created to consume the data of the topic. If the name of the consumer group includes a prefix, the consumer group is created by using the catalog.
If you want to obtain data from an ApsaraMQ for Kafka table, we recommend that you use a Kafka JSON catalog of Realtime Compute for Apache Flink that uses VVR 6.0.7 or later. For Realtime Compute for Apache Flink that uses VVR of a version earlier than 6.0.7, consumer groups are not automatically deleted. As a result, you may receive an alert notification about message accumulation in a consumer group.
A topic schema consists of the following parts:
Physical columns
By default, physical columns are parsed based on the key and value fields of a Kafka message. Prefixes are added to the obtained column names.
If the key field is specified but fails to be parsed, a column whose name is the key.fields-prefix prefix and the value of the infer-schema.parse-key-error.field-name parameter is returned. The column type is VARBINARY.
After the Kafka JSON catalog obtains a group of Kafka messages, the Kafka JSON catalog parses the Kafka messages in sequence and merges the physical columns that are obtained after parsing as the schema of the topic based on the following rules: This function merges JSON documents based on the following rules:
If specific physical columns that are obtained after parsing are not in the topic schema, the Kafka JSON catalog automatically adds the columns to the topic schema.
If specific physical columns that are obtained after parsing are named the same as specific columns in the topic schema, perform operations based on your business scenario:
If the columns are of the same data type but different precision, the Kafka JSON catalog merges the columns of the larger precision.
If the columns are of different data types, the Kafka JSON catalog uses the smallest parent node in the tree structure that is shown in the following figure as the type of the columns that have the same name. If columns of the DECIMAL and FLOAT types are merged, the columns are merged into the DOUBLE type to retain the precision.
For example, if a Kafka topic contains three data records, the schema shown in the following figure is returned.
Metadata columns
By default, the metadata columns named partition, offset, and timestamp are added. The following table describes the metadata columns.
Metadata name
Column name
Type
Description
partition
partition
INT NOT NULL
The value in a partition key column.
offset
offset
BIGINT NOT NULL
The offset.
timestamp
timestamp
TIMESTAMP_LTZ(3) NOT NULL
The timestamp of the message.
Rules for the default primary key that is added
If a table that is obtained after you configure a Kafka JSON catalog is consumed as the source table, the metadata columns partition and offset are used as the primary key. This ensures that data is not duplicated.
NoteIf the table schema that is inferred from the Kafka JSON catalog is not as expected, you can use the CREATE TEMPORARY TABLE ... LIKE syntax to declare a temporary table to specify the desired table schema. For example, JSON data contains the ts field in the '2023-01-01 12:00:01' format. The Kafka JSON catalog automatically infers the ts field as the TIMESTAMP data type. If you want the ts field to be used as the STRING data type, you can use the CREATE TEMPORARY TABLE... LIKE syntax to declare the table. In the following sample code, the value_ts field is used because the value_ prefix is added to the value field in the default configuration.
CREATE TEMPORARY TABLE tempTable ( value_name STRING, value_ts STRING ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;
Table parameters added by default
Parameter
Description
Remarks
connector
The type of the connector.
Set the value to kafka or upsert-kafka.
topic
The name of the topic.
Set the value to the name of the table that you declare in the Kafka JSON catalog.
properties.bootstrap.servers
The IP addresses or endpoints of Kafka brokers.
Set the value to the same as the value of the properties.bootstrap.servers parameter of the Kafka JSON catalog.
value.format
The format that the Flink Kafka connector uses to serialize or deserialize the value fields in a Kafka message.
Set the value to json.
value.fields-prefix
A custom prefix for all value fields in Kafka messages. You can configure this parameter to prevent name conflicts with the key fields or metadata fields.
Set the value to the same as the value of the value.fields-prefix parameter of the Kafka JSON catalog.
value.json.infer-schema.flatten-nested-columns.enable
Specifies whether to recursively expand nested columns in a JSON text when the value fields in the JSON-formatted Kafka message are parsed.
Set the value to the same as the value of the infer-schema.flatten-nested-columns.enable parameter of the Kafka JSON catalog.
value.json.infer-schema.primitive-as-string
Specifies whether to infer all basic types as the STRING type when the value fields in the JSON-formatted Kafka message are parsed.
Set the value to the same as the value of the infer-schema.primitive-as-string parameter of the Kafka JSON catalog.
value.fields-include
The policy that is used to process the key field when value fields are parsed.
Set the value to EXCEPT_KEY. If this parameter is set to EXCEPT_KEY, the key field is excluded when value fields are parsed.
If the key field is specified, you must configure this parameter.
key.format
The format that the Flink Kafka connector uses to serialize or deserialize the key field in a Kafka message.
Set the value to json or raw.
If the key field is specified, you must configure this parameter.
If the key field is specified but fails to be parsed, set the value of this parameter to raw. If the key field is specified and is parsed, set the value of this parameter to json.
key.fields-prefix
A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields.
Set the value to the same as the value of the key.fields-prefix parameter of the Kafka JSON catalog.
If the key field is specified, you must configure this parameter.
key.fields
The fields that are parsed from the key field in a Kafka message.
The system automatically enters the key fields in the table.
If the key field is specified and the table is not an Upsert Kafka table, you must configure this parameter.