本文为您介绍如何使用Print连接器。
背景信息
Print是用于调试的连接器,允许接收并打印一定数量的输入记录。如果您想观察Flink作业的中间结果,或者观察最终输出结果,可以添加Print结果表,单击运行,在TaskManager的日志中观察打印出的结果信息。
Print可用于辅助确认输出到其他结果表中的消息是否符合预期。
Print连接器支持的信息如下。
类别 | 详情 |
支持类型 | 结果表,数据摄入目标端 |
运行模式 | 批模式和流模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL,数据摄入YAML作业 |
是否支持更新或删除结果表数据 | 是 |
前提条件
因为Print结果表数据输出为Info日志,所以如果您需要查看Print结果表的结果数据,则需要将日志级别调至Info,否则无法查到结果数据。
Taskmanager.out日志展示数据限制为2000条。如果您有排查脏数据或特定数据等需求,建议在Where条件中指定业务场景相关条件后,进行Print操作,以避免因为数据条数限制导致无法排查。
SQL
语法结构
CREATE TABLE print_table (
a INT,
b varchar
) WITH (
'connector'='print',
'logger'='true'
);
您也可以基于现有的表模式使用LIKE
子句来创建,代码示例如下。
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | String | 是 | 无 | 固定值为print。 |
logger | 控制台是否显示数据结果。 | Boolean | 否 | false | 取值如下:
|
print-identifier | 数据结果标识。 | String | 否 | 无 | 在日志中通过数据结果标识检索信息。 |
sink.parallelism | 结果表并行度。 | Int | 否 | 上游并行度 | 无。 |
数据摄入
数据摄入YAML作业可以使用values连接器打印数据到out文件或日志中。
语法结构
source:
type: xxx
sink:
type: values
name: Values Sink
print.enabled: true
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
type | 目标端类型。 | STRING | 是 | 无 | 固定值为values。 |
name | 目标端名称。 | STRING | 否 | 无 | 无。 |
print.enabled | 是否作为print使用。 | BOOLEAN | 是 | 无 | 固定值为true。 |
materialized.in.memory | 是否将Binlog事件持久化到内存。 | BOOLEAN | 否 | false | 无。 |
sink.print.standard-error | 是否替换为输出到标准错误流(System.error) | BOOLEAN | 否 | false | 默认输出到标准输出(System.out)。 |
sink.print.logger | 是否在日志打印。 | BOOLEAN | 否 | false | 无。 |
sink.print.limit | 最多打印的条数。 | LONG | 否 | 2000 | 无。 |
error.on.schema.change | Schema变更发生时是否报错。 | BOOLEAN | 否 | false | 无。 |
使用示例
结果表
CREATE TEMPORARY TABLE table_source( name VARCHAR, score BIGINT ) WITH ( ... ); CREATE TEMPORARY TABLE print_sink( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * from table_source;
数据摄入目标端
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true