This topic describes how to use the Print connector.
Background information
The Print connector is used to write every row to the system output or system error stream. It is designed for debugging. To view the intermediate results or the output of a Flink deployment, you can add a print result table. Then, you can view the result data in TaskManager logs as prompted.
The Print connector can be used to check whether the messages sent to other result tables meet the expectations.
The following table describes the capabilities supported by the Print connector.
Item | Description |
Table type | Result table and data ingestion sink |
Running mode | Batch mode and streaming mode |
Data format | N/A |
Metric | N/A |
API type | SQL API and data ingestion YAML API |
Data update or deletion in a sink table | Supported |
Prerequisites
If you want to view the output of a Print result table, make sure that the log level is set to INFO.
A maximum of 2,000 log entries can be displayed in Taskmanager.out. If you want to check for dirty data or specific data, we recommend that you specify conditions in the WHERE clause to perform the print operation. The print operation allows you to effectively check required data, even when the number of data records displayed is limited.
SQL
Syntax
CREATE TABLE print_table (
a INT,
b varchar
) WITH (
'connector'='print',
'logger'='true'
);
You can also create a Print table by using the LIKE
clause based on an existing table schema. Sample statement:
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)
Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The type of the table. | String | Yes | No default value | Set the value to print. |
logger | Specifies whether to display the data result in the console. | Boolean | No | false | Valid values:
|
print-identifier | The identifier of the data result. | String | No | No default value | The log information is retrieved by using the identifier of the data result. |
sink.parallelism | The parallelism of the result table. | Int | No | A value that is the same as the upstream parallelism | N/A. |
Data ingestion
YAML drafts for data ingestion can use the values connector to print data to out files or logs.
Syntax
source:
type: xxx
sink:
type: values
name: Values Sink
print.enabled: true
Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Remarks |
type | The connector type of a sink | STRING | Yes | No default value | The value is fixed to values. |
name | The connector name of a sink | STRING | No | No default value | N/A. |
print.enabled | Whether the connector is used as the Print connector | BOOLEAN | Yes | No default value | The value is fixed to true. |
materialized.in.memory | Whether to persist binary log events to the memory. | BOOLEAN | No | false | N/A. |
sink.print.standard-error | Whether to print the format to system error instead of system out. | BOOLEAN | No | false | By default, the format is printed to the system out. |
sink.print.logger | Whether to display the result data on the console. | BOOLEAN | No | false | N/A. |
sink.print.limit | The maximum number of data records to print. | LONG | No | 2000 | N/A. |
error.on.schema.change | Whether an error is reported when a schema change occurs. | BOOLEAN | No | false | N/A. |
Sample code
Sample code for a result table
CREATE TEMPORARY TABLE table_source( name VARCHAR, score BIGINT ) WITH ( ... ); CREATE TEMPORARY TABLE print_sink( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * from table_source;
Data ingestion sink
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true