全部產品
Search
文件中心

Simple Log Service:通過消費組消費日誌

更新時間:Nov 06, 2024

當您使用第三方軟體、多語言應用、雲產品、流式計算架構等通過SDK即時消費Log Service的資料時,SDK消費無法滿足Log Service的實現細節及消費者之間的負載平衡、容錯移轉(Failover)等,您可以通過消費組(ConsumerGroup)消費日誌,消費組(ConsumerGroup)消費的即時性較強,通常為秒級。本文為您介紹通過消費組消費資料的操作步驟。

方案概覽

一個Logstore中包含多個Shard,通過消費組消費資料就是將Shard分配給一個消費組中的消費者,分配方式遵循以下原則。

  • 在一個消費組中,一個Shard只會分配到一個消費者。

  • 在一個消費組中,一個消費者可以被分配多個Shard。

新的消費者加入消費組後,這個消費組下面的Shard從屬關係會調整,以實現消費的負載平衡,但是仍遵循上述分配原則。

基本概念

名詞

說明

消費組

Log Service支援通過消費組消費資料。一個消費組由多個消費者構成,同一個消費組中的消費者共同消費一個Logstore中的資料,各個消費者不會重複消費資料。

重要

每個Logstore中,最多建立30個消費組。

消費者

消費組的構成單元,實際承擔消費任務。

重要

同一個消費組中的消費者名稱必須不同。

Logstore

資料擷取、儲存和查詢單元。更多資訊,請參見日誌庫(Logstore)

Shard

用於控制Logstore的讀寫能力,資料必定儲存在某一個Shard中。更多資訊,請參見分區(Shard)

Checkpoint

消費位點,是程式消費到的最新位置。程式重啟後,可以通過Checkpoint恢複消費進度。

說明

通過消費組消費,程式發生故障時,會預設儲存Checkpoint。在程式故障恢複時,能夠從斷點處繼續消費,從而保證資料不會被重複消費。

前提條件

步驟一:建立消費組

下面分別介紹通過SDK、API和CLI方式建立消費組。

通過SDK建立消費組

建立消費組代碼如下所示:

CreateConsumerGroup.java

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;

public class CreateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // 輸入Project名稱。
        String projectName = "ali-test-project";
        // 輸入Logstore名稱。
        String logstoreName = "ali-test-logstore";
        // 設定Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // 建立Log ServiceClient。
        Client client = new Client(host, accessId, accessKey);

        try {
            // 設定消費組名稱。
            String consumerGroupName = "ali-test-consumergroup2";
            System.out.println("ready to create consumergroup");

            ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);

            client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);

            System.out.println(String.format("create consumergroup %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

管理消費組的程式碼範例,請參見使用Java SDK管理消費組使用Python SDK管理消費組

通過API建立消費組

API建立消費組,請參見CreateConsumerGroup - 建立消費組

查詢消費組是否建立成功,請參見ListConsumerGroup - 查詢消費組

通過CLI建立消費組

CLI建立消費組,請參見create_consumer_group

查詢消費組是否建立成功,請參見list_consumer_group

步驟二:消費日誌

消費原理

消費組SDK的消費者在初次開機時,當消費組不存在時會建立消費組。起始消費位點是指建立消費組時的資料起始消費位點,該消費位點僅在第一次建立時有效。後續重啟消費者時,消費者會從上次服務端儲存的消費位點處繼續消費。以本樣本為例:

  • LogHubConfig.ConsumePosition.BEGIN_CURSOR:消費組從頭開始消費日誌,起始消費位點為Logstore中的第一條日誌。

  • LogHubConfig.ConsumePosition.END_CURSOR:此消費位點記錄Logstore日誌的最後一條日誌之後。

消費樣本

您可以通過Java、C++、Python及Go SDK實現消費組消費資料,此處以Java SDK為例。

樣本一:SDK消費

  1. 添加Maven依賴。

    pom.xml檔案中,添加以下代碼:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.47</version>
    </dependency>
  2. 建立消費邏輯,代碼如下所示:

    SampleLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 記錄上次持久化Checkpoint的時間。
        private long mLastSaveTime = 0;
    
        // initialize 方法會在 processor 對象初始化時被調用一次
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // 消費資料的主邏輯,消費時的所有異常都需要處理,不能直接拋出。
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // 列印已擷取的資料。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // 每隔30秒,寫一次Checkpoint到服務端。如果30秒內發生Worker異常終止,新啟動的Worker會從上一個Checkpoint擷取消費資料,可能存在少量的重複資料。
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // 參數為true表示立即手動將Checkpoint更新到服務端。此外,預設每60秒會自動將記憶體中緩衝的Checkpoint更新到服務端。
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // 參數為false表示將Checkpoint緩衝在本地,可被自動更新Checkpoint機制更新到服務端。
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // 當Worker退出時,會調用該函數,您可以在此處執行清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // 將Checkpoint立即儲存到服務端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }

    更多範例程式碼,請參見aliyun-log-consumer-javaAliyun LOG Go Consumer

  3. 建立消費者實體,代碼如下所示:

    SampleLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 產生一個消費執行個體。注意:每次調用 generatorProcessor 方法,都應該返回一個新的 SampleLogHubProcessor 對象。
            return new SampleLogHubProcessor();
        }
    }
  4. 建立一個消費者並啟動一個消費者線程,該消費者會從指定的Logstore中消費資料。代碼如下所示:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // Log Service的服務存取點,請您根據實際情況填寫。
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // Log Service專案名稱,請您根據實際情況填寫。請從已建立專案中擷取專案名稱。
        private static String Project = "ali-test-project";
        // 日誌庫名稱,請您根據實際情況填寫。請從已建立日誌庫中擷取日誌庫名稱。
        private static String Logstore = "ali-test-logstore";
        // 消費組名稱,請您根據實際情況填寫。您無需提前建立,該程式運行時會自動建立該消費組。
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。。
        private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1是消費者名稱,同一個消費組下面的消費者名稱必須不同。不同消費者在多台機器上啟動多個進程,均衡消費一個Logstore時,消費者名稱可以使用機器IP地址來區分。
            // maxFetchLogGroupSize用於設定每次從服務端擷取的LogGroup最大數目,使用預設值即可。您可以使用config.setMaxFetchLogGroupSize(100);調整,取值範圍為(0,1000]。
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread運行之後,ClientWorker會自動運行,ClientWorker擴充了Runnable介面。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // 調用Worker的Shutdown函數,退出消費執行個體,關聯的線程也會自動停止。
            worker.shutdown();
            // ClientWorker運行過程中會產生多個非同步任務。Shutdown完成後,請等待還在執行的任務安全退出。建議設定sleep為30秒。
            Thread.sleep(30 * 1000);
        }
    }
  5. 運行Main.java

    以類比消費Nginx日誌為例,列印日誌如下:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

樣本二:SDK基於SPL消費

  1. 添加Maven依賴。

    pom.xml檔案中,添加以下代碼:

        <dependency>
          <groupId>com.google.protobuf</groupId>
          <artifactId>protobuf-java</artifactId>
          <version>2.5.0</version>
        </dependency>
        <dependency>
          <groupId>com.aliyun.openservices</groupId>
          <artifactId>aliyun-log</artifactId>
          <version>0.6.99</version>
        </dependency>
  2. 建立消費邏輯,代碼如下所示:

    SPLLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 記錄上次持久化Checkpoint的時間。
        private long mLastSaveTime = 0;
    
        // initialize 方法會在 processor 對象初始化時被調用一次
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // 消費資料的主邏輯,消費時的所有異常都需要處理,不能直接拋出。
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // 列印已擷取的資料。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // 每隔30秒,寫一次Checkpoint到服務端。如果30秒內發生Worker異常終止,新啟動的Worker會從上一個Checkpoint擷取消費資料,可能存在少量的重複資料。
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // 參數為true表示立即手動將Checkpoint更新到服務端。此外,預設每60秒會自動將記憶體中緩衝的Checkpoint更新到服務端。
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // 參數為false表示將Checkpoint緩衝在本地,可被自動更新Checkpoint機制更新到服務端。
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // 當Worker退出時,會調用該函數,您可以在此處執行清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // 將Checkpoint立即儲存到服務端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. 建立消費者實體,代碼如下所示:

    SPLLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 產生一個消費執行個體。注意:每次調用 generatorProcessor 方法,都應該返回一個新的 SPLLogHubProcessor 對象。
            return new SPLLogHubProcessor();
        }
    }
  4. 建立一個消費者並啟動一個消費者線程,該消費者會從指定的Logstore中消費資料。代碼如下所示:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // Log Service的服務存取點,請您根據實際情況填寫。
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // Log Service專案名稱,請您根據實際情況填寫。請從已建立專案中擷取專案名稱。
        private static String Project = "ali-test-project";
        // 日誌庫名稱,請您根據實際情況填寫。請從已建立日誌庫中擷取日誌庫名稱。
        private static String Logstore = "ali-test-logstore";
        // 消費組名稱,請您根據實際情況填寫。您無需提前建立,該程式運行時會自動建立該消費組。
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。。
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1是消費者名稱,同一個消費組下面的消費者名稱必須不同。不同消費者在多台機器上啟動多個進程,均衡消費一個Logstore時,消費者名稱可以使用機器IP地址來區分。
            // maxFetchLogGroupSize用於設定每次從服務端擷取的LogGroup最大數目,使用預設值即可。您可以使用config.setMaxFetchLogGroupSize(100);調整,取值範圍為(0,1000]。
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // setQuery可以設定消費過程中的SLS SPL語句
            config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread運行之後,ClientWorker會自動運行,ClientWorker擴充了Runnable介面。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // 調用Worker的Shutdown函數,退出消費執行個體,關聯的線程也會自動停止。
            worker.shutdown();
            // ClientWorker運行過程中會產生多個非同步任務。Shutdown完成後,請等待還在執行的任務安全退出。建議設定sleep為30秒。
            Thread.sleep(30 * 1000);
        }
    }
  5. 運行Main.java

    以類比消費Nginx日誌為例,列印日誌如下:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

步驟三:查看消費組狀態

下面介紹兩種查看消費組狀態方式:

通過Java SDK查看消費組狀態

  1. 查看每個Shard消費資料的進度。代碼如下所示:

    ConsumerGroupTest.java

    import java.util.List;
    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.Consts.CursorMode;
    import com.aliyun.openservices.log.common.ConsumerGroup;
    import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
    import com.aliyun.openservices.log.exception.LogException;
    public class ConsumerGroupTest {
        static String endpoint = "cn-hangzhou.log.aliyuncs.com";
        static String project = "ali-test-project";
        static String logstore = "ali-test-logstore";
        static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogException {
            Client client = new Client(endpoint, accesskeyId, accesskey);
            // 擷取Logstore下的所有消費組。如果消費組不存在,則長度為0。
            List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
            for(ConsumerGroup c: consumerGroups){
                // 列印消費組的屬性,包括名稱、心跳逾時時間、是否按序消費。
                System.out.println("名稱: " + c.getConsumerGroupName());
                System.out.println("心跳逾時時間: " + c.getTimeout());
                System.out.println("按序消費: " + c.isInOrder());
                for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                    System.out.println("shard: " + cp.getShard());
                    // 該時間精確到微秒,類型為長整型。
                    System.out.println("最後一次更新消費進度的時間: " + cp.getUpdateTime());
                    System.out.println("消費者名稱: " + cp.getConsumer());
                    String consumerPrg = "";
                    if(cp.getCheckPoint().isEmpty())
                        consumerPrg = "尚未開始消費";
                    else{
                        // Unix時間戳記,單位是秒,輸出時請注意格式化。
                        try{
                            int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                            consumerPrg = "" + prg;
                        }
                        catch(LogException e){
                            if(e.GetErrorCode() == "InvalidCursor")
                                consumerPrg = "非法,前一次消費時刻已經超出了Logstore中資料的生命週期";
                            else{
                                // internal server error
                                throw e;
                            }
                        }
                    }
                    System.out.println("消費進度: " + consumerPrg);
                    String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                    int endPrg = 0;
                    try{
                        endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                    }
                    catch(LogException e){
                        // do nothing
                    }
                    //Unix時間戳記,單位:秒。輸出時,請注意格式化。
                    System.out.println("最後一條資料到達時刻: " + endPrg);
                }
            }
        }
    }
  2. 返回結果如下所示:

    名稱: ali-test-consumergroup2
    心跳逾時時間: 60
    按序消費: false
    shard: 0
    最後一次更新消費進度的時間: 0
    消費者名稱: consumer_1
    消費進度: 尚未開始消費
    最後一條資料到達時刻: 1729583617
    shard: 1
    最後一次更新消費進度的時間: 0
    消費者名稱: consumer_1
    消費進度: 尚未開始消費
    最後一條資料到達時刻: 1729583738
    
    Process finished with exit code 0

在控制台查看消費組狀態

  1. 登入Log Service控制台

  2. 在Project列表地區,單擊目標Project。

    image

  3. 日誌儲存 > 日誌庫頁簽中,單擊目標Logstore左側的展開節點表徵圖,然後單擊資料消費左側的展開節點表徵圖。

  4. 在消費組列表中,單擊目標消費組。

  5. Consumer Group狀態頁面,查看每個Shard消費資料的進度。image

相關操作

  • RAM使用者授權

    使用RAM使用者操作時,需授予RAM使用者操作消費組的相關許可權。具體操作,請參見建立RAM使用者及授權

    授權的動作(Action)如下表所示。

    動作(Action)

    說明

    授權策略中的資源描述方式(Resource)

    log:GetCursorOrData(GetCursor - 通過時間查詢Cursor

    根據時間擷取遊標(cursor)。

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

    log:CreateConsumerGroup(CreateConsumerGroup - 建立消費組

    在指定的Logstore上建立一個消費組。

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    log:ListConsumerGroup(ListConsumerGroup - 查詢消費組

    查詢指定Logstore的所有消費組。

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*

    log:ConsumerGroupUpdateCheckPoint(ConsumerGroupUpdateCheckPoint - 更新消費進度

    更新指定消費組的某個Shard的Checkpoint。

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat - 消費者發送心跳到服務端

    為指定消費者發送心跳到服務端。

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    log:UpdateConsumerGroup(UpdateConsumerGroup - 更新消費者組

    修改指定消費組屬性。

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    log:GetConsumerGroupCheckPoint(GetCheckPoint - 擷取指定消費組的消費點

    擷取指定消費組消費的某個或者所有Shard的Checkpoint。

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    例如,消費組的相關資源資訊如下所示,您要通過RAM使用者操作該消費組,則需為RAM使用者授予以下許可權。

    • Project所屬的阿里雲帳號:174649****602745。

    • Project所在地區ID:cn-hangzhou。

    • Project名稱:project-test。

    • Logstore名稱:logstore-test。

    • 消費組名稱:consumergroup-test。

    授權碼如下所示:

    {
      "Version": "1",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "log:GetCursorOrData"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:CreateConsumerGroup",
            "log:ListConsumerGroup"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:ConsumerGroupUpdateCheckPoint",
            "log:ConsumerGroupHeartBeat",
            "log:UpdateConsumerGroup",
            "log:GetConsumerGroupCheckPoint"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
        }
      ]
    }
  • 異常診斷

    建議您為消費者程式配置Log4j,將消費組內部遇到的異常資訊列印出來,便於定位。log4j.properties典型配置:

    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

    配置Log4j後,執行消費者程式可以看到類似如下異常資訊:

    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • 通過消費組消費從某個時間開始的資料

    // consumerStartTimeInSeconds表示消費這個時間點之後的資料。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // position是個枚舉變數,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示從最老的資料開始消費,LogHubConfig.ConsumePosition.END_CURSOR表示從最新的資料開始消費。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    說明
    • 按照消費需求,請您使用不同的構造方法。

    • 當服務端已儲存Checkpoint,則開始消費位置以服務端儲存的Checkpoint為準。

    • Log Service消費資料時,預設優先使用Checkpoint作為消費點。當您指定從固定時間點開始消費資料時,必須保證consumerStartTimeInSeconds時間點落到TTL周期內,否則會造成消費不生效。

  • 重設Checkpoint

    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            // 這裡 timestamp 需要是以秒為單位的 unix timestamp,如果您的時間戳記以毫秒為單位,需要如下所示除以1000
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

相關文檔