全部产品
Search
文档中心

实时计算Flink版:配置作业日志输出

更新时间:Nov 12, 2024

除直接在Flink控制台作业探查页面查看作业日志外,您还可以将作业日志配置输出至外部存储(OSS、SLS或Kafka)后进行查看,并支持配置输出的日志级别。本文为您介绍如何配置作业日志输出,配置后您可以在对应存储中查看作业日志。

注意事项

  • 配置日志到其他存储后,如果未关闭日志归档功能,购买工作空间时配置的OSS仍会持续保存日志。关闭后,Flink控制台页面也将无法查看作业日志。

    image

  • 配置日志输出至OSS、SLS或Kafka后,需要重启作业。

  • 您可以在日志配置中使用${secret_values.xxxx},引用已经在密钥托管中设置的变量,详情请参见变量管理

配置单个作业日志输出

  1. 进入配置单个作业日志输出入口。

    1. 登录实时计算管理控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏上,单击运维中心 > 作业运维,单击目标作业名称。

    4. 部署详情页签,单击日志配置区域右侧的编辑

    5. 日志模板选择为自定义模板

  2. 配置日志输出信息。

    按照存储对象,将对应配置信息复制粘贴到输入框中,并修改指定参数取值为您目标存储信息。如果您还需要配置不同级别的日志分别输出至不同存储,可以参见配置不同级别日志分别输出为Appender配置不同的日志级别过滤规则。

    配置到OSS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender> 
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="20 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="4"/> 
        </Appender>  
        <Appender name="OSS" type="OSS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
          <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property>
          <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property>
          <Property name="flushIntervalSeconds">10</Property>  
          <Property name="flushIntervalEventCount">100</Property>  
          <Property name="rollingBytes">10485760</Property>  
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="OSS"/> 
        </Root>
      </Loggers> 
    </Configuration>

    参数

    说明

    YOUR-BUCKET-NAME

    替换成您OSS Bucket名称。

    YOUR-ENDPOINT

    替换成您OSS的Endpoint,详情请参见OSS地域和访问域名

    Endpoint为ECS的VPC网络访问(内网) 所在行的Endpoint(地域节点)信息。

    YOUR-OSS-ACCESSKEYID

    替换成您配置OSS服务账号的AccessKey ID和AccessKey Secret,获取方法请参见获取AccessKey

    为了避免明文AccessKey带来的安全风险,本示例通过密钥管理的方式填写AccessKey取值,详情请参见变量管理

    说明

    仅当您配置输出到与Flink账号不同账号下的OSS时,该参数必填。相同账号时无需填写,请删除该参数。

    YOUR-OSS-ACCESSKEYSECRET

    flushIntervalSeconds

    日志同步到存储中的时间间隔,即间隔多久将日志数据写入一次,单位是秒。

    flushIntervalEventCount

    日志同步到存储中的条数间隔,即获取多少条日志数据后写入一次。

    说明

    与flushIntervalSeconds同时配置时,哪个先达到设置值就写入一次。

    rollingBytes

    OSS中单个日志的大小。达到最大值后,后续写入的数据会写入一个新的日志文件。

    配置到SLS

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
    strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
      <Appenders> 
        <Appender name="StdOut" type="Console"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> 
        </Appender>  
        <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
          <Policies> 
            <SizeBasedTriggeringPolicy size="5 MB"/> 
          </Policies>  
          <DefaultRolloverStrategy max="1"/> 
        </Appender>  
        <Appender name="SLS" type="SLS">
          <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/>  
    
          <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ -->
          <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
          <Property name="project">YOUR-SLS-PROJECT</Property>  
          <Property name="logStore">YOUR-SLS-LOGSTORE</Property> 
          <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> 
          <Property name="accessKeyId">${secret_values.accessKeyId}</Property> 
          <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> 
          <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property>
          <Property name="deploymentName">{{ deploymentName }}</Property>
          <Property name="flushIntervalSeconds">10</Property>
          <Property name="flushIntervalEventCount">100</Property>
        </Appender>
       <Appender name="StdOutErrConsoleAppender" type="Console">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
       </Appender>
       <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
       <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i">
         <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/>
         <Policies>
         <SizeBasedTriggeringPolicy size="1 GB"/>
         </Policies>
         <DefaultRolloverStrategy max="2"/>
       </Appender>
      </Appenders>  
      <Loggers> 
        <Logger level="INFO" name="org.apache.hadoop"/>  
        <Logger level="INFO" name="org.apache.kafka"/>  
        <Logger level="INFO" name="org.apache.zookeeper"/>  
        <Logger level="INFO" name="akka"/>  
        <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
        <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
        <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
      <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false">
        <AppenderRef ref="StdOutFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
      <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false">
        <AppenderRef ref="StdErrFileAppender"/>
        <AppenderRef ref="StdOutErrConsoleAppender"/>
      </Logger>
        {%- for name, level in userConfiguredLoggers -%} 
          <Logger level="{{ level }}" name="{{ name }}"/> 
        {%- endfor -%}
        <Root level="{{ rootLoggerLogLevel }}"> 
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>  
          <AppenderRef ref="SLS"/> 
        </Root>
      </Loggers> 
    </Configuration>
    说明

    代码中的namespace、deploymentId、jobId、deploymentName为Twig变量,您无需修改,修改后会导致作业启动时报错。

    参数

    说明

    YOUR-SLS-PROJECT

    替换成您SLS的Project名称。

    YOUR-SLS-LOGSTORE

    替换成您SLS的Logstore名称。

    YOUR-SLS-ENDPOINT

    替换成您SLS所在地域的私网Endpoint,详情请参见服务入口

    YOUR-SLS-ACCESSKEYID

    替换成您配置SLS服务账号的AccessKey ID和AccessKey Secret,获取方法请参见获取AccessKey

    为了避免明文AccessKey带来的安全风险,本示例通过密钥管理的方式填写AccessKey取值,详情请参见变量管理

    说明

    如果您配置的SLS和Flink服务不在同一账号时,需要给SLS服务账号配置Flink账号可以写入SLS的权限,具体操作详情请参见创建自定义权限策略,具体的策略内容信息如下:

    • 不限制SLS范围

      {
      
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:Get*",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "*"
              }
          ]
      }
    • 指定SLS资源范围,示例如下。

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "log:PostLogStoreLogs",
                      "log:GetLogStore"
                  ],
                  "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest"
              }
          ]
      }

    YOUR-SLS-ACCESSKEYSECRET

    flushIntervalSeconds

    日志同步到存储中的时间间隔,即间隔多久将日志数据写入一次,单位是秒。

    flushIntervalEventCount

    日志同步到存储中的条数间隔,即获取多少条日志数据后写入一次。

    说明

    与flushIntervalSeconds同时配置时,哪个先达到设置值就写入一次。

    配置到Kafka

    说明

    暂不支持开启Kerberos认证的Kafka集群。

    • 前提条件

      实时计算提供的KafkaAppender日志插件是通过Flink的插件类加载器加载,使用前需要显示指定KafkaAppender日志插件所在的包路径,使得Flink应用可以成功加载到该插件。具体的操作方式如下:

      • 配置作业模板(对该项目空间下所有作业生效)

        在Flink开发控制台配置管理页面的其他配置中,添加如下代码。

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
      • 配置单个作业(仅对当前作业生效)

        作业运维页面,单击目标作业名称,在部署详情页签的运行参数配置区域的其他配置中,添加如下代码。

        plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
    • 日志配置

      <?xml version="1.0" encoding="UTF-8"?>
      <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" 
      strict="true" packages="com.ververica.platform.logging.appender" status="WARN">  
        <Appenders> 
          <Appender name="StdOut" type="Console"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> 
          </Appender>  
          <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> 
            <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>  
            <Policies> 
              <SizeBasedTriggeringPolicy size="20 MB"/> 
            </Policies>  
            <DefaultRolloverStrategy max="4"/> 
          </Appender>  
          <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME">
              <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/>
              <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property>
               <Property name="acks">YOUR-ACKS-VALUE</Property>
               <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property>
                <Property name="retries">YOUR-RETRIES-NUMBER</Property>
               <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property>
          </Appender>
          <Appender type="Async" name="AsyncAppender">
              <AppenderRef ref="KafkaVVPAppender"/>
          </Appender>
        </Appenders>
        <Loggers> 
          <Logger level="INFO" name="org.apache.hadoop"/>  
          <Logger level="INFO" name="org.apache.kafka"/>  
          <Logger level="INFO" name="org.apache.zookeeper"/>  
          <Logger level="INFO" name="akka"/>  
          <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/>  
          <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> 
          {%- for name, level in userConfiguredLoggers -%} 
            <Logger level="{{ level }}" name="{{ name }}"/> 
          {%- endfor -%}
          <Root level="{{ rootLoggerLogLevel }}"> 
            <AppenderRef ref="StdOut"/>
            <AppenderRef ref="RollingFile"/>  
            <AppenderRef ref="AsyncAppender"/> 
          </Root>
        </Loggers>
      </Configuration>

      参数

      说明

      YOUR-TOPIC-NAME

      写入的Kafka Topic名称。

      YOUR-KAFKA-BOOTSTRAP-SERVERS

      写入的Kafka Broker地址。

      YOUR-ACKS-VALUE

      指定了必须有多少个分区副本收到消息,Producer才会认为消息写入是成功的。具体取值请参见acks

      YOUR-BUFFER-MEMORY-SIZE

      Producer缓冲区大小。单位Byte。

      YOUR-RETRIES-NUMBER

      发送失败的重试次数。

      YOUR-COMPRESSION-TYPE

      Producer生成数据时可使用的压缩类型,包括none、gzip、snappy、lz4或zstd。

      说明

      您还可以设置所有Apache Kafka client支持的配置参数,详情请参见Apache Kafka

  3. 单击保存

  4. 单击页面顶部的启动

配置项目空间下所有作业日志输出

您可以通过配置模板的方式,配置项目空间下所有作业日志默认输出到OSS、SLS或Kafka。

重要

配置后,后续该项目空间下创建的所有作业的日志都会被存储到OSS、SLS或Kafka。

  1. 进入到作业日志模板配置入口。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台,在实时计算开发控制台页面顶部选择目标项目空间

    3. 在左侧导航栏,单击运维中心 > 配置管理

    4. 作业默认配置页签,选择作业类型。

    5. 日志配置区域,日志模板选择为自定义模板

    6. 参考配置单个作业日志输出,配置项目空间下所有作业的日志输出信息。

  2. 单击保存更改

配置不同级别日志分别输出

您可以通过使用log4j2的ThresholdFilter来为不同的Appender配置不同的日志级别过滤规则。配置的优势有:

  • 灵活性:可以根据需要为不同的(外部)存储设置不同的日志级别。

  • 效率:减少不必要的日志处理和传输,提高系统性能。

  • 清晰性:通过分开配置,使得日志的流向更清晰,级别管理更方便。

配置方法如下:

  1. 日志配置区域,日志模板选择为自定义模板

  2. 配置日志输出信息。

    本文以实现实时计算开发控制台(Console)输出INFO级别以上日志,而日志服务(SLS)仅输出ERROR级别以上日志为例,配置示例如下。

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN">
      <Appenders>
        <!-- Console Appender configured to output only INFO level logs -->
        <Appender name="StdOut" type="Console">
          <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
        </Appender>
        
        <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) -->
        <Appender name="RollingFile" type="RollingFile">
          <!-- Configuration remains unchanged -->
          <!-- ... -->
        </Appender>
        
        <!-- SLS Appender configured to output only ERROR level and above logs -->
        <Appender name="SLS" type="SLS">
          <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
          <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/>
          <!-- SLS specific properties -->
          <Property name="namespace">YOUR_NAMESPACE</Property>
          <Property name="project">YOUR_SLS_PROJECT</Property>
          <Property name="logStore">YOUR_SLS_LOGSTORE</Property>
          <Property name="endpoint">YOUR_SLS_ENDPOINT</Property>
          <!-- Access credentials and other properties -->
          <!-- ... -->
        </Appender>
    
        <!-- Other Appenders definitions remain unchanged -->
        <!-- ... -->
      </Appenders>
      
      <Loggers>
        <!-- Directly configure loggers for StdOut and SLS with specific levels -->
        <Logger name="StdOutLogger" level="INFO" additivity="false">
          <AppenderRef ref="StdOut"/>
        </Logger>
        
        <Logger name="SLSLogger" level="ERROR" additivity="false">
          <AppenderRef ref="SLS"/>
        </Logger>
    
        <!-- Other Loggers definitions with their specific configurations -->
        <!-- ... -->
    
        <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging -->
        <Root level="INFO">
          <AppenderRef ref="StdOut"/>
          <AppenderRef ref="RollingFile"/>
          <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers -->
        </Root>
      </Loggers>
    </Configuration>

    在这个配置中:

    • Console Appender:使用了一个ThresholdFilter来确保INFO级别及以上的日志会被输出到实时计算开发控制台。

    • SLS Appender:使用了一个ThresholdFilter来确保只有ERROR级别及以上的日志会被发送到SLS。SLS Appender中的具体属性请参见配置到SLSYOUR_NAMESPACEYOUR_SLS_PROJECT等需要替换为实际的SLS项目信息。

      说明

      如果SLS Appender类型不是SLS,而是一个自定义的Appender,请确保使用正确的类型,并且Appender类已经具备了连接到SLS所需的逻辑。

    • StdOutLogger和SLSLogger:它们分别只向StdOut Appender和SLS Appender发送日志,并且各自具有不同的日志级别限制。

    • Root Logger:配置了StdOut Appender和RollingFile Appender,但不包括SLS Appender。这是为了避免在已经有特定Logger配置的情况下,重复将日志发送到SLS。

    更多操作以及log4j配置参数,详情请参见Apache Log4j

相关文档