当您通过函数计算消费日志数据时,在不同场景您可以选择使用日志服务提供的函数模板或自定义函数。本文介绍如何构建一个自定义函数。
函数Event
在通过函数计算消费日志数据的过程中,步骤二需要配置函数的入口参数(函数Event),格式为一个JSON Object序列化后字符串。
参数说明
参数
说明
jobName
日志服务ETL Job名称。函数计算服务上的日志服务触发器对应一个日志服务的ETL Job。
taskId
对于一个ETL Job,taskId是某一次确定性的函数调用标识。
cursorTime
本次函数调用包括的数据中,最后一条日志到达日志服务的服务器端的unix_timestamp。
source
该字段由日志服务生成,日志服务根据ETL Job定义的任务间隔定时触发函数执行,source字段是函数Event中的重要组成部分,定义了本次函数调用的消费范围。
endpoint:Project所属地域的服务入口,详情请参见服务入口。
projectName:Project名称。
logstoreName:Logstore名称。
shardId:Logstore下的某一个确定Shard。
beginCursor:需要从Shard的什么位置开始消费数据。
endCursor:需要消费Shard数据到什么位置。
说明Shard对应的[beginCursor,endCursor)是一个左闭右开区间。
parameter
JSON Object类型,在您创建触发器函数配置时设置。自定义日志服务ETL函数运行时解析该字段,可以获取到函数所需要的运行参数。详情请参见SLS触发器。
示例
{ "source": { "endpoint": "http://cn-shanghai-intranet.log.aliyuncs.com", "projectName": "fc-****************", "logstoreName": "demo", "shardId": 0, "beginCursor": "MTUwNTM5MDI3NTY1ODcwNzU2Ng==", "endCursor": "MTUwNTM5MDI3NTY1ODcwNzU2OA==" }, "parameter": { ... }, "jobName": "fedad35f51a2a97b466da57fd71f315f539d2234", "taskId": "9bc06c96-e364-4f41-85eb-b6e579214ae4", "cursorTime": 1511429883 }
在函数调试时候,您可以调用GetCursor - 通过时间查询Cursor接口获取cursor并按上述示例构建一个函数Event用于测试。
函数开发
您可以通过Java、Python、Node.js等多种语言实现函数开发,日志服务提供了相应Runtime的SDK,以便您在函数中进行集成,详情请参见SDK参考。
以下内容以Java 8 Runtime为例,介绍如何开发日志服务ETL函数。关于Java 8函数编程细节,详情请参见函数计算服务Java编程指南。
Java函数模板
目前,日志服务提供了基于Java8 Runtime的自定义数据模板,您可以在这基础上完成自定义需求的实现。
模板已实现以下功能:
函数Event中source、taskId、jobName字段的解析。
根据source中定义的数据源,通过Log Service Java SDK获取数据,并对每一批数据调用processData接口进行处理。
在模板中,您还需要实现以下功能:
函数Event中parameter字段的解析,通过
UserDefinedFunctionParameter.java
实现。函数内针对数据的自定义业务逻辑,通过
UserDefinedFunction.java
的processData接口实现。为您的函数取一个可以描述功能的名字,替换
UserDefinedFunction
。
processData接口实现
您可以在processData内完成对一批数据的消费、加工、投递。例如在LogstoreReplication中,实现了将数据从一个Logstore中读出后写到另一个Logstore。
说明processData处理数据成功则返回true,处理数据遇到异常无法重试成功则返回false,但此时函数还会继续运行下去,且日志服务判定这是一次成功的ETL任务,会忽略其中处理异常的数据。
如果遇到致命错误或业务逻辑上认为有异常需要提前终止函数执行,会通过throw Exception方式跳出函数运行,日志服务可以据此判断函数运行异常,并会按照ETL Job设置的规则重新调用函数执行。
如果Shard流量较大,请为函数配置足够的内存规格,以避免函数OOM导致异常终止。
如果在函数内执行耗时操作或者Shard流量较大,请您设置较短的函数触发间隔和较长的函数运行超时时间。
请您为函数服务配置足够的权限,例如在函数内写OSS就需要配置OSS写权限。
ETL日志
ETL调度日志
调度日志记录ETL任务开始时间、结束时间、任务是否成功以及成功返回的信息。如果ETL任务出错会生成ETL出错日志,并向系统管理员发送报警邮件或短信。请您在创建触发器时设置触发器日志Logstore,并为该Logstore开启并配置索引,详情请参见创建索引。
对于函数执行结果的统计,可以通过函数返回,例如在Java8 Runtime函数的
outputStream
中体现。日志服务提供的函数模板会写出一个JSON Object序列化后的字符串,该字符串将记录在ETL任务调度日志中,方便您进行统计、查询。ETL过程日志
这一部分日志是在ETL执行过程中每执行一步记录的关键点和错误信息,包括某一步骤的开始和结束时间、初始化动作完成情况、模块出错信息等。ETL过程日志的意义是随时可以感知ETL运行情况,如果发生错误,可以及时通过过程日志查找原因。
您可以通过
context.getLogger()
记录过程日志并存放在日志服务指定Project的Logstore中,建议您为该Logstore开启索引查询功能。