全部產品
Search
文件中心

Realtime Compute for Apache Flink:Print

更新時間:Sep 27, 2024

本文為您介紹如何使用Print連接器。

背景資訊

Print是用於調試的連接器,允許接收並列印一定數量的輸入記錄。如果您想觀察Flink作業的中間結果,或者觀察最終輸出結果,可以添加Print結果表,單擊運行,在TaskManager的日誌中觀察列印出的結果資訊。

Print可用於輔助確認輸出到其他結果表中的訊息是否符合預期。

Print連接器支援的資訊如下。

類別

詳情

支援類型

結果表,資料攝入目標端

運行模式

批模式和流模式

資料格式

暫不適用

特有監控指標

暫無

API種類

SQL,資料攝入YAML作業

是否支援更新或刪除結果表資料

前提條件

  • 因為Print結果表資料輸出為Info日誌,所以如果您需要查看Print結果表的結果資料,則需要將記錄層級調至Info,否則無法查到結果資料。

  • Taskmanager.out日誌展示資料限制為2000條。如果您有排查髒資料或特定資料等需求,建議在Where條件中指定業務情境相關條件後,進行Print操作,以避免因為資料條數限制導致無法排查。

SQL

文法結構

CREATE TABLE print_table (
  a INT,
  b varchar
) WITH (
  'connector'='print',
  'logger'='true'
);

您也可以基於現有的表模式使用LIKE子句來建立,程式碼範例如下。

CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

固定值為print。

logger

控制台是否顯示資料結果。

Boolean

false

取值如下:

  • false(預設值):不顯示。

  • true:顯示。

print-identifier

資料結果標識。

String

在日誌中通過資料結果標識檢索資訊。

sink.parallelism

結果表並行度。

Int

上遊並行度

無。

資料攝入

資料攝入YAML作業可以使用values連接器列印資料到out檔案或日誌中。

文法結構

source:
  type: xxx

sink:
  type: values
  name: Values Sink
  print.enabled: true

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

type

目標端類型。

STRING

固定值為values。

name

目標端名稱。

STRING

無。

print.enabled

是否作為print使用。

BOOLEAN

固定值為true。

materialized.in.memory

是否將Binlog事件持久化到記憶體。

BOOLEAN

false

無。

sink.print.standard-error

是否替換為輸出到標準錯誤流(System.error)

BOOLEAN

false

預設輸出到標準輸出(System.out)。

sink.print.logger

是否在日誌列印。

BOOLEAN

false

無。

sink.print.limit

最多列印的條數。

LONG

2000

無。

error.on.schema.change

Schema變更發生時是否報錯。

BOOLEAN

false

無。

使用樣本

  • 結果表

    CREATE TEMPORARY TABLE table_source(
      name VARCHAR,
      score BIGINT
    ) WITH (
      ...
    );
    
    CREATE TEMPORARY TABLE print_sink(
      name VARCHAR,
      score BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO print_sink SELECT * from table_source;
  • 資料攝入目標端

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true