全部產品
Search
文件中心

Realtime Compute for Apache Flink:配置作業日誌輸出

更新時間:Nov 13, 2024

除直接在Flink控制台作業探查頁面查看作業日誌外,您還可以將作業日誌配置輸出至外部儲存(OSS、SLS或Kafka)後進行查看,並支援配置輸出的記錄層級。本文為您介紹如何配置作業日誌輸出,配置後您可以在對應儲存中查看作業日誌。

注意事項

  • 配置日誌到其他儲存後,如果未關閉日誌歸檔功能,購買工作空間時配置的OSS仍會持續儲存日誌。關閉後,Flink控制台頁面也將無法查看作業日誌。

    image

  • 配置日誌輸出至OSS、SLS或Kafka後,需要重啟作業。

  • 您可以在日誌配置中使用${secret_values.xxxx},引用已經在密鑰託管中設定的變數,詳情請參見變數管理

配置單個作業日誌輸出

  1. 進入配置單個作業日誌輸出入口。

    1. 登入Realtime Compute管理主控台

    2. 單擊目標工作空間操作列下的控制台

    3. 在左側導覽列上,單擊營運中心 > 作業營運,單擊目標作業名稱。

    4. 部署詳情頁簽,單擊日誌配置地區右側的編輯

    5. 日誌模板選擇為自訂模板

  2. 配置日誌輸出資訊。

    按照儲存物件,將對應配置資訊複製粘貼到輸入框中,並修改指定參數取值為您目標儲存資訊。如果您還需要配置不同層級的日誌分別輸出至不同儲存,可以參見配置不同層級日誌分別輸出為Appender配置不同的記錄層級過濾規則。

    配置到OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    參數

    說明

    YOUR-BUCKET-NAME

    替換成您OSS Bucket名稱。

    YOUR-ENDPOINT

    替換成您OSS的Endpoint,詳情請參見OSS地區和訪問網域名稱

    Endpoint為ECS的VPC網路訪問(內網) 所在行的Endpoint(地區節點)資訊。

    YOUR-OSS-ACCESSKEYID

    替換成您配置OSS服務帳號的AccessKey ID和AccessKey Secret,擷取方法請參見擷取AccessKey

    為了避免明文AccessKey帶來的安全風險,本樣本通過密鑰管理的方式填寫AccessKey取值,詳情請參見變數管理

    說明

    僅當您配置輸出到與Flink帳號不同帳號下的OSS時,該參數必填。相同帳號時無需填寫,請刪除該參數。

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    日誌同步到儲存中的時間間隔,即間隔多久將日誌資料寫入一次,單位是秒。

    flushIntervalEventCount

    日誌同步到儲存中的條數間隔,即擷取多少條日誌資料後寫入一次。

    說明

    與flushIntervalSeconds同時配置時,哪個先達到設定值就寫入一次。

    rollingBytes

    OSS中單個日誌的大小。達到最大值後,後續寫入的資料會寫入一個新的記錄檔。

    配置到SLS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    說明

    代碼中的namespace、deploymentId、jobId、deploymentName為Twig變數,您無需修改,修改後會導致作業啟動時報錯。

    參數

    說明

    YOUR-SLS-PROJECT

    替換成您SLS的Project名稱。

    YOUR-SLS-LOGSTORE

    替換成您SLS的Logstore名稱。

    YOUR-SLS-ENDPOINT

    替換成您SLS所在地區的私網Endpoint,詳情請參見服務入口

    YOUR-SLS-ACCESSKEYID

    替換成您配置SLS服務帳號的AccessKey ID和AccessKey Secret,擷取方法請參見擷取AccessKey

    為了避免明文AccessKey帶來的安全風險,本樣本通過密鑰管理的方式填寫AccessKey取值,詳情請參見變數管理

    說明

    如果您配置的SLS和Flink服務不在同一帳號時,需要給SLS服務帳號配置Flink帳號可以寫入SLS的許可權,具體操作詳情請參見建立自訂權限原則,具體的策略內容資訊如下:

    • 不限制SLS範圍

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • 指定SLS資源範圍,樣本如下。

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    日誌同步到儲存中的時間間隔,即間隔多久將日誌資料寫入一次,單位是秒。

    flushIntervalEventCount

    日誌同步到儲存中的條數間隔,即擷取多少條日誌資料後寫入一次。

    說明

    與flushIntervalSeconds同時配置時,哪個先達到設定值就寫入一次。

    配置到Kafka

    說明

    暫不支援開啟Kerberos認證的Kafka叢集。

    • 前提條件

      Realtime Compute提供的KafkaAppender日誌外掛程式是通過Flink的外掛程式類載入器載入,使用前需要顯示指定KafkaAppender日誌外掛程式所在的包路徑,使得Flink應用可以成功載入到該外掛程式。具體的操作方式如下:

      • 配置工作範本(對該專案空間下所有作業生效)

        在Flink開發控制台組態管理頁面的其他配置中,添加如下代碼。

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • 配置單個作業(僅對當前作業生效)

        作業營運頁面,單擊目標作業名稱,在部署詳情頁簽的運行參數配置地區的其他配置中,添加如下代碼。

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • 日誌配置

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      參數

      說明

      YOUR-TOPIC-NAME

      寫入的Kafka Topic名稱。

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      寫入的Kafka Broker地址。

      YOUR-ACKS-VALUE

      指定了必須有多少個分區副本收到訊息,Producer才會認為訊息寫入是成功的。具體取值請參見acks

      YOUR-BUFFER-MEMORY-SIZE

      Producer緩衝區大小。單位Byte。

      YOUR-RETRIES-NUMBER

      發送失敗的重試次數。

      YOUR-COMPRESSION-TYPE

      Producer產生資料時可使用的壓縮類型,包括none、gzip、snappy、lz4或zstd。

      說明

      您還可以設定所有Apache Kafka client支援的配置參數,詳情請參見Apache Kafka

  3. 單擊儲存

  4. 單擊頁面頂部的啟動

設定項目空間下所有作業日誌輸出

您可以通過配置模板的方式,設定項目空間下所有作業日誌預設輸出到OSS、SLS或Kafka。

重要

配置後,後續該專案空間下建立的所有作業的日誌都會被儲存到OSS、SLS或Kafka。

  1. 進入到作業日誌模板配置入口。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台,在Realtime Compute開發控制台頁面頂部選擇目標專案空間

    3. 在左側導覽列,單擊營運中心 > 組態管理

    4. 作業預設配置頁簽,選擇作業類型。

    5. 日誌配置地區,日誌模板選擇為自訂模板

    6. 參考配置單個作業日誌輸出,設定項目空間下所有作業的日誌輸出資訊。

  2. 單擊儲存更改

配置不同層級日誌分別輸出

您可以通過使用log4j2的ThresholdFilter來為不同的Appender配置不同的記錄層級過濾規則。配置的優勢有:

  • 靈活性:可以根據需要為不同的(外部)儲存設定不同的記錄層級。

  • 效率:減少不必要的Tlog和傳輸,提高系統效能。

  • 清晰性:通過分開配置,使得日誌的流向更清晰,層級管理更方便。

配置方法如下:

  1. 日誌配置地區,日誌模板選擇為自訂模板

  2. 配置日誌輸出資訊。

    本文以實現Realtime Compute開發控制台(Console)輸出INFO層級以上日誌,而Log Service(SLS)僅輸出ERROR層級以上日誌為例,配置樣本如下。

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN">
      <Appenders>
        <!-- Console Appender configured to output only INFO level logs -->
        <Appender name="StdOut" type="Console">
          <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
        </Appender>
        
        <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) -->
        <Appender name="RollingFile" type="RollingFile">
          <!-- Configuration remains unchanged -->
          <!-- ... -->
        </Appender>
        
        <!-- SLS Appender configured to output only ERROR level and above logs -->
        <Appender name="SLS" type="SLS">
          <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
          <!-- SLS specific properties -->
          <Property name="namespace">YOUR_NAMESPACE</Property>
          <Property name="project">YOUR_SLS_PROJECT</Property>
          <Property name="logStore">YOUR_SLS_LOGSTORE</Property>
          <Property name="endpoint">YOUR_SLS_ENDPOINT</Property>
          <!-- Access credentials and other properties -->
          <!-- ... -->
        </Appender>
    
        <!-- Other Appenders definitions remain unchanged -->
        <!-- ... -->
      </Appenders>
      
      <Loggers>
        <!-- Directly configure loggers for StdOut and SLS with specific levels -->
        <Logger name="StdOutLogger" level="INFO" additivity="false">
          <AppenderRef ref="StdOut"/>
        </Logger>
        
        <Logger name="SLSLogger" level="ERROR" additivity="false">
          <AppenderRef ref="SLS"/>
        </Logger>
    
        <!-- Other Loggers definitions with their specific configurations -->
        <!-- ... -->
    
        <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging -->
        <Root level="INFO">
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>
          <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers -->
        </Root>
      </Loggers>
    </Configuration>

    在這個配置中:

    • Console Appender:使用了一個ThresholdFilter來確保INFO層級及以上的日誌會被輸出到Realtime Compute開發控制台。

    • SLS Appender:使用了一個ThresholdFilter來確保只有ERROR層級及以上的日誌會被發送到SLS。SLS Appender中的具體屬性請參見配置到SLSYOUR_NAMESPACEYOUR_SLS_PROJECT等需要替換為實際的SLS專案資訊。

      說明

      如果SLS Appender類型不是SLS,而是一個自訂的Appender,請確保使用正確的類型,並且Appender類已經具備了串連到SLS所需的邏輯。

    • StdOutLogger和SLSLogger:它們分別只向StdOut Appender和SLS Appender發送日誌,並且各自具有不同的記錄層級限制。

    • Root Logger:配置了StdOut Appender和RollingFile Appender,但不包括SLS Appender。這是為了避免在已經有特定Logger配置的情況下,重複將日誌發送到SLS。

    更多操作以及log4j配置參數,詳情請參見Apache Log4j

相關文檔