除直接在Flink控制台作业探查页面查看作业日志外,您还可以将作业日志配置输出至外部存储(OSS、SLS或Kafka)后进行查看,并支持配置输出的日志级别。本文为您介绍如何配置作业日志输出,配置后您可以在对应存储中查看作业日志。
注意事项
配置日志到其他存储后,如果未关闭日志归档功能,购买工作空间时配置的OSS仍会持续保存日志。关闭后,Flink控制台页面也将无法查看作业日志。
配置日志输出至OSS、SLS或Kafka后,需要重启作业。
您可以在日志配置中使用
${secret_values.xxxx}
,引用已经在密钥托管中设置的变量,详情请参见变量管理。
配置单个作业日志输出
进入配置单个作业日志输出入口。
登录实时计算管理控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏上,单击 ,单击目标作业名称。
在部署详情页签,单击日志配置区域右侧的编辑。
日志模板选择为自定义模板。
配置日志输出信息。
按照存储对象,将对应配置信息复制粘贴到输入框中,并修改指定参数取值为您目标存储信息。如果您还需要配置不同级别的日志分别输出至不同存储,可以参见配置不同级别日志分别输出为Appender配置不同的日志级别过滤规则。
配置到OSS
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="20 MB"/> </Policies> <DefaultRolloverStrategy max="4"/> </Appender> <Appender name="OSS" type="OSS"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ --> <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line --> <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property> <Property name="endpoint">https://YOUR-ENDPOINT</Property> <Property name="accessKeyId">${secret_values.accessKeyId}</Property> <Property name="secretAccessKey">${secret_values.accessKeySecret}</Property> <Property name="flushIntervalSeconds">10</Property> <Property name="flushIntervalEventCount">100</Property> <Property name="rollingBytes">10485760</Property> </Appender> <Appender name="StdOutErrConsoleAppender" type="Console"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/> <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false"> <AppenderRef ref="StdOutFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false"> <AppenderRef ref="StdErrFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="OSS"/> </Root> </Loggers> </Configuration>
参数
说明
YOUR-BUCKET-NAME
替换成您OSS Bucket名称。
YOUR-ENDPOINT
替换成您OSS的Endpoint,详情请参见OSS地域和访问域名。
Endpoint为ECS的VPC网络访问(内网) 所在行的Endpoint(地域节点)信息。
YOUR-OSS-ACCESSKEYID
替换成您配置OSS服务账号的AccessKey ID和AccessKey Secret,获取方法请参见获取AccessKey。
为了避免明文AccessKey带来的安全风险,本示例通过密钥管理的方式填写AccessKey取值,详情请参见变量管理。
说明仅当您配置输出到与Flink账号不同账号下的OSS时,该参数必填。相同账号时无需填写,请删除该参数。
YOUR-OSS-ACCESSKEYSECRET
flushIntervalSeconds
日志同步到存储中的时间间隔,即间隔多久将日志数据写入一次,单位是秒。
flushIntervalEventCount
日志同步到存储中的条数间隔,即获取多少条日志数据后写入一次。
说明与flushIntervalSeconds同时配置时,哪个先达到设置值就写入一次。
rollingBytes
OSS中单个日志的大小。达到最大值后,后续写入的数据会写入一个新的日志文件。
配置到SLS
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="5 MB"/> </Policies> <DefaultRolloverStrategy max="1"/> </Appender> <Appender name="SLS" type="SLS"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout" charset="UTF-8"/> <!-- The final effective log path is: ${baseUri}/logs/${namespace}/${deploymentId}/{jobId}/ --> <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line --> <Property name="project">YOUR-SLS-PROJECT</Property> <Property name="logStore">YOUR-SLS-LOGSTORE</Property> <Property name="endpoint">YOUR-SLS-ENDPOINT</Property> <Property name="accessKeyId">${secret_values.accessKeyId}</Property> <Property name="accessKeySecret">${secret_values.accessKeySecret}</Property> <Property name="topic">{{ namespace }}:{{ deploymentId }}:{{ jobId }}</Property> <Property name="deploymentName">{{ deploymentName }}</Property> <Property name="flushIntervalSeconds">10</Property> <Property name="flushIntervalEventCount">100</Property> </Appender> <Appender name="StdOutErrConsoleAppender" type="Console"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> </Appender> <Appender name="StdOutFileAppender" type="RollingFile" fileName="${sys:stdout.file}" filePattern="${sys:stdout.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> <Appender name="StdErrFileAppender" type="RollingFile" fileName="${sys:stderr.file}" filePattern="${sys:stderr.file}.%i"> <Layout pattern="%m" type="PatternLayout" charset="UTF-8"/> <Policies> <SizeBasedTriggeringPolicy size="1 GB"/> </Policies> <DefaultRolloverStrategy max="2"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> <Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/> <Logger level="INFO" name="StdOutErrRedirector.StdOut" additivity="false"> <AppenderRef ref="StdOutFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> <Logger level="INFO" name="StdOutErrRedirector.StdErr" additivity="false"> <AppenderRef ref="StdErrFileAppender"/> <AppenderRef ref="StdOutErrConsoleAppender"/> </Logger> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="SLS"/> </Root> </Loggers> </Configuration>
说明代码中的namespace、deploymentId、jobId、deploymentName为Twig变量,您无需修改,修改后会导致作业启动时报错。
参数
说明
YOUR-SLS-PROJECT
替换成您SLS的Project名称。
YOUR-SLS-LOGSTORE
替换成您SLS的Logstore名称。
YOUR-SLS-ENDPOINT
替换成您SLS所在地域的私网Endpoint,详情请参见服务入口。
YOUR-SLS-ACCESSKEYID
替换成您配置SLS服务账号的AccessKey ID和AccessKey Secret,获取方法请参见获取AccessKey。
为了避免明文AccessKey带来的安全风险,本示例通过密钥管理的方式填写AccessKey取值,详情请参见变量管理。
说明如果您配置的SLS和Flink服务不在同一账号时,需要给SLS服务账号配置Flink账号可以写入SLS的权限,具体操作详情请参见创建自定义权限策略,具体的策略内容信息如下:
不限制SLS范围
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "log:Get*", "log:PostLogStoreLogs" ], "Resource": "*" } ] }
指定SLS资源范围,示例如下。
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "log:PostLogStoreLogs", "log:GetLogStore" ], "Resource": "acs:log:cn-beijing:152940222687****:project/test-vvp-sls/logstore/test-ltest" } ] }
YOUR-SLS-ACCESSKEYSECRET
flushIntervalSeconds
日志同步到存储中的时间间隔,即间隔多久将日志数据写入一次,单位是秒。
flushIntervalEventCount
日志同步到存储中的条数间隔,即获取多少条日志数据后写入一次。
说明与flushIntervalSeconds同时配置时,哪个先达到设置值就写入一次。
配置到Kafka
说明暂不支持开启Kerberos认证的Kafka集群。
前提条件
实时计算提供的KafkaAppender日志插件是通过Flink的插件类加载器加载,使用前需要显示指定KafkaAppender日志插件所在的包路径,使得Flink应用可以成功加载到该插件。具体的操作方式如下:
配置作业模板(对该项目空间下所有作业生效)
在Flink开发控制台配置管理页面的其他配置中,添加如下代码。
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
配置单个作业(仅对当前作业生效)
在作业运维页面,单击目标作业名称,在部署详情页签的运行参数配置区域的其他配置中,添加如下代码。
plugin.classloader.parent-first-patterns.additional: com.ververica.platform.logging.appender
日志配置
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" packages="com.ververica.platform.logging.appender" status="WARN"> <Appenders> <Appender name="StdOut" type="Console"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> </Appender> <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i"> <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/> <Policies> <SizeBasedTriggeringPolicy size="20 MB"/> </Policies> <DefaultRolloverStrategy max="4"/> </Appender> <Appender type="KafkaVVP" name="KafkaVVPAppender" topic="YOUR-TOPIC-NAME"> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n"/> <Property name="bootstrap.servers">YOUR-KAFKA-BOOTSTRAP-SERVERS</Property> <Property name="acks">YOUR-ACKS-VALUE</Property> <Property name="buffer.memory">YOUR-BUFFER-MEMORY-SIZE</Property> <Property name="retries">YOUR-RETRIES-NUMBER</Property> <Property name="compression.type">YOUR-COMPRESSION-TYPE</Property> </Appender> <Appender type="Async" name="AsyncAppender"> <AppenderRef ref="KafkaVVPAppender"/> </Appender> </Appenders> <Loggers> <Logger level="INFO" name="org.apache.hadoop"/> <Logger level="INFO" name="org.apache.kafka"/> <Logger level="INFO" name="org.apache.zookeeper"/> <Logger level="INFO" name="akka"/> <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/> {%- for name, level in userConfiguredLoggers -%} <Logger level="{{ level }}" name="{{ name }}"/> {%- endfor -%} <Root level="{{ rootLoggerLogLevel }}"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <AppenderRef ref="AsyncAppender"/> </Root> </Loggers> </Configuration>
参数
说明
YOUR-TOPIC-NAME
写入的Kafka Topic名称。
YOUR-KAFKA-BOOTSTRAP-SERVERS
写入的Kafka Broker地址。
YOUR-ACKS-VALUE
指定了必须有多少个分区副本收到消息,Producer才会认为消息写入是成功的。具体取值请参见acks。
YOUR-BUFFER-MEMORY-SIZE
Producer缓冲区大小。单位Byte。
YOUR-RETRIES-NUMBER
发送失败的重试次数。
YOUR-COMPRESSION-TYPE
Producer生成数据时可使用的压缩类型,包括none、gzip、snappy、lz4或zstd。
说明您还可以设置所有Apache Kafka client支持的配置参数,详情请参见Apache Kafka。
单击保存。
单击页面顶部的启动。
配置项目空间下所有作业日志输出
您可以通过配置模板的方式,配置项目空间下所有作业日志默认输出到OSS、SLS或Kafka。
配置后,后续该项目空间下创建的所有作业的日志都会被存储到OSS、SLS或Kafka。
进入到作业日志模板配置入口。
登录实时计算控制台。
单击目标工作空间操作列下的控制台,在实时计算开发控制台页面顶部选择目标项目空间。
在左侧导航栏,单击
。在作业默认配置页签,选择作业类型。
在日志配置区域,日志模板选择为自定义模板。
参考配置单个作业日志输出,配置项目空间下所有作业的日志输出信息。
单击保存更改。
配置不同级别日志分别输出
您可以通过使用log4j2的ThresholdFilter
来为不同的Appender配置不同的日志级别过滤规则。配置的优势有:
灵活性:可以根据需要为不同的(外部)存储设置不同的日志级别。
效率:减少不必要的日志处理和传输,提高系统性能。
清晰性:通过分开配置,使得日志的流向更清晰,级别管理更方便。
配置方法如下:
在日志配置区域,日志模板选择为自定义模板。
配置日志输出信息。
本文以实现实时计算开发控制台(Console)输出INFO级别以上日志,而日志服务(SLS)仅输出ERROR级别以上日志为例,配置示例如下。
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true" status="WARN"> <Appenders> <!-- Console Appender configured to output only INFO level logs --> <Appender name="StdOut" type="Console"> <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/> </Appender> <!-- RollingFile Appender (no filter shown, logs all levels due to Root level being INFO) --> <Appender name="RollingFile" type="RollingFile"> <!-- Configuration remains unchanged --> <!-- ... --> </Appender> <!-- SLS Appender configured to output only ERROR level and above logs --> <Appender name="SLS" type="SLS"> <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/> <Layout type="PatternLayout" pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" charset="UTF-8"/> <!-- SLS specific properties --> <Property name="namespace">YOUR_NAMESPACE</Property> <Property name="project">YOUR_SLS_PROJECT</Property> <Property name="logStore">YOUR_SLS_LOGSTORE</Property> <Property name="endpoint">YOUR_SLS_ENDPOINT</Property> <!-- Access credentials and other properties --> <!-- ... --> </Appender> <!-- Other Appenders definitions remain unchanged --> <!-- ... --> </Appenders> <Loggers> <!-- Directly configure loggers for StdOut and SLS with specific levels --> <Logger name="StdOutLogger" level="INFO" additivity="false"> <AppenderRef ref="StdOut"/> </Logger> <Logger name="SLSLogger" level="ERROR" additivity="false"> <AppenderRef ref="SLS"/> </Logger> <!-- Other Loggers definitions with their specific configurations --> <!-- ... --> <!-- Root Logger without specific AppenderRef for SLS and StdOut, to avoid duplicate logging --> <Root level="INFO"> <AppenderRef ref="StdOut"/> <AppenderRef ref="RollingFile"/> <!-- Exclude SLS from Root to prevent duplicate logging in case of other loggers --> </Root> </Loggers> </Configuration>
在这个配置中:
Console Appender:使用了一个ThresholdFilter来确保INFO级别及以上的日志会被输出到实时计算开发控制台。
SLS Appender:使用了一个ThresholdFilter来确保只有ERROR级别及以上的日志会被发送到SLS。SLS Appender中的具体属性请参见配置到SLS,
YOUR_NAMESPACE
、YOUR_SLS_PROJECT
等需要替换为实际的SLS项目信息。说明如果SLS Appender类型不是SLS,而是一个自定义的Appender,请确保使用正确的类型,并且Appender类已经具备了连接到SLS所需的逻辑。
StdOutLogger和SLSLogger:它们分别只向StdOut Appender和SLS Appender发送日志,并且各自具有不同的日志级别限制。
Root Logger:配置了StdOut Appender和RollingFile Appender,但不包括SLS Appender。这是为了避免在已经有特定Logger配置的情况下,重复将日志发送到SLS。
更多操作以及log4j配置参数,详情请参见Apache Log4j。
相关文档
查看作业日志,详情请参见查看启动和运行日志。
在作业启动或者运行异常时,您可以查看运行异常日志,详情请参见查看运行异常日志。
当INFO级别的日志无法满足您定位问题的需求时,您可以修改打印日志的级别为DEBUG,详情请参见修改运行作业日志级别。
您可以通过操作审计(ActionTrail)产品查看Flink审计事件,详情请参见查看Flink审计事件。