背景信息
调用PullLogs接口可以获取指定游标(Cursor)位置的日志数据。日志服务支持Java、Python、Go等语言的应用作为消费者或消费组消费日志服务的数据。
日志服务SPL支持在实时消费、扫描查询和Logtail采集三个日志服务场景中使用,更多信息,请参见SPL概述。
使用Java SDK消费
开始使用前,请确保已安装日志服务Java SDK。具体操作,请参见安装Java SDK。
SDK消费
本示例中,调用PullLogs接口读取日志数据,完成普通消费的演示。
参数说明
参数名称 | 类型 | 是否必选 | 说明 |
project | string | 是 | 日志服务Project名称,更多信息,请参见管理Project。 |
logStore | string | 是 | 日志服务Logstore名称,Logstore是日志服务中日志数据的采集、存储和查询单元。更多信息,请参见管理Logstore。 |
shardId | int | 是 | 日志库的分区ID。 |
添加Maven依赖
在Java项目的根目录下,打开pom.xml
文件,添加以下代码:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.99</version>
</dependency>
创建PullLogsDemo.java
文件
示例代码如下:
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PullLogsDemo {
// 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写
private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
// 本示例从环境变量中获取 AccessKey ID 和 AccessKey Secret。
private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Project 名称
private static final String project = "your_project";
// LogStore 名称
private static final String logStore = "your_logstore";
public static void main(String[] args) throws Exception {
// 创建日志服务 Client
Client client = new Client(endpoint, accessKeyId, accessKeySecret);
// 查询 LogStore 的 Shard
ListShardResponse resp = client.ListShard(project, logStore);
System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
Map<Integer, String> cursorMap = new HashMap<Integer, String>();
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
// 从头开始消费,获取游标。(如果是从尾部开始消费,使用 Consts.CursorMode.END)
cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
}
try {
while (true) {
// 从每个Shard中获取日志
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
PullLogsResponse response = client.pullLogs(request);
// 日志都在日志组(LogGroup)中,按照逻辑拆分即可。
List<LogGroupData> logGroups = response.getLogGroups();
System.out.printf("Get %d logGroup from logStore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
// 完成处理拉取的日志后,移动游标。
cursorMap.put(shardId, response.getNextCursor());
}
}
} catch (LogException e) {
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
SDK基于SPL消费
本示例中,调用PullLogs接口读取日志数据,完成使用Java SDK基于SPL消费日志数据的演示。
参数说明
参数名称 | 类型 | 是否必选 | 说明 |
project | string | 是 | 日志服务Project名称,更多信息,请参见管理Project。 |
logStore | string | 是 | 日志服务Logstore名称,Logstore是日志服务中日志数据的采集、存储和查询单元。更多信息,请参见管理Logstore。 |
shardId | int | 是 | 日志库的分区ID。 |
添加Maven依赖
在Java项目的根目录下,打开pom.xml
文件,添加以下代码:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.99</version>
</dependency>
创建PullLogsWithSPLDemo.java
文件
示例代码如下:
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PullLogsWithSPLDemo {
// 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写
private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
// 本示例从环境变量中获取 AccessKey ID 和 AccessKey Secret。
private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Project 名称
private static final String project = "your_project";
// LogStore 名称
private static final String logStore = "your_logstore";
public static void main(String[] args) throws Exception {
// 创建日志服务 Client
Client client = new Client(endpoint, accessKeyId, accessKeySecret);
// 查询 LogStore 的 Shard
ListShardResponse resp = client.ListShard(project, logStore);
System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
Map<Integer, String> cursorMap = new HashMap<Integer, String>();
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
// 从头开始消费,获取游标。(如果是从尾部开始消费,使用 Consts.CursorMode.END)
cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
}
try {
while (true) {
// 从每个Shard中获取日志
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
request.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
request.setPullMode("scan_on_stream");
PullLogsResponse response = client.pullLogs(request);
// 日志都在日志组(LogGroup)中,按照逻辑拆分即可。
List<LogGroupData> logGroups = response.getLogGroups();
System.out.printf("Get %d logGroup from logStore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
// 完成处理拉取的日志后,移动游标。
cursorMap.put(shardId, response.getNextCursor());
}
}
} catch (LogException e) {
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}