背景資訊
調用PullLogs介面可以擷取指定遊標(Cursor)位置的日誌資料。Log Service支援Java、Python、Go等語言的應用作為消費者或消費組消費Log Service的資料。
Log ServiceSPL支援在即時消費、掃描查詢和Logtail採集三個Log Service情境中使用,更多資訊,請參見SPL概述。
使用Java SDK消費
開始使用前,請確保已安裝Log ServiceJava SDK。具體操作,請參見安裝Java SDK。
SDK消費
本樣本中,調用PullLogs介面讀取日誌資料,完成普通消費的示範。
參數說明
參數名稱 | 類型 | 是否必選 | 說明 |
project | string | 是 | Log ServiceProject名稱,更多資訊,請參見管理Project。 |
logStore | string | 是 | Log ServiceLogstore名稱,Logstore是Log Service中日誌資料的採集、儲存和查詢單元。更多資訊,請參見管理Logstore。 |
shardId | int | 是 | 日誌庫的分區ID。 |
添加Maven依賴
在Java專案的根目錄下,開啟pom.xml
檔案,添加以下代碼:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.99</version>
</dependency>
建立PullLogsDemo.java
檔案
範例程式碼如下:
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PullLogsDemo {
// Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫
private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
// 本樣本從環境變數中擷取 AccessKey ID 和 AccessKey Secret。
private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Project 名稱
private static final String project = "your_project";
// LogStore 名稱
private static final String logStore = "your_logstore";
public static void main(String[] args) throws Exception {
// 建立Log Service Client
Client client = new Client(endpoint, accessKeyId, accessKeySecret);
// 查詢 LogStore 的 Shard
ListShardResponse resp = client.ListShard(project, logStore);
System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
Map<Integer, String> cursorMap = new HashMap<Integer, String>();
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
// 從頭開始消費,擷取遊標。(如果是從尾部開始消費,使用 Consts.CursorMode.END)
cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
}
try {
while (true) {
// 從每個Shard中擷取日誌
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
PullLogsResponse response = client.pullLogs(request);
// 日誌都在日誌組(LogGroup)中,按照邏輯拆分即可。
List<LogGroupData> logGroups = response.getLogGroups();
System.out.printf("Get %d logGroup from logStore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
// 完成處理拉取的日誌後,移動遊標。
cursorMap.put(shardId, response.getNextCursor());
}
}
} catch (LogException e) {
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
SDK基於SPL消費
本樣本中,調用PullLogs介面讀取日誌資料,完成使用Java SDK基於SPL消費日誌資料的示範。
參數說明
參數名稱 | 類型 | 是否必選 | 說明 |
project | string | 是 | Log ServiceProject名稱,更多資訊,請參見管理Project。 |
logStore | string | 是 | Log ServiceLogstore名稱,Logstore是Log Service中日誌資料的採集、儲存和查詢單元。更多資訊,請參見管理Logstore。 |
shardId | int | 是 | 日誌庫的分區ID。 |
添加Maven依賴
在Java專案的根目錄下,開啟pom.xml
檔案,添加以下代碼:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.99</version>
</dependency>
建立PullLogsWithSPLDemo.java
檔案
範例程式碼如下:
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PullLogsWithSPLDemo {
// Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫
private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
// 本樣本從環境變數中擷取 AccessKey ID 和 AccessKey Secret。
private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Project 名稱
private static final String project = "your_project";
// LogStore 名稱
private static final String logStore = "your_logstore";
public static void main(String[] args) throws Exception {
// 建立Log Service Client
Client client = new Client(endpoint, accessKeyId, accessKeySecret);
// 查詢 LogStore 的 Shard
ListShardResponse resp = client.ListShard(project, logStore);
System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
Map<Integer, String> cursorMap = new HashMap<Integer, String>();
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
// 從頭開始消費,擷取遊標。(如果是從尾部開始消費,使用 Consts.CursorMode.END)
cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
}
try {
while (true) {
// 從每個Shard中擷取日誌
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
request.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
request.setPullMode("scan_on_stream");
PullLogsResponse response = client.pullLogs(request);
// 日誌都在日誌組(LogGroup)中,按照邏輯拆分即可。
List<LogGroupData> logGroups = response.getLogGroups();
System.out.printf("Get %d logGroup from logStore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
// 完成處理拉取的日誌後,移動遊標。
cursorMap.put(shardId, response.getNextCursor());
}
}
} catch (LogException e) {
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}