通過消費組(ConsumerGroup)消費日誌資料有顯著優點,您無需關注Log Service的實現細節和消費者之間的負載平衡、Failover等,只需關注商務邏輯。本文通過程式碼範例介紹如何建立、修改、查詢、刪除消費組等。
前提條件
已開通Log Service。更多資訊,請參見開通Log Service。
已建立RAM使用者並完成授權。具體操作,請參見建立RAM使用者並完成授權。
已配置環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。
重要阿里雲帳號的AccessKey擁有所有API的存取權限,建議您使用RAM使用者的AccessKey進行API訪問或日常營運。
強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
已安裝Log ServiceJava SDK。具體操作,請參見安裝Java SDK。
已建立Project、標準型Logstore並完成日誌採集。具體操作,請參見建立專案Project、建立Logstore和資料擷取概述。
查詢日誌前,已配置索引。具體操作,請參見建立索引。
注意事項
本樣本以華東1(杭州)的公網Endpoint為例,其公網Endpoint為https://cn-hangzhou.log.aliyuncs.com
。如果您通過與Project同地區的其他阿里雲產品訪問Log Service,請使用內網Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com
。關於Log Service支援的地區與Endpoint的對應關係,請參見服務入口。
建立消費組範例程式碼
以下代碼用於建立名為ali-test-consumergroup的消費組。
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
public static void main(String[] args) throws LogException {
// 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 輸入Project名稱。
String projectName = "ali-test-project";
// 輸入Logstore名稱。
String logstoreName = "ali-test-logstore";
// 設定Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
String host = "https://cn-hangzhou.log.aliyuncs.com";
// 建立Log ServiceClient。
Client client = new Client(host, accessId, accessKey);
try {
// 設定消費組名稱。
String consumerGroupName = "ali-test-consumergroup2";
System.out.println("ready to create consumergroup");
ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
System.out.println(String.format("create consumergroup %s success", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
預期結果如下:
ready to create consumergroup
create consumergroup ali-test-consumergroup success
修改消費組範例程式碼
以下代碼用於修改名為ali-test-consumergroup的消費組資訊。
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;
public class UpdateConsumerGroup {
public static void main(String[] args) throws LogException {
// 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 輸入Project名稱。
String projectName = "ali-test-project";
// 輸入Logstore名稱。
String logstoreName = "ali-test-logstore";
// 設定Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
String host = "https://cn-hangzhou.log.aliyuncs.com";
// 建立Log ServiceClient。
Client client = new Client(host, accessId, accessKey);
try {
String consumerGroupName = "ali-test-consumergroup";
System.out.println("ready to update consumergroup");
// 修改消費組逾時時間為350秒。
client.UpdateConsumerGroup(projectName, logstoreName, consumerGroupName, false, 350);
System.out.println(String.format("update consumergroup %s success", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
預期結果如下:
ready to update consumergroup
update consumergroup ali-test-consumergroup success
查詢所有消費組範例程式碼
以下代碼用於查詢指定Logstore的所有消費組。
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;
public class ListConsumerGroup {
public static void main(String[] args) throws LogException {
// 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 輸入Project名稱。
String projectName = "ali-test-project";
// 輸入Logstore名稱。
String logstoreName = "ali-test-logstore";
// 設定Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
String host = "https://cn-hangzhou.log.aliyuncs.com";
// 建立Log ServiceClient。
Client client = new Client(host, accessId, accessKey);
try {
System.out.println("ready to list consumergroup");
// 查詢指定Logstore的所有消費組。
ListConsumerGroupResponse response = client.ListConsumerGroup(projectName,logstoreName);
for(ConsumerGroup consumerGroup : response.GetConsumerGroups()){
System.out.println("ConsumerName is : " + consumerGroup.getConsumerGroupName());
}
System.out.println(String.format("list consumergroup from %s success",projectName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
預期結果如下:
ready to list consumergroup
ConsumerName is : ali-test-consumergroup2
ConsumerName is : ali-test-consumergroup
list consumergroup from ali-test-project success
刪除消費組範例程式碼
以下代碼用於刪除目標Project下的消費組。
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;
public class DeleteConsumerGroup {
public static void main(String[] args) throws LogException {
// 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 輸入Project名稱。
String projectName = "ali-test-project";
// 輸入Logstore名稱。
String logstoreName = "ali-test-logstore";
// 設定Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
String host = "https://cn-hangzhou.log.aliyuncs.com";
// 建立Log ServiceClient。
Client client = new Client(host, accessId, accessKey);
try {
String consumerGroupName = "ali-test-consumergroup";
System.out.println("ready to delete consumergroup");
// 刪除一個指定的消費組。
client.DeleteConsumerGroup(projectName,logstoreName,consumerGroupName);
System.out.println(String.format("delete consumergroup %s success",consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
預期結果如下:
ready to delete consumergroup
delete consumergroup ali-test-consumergroup success
擷取消費組Checkpoint範例程式碼
以下代碼用於擷取指定消費組的Checkpoint。
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.GetCheckPointResponse;
public class GetCheckPoint {
public static void main(String[] args) throws LogException {
// 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 輸入Project名稱。
String projectName = "ali-test-project";
// 輸入Logstore名稱。
String logstoreName = "ali-test-logstore";
// 設定Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
String host = "https://cn-hangzhou.log.aliyuncs.com";
// 建立Log ServiceClient。
Client client = new Client(host, accessId, accessKey);
try {
String consumerGroupName = "consumerGroupX";
System.out.println("ready to get consumergroup checkpoint");
// 擷取指定消費組中Shard的CheckPoint。
GetCheckPointResponse response1 = client.getCheckpoint(projectName,logstoreName,consumerGroupName,0);
GetCheckPointResponse response2 = client.getCheckpoint(projectName,logstoreName,consumerGroupName,1);
System.out.println("The checkpoint of Shard 0 is : " + response1.getCheckpoint());
System.out.println("The checkpoint of Shard 1 is : " + response2.getCheckpoint());
System.out.println(String.format("get consumergroup %s checkpoint success",consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
預期結果如下:
ready to get consumergroup checkpoint
The checkpoint of Shard 0 is : ConsumerGroupShardCheckPoint [shard=0, checkPoint=MTY2NzgxMDc0Nzk5MDk5MzAyMg==, updateTime=1668750821709044, consumer=consumer_1]
The checkpoint of Shard 1 is : ConsumerGroupShardCheckPoint [shard=1, checkPoint=MTY2NzgxMDc0Nzk5MTk0NTU0NQ==, updateTime=1668750828790425, consumer=consumer_1]
get consumergroup consumerGroupX checkpoint success
更新指定消費組Checkpoint範例程式碼
以下代碼用於更新指定消費組的Checkpoint。
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;
public class ConsumerGroupUpdateCheckpoint {
public static void main(String[] args) throws LogException {
// 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 輸入Project名稱。
String projectName = "ali-test-project";
// 輸入Logstore名稱。
String logstoreName = "ali-test-logstore";
// 設定Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
String host = "https://cn-hangzhou.log.aliyuncs.com";
// 建立Log ServiceClient。
Client client = new Client(host, accessId, accessKey);
try {
String consumerGroupName = "consumerGroupX";
System.out.println("ready to update checkpoint");
// 查詢指定Logstore的所有消費組。
ListConsumerGroupResponse response = client.ListConsumerGroup(projectName, logstoreName);
for(ConsumerGroup consumerGroup : response.GetConsumerGroups()){
System.out.println("ConsumerName is : " + consumerGroup.getConsumerGroupName());
System.out.println("Consumer order is : " + consumerGroup.isInOrder());
}
// 更新Shard 0的Checkpoint。您可以通過GetCursor介面擷取對應時間的Cursor。
client.UpdateCheckPoint(projectName, logstoreName, consumerGroupName, 0, "MTY2NzgxMDc0Nzk5MTAwNjQ3Mg==");
System.out.println(String.format("update checkpoint of %s success", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
預期結果如下:
ready to update checkpoint
ConsumerName is : consumerGroupX
Consumer order is : false
ConsumerName is : ali-test-consumergroup2
Consumer order is : true
ConsumerName is : ali-test-consumergroup
Consumer order is : false
update consumergroup checkpoint is:consumerGroupX
update checkpoint of consumerGroupX success
相關文檔
在調用API介面過程中,若服務端返回結果中包含錯誤資訊,則表示調用API介面失敗。您可以參考API錯誤碼對照表尋找對應的解決方案。更多資訊,請參見API錯誤處理對照表。
阿里雲OpenAPI開發人員門戶提供調試、SDK、樣本和配套文檔。通過OpenAPI,您無需手動封裝請求和簽名操作,就可以快速對Log ServiceAPI進行調試。更多資訊,請參見OpenAPI開發人員門戶。
為滿足越來越多的自動化Log Service配置需求,Log Service提供命令列工具CLI(Command Line Interface)。更多資訊,請參見Log Service命令列工具CLI。
關於消費組API介面說明,請參見如下:
更多範例程式碼,請參見Aliyun Log Java SDK on GitHub。