Unlock the Power of AI

1 million free tokens

88% Price Reduction

Activate Now

Use SDK based on SPL consumption

Updated at: 2025-03-11 04:02

This topic provides examples of how to use the Simple Log Service SDK to consume logs based on Simple Log Service Processing Language (SPL) statements.

Prerequisites​​

Sample code

Java
Go
Python
  1. Install the Simple Log Service SDK: In the root directory of your Java project, open the pom.xml file and add the following Maven dependencies. For more information, see Install Simple Log Service SDK for Java.

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>aliyun-log</artifactId>
      <version>0.6.120</version>
    </dependency>
  2. Create the PullLogsWithSPLDemo.java file. In this example, call the PullLog operation to read log data and complete the demo of consuming log data based on SPL using the Java SDK. Fill in the query field using SPL statements.

    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.*;
    import com.aliyun.openservices.log.common.Consts;
    import com.aliyun.openservices.log.exception.LogException;
    import com.aliyun.openservices.log.request.PullLogsRequest;
    import com.aliyun.openservices.log.response.ListShardResponse;
    import com.aliyun.openservices.log.response.PullLogsResponse;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class PullLogsWithSPLDemo {
        // Specify a Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the Singapore region is used. Replace this parameter value with your actual endpoint.
        private static final String endpoint = "ap-southeast-1.log.aliyuncs.com";
        // Obtain an AccessKey ID and an AccessKey secret from environment variables.
        private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // The name of the project. Replace the value with the actual project name.
        private static final String project = "ali-project-test";
        // The name of the logstore. Replace the value with the actual logstore name.
        private static final String logStore = "test-logstore";
    
        public static void main(String[] args) throws Exception {
            // Create a Simple Log Service client.
            Client client = new Client(endpoint, accessKeyId, accessKeySecret);
            // Query the shards of the logstore.
            ListShardResponse resp = client.ListShard(project, logStore);
            System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
            Map<Integer, String> cursorMap = new HashMap<>();
            for (Shard shard : resp.GetShards()) {
                int shardId = shard.getShardId();
                // Use the BEGIN cursor or obtain a specific cursor to consume log data. (If you want to consume log data from the end, use Consts.CursorMode.END).
                cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
            }
            try {
                while (true) {
                    // Obtain log data from each shard.
                    for (Shard shard : resp.GetShards()) {
                        int shardId = shard.getShardId();
                        PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
                        request.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
                        request.setPullMode("scan_on_stream");
                        PullLogsResponse response = client.pullLogs(request);
                        // Obtain logs from log groups by logic. Logs are usually stored in log groups.
                        List<LogGroupData> logGroups = response.getLogGroups();
                        System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
    
                        // Move the cursor after the pulled logs are processed.
                        cursorMap.put(shardId, response.getNextCursor());
                    }
                }
            } catch (LogException e) {
                System.out.println("error code :" + e.GetErrorCode());
                System.out.println("error message :" + e.GetErrorMessage());
                throw e;
            }
        }
    }
  3. Run the main function to view the output.

    Get 41 logGroup from logstore:test-logstore:	Shard:0
    Get 49 logGroup from logstore:test-logstore:	Shard:1
    Get 43 logGroup from logstore:test-logstore:	Shard:0
    Get 39 logGroup from logstore:test-logstore:	Shard:1
    ... ...
  1. Install the Simple Log Service SDK: Create the project directory spl_demo and run the following command in the directory. For more information, see Install Simple Log Service SDK for Go.

    go get -u github.com/aliyun/aliyun-log-go-sdk
  2. Create the main.go file in the spl_demo directory. Create a consumer group and start a consumer thread to allow consumers in the consumer group to consume data in the specified Logstore. Fill in the query field using SPL statements.

    package main
    
    import (
      "fmt"
      "time"
      "os"
    
      sls "github.com/aliyun/aliyun-log-go-sdk"
    )
    
    func main() {
      client := &sls.Client{
        AccessKeyID:     os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
        AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
        Endpoint:        "ap-southeast-1.log.aliyuncs.com",
      }
    
      project := "ali-project-test"
      logstore := "test-logstore"
      initCursor := "end"
      query := "* | where cast(body_bytes_sent as bigint) > 14000"
    
      shards, err := client.ListShards(project, logstore)
      if err != nil {
        fmt.Println("ListShards error", err)
        return
      }
    
      shardCursorMap := map[int]string{}
      for _, shard := range shards {
        cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor)
        if err != nil {
          fmt.Println("GetCursor error", shard.ShardID, err)
          return
        }
        shardCursorMap[shard.ShardID] = cursor
      }
    
      for {
        for _, shard := range shards {
          pullLogRequest := &sls.PullLogRequest{
            Project:          project,
            Logstore:         logstore,
            ShardID:          shard.ShardID,
            LogGroupMaxCount: 10,
            Query:            query,
            Cursor:           shardCursorMap[shard.ShardID],
          }
          lg, nextCursor, err := client.PullLogsV2(pullLogRequest)
          fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor)
          if err != nil {
            fmt.Println("PullLogsV2 error", shard.ShardID, err)
            return
          }
          shardCursorMap[shard.ShardID] = nextCursor
          if len(lg.LogGroups) == 0 {
            // only for debug
            time.Sleep(time.Duration(3) * time.Second)
          }
        }
      }
    }
  3. Run the main function to view the output.

    shard:  0 loggroups:  41 nextCursor:  MTY5Mz*******TIxNjcxMDcwMQ==
    shard:  1 loggroups:  49 nextCursor:  MTY5Mz*******DYwNDIyNDQ2Mw==
    shard:  0 loggroups:  43 nextCursor:  MTY5Mz*******TIxNjcxMDcwMQ==
    shard:  1 loggroups:  39 nextCursor:  MTY5Mz*******DYwNDIyNDQ2Mw==
    ... ...
  1. Install the Simple Log Service SDK: Create the project directory spl_demo and run the following command in the directory. For more information, see Install Simple Log Service SDK for Python.

    pip install -U aliyun-log-python-sdk
  2. Create the main.py file in the spl_demo directory. Create a consumer group and start a consumer thread to allow consumers in the consumer group to consume data in the specified Logstore. Fill in the query field using SPL statements.

    # encoding: utf-8
    
    import time
    import os
    from aliyun.log import *
    
    def main():
        # Specify a Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the Singapore region is used. Replace this parameter value with your actual endpoint.
        endpoint = 'ap-southeast-1.log.aliyuncs.com'
        # Obtain an AccessKey ID and an AccessKey secret from environment variables.
        access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
        access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
        # The name of the project. Replace the value with the actual project name.
        project_name = 'ali-project-test'
        # The name of the logstore. Replace the value with the actual logstore name.
        logstore_name = 'test-logstore'
        query = '* | where cast(cdn_in as bigint) > 70'
        init_cursor = 'end'
        log_group_count = 10
    
        # Create a Simple Log Service client.
        client = LogClient(endpoint, access_key_id, access_key)
    
        cursor_map = {}
        # List the shards of the logstore.
        res = client.list_shards(project_name, logstore_name)
        res.log_print()
        shards = res.shards
    
        # Obtain the initial cursor.
        for shard in shards:
            shard_id = shard.get('shardID')
            res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor)
            cursor_map[shard_id] = res.get_cursor()
    
        # Loop to read data from each shard.
        while True:
            for shard in shards:
                shard_id = shard.get('shardID')
                res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count, query=query)
                res.log_print()
                if cursor_map[shard_id] == res.next_cursor: 
                    # only for debug 
                    time.sleep(3)
                else:
                    cursor_map[shard_id] = res.next_cursor
    
    
    if __name__ == '__main__':
        main()
  3. Run the main function to view the output.

    ListShardResponse:
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'}
    res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}]
    PullLogResponse
    next_cursor MTczNz********c3ODgyMjQ0MQ==
    log_count 0
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'}
    detail: []
    PullLogResponse
    next_cursor MTczNz********c3OTg5NzE3NA==
    log_count 0
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'}
    detail: []
    PullLogResponse
    ... ...
  • On this page (1)
  • Prerequisites​​
  • Sample code
Feedback
phone Contact Us

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare