This topic offers SDK examples for consuming logs using a Consume Processor.
Prerequisites
A Resource Access Management (RAM) user is created and granted the required permissions. For more information, see Create a RAM user and grant permissions.
The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.
ImportantThe AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.
Do not include your AccessKey ID or AccessKey secret in your project code. If either is leaked, the security of all resources in your account may be compromised.
Code examples
Java
Install the Simple Log Service SDK. In the root directory of your Java project, open the
pom.xmlfile and add the following Maven dependencies. For more information, see Install the Java SDK.The version of the Simple Log Service SDK for Java must be 0.6.126 or later.
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <!-- Import the Simple Log Service SDK for Java --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.126</version> </dependency>Create a file named
PullLogsWithSPLDemo.java. This example calls the PullLog operation to read log data. It demonstrates how to use the Java SDK to consume log data with SPL.import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.*; import com.aliyun.openservices.log.request.PullLogsRequest; import com.aliyun.openservices.log.response.ListShardResponse; import com.aliyun.openservices.log.response.PullLogsResponse; import java.util.HashMap; import java.util.List; import java.util.Map; public class PullLogsWithSPLDemo { // The service endpoint of Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace the endpoint with the one for your region. private static final String endpoint = "cn-hangzhou.log.aliyuncs.com"; // This example obtains the AccessKey ID and AccessKey secret from environment variables. private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); // The name of the project. Replace the value with your actual project name. private static final String project = "ali-project-test"; // The name of the Logstore. Replace the value with your actual Logstore name. private static final String logStore = "test-logstore"; public static void main(String[] args) throws Exception { // The processorName parameter specifies the identity of the Consume Processor. String processorName = "processor-test"; // Create a Simple Log Service client. Client client = new Client(endpoint, accessKeyId, accessKeySecret); // Query the shards of the Logstore. ListShardResponse resp = client.ListShard(project, logStore); System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size()); Map<Integer, String> cursorMap = new HashMap<>(); for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); // Start consumption from the beginning and obtain a cursor. To start consumption from the end, use Consts.CursorMode.END. cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor()); } try { while (true) { // Obtain logs from each shard. for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId)); // The processorName parameter specifies the identity of the Consume Processor. request.setProcessor(processorName); PullLogsResponse response = client.pullLogs(request); // Logs are stored in log groups. You can split the logs based on your business logic. List<LogGroupData> logGroups = response.getLogGroups(); System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId); // After you process the pulled logs, move the cursor. cursorMap.put(shardId, response.getNextCursor()); } } } catch (LogException e) { System.out.println("error code :" + e.GetErrorCode()); System.out.println("error message :" + e.GetErrorMessage()); throw e; } } }Run the main function and view the output.
Get 41 logGroup from logstore:test-logstore: Shard:0 Get 49 logGroup from logstore:test-logstore: Shard:1 Get 43 logGroup from logstore:test-logstore: Shard:0 Get 39 logGroup from logstore:test-logstore: Shard:1 ... ...
Python
Install the Simple Log Service SDK. Create a project folder named spl_demo and run the following command in the folder. For more information, see Install the Simple Log Service SDK for Python.
The version of the Simple Log Service SDK for Python must be 0.9.28 or later.
pip install -U aliyun-log-python-sdkIn the spl_demo folder, create a file named main.py. This file creates a consumer group and starts a consumer thread to consume data from the specified Logstore.
# encoding: utf-8 import time import os from aliyun.log import * def main(): # The service endpoint of Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace the endpoint with the one for your region. endpoint = 'cn-hangzhou.log.aliyuncs.com' # This example obtains the AccessKey ID and AccessKey secret from environment variables. access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') # The name of the project. Replace the value with your actual project name. project_name = 'ali-project-test' # The name of the Logstore. Replace the value with your actual Logstore name. logstore_name = 'test-logstore' # The processor parameter specifies the identity of the Consume Processor. processor = "processor-test" init_cursor = 'end' log_group_count = 10 # Create a Simple Log Service client. client = LogClient(endpoint, access_key_id, access_key) cursor_map = {} # List the shards of the Logstore. res = client.list_shards(project_name, logstore_name) res.log_print() shards = res.shards # Obtain the initial cursor. for shard in shards: shard_id = shard.get('shardID') res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor) cursor_map[shard_id] = res.get_cursor() # Read data from each shard in a loop. while True: for shard in shards: shard_id = shard.get('shardID') res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count, processor=processor) res.log_print() if cursor_map[shard_id] == res.next_cursor: # only for debug time.sleep(3) else: cursor_map[shard_id] = res.next_cursor if __name__ == '__main__': main()Run the main function and view the output.
ListShardResponse: headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'} res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}] PullLogResponse next_cursor MTczNz********c3ODgyMjQ0MQ== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'} detail: [] PullLogResponse next_cursor MTczNz********c3OTg5NzE3NA== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'} detail: [] PullLogResponse ... ...
Go
Install the Simple Log Service SDK. Create a project folder named spl_demo and run the following command in the folder. For more information, see Install the Go SDK.
The version of the Simple Log Service SDK for Go must be v0.1.107 or later.
go get -u github.com/aliyun/aliyun-log-go-sdkIn the spl_demo folder, create a file named main.go. This file creates a consumer group and starts a consumer thread to consume data from the specified Logstore.
package main import ( "fmt" "os" "time" sls "github.com/aliyun/aliyun-log-go-sdk" ) func main() { client := &sls.Client{ AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Endpoint: "cn-chengdu.log.aliyuncs.com", } project := "ali-project-test" logstore := "test-logstore" initCursor := "end" // The consumeProcessor parameter specifies the identity of the Consume Processor. consumeProcessor := "ali-test-consume-processor" shards, err := client.ListShards(project, logstore) if err != nil { fmt.Println("ListShards error", err) return } shardCursorMap := map[int]string{} for _, shard := range shards { cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor) if err != nil { fmt.Println("GetCursor error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = cursor } for { for _, shard := range shards { pullLogRequest := &sls.PullLogRequest{ Project: project, Logstore: logstore, ShardID: shard.ShardID, LogGroupMaxCount: 10, Processor: consumeProcessor, Cursor: shardCursorMap[shard.ShardID], } lg, nextCursor, err := client.PullLogsV2(pullLogRequest) fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor) if err != nil { fmt.Println("PullLogsV2 error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = nextCursor if len(lg.LogGroups) == 0 { // only for debug time.Sleep(time.Duration(3) * time.Second) } } } }Run the main function and view the output.
shard: 0 loggroups: 41 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 49 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== shard: 0 loggroups: 43 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 39 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== ... ...