當您使用第三方軟體、多語言應用、雲產品、流式計算架構等通過SDK即時消費Log Service的資料時,SDK消費無法滿足Log Service的實現細節及消費者之間的負載平衡、容錯移轉(Failover)等,您可以通過消費組(ConsumerGroup)消費日誌,消費組(ConsumerGroup)消費的即時性較強,通常為秒級。本文為您介紹通過消費組消費資料的操作步驟。
工作流程
一個Logstore中包含多個Shard,通過消費組消費資料就是將Shard分配給一個消費組下面的消費者,分配方式遵循以下原則。
在一個消費組中,一個Shard只會分配到一個消費者。
在一個消費組中,一個消費者可以被分配多個Shard。
新的消費者加入消費組後,這個消費組下面的Shard從屬關係會調整,以實現消費的負載平衡,但是仍遵循上述分配原則。
通過消費組消費,程式發生故障時,會預設儲存Checkpoint。在程式故障恢複時,能夠從斷點處繼續消費,從而保證資料不會被重複消費。
前提條件
已開通Log Service。更多資訊,請參見開通Log Service。
已建立RAM使用者並完成授權。具體操作,請參見建立RAM使用者並完成授權。
已配置環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。
重要阿里雲帳號的AccessKey擁有所有API的存取權限,建議您使用RAM使用者的AccessKey進行API訪問或日常營運。
強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
已安裝SDK開發環境。具體操作,請參見SDK參考概述。
基本概念
概念 | 說明 |
消費組 | Log Service支援通過消費組消費資料。一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個Logstore中的資料,各個消費者不會重複消費資料。 重要 每個Logstore中,最多建立30個消費組。 |
消費者 | 消費組的構成單元,實際承擔消費任務。 重要 同一個消費組中的消費者名稱必須不同。 |
Logstore | 資料擷取、儲存和查詢單元。更多資訊,請參見日誌庫(Logstore)。 |
Shard | 用於控制Logstore的讀寫能力,資料必定儲存在某一個Shard中。更多資訊,請參見分區(Shard)。 |
Checkpoint | 消費位點,是程式消費到的最新位置。程式重啟後,可以通過Checkpoint恢複消費進度。 |
步驟一:建立消費組
API建立消費組
API建立消費組,請參見CreateConsumerGroup - 建立消費組。
查詢消費組是否建立成功,請參見ListConsumerGroup - 查詢消費組。
SDK建立消費組
管理消費組的程式碼範例,請參見使用Java SDK管理消費組、使用Python SDK管理消費組。
CLI建立消費組
CLI建立消費組,請參見create_consumer_group。
查詢消費組是否建立成功,請參見list_consumer_group。
步驟二:消費資料
消費資料
您可以通過Java、C++、Python及Go SDK實現消費組消費資料。此處,以Java SDK為例。
消費原理
消費組SDK的消費者在初次開機時,當消費組不存在時會建立消費組。起始消費位點是指建立消費組時的資料起始消費位點,該消費位點僅在第一次建立時有效。後續重啟消費者時,消費者會從上次服務端儲存的消費位點處繼續消費。以本樣本為例:
LogHubConfig.ConsumePosition.BEGIN_CURSOR
:消費組從頭開始消費日誌,起始消費位點為Logstore中的第一條日誌。LogHubConfig.ConsumePosition.END_CURSOR
:此消費位點記錄Logstore日誌的最後一條日誌之後。
添加Maven依賴。
在Java專案的根目錄下,開啟pom.xml檔案,添加以下代碼:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.47</version> </dependency>
建立消費者邏輯代碼
SampleLogHubProcessor.java
。import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SampleLogHubProcessor implements ILogHubProcessor { private int shardId; // 記錄上次持久化Checkpoint的時間。 private long mLastSaveTime = 0; // initialize 方法會在 processor 對象初始化時被調用一次 public void initialize(int shardId) { this.shardId = shardId; } // 消費資料的主邏輯,消費時的所有異常都需要處理,不能直接拋出。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 列印已擷取的資料。 for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔30秒,寫一次Checkpoint到服務端。如果30秒內發生Worker異常終止,新啟動的Worker會從上一個Checkpoint擷取消費資料,可能存在少量的重複資料。 try { if (curTime - mLastSaveTime > 30 * 1000) { // 參數為true表示立即手動將Checkpoint更新到服務端。此外,預設每60秒會自動將記憶體中緩衝的Checkpoint更新到服務端。 checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // 參數為false表示將Checkpoint緩衝在本地,可被自動更新Checkpoint機制更新到服務端。 checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // 當Worker退出時,會調用該函數,您可以在此處執行清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // 將Checkpoint立即儲存到服務端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }
建立消費者實體
SampleLogHubProcessorFactory.java
。class SampleLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 產生一個消費執行個體。注意:每次調用 generatorProcessor 方法,都應該返回一個新的 SampleLogHubProcessor 對象。 return new SampleLogHubProcessor(); } }
建立Main.java檔案。建立一個消費者並啟動一個消費者線程,該消費者會從指定的Logstore中消費資料。
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // Log Service的服務存取點,請您根據實際情況填寫。 private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // Log Service專案名稱,請您根據實際情況填寫。請從已建立專案中擷取專案名稱。 private static String Project = "ali-cn-hangzhou-sls-admin"; // 日誌庫名稱,請您根據實際情況填寫。請從已建立日誌庫中擷取日誌庫名稱。 private static String Logstore = "sls_operation_log"; // 消費組名稱,請您根據實際情況填寫。您無需提前建立,該程式運行時會自動建立該消費組。 private static String ConsumerGroup = "consumerGroupX"; // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。。 private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1是消費者名稱,同一個消費組下面的消費者名稱必須不同。不同消費者在多台機器上啟動多個進程,均衡消費一個Logstore時,消費者名稱可以使用機器IP地址來區分。 // maxFetchLogGroupSize用於設定每次從服務端擷取的LogGroup最大數目,使用預設值即可。您可以使用config.setMaxFetchLogGroupSize(100);調整,取值範圍為(0,1000]。 LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000); ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Thread運行之後,ClientWorker會自動運行,ClientWorker擴充了Runnable介面。 thread.start(); Thread.sleep(60 * 60 * 1000); // 調用Worker的Shutdown函數,退出消費執行個體,關聯的線程也會自動停止。 worker.shutdown(); // ClientWorker運行過程中會產生多個非同步任務。Shutdown完成後,請等待還在執行的任務安全退出。建議設定sleep為30秒。 Thread.sleep(30 * 1000); } }
運行Main.java。
以類比消費Nginx日誌為例,列印日誌如下:
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
基於SPL消費資料
您可以通過Java、C++、Python及Go SDK實現消費組消費資料。此處,以Java SDK為例。
消費原理
消費組SDK的消費者在初次開機時,當消費組不存在時會建立消費組。起始消費位點是指建立消費組時的資料起始消費位點,該消費位點僅在第一次建立時有效。後續重啟消費者時,消費者會從上次服務端儲存的消費位點處繼續消費。以本樣本為例:
LogHubConfig.ConsumePosition.BEGIN_CURSOR
:消費組從頭開始消費日誌,起始消費位點為Logstore中的第一條日誌。LogHubConfig.ConsumePosition.END_CURSOR
:此消費位點記錄Logstore日誌的最後一條日誌之後。
添加Maven依賴。
在Java專案的根目錄下,開啟pom.xml檔案,添加以下代碼:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.99</version> </dependency>
建立SPLLogHubProcessor.java檔案。
import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SPLLogHubProcessor implements ILogHubProcessor { private int shardId; // 記錄上次持久化Checkpoint的時間。 private long mLastSaveTime = 0; // initialize 方法會在 processor 對象初始化時被調用一次 public void initialize(int shardId) { this.shardId = shardId; } // 消費資料的主邏輯,消費時的所有異常都需要處理,不能直接拋出。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 列印已擷取的資料。 for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔30秒,寫一次Checkpoint到服務端。如果30秒內發生Worker異常終止,新啟動的Worker會從上一個Checkpoint擷取消費資料,可能存在少量的重複資料。 try { if (curTime - mLastSaveTime > 30 * 1000) { // 參數為true表示立即手動將Checkpoint更新到服務端。此外,預設每60秒會自動將記憶體中緩衝的Checkpoint更新到服務端。 checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // 參數為false表示將Checkpoint緩衝在本地,可被自動更新Checkpoint機制更新到服務端。 checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // 當Worker退出時,會調用該函數,您可以在此處執行清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // 將Checkpoint立即儲存到服務端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }
建立 SPLLogHubProcessorFactory.java 檔案。
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; class SPLLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 產生一個消費執行個體。注意:每次調用 generatorProcessor 方法,都應該返回一個新的 SPLLogHubProcessor 對象。 return new SPLLogHubProcessor(); } }
建立Main.java檔案。建立一個消費組並啟動一個消費者線程,該消費者會從指定的Logstore中消費資料。管理消費組的程式碼範例,請參見使用Java SDK管理消費組、使用Python SDK管理消費組。
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class SPLConsumer { // Log Service的服務存取點,請您根據實際情況填寫。 private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // Log Service專案名稱,請您根據實際情況填寫。請從已建立專案中擷取專案名稱。 private static String Project = "your_project"; // 日誌庫名稱,請您根據實際情況填寫。請從已建立日誌庫中擷取日誌庫名稱。 private static String Logstore = "your_logstore"; // 消費組名稱,請您根據實際情況填寫。您無需提前建立,該程式運行時會自動建立該消費組。 private static String ConsumerGroup = "consumerGroupX"; // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。。 private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1是消費者名稱,同一個消費組下面的消費者名稱必須不同。不同消費者在多台機器上啟動多個進程,均衡消費一個Logstore時,消費者名稱可以使用機器IP地址來區分。 // maxFetchLogGroupSize用於設定每次從服務端擷取的LogGroup最大數目,使用預設值即可。您可以使用config.setMaxFetchLogGroupSize(100);調整,取值範圍為(0,1000]。 LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000); // setQuery可以設定消費過程中的SLS SPL語句 config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000"); ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Thread運行之後,ClientWorker會自動運行,ClientWorker擴充了Runnable介面。 thread.start(); Thread.sleep(60 * 60 * 1000); // 調用Worker的Shutdown函數,退出消費執行個體,關聯的線程也會自動停止。 worker.shutdown(); // ClientWorker運行過程中會產生多個非同步任務。Shutdown完成後,請等待還在執行的任務安全退出。建議設定sleep為30秒。 Thread.sleep(30 * 1000); } }
運行Main.java。
以類比消費Nginx日誌為例,列印日誌如下:
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
步驟三:查看消費組狀態
控制台方式
在Project列表地區,單擊目標Project。
在
頁簽中,單擊目標Logstore左側的表徵圖,然後單擊資料消費左側的表徵圖。在消費組列表中,單擊目標消費組。
在Consumer Group狀態頁面,查看每個Shard消費資料的進度。
SDK方式
此處以Java SDK為例。運行ConsumerGroupTest.java,查看每個Shard消費資料的進度。
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "";
static String project = "";
static String logstore = "";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// 擷取Logstore下的所有消費組。如果消費組不存在,則長度為0。
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// 列印消費組的屬性,包括名稱、心跳逾時時間、是否按序消費。
System.out.println("名稱: " + c.getConsumerGroupName());
System.out.println("心跳逾時時間: " + c.getTimeout());
System.out.println("按序消費: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// 該時間精確到微秒,類型為長整型。
System.out.println("最後一次更新消費進度的時間: " + cp.getUpdateTime());
System.out.println("消費者名稱: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "尚未開始消費";
else{
// Unix時間戳記,單位是秒,輸出時請注意格式化。
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "非法,前一次消費時刻已經超出了Logstore中資料的生命週期";
else{
// internal server error
throw e;
}
}
}
System.out.println("消費進度: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
// do nothing
}
//Unix時間戳記,單位:秒。輸出時,請注意格式化。
System.out.println("最後一條資料到達時刻: " + endPrg);
}
}
}
}
返回以下結果:
名稱: etl-6cac01c571d5a4b933649c04a7ba215b
心跳逾時時間: 60
按序消費: false
shard: 0
最後一次更新消費進度的時間: 1639555453575211
消費者名稱: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
消費進度: 1639555453
最後一條資料到達時刻: 1639555453
shard: 1
最後一次更新消費進度的時間: 1639555392071328
消費者名稱: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
消費進度: 1639555391
最後一條資料到達時刻: 1639555391
名稱: etl-2bd3fdfdd63595d56b1ac24393bf5991
心跳逾時時間: 60
按序消費: false
shard: 0
最後一次更新消費進度的時間: 1639555453256773
消費者名稱: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
消費進度: 1639555453
最後一條資料到達時刻: 1639555453
shard: 1
最後一次更新消費進度的時間: 1639555392066234
消費者名稱: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
消費進度: 1639555391
最後一條資料到達時刻: 1639555391
名稱: consumerGroupX
心跳逾時時間: 60
按序消費: false
shard: 0
最後一次更新消費進度的時間: 1639555434142879
消費者名稱: consumer_1
消費進度: 1635615029
最後一條資料到達時刻: 1639555453
shard: 1
最後一次更新消費進度的時間: 1639555437976929
消費者名稱: consumer_1
消費進度: 1635616802
最後一條資料到達時刻: 1639555391
RAM使用者授權
使用RAM使用者操作時,需授予RAM使用者操作消費組的相關許可權。具體操作,請參見建立RAM使用者及授權。
授權的Action如下表所示。
動作(Action) | 說明 | 授權策略中的資源描述方式(Resource) |
log:GetCursorOrData(GetCursor - 通過時間查詢Cursor) | 根據時間擷取遊標(cursor)。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup(CreateConsumerGroup - 建立消費組) | 在指定的Logstore上建立一個消費組。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ListConsumerGroup(ListConsumerGroup - 查詢消費組) | 查詢指定Logstore的所有消費組。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint(ConsumerGroupUpdateCheckPoint - 更新消費進度) | 更新指定消費組的某個Shard的Checkpoint。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat - 消費者發送心跳到服務端) | 為指定消費者發送心跳到服務端。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:UpdateConsumerGroup(UpdateConsumerGroup - 更新消費者組) | 修改指定消費組屬性。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint(GetCheckPoint - 擷取指定消費組的消費點) | 擷取指定消費組消費的某個或者所有Shard的Checkpoint。 | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
例如,消費組的相關資源資訊如下所示,您要通過RAM使用者操作該消費組,則需為RAM使用者授予以下許可權。
Project所屬的阿里雲帳號:174649****602745。
Project所在地區ID:cn-hangzhou。
Project名稱:project-test。
Logstore名稱:logstore-test。
消費組名稱:consumergroup-test。
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
相關操作
異常診斷
建議您為消費者程式配置Log4j,將消費組內部遇到的異常資訊列印出來,便於定位。log4j.properties典型配置:
log4j.rootLogger = info,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
配置Log4j後,執行消費者程式可以看到類似如下異常資訊:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159) com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
通過消費組消費從某個時間開始的資料
// consumerStartTimeInSeconds表示消費這個時間點之後的資料。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, int consumerStartTimeInSeconds); // position是個枚舉變數,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示從最老的資料開始消費,LogHubConfig.ConsumePosition.END_CURSOR表示從最新的資料開始消費。 public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, ConsumePosition position);
說明按照消費需求,請您使用不同的構造方法。
當服務端已儲存Checkpoint,則開始消費位置以服務端儲存的Checkpoint為準。
Log Service消費資料時,預設優先使用Checkpoint作為消費點。當您指定從固定時間點開始消費資料時,必須保證consumerStartTimeInSeconds時間點落到TTL周期內,否則會造成消費不生效。
重設Checkpoint
public static void updateCheckpoint() throws Exception { Client client = new Client(host, accessId, accessKey); // 這裡 timestamp 需要是以秒為單位的 unix timestamp,如果您的時間戳記以毫秒為單位,需要如下所示除以1000 long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000; ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore)); for (Shard shard : response.GetShards()) { int shardId = shard.GetShardId(); String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor(); client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor); } }
相關文檔
API
操作
API介面
建立消費組
查詢消費組
刪除消費組
更新消費組
發送消費者心跳
查詢消費組的Checkpoint
更新消費組的Checkpoint
SDK
語言
文檔連結
Java
Python
CLI
操作
命令列介面
建立消費組
查詢消費組
更新消費組
刪除消費組
查詢消費組的Checkpoint
更新消費組的Checkpoint