当您使用第三方软件、多语言应用、云产品、流式计算框架等通过SDK实时消费日志服务的数据时,SDK消费无法满足日志服务的实现细节及消费者之间的负载均衡、故障转移(Failover)等,您可以通过消费组(ConsumerGroup)消费日志,消费组(ConsumerGroup)消费的实时性较强,通常为秒级。本文为您介绍通过消费组消费数据的操作步骤。
工作流程
一个Logstore中包含多个Shard,通过消费组消费数据就是将Shard分配给一个消费组下面的消费者,分配方式遵循以下原则。
在一个消费组中,一个Shard只会分配到一个消费者。
在一个消费组中,一个消费者可以被分配多个Shard。
新的消费者加入消费组后,这个消费组下面的Shard从属关系会调整,以实现消费的负载均衡,但是仍遵循上述分配原则。
通过消费组消费,程序发生故障时,会默认保存Checkpoint。在程序故障恢复时,能够从断点处继续消费,从而保证数据不会被重复消费。
前提条件
已开通日志服务。更多信息,请参见开通日志服务。
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。
重要阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
已安装SDK开发环境。具体操作,请参见SDK参考概述。
基本概念
概念 | 说明 |
消费组 | 日志服务支持通过消费组消费数据。一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个Logstore中的数据,各个消费者不会重复消费数据。 重要 每个Logstore中,最多创建30个消费组。 |
消费者 | 消费组的构成单元,实际承担消费任务。 重要 同一个消费组中的消费者名称必须不同。 |
Logstore | 数据采集、存储和查询单元。更多信息,请参见日志库(Logstore)。 |
Shard | 用于控制Logstore的读写能力,数据必定保存在某一个Shard中。更多信息,请参见分区(Shard)。 |
Checkpoint | 消费位点,是程序消费到的最新位置。程序重启后,可以通过Checkpoint恢复消费进度。 |
步骤一:创建消费组
API创建消费组
API创建消费组,请参见CreateConsumerGroup - 创建消费组。
查询消费组是否创建成功,请参见ListConsumerGroup - 查询消费组。
SDK创建消费组
管理消费组的代码示例,请参见使用Java SDK管理消费组、使用Python SDK管理消费组。
CLI创建消费组
CLI创建消费组,请参见create_consumer_group。
查询消费组是否创建成功,请参见list_consumer_group。
步骤二:消费数据
消费数据
您可以通过Java、C++、Python及Go SDK实现消费组消费数据。此处,以Java SDK为例。
消费原理
消费组SDK的消费者在首次启动时,当消费组不存在时会创建消费组。起始消费位点是指创建消费组时的数据起始消费位点,该消费位点仅在第一次创建时有效。后续重启消费者时,消费者会从上次服务端保存的消费位点处继续消费。以本示例为例:
LogHubConfig.ConsumePosition.BEGIN_CURSOR
:消费组从头开始消费日志,起始消费位点为Logstore中的第一条日志。LogHubConfig.ConsumePosition.END_CURSOR
:此消费位点记录Logstore日志的最后一条日志之后。
添加Maven依赖。
在Java项目的根目录下,打开pom.xml文件,添加以下代码:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.47</version> </dependency>
创建消费者逻辑代码
SampleLogHubProcessor.java
。import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SampleLogHubProcessor implements ILogHubProcessor { private int shardId; // 记录上次持久化Checkpoint的时间。 private long mLastSaveTime = 0; // initialize 方法会在 processor 对象初始化时被调用一次 public void initialize(int shardId) { this.shardId = shardId; } // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 打印已获取的数据。 for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。 try { if (curTime - mLastSaveTime > 30 * 1000) { // 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。 checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。 checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // 将Checkpoint立即保存到服务端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }
创建消费者实体
SampleLogHubProcessorFactory.java
。class SampleLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SampleLogHubProcessor 对象。 return new SampleLogHubProcessor(); } }
创建Main.java文件。创建一个消费者并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // 日志服务的服务接入点,请您根据实际情况填写。 private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。 private static String Project = "ali-cn-hangzhou-sls-admin"; // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。 private static String Logstore = "sls_operation_log"; // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。 private static String ConsumerGroup = "consumerGroupX"; // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。。 private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。 // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。 LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000); ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。 thread.start(); Thread.sleep(60 * 60 * 1000); // 调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。 worker.shutdown(); // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep为30秒。 Thread.sleep(30 * 1000); } }
运行Main.java。
以模拟消费Nginx日志为例,打印日志如下:
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
基于SPL消费数据
您可以通过Java、C++、Python及Go SDK实现消费组消费数据。此处,以Java SDK为例。
消费原理
消费组SDK的消费者在首次启动时,当消费组不存在时会创建消费组。起始消费位点是指创建消费组时的数据起始消费位点,该消费位点仅在第一次创建时有效。后续重启消费者时,消费者会从上次服务端保存的消费位点处继续消费。以本示例为例:
LogHubConfig.ConsumePosition.BEGIN_CURSOR
:消费组从头开始消费日志,起始消费位点为Logstore中的第一条日志。LogHubConfig.ConsumePosition.END_CURSOR
:此消费位点记录Logstore日志的最后一条日志之后。
添加Maven依赖。
在Java项目的根目录下,打开pom.xml文件,添加以下代码:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.99</version> </dependency>
创建SPLLogHubProcessor.java文件。
import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SPLLogHubProcessor implements ILogHubProcessor { private int shardId; // 记录上次持久化Checkpoint的时间。 private long mLastSaveTime = 0; // initialize 方法会在 processor 对象初始化时被调用一次 public void initialize(int shardId) { this.shardId = shardId; } // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 打印已获取的数据。 for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。 try { if (curTime - mLastSaveTime > 30 * 1000) { // 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。 checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。 checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // 将Checkpoint立即保存到服务端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }
创建 SPLLogHubProcessorFactory.java 文件。
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; class SPLLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SPLLogHubProcessor 对象。 return new SPLLogHubProcessor(); } }
创建Main.java文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。管理消费组的代码示例,请参见使用Java SDK管理消费组、使用Python SDK管理消费组。
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class SPLConsumer { // 日志服务的服务接入点,请您根据实际情况填写。 private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。 private static String Project = "your_project"; // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。 private static String Logstore = "your_logstore"; // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。 private static String ConsumerGroup = "consumerGroupX"; // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。。 private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。 // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。 LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000); // setQuery可以设置消费过程中的SLS SPL语句 config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000"); ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。 thread.start(); Thread.sleep(60 * 60 * 1000); // 调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。 worker.shutdown(); // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep为30秒。 Thread.sleep(30 * 1000); } }
运行Main.java。
以模拟消费Nginx日志为例,打印日志如下:
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
步骤三:查看消费组状态
控制台方式
登录日志服务控制台。
在Project列表区域,单击目标Project。
在
页签中,单击目标Logstore左侧的图标,然后单击数据消费左侧的图标。在消费组列表中,单击目标消费组。
在Consumer Group状态页面,查看每个Shard消费数据的进度。
SDK方式
此处以Java SDK为例。运行ConsumerGroupTest.java,查看每个Shard消费数据的进度。
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "";
static String project = "";
static String logstore = "";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// 获取Logstore下的所有消费组。如果消费组不存在,则长度为0。
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// 打印消费组的属性,包括名称、心跳超时时间、是否按序消费。
System.out.println("名称: " + c.getConsumerGroupName());
System.out.println("心跳超时时间: " + c.getTimeout());
System.out.println("按序消费: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// 该时间精确到微秒,类型为长整型。
System.out.println("最后一次更新消费进度的时间: " + cp.getUpdateTime());
System.out.println("消费者名称: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "尚未开始消费";
else{
// Unix时间戳,单位是秒,输出时请注意格式化。
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "非法,前一次消费时刻已经超出了Logstore中数据的生命周期";
else{
// internal server error
throw e;
}
}
}
System.out.println("消费进度: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
// do nothing
}
//Unix时间戳,单位:秒。输出时,请注意格式化。
System.out.println("最后一条数据到达时刻: " + endPrg);
}
}
}
}
返回以下结果:
名称: etl-6cac01c571d5a4b933649c04a7ba215b
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555453575211
消费者名称: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
消费进度: 1639555453
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555392071328
消费者名称: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
消费进度: 1639555391
最后一条数据到达时刻: 1639555391
名称: etl-2bd3fdfdd63595d56b1ac24393bf5991
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555453256773
消费者名称: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
消费进度: 1639555453
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555392066234
消费者名称: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
消费进度: 1639555391
最后一条数据到达时刻: 1639555391
名称: consumerGroupX
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 1639555434142879
消费者名称: consumer_1
消费进度: 1635615029
最后一条数据到达时刻: 1639555453
shard: 1
最后一次更新消费进度的时间: 1639555437976929
消费者名称: consumer_1
消费进度: 1635616802
最后一条数据到达时刻: 1639555391
RAM用户授权
使用RAM用户操作时,需授予RAM用户操作消费组的相关权限。具体操作,请参见创建RAM用户及授权。
授权的Action如下表所示。
动作(Action) | 说明 | 授权策略中的资源描述方式(Resource) |
log:GetCursorOrData(GetCursor - 通过时间查询Cursor) | 根据时间获取游标(cursor)。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup(CreateConsumerGroup - 创建消费组) | 在指定的Logstore上创建一个消费组。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ListConsumerGroup(ListConsumerGroup - 查询消费组) | 查询指定Logstore的所有消费组。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint(ConsumerGroupUpdateCheckPoint - 更新消费进度) | 更新指定消费组的某个Shard的Checkpoint。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat - 消费者发送心跳到服务端) | 为指定消费者发送心跳到服务端。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:UpdateConsumerGroup(UpdateConsumerGroup - 更新消费者组) | 修改指定消费组属性。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint(GetCheckPoint - 获取指定消费组的消费点) | 获取指定消费组消费的某个或者所有Shard的Checkpoint。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
例如,消费组的相关资源信息如下所示,您要通过RAM用户操作该消费组,则需为RAM用户授予以下权限。
Project所属的阿里云账号:174649****602745。
Project所在地域ID:cn-hangzhou。
Project名称:project-test。
Logstore名称:logstore-test。
消费组名称:consumergroup-test。
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
相关操作
异常诊断
建议您为消费者程序配置Log4j,将消费组内部遇到的异常信息打印出来,便于定位。log4j.properties典型配置:
log4j.rootLogger = info,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
配置Log4j后,执行消费者程序可以看到类似如下异常信息:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159) com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
通过消费组消费从某个时间开始的数据
// consumerStartTimeInSeconds表示消费这个时间点之后的数据。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, int consumerStartTimeInSeconds); // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, ConsumePosition position);
说明按照消费需求,请您使用不同的构造方法。
当服务端已保存Checkpoint,则开始消费位置以服务端保存的Checkpoint为准。
日志服务消费数据时,默认优先使用Checkpoint作为消费点。当您指定从固定时间点开始消费数据时,必须保证consumerStartTimeInSeconds时间点落到TTL周期内,否则会造成消费不生效。
重置Checkpoint
public static void updateCheckpoint() throws Exception { Client client = new Client(host, accessId, accessKey); // 这里 timestamp 需要是以秒为单位的 unix timestamp,如果您的时间戳以毫秒为单位,需要如下所示除以1000 long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000; ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore)); for (Shard shard : response.GetShards()) { int shardId = shard.GetShardId(); String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor(); client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor); } }
相关文档
API
操作
API接口
创建消费组
查询消费组
删除消费组
更新消费组
发送消费者心跳
查询消费组的Checkpoint
更新消费组的Checkpoint
SDK
语言
文档链接
Java
Python
CLI
操作
命令行接口
创建消费组
查询消费组
更新消费组
删除消费组
查询消费组的Checkpoint
更新消费组的Checkpoint