Simple Log Service allows third-party software, applications in various programming languages, cloud services, and stream computing frameworks to consume data in real time by calling Simple Log Service SDK. However, SDK-based consumption cannot meet the requirements for specific implementation details, such as load balancing and failovers between consumers. In this case, you can create consumer groups to consume data within seconds. This topic describes how to use consumer groups to consume data.
Overview
A Logstore has multiple shards. Simple Log Service allocates shards to the consumers in a consumer group based on the following rules:
After a new consumer is added to a consumer group, shards that are allocated to the consumers in the consumer group are reallocated to each consumer for load balancing. The shards are reallocated based on the preceding rules.
Terms
Term | Description |
consumer group | You can use consumer groups to consume data in Simple Log Service. A consumer group consists of multiple consumers. All consumers in a consumer group consume data in the same Logstore. Consumers do not repeatedly consume data. Important You can create up to 30 consumer groups for a Logstore. |
consumer | The consumers in a consumer group consume data. Important The names of consumers in a consumer group must be unique. |
Logstore | A Logstore is used to collect, store, and query data. For more information, see Logstore. |
shard | A shard is used to control the read and write capacity of a Logstore. In Simple Log Service, data is stored in shards. For more information, see Shard. |
checkpoint | A consumption checkpoint is the position at which a program stops consuming data. If a program is restarted, the program consumes data from the last consumption checkpoint. Note If you use consumer groups to consume data, Simple Log Service automatically stores consumption checkpoints when an error occurs in your program. Consumers can resume data consumption from a consumption checkpoint without repeatedly consuming data after a program recovers. |
Step 1: Create a consumer group
This section describes how to use Simple Log Service SDK, Simple Log Service API, and Simple Log Service CLI to create a consumer group.
Use Simple Log Service SDK
Use Simple Log Service API
Use Simple Log Service CLI
Sample code:
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
public static void main(String[] args) throws LogException {
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
String projectName = "ali-test-project";
String logstoreName = "ali-test-logstore";
String host = "https://cn-hangzhou.log.aliyuncs.com";
Client client = new Client(host, accessId, accessKey);
try {
String consumerGroupName = "ali-test-consumergroup2";
System.out.println("ready to create consumergroup");
ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
System.out.println(String.format("create consumergroup %s success", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
For more information about the sample code that is used to manage consumer groups, see Use Simple Log Service SDK for Java to manage consumer groups and Use Simple Log Service SDK for Python to manage consumer groups.
For more information about how to use Simple Log Service API to create a consumer group, see CreateConsumerGroup.
For more information about how to check whether a consumer group is created, see ListConsumerGroup.
For more information about how to use Simple Log Service CLI to create a consumer group, see create_consumer_group.
For more information about how to check whether a consumer group is created, see list_consumer_group.
Step 2: Consume log data
How it works
The first time you call Simple Log Service SDK for Java to start a consumer, the SDK creates a consumer group if the SDK does not find the consumer group to which the consumer belongs. After the consumer group is created, the SDK records a start consumption checkpoint and starts to consume data from the consumption checkpoint. The start consumption checkpoint becomes invalid after the first-time consumption. When the consumer is restarted, the consumer resumes data consumption from the last consumption checkpoint that is stored by Simple Log Service. Sample consumption checkpoints:
LogHubConfig.ConsumePosition.BEGIN_CURSOR
: the start consumption checkpoint, which specifies the first log in a Logstore. A consumer starts consumption from the earliest data.
LogHubConfig.ConsumePosition.END_CURSOR
: the end consumption checkpoint, which specifies the last log in a Logstore.
Examples
You can use Simple Log Service SDK for Java, C++, Python, or Go to create consumer groups and consume data. In this example, Simple Log Service SDK for Java is used.
Example 1: Use the SDK
Example 2: Use the SDK and SPL
Add Maven dependencies.
Open the pom.xml
file and add the following code:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
Write the implementation logic of data consumption. Sample code:
SampleLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SampleLogHubProcessor implements ILogHubProcessor {
private int shardId;
private long mLastSaveTime = 0;
public void initialize(int shardId) {
this.shardId = shardId;
}
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
try {
if (curTime - mLastSaveTime > 30 * 1000) {
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
checkPointTracker.saveCheckPoint(false);
}
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
return null;
}
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
For more information about the sample code, see aliyun-log-consumer-java and Aliyun LOG Go Consumer.
Define a consumer entity. Sample code:
SampleLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
return new SampleLogHubProcessor();
}
}
Create a consumer group and start a consumer thread to allow consumers in the consumer group to consume data in the specified Logstore. Sample code:
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
private static String Project = "ali-test-project";
private static String Logstore = "ali-test-logstore";
private static String ConsumerGroup = "ali-test-consumergroup2";
private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
thread.start();
Thread.sleep(60 * 60 * 1000);
worker.shutdown();
Thread.sleep(30 * 1000);
}
}
Run the Main.java
file.
In this example, NGINX logs are consumed and the consumption result is displayed.
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
Add Maven dependencies.
Open the pom.xml
file and add the following code:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.114</version>
</dependency>
Write the implementation logic of data consumption. Sample code:
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SPLLogHubProcessor implements ILogHubProcessor {
private int shardId;
private long mLastSaveTime = 0;
public void initialize(int shardId) {
this.shardId = shardId;
}
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
try {
if (curTime - mLastSaveTime > 30 * 1000) {
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
checkPointTracker.saveCheckPoint(false);
}
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
return null;
}
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
Define a consumer entity. Sample code:
SPLLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
return new SPLLogHubProcessor();
}
}
Create a consumer group and start a consumer thread to allow consumers in the consumer group to consume data in the specified Logstore. Sample code:
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
private static String Project = "ali-test-project";
private static String Logstore = "ali-test-logstore";
private static String ConsumerGroup = "ali-test-consumergroup2";
private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
thread.start();
Thread.sleep(60 * 60 * 1000);
worker.shutdown();
Thread.sleep(30 * 1000);
}
}
Run the Main.java
file.
In this example, NGINX logs are consumed and the consumption result is displayed.
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
Step 3: View the status of a consumer group
This section describes the methods that you can use to view the status of a consumer group.
Use Simple Log Service SDK for Java
Use the Simple Log Service console
View the consumption checkpoint of each shard. Sample code:
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "cn-hangzhou.log.aliyuncs.com";
static String project = "ali-test-project";
static String logstore = "ali-test-logstore";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
System.out.println("Name: " + c.getConsumerGroupName());
System.out.println("Heartbeat timeout period " + c.getTimeout());
System.out.println("Ordered consumption: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
System.out.println("The time at which the consumption checkpoint was last updated: " + cp.getUpdateTime());
System.out.println("Consumer name: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "Consumption is not started";
else{
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "Invalid. The time at which the consumption checkpoint was last updated is beyond the retention period of the data";
else{
throw e;
}
}
}
System.out.println("Consumption checkpoint: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
}
System.out.println("The time at which the last data record was received: " + endPrg);
}
}
}
}
-
View the output. Example:
Name: ali-test-consumergroup2
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time at which the consumption checkpoint was last updated: 0
Consumer name: consumer_1
Consumption checkpoint: Consumption is not started
The time at which the last data record was received: 1729583617
shard: 1
The time at which the consumption checkpoint was last updated: 0
Consumer name: consumer_1
Consumption checkpoint: Consumption is not started
The time at which the last data record was received: 1729583738
Process finished with exit code 0
Log on to the Simple Log Service console.
In the Projects section, click the project you want.

On the tab, click the
icon next to the Logstore that you want to manage. Then, click the
icon next to Data Consumption.
In the consumer group list, click the consumer group that you want to manage.
On the Consumer Group Status page, view the consumption checkpoint of each shard.
What to do next
Authorize a RAM user to perform operations on consumer groups
Before you can use a Resource Access Management (RAM) user to manage consumer groups, you must grant the required permissions to the RAM user. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service.
The following table describes the actions that you can authorize a RAM user to perform.
Action | Description | Resource |
Action | Description | Resource |
log:GetCursorOrData(GetCursor) | Queries cursors based on the time when logs are generated. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup(CreateConsumerGroup) | Creates a consumer group for a Logstore. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ListConsumerGroup(ListConsumerGroup) | Queries all consumer groups of a Logstore. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint(ConsumerGroupUpdateCheckPoint) | Updates the consumption checkpoint for a shard that is allocated to a consumer group. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat) | Sends a heartbeat message for a consumer to Simple Log Service. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:UpdateConsumerGroup(UpdateConsumerGroup) | Modifies the attributes of a consumer group. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint(GetCheckPoint) | Queries the consumption checkpoints for one or all shards that are allocated to a consumer group. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
The following list provides resource information about a consumer group. To allow a RAM user to perform operations on the consumer group, you can refer to the following code to grant the required permissions to the RAM user:
ID of the Alibaba Cloud account to which the project belongs: 174649****602745
ID of the region where the project resides: cn-hangzhou
Name of the project: project-test
Name of the Logstore: logstore-test
Name of the consumer group: consumergroup-test
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
Configure Log4j for troubleshooting.
We recommend that you configure Log4j for your consumer program to display error messages when exceptions occur in consumer groups. This helps you troubleshoot the errors. The following code shows a common log4j.properties configuration file:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
After you configure Log4j, you can receive error messages when you run the consumer program. The following example shows an error message:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
Use a consumer group to consume data that is generated after a point in time
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
int consumerStartTimeInSeconds);
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
ConsumePosition position);
Note
You can use different constructors based on your business requirements.
If a consumption checkpoint is stored on Simple Log Service, data consumption starts from the consumption checkpoint.
When Simple Log Service consumes data, a consumption checkpoint is preferentially used to start data consumption. If you want to specify a point in time from which Simple Log Service starts data consumption, make sure that the value of consumerStartTimeInSeconds falls within the time-to-live (TTL) period. Otherwise, Simple Log Service cannot consume data based on your configurations.
Reset a consumption checkpoint
public static void updateCheckpoint() throws Exception {
Client client = new Client(host, accessId, accessKey);
long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
for (Shard shard : response.GetShards()) {
int shardId = shard.GetShardId();
String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
}
}
References
API
SDK
Programming language | References |
Programming language | References |
Java | |
Python | |
CLI