使用SDK基于SPL消费日志

更新时间:2025-03-11 03:55

本文向您介绍使用SDK基于SPL消费日志的示例。

前提条件

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户及授权

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

代码示例

Java
Go
Python
  1. 安装日志服务SDK:在Java项目的根目录下,打开pom.xml文件,添加以下Maven依赖。更多信息,请参见安装Java SDK

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>aliyun-log</artifactId>
      <version>0.6.120</version>
    </dependency>
  2. 创建PullLogsWithSPLDemo.java文件,在本示例中调用PullLog接口读取日志数据,完成使用Java SDK基于SPL消费日志数据的演示。query字段请根据实际情况填写SPL语句

    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.*;
    import com.aliyun.openservices.log.common.Consts;
    import com.aliyun.openservices.log.exception.LogException;
    import com.aliyun.openservices.log.request.PullLogsRequest;
    import com.aliyun.openservices.log.response.ListShardResponse;
    import com.aliyun.openservices.log.response.PullLogsResponse;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class PullLogsWithSPLDemo {
        // 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
        // 本示例从环境变量中获取 AccessKey ID 和 AccessKey Secret。
        private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Project 名称(需替换为实际Project名)。
        private static final String project = "ali-project-test";
        // Logstore 名称(需替换为实际Logstore名)。
        private static final String logStore = "test-logstore";
    
        public static void main(String[] args) throws Exception {
            // 创建日志服务 Client。
            Client client = new Client(endpoint, accessKeyId, accessKeySecret);
            // 查询 Logstore 的 Shard。
            ListShardResponse resp = client.ListShard(project, logStore);
            System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
            Map<Integer, String> cursorMap = new HashMap<>();
            for (Shard shard : resp.GetShards()) {
                int shardId = shard.getShardId();
                // 从头开始消费,获取游标。(如果是从尾部开始消费,使用 Consts.CursorMode.END)。
                cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
            }
            try {
                while (true) {
                    // 从每个Shard中获取日志。
                    for (Shard shard : resp.GetShards()) {
                        int shardId = shard.getShardId();
                        PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
                        request.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
                        request.setPullMode("scan_on_stream");
                        PullLogsResponse response = client.pullLogs(request);
                        // 日志都在日志组(LogGroup)中,按照逻辑拆分即可。
                        List<LogGroupData> logGroups = response.getLogGroups();
                        System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
    
                        // 完成处理拉取的日志后,移动游标。
                        cursorMap.put(shardId, response.getNextCursor());
                    }
                }
            } catch (LogException e) {
                System.out.println("error code :" + e.GetErrorCode());
                System.out.println("error message :" + e.GetErrorMessage());
                throw e;
            }
        }
    }
  3. 运行Main函数,查看输出结果。

    Get 41 logGroup from logstore:test-logstore:	Shard:0
    Get 49 logGroup from logstore:test-logstore:	Shard:1
    Get 43 logGroup from logstore:test-logstore:	Shard:0
    Get 39 logGroup from logstore:test-logstore:	Shard:1
    ... ...
  1. 安装日志服务SDK:创建项目目录spl_demo,在目录下执行如下命令,更多信息,请参见安装Go SDK

    go get -u github.com/aliyun/aliyun-log-go-sdk
  2. 在spl_demo目录下创建main.go文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。query字段请根据实际情况填写SPL语句

    package main
    
    import (
      "fmt"
      "time"
      "os"
    
      sls "github.com/aliyun/aliyun-log-go-sdk"
    )
    
    func main() {
      client := &sls.Client{
        AccessKeyID:     os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
        AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
        Endpoint:        "cn-chengdu.log.aliyuncs.com",
      }
    
      project := "ali-project-test"
      logstore := "test-logstore"
      initCursor := "end"
      query := "* | where cast(body_bytes_sent as bigint) > 14000"
    
      shards, err := client.ListShards(project, logstore)
      if err != nil {
        fmt.Println("ListShards error", err)
        return
      }
    
      shardCursorMap := map[int]string{}
      for _, shard := range shards {
        cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor)
        if err != nil {
          fmt.Println("GetCursor error", shard.ShardID, err)
          return
        }
        shardCursorMap[shard.ShardID] = cursor
      }
    
      for {
        for _, shard := range shards {
          pullLogRequest := &sls.PullLogRequest{
            Project:          project,
            Logstore:         logstore,
            ShardID:          shard.ShardID,
            LogGroupMaxCount: 10,
            Query:            query,
            Cursor:           shardCursorMap[shard.ShardID],
          }
          lg, nextCursor, err := client.PullLogsV2(pullLogRequest)
          fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor)
          if err != nil {
            fmt.Println("PullLogsV2 error", shard.ShardID, err)
            return
          }
          shardCursorMap[shard.ShardID] = nextCursor
          if len(lg.LogGroups) == 0 {
            // only for debug
            time.Sleep(time.Duration(3) * time.Second)
          }
        }
      }
    }
  3. 运行main函数,查看输出结果

    shard:  0 loggroups:  41 nextCursor:  MTY5Mz*******TIxNjcxMDcwMQ==
    shard:  1 loggroups:  49 nextCursor:  MTY5Mz*******DYwNDIyNDQ2Mw==
    shard:  0 loggroups:  43 nextCursor:  MTY5Mz*******TIxNjcxMDcwMQ==
    shard:  1 loggroups:  39 nextCursor:  MTY5Mz*******DYwNDIyNDQ2Mw==
    ... ...
  1. 安装日志服务SDK:创建项目目录spl_demo,在目录下执行如下命令,更多信息,请参见安装日志服务Python SDK

    pip install -U aliyun-log-python-sdk
  2. 在spl_demo目录下创建main.py文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。query字段请根据实际情况填写SPL语句

    # encoding: utf-8
    
    import time
    import os
    from aliyun.log import *
    
    def main():
        # 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        endpoint = 'cn-hangzhou.log.aliyuncs.com'
        # 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
        access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
        # Project名称(需替换为实际Project名)。
        project_name = 'ali-project-test'
        # Logstore名称(需替换为实际Logstore名)。
        logstore_name = 'test-logstore'
        query = '* | where cast(cdn_in as bigint) > 70'
        init_cursor = 'end'
        log_group_count = 10
    
        # 创建日志服务Client。
        client = LogClient(endpoint, access_key_id, access_key)
    
        cursor_map = {}
        # 列举logstore的shards
        res = client.list_shards(project_name, logstore_name)
        res.log_print()
        shards = res.shards
    
        # 获取初始cursor
        for shard in shards:
            shard_id = shard.get('shardID')
            res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor)
            cursor_map[shard_id] = res.get_cursor()
    
        # 循环读取每个shard的数据
        while True:
            for shard in shards:
                shard_id = shard.get('shardID')
                res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count, query=query)
                res.log_print()
                if cursor_map[shard_id] == res.next_cursor: 
                    # only for debug 
                    time.sleep(3)
                else:
                    cursor_map[shard_id] = res.next_cursor
    
    
    if __name__ == '__main__':
        main()
  3. 运行main函数,查看输出结果。

    ListShardResponse:
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'}
    res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}]
    PullLogResponse
    next_cursor MTczNz********c3ODgyMjQ0MQ==
    log_count 0
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'}
    detail: []
    PullLogResponse
    next_cursor MTczNz********c3OTg5NzE3NA==
    log_count 0
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'}
    detail: []
    PullLogResponse
    ... ...
  • 本页导读 (1)
  • 前提条件​
  • 代码示例
文档反馈
phone 联系我们

立即和Alibaba Cloud在线服务人员进行交谈,获取您想了解的产品信息以及最新折扣。

alicare alicarealicarealicare