すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:コンシューマグループを使用したログの消費

最終更新日:Dec 05, 2024

Simple Log Serviceを使用すると、サードパーティのソフトウェア、さまざまなプログラミング言語のアプリケーション、クラウドサービス、およびストリームコンピューティングフレームワークで、Simple Log Service SDKを呼び出すことでリアルタイムでデータを消費できます。 ただし、SDKベースの消費は、負荷分散や消費者間のフェイルオーバーなど、特定の実装詳細の要件を満たすことができません。 この場合、数秒以内にデータを消費するコンシューマグループを作成できます。 このトピックでは、コンシューマーグループを使用してデータを消費する方法について説明します。

概要

ログストアには複数のシャードがあります。 Simple Log Serviceは、次のルールに基づいて、コンシューマーグループ内のコンシューマーにシャードを割り当てます。

  • 各シャードは1つのコンシューマーにのみ割り当てることができます。

  • コンシューマーは複数のシャードからデータを消費できます。

新しいコンシューマーがコンシューマーグループに追加された後、コンシューマーグループ内のコンシューマーに割り当てられたシャードは、負荷分散のために各コンシューマーに再割り当てされます。 シャードは、前述のルールに基づいて再割り当てされます。

image

条件

用語

説明

消費者グループ

コンシューマーグループを使用して、Simple Log Serviceのデータを消費できます。 コンシューマーグループは複数のコンシューマーで構成されています。 コンシューマーグループ内のすべてのコンシューマーは、同じLogstore内のデータを消費します。 消費者はデータを繰り返し消費しません。

重要

1つのLogstoreに最大30のコンシューマグループを作成できます。

消費者

コンシューマーグループ内のコンシューマーはデータを消費します。

重要

コンシューマーグループ内のコンシューマーの名前は一意である必要があります。

ログストア

Logstoreは、データの収集、保存、およびクエリに使用されます。 詳細は、「Logstore」をご参照ください。

シャード

シャードは、Logstoreの読み取りおよび書き込み容量を制御するために使用されます。 Simple Log Serviceでは、データはシャードに保存されます。 詳細については、「シャード」をご参照ください。

チェックポイント

消費チェックポイントは、プログラムがデータの消費を停止する位置である。 プログラムが再起動されると、プログラムは最後の消費チェックポイントからデータを消費します。

説明

コンシューマーグループを使用してデータを消費する場合、プログラムでエラーが発生すると、Simple Log Serviceは自動的に消費チェックポイントを保存します。 消費者は、プログラムの回復後にデータを繰り返し消費することなく、消費チェックポイントからデータ消費を再開できます。

前提条件

ステップ1: コンシューマグループの作成

このセクションでは、Simple Log Service SDK、Simple Log Service API、およびSimple Log Service CLIを使用してコンシューマーグループを作成する方法について説明します。

Simple Log Service SDKの使用

サンプルコード:

CreateConsumerGroup.java

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;

public class CreateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // The name of the project. 
        String projectName = "ali-test-project";
        // The name of the Logstore. 
        String logstoreName = "ali-test-logstore";
        // The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Create a Simple Log Service client. 
        Client client = new Client(host, accessId, accessKey);

        try {
            // The name of the consumer group. 
            String consumerGroupName = "ali-test-consumergroup2";
            System.out.println("ready to create consumergroup");

            ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);

            client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);

            System.out.println(String.format("create consumergroup %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

コンシューマーグループの管理に使用されるサンプルコードの詳細については、「Simple Log Service SDK For Javaを使用したコンシューマーグループの管理」および「Simple Log Service SDK for Pythonを使用したコンシューマーグループの管理」をご参照ください。

Simple Log Service APIの使用

Simple Log Service APIを使用してコンシューマーグループを作成する方法の詳細については、「CreateConsumerGroup」をご参照ください。

コンシューマーグループが作成されているかどうかを確認する方法の詳細については、「ListConsumerGroup」をご参照ください。

Simple Log Service CLIの使用

Simple Log Service CLIを使用してコンシューマーグループを作成する方法の詳細については、「create_consumer_group」をご参照ください。

コンシューマーグループが作成されているかどうかを確認する方法の詳細については、「list_consumer_group」をご参照ください。

ステップ2: ログデータ

仕組み

Simple Log Service SDK for Javaを初めて呼び出してコンシューマーを起動すると、コンシューマーが属するコンシューマーグループが見つからない場合、SDKはコンシューマーグループを作成します。 コンシューマーグループが作成されると、SDKは消費開始チェックポイントを記録し、消費チェックポイントからデータの消費を開始します。 最初の消費の後、消費開始チェックポイントは無効になります。 コンシューマーが再起動されると、コンシューマーはSimple Log Serviceによって保存された最後の消費チェックポイントからデータの消費を再開します。 サンプル消費チェックポイント:

  • LogHubConfig.ConsumePosition.BEGIN_CURSOR: ログストアの最初のログを指定する開始消費チェックポイント。 消費者は、最も早いデータから消費を開始する。

  • LogHubConfig.ConsumePosition.END_CURSOR: 消費終了チェックポイント。Logstoreの最後のログを指定します。

Simple Log Service SDK for Java、C ++ 、Python、またはGoを使用して、コンシューマーグループを作成し、データを消費できます。 この例では、Simple Log Service SDK for Javaが使用されています。

例1: SDKの使用

  1. Mavenの依存関係を追加します。

    pom.xmlファイルを開き、次のコードを追加します。

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.50</version>
    </dependency>
  2. データ消費の実装ロジックを記述します。 サンプルコード:

    SampleLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // The time at which the last consumption checkpoint was stored. 
        private long mLastSaveTime = 0;
    
        // The initialize method is called once when a processor object is initialized.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // The main logic of data consumption. You must include the code to handle all exceptions that may occur during data consumption. 
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Display the obtained data. 
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // A consumption checkpoint is written to Simple Log Service at an interval of 30 seconds. If the ClientWorker instance unexpectedly stops within 30 seconds, a newly started ClientWorker instance consumes data from the last consumption checkpoint. A small amount of data may be repeatedly consumed. 
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // The value true indicates that the consumption checkpoints are immediately updated to Simple Log Service. By default, the consumption checkpoints that are cached in memory are automatically updated to Simple Log Service at an interval of 60 seconds. 
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // The value false indicates that the consumption checkpoints are locally cached and can be updated to Simple Log Service by using the automatic consumption checkpoint update mechanism. 
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // The shutdown function of the ClientWorker instance is called. You can manage the consumption checkpoints. 
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Immediately store consumption checkpoints to Simple Log Service. 
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }

    サンプルコードの詳細については、「aliyun-log-consumer-java」および「Aliyun LOG Go Consumer」をご参照ください。

  3. コンシューマーエンティティを定義します。 サンプルコード:

    SampleLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumer. Each time the generatorProcessor method is called, a new SampleLogHubProcessor object is returned as expected. 
            return new SampleLogHubProcessor();
        }
    }
  4. コンシューマーグループを作成し、コンシューマースレッドを起動して、コンシューマーグループのコンシューマーが指定されたLogstoreのデータを消費できるようにします。 サンプルコード:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // The Simple Log Service endpoint. Enter an endpoint based on your business requirements. 
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // The name of the Simple Log Service project. Enter a name based on your business requirements. You must enter the name of an existing project. 
        private static String Project = "ali-test-project";
        // The name of the Logstore. Enter a name based on your business requirements. You must enter the name of an existing Logstore. 
        private static String Logstore = "ali-test-logstore";
        // The name of the consumer group. Enter a name based on your business requirements. You do not need to create a consumer group in advance. A consumer group is automatically created when a program runs. 
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.  
        private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 specifies the name of a consumer. The name of each consumer in a consumer group must be unique. If different consumers start processes on different machines to consume data in a Logstore, you can use the machine IP addresses to identify each consumer. 
            // maxFetchLogGroupSize specifies the maximum number of log groups that can be obtained from Simple Log Service at a time. Retain the default value. You can use config.setMaxFetchLogGroupSize(100); to change the maximum number. Valid values: (0,1000]. 
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After the Thread instance runs, the ClientWorker instance automatically runs and extends the Runnable interface. 
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // The shutdown function of the ClientWorker instance is called to exit the consumer. The Thread instance is automatically stopped. 
            worker.shutdown();
            // Multiple asynchronous tasks are generated when the ClientWorker instance is running. To ensure that all running tasks securely stop after the shutdown, we recommend that you set Thread.sleep to 30 seconds. 
            Thread.sleep(30 * 1000);
        }
    }
  5. Main.javaファイルを実行します。

    この例では、NGINXログが消費され、消費結果が表示されます。

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

例2: SDKとSPLの使用

  1. Mavenの依存関係を追加します。

    pom.xmlファイルを開き、次のコードを追加します。

        <dependency>
          <groupId>com.google.protobuf</groupId>
          <artifactId>protobuf-java</artifactId>
          <version>2.5.0</version>
        </dependency>
        <dependency>
          <groupId>com.aliyun.openservices</groupId>
          <artifactId>aliyun-log</artifactId>
          <version>0.6.114</version>
        </dependency>
  2. データ消費の実装ロジックを記述します。 サンプルコード:

    SPLLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // The time at which the last consumption checkpoint was stored. 
        private long mLastSaveTime = 0;
    
        // The initialize method is called once when a processor object is initialized.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // The main logic of data consumption. You must include the code to handle all exceptions that may occur during data consumption. 
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Display the obtained data. 
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // A consumption checkpoint is written to Simple Log Service at an interval of 30 seconds. If the ClientWorker instance unexpectedly stops within 30 seconds, a newly started ClientWorker instance consumes data from the last consumption checkpoint. A small amount of data may be repeatedly consumed. 
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // The value true indicates that the consumption checkpoints are immediately updated to Simple Log Service. By default, the consumption checkpoints that are cached in memory are automatically updated to Simple Log Service at an interval of 60 seconds. 
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // The value false indicates that the consumption checkpoints are locally cached and can be updated to Simple Log Service by using the automatic consumption checkpoint update mechanism. 
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // The shutdown function of the ClientWorker instance is called. You can manage the consumption checkpoints. 
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Immediately store consumption checkpoints to Simple Log Service. 
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. コンシューマーエンティティを定義します。 サンプルコード:

    SPLLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumer. Each time the generatorProcessor method is called, a new SPLLogHubProcessor object is returned as expected. 
            return new SPLLogHubProcessor();
        }
    }
  4. コンシューマーグループを作成し、コンシューマースレッドを起動して、コンシューマーグループのコンシューマーが指定されたLogstoreのデータを消費できるようにします。 サンプルコード:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // The Simple Log Service endpoint. Enter an endpoint based on your business requirements. 
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // The name of the Simple Log Service project. Enter a name based on your business requirements. You must enter the name of an existing project. 
        private static String Project = "ali-test-project";
        // The name of the Logstore. Enter a name based on your business requirements. You must enter the name of an existing Logstore. 
        private static String Logstore = "ali-test-logstore";
        // The name of the consumer group. Enter a name based on your business requirements. You do not need to create a consumer group in advance. A consumer group is automatically created when a program runs. 
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.  
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 specifies the name of a consumer. The name of each consumer in a consumer group must be unique. If different consumers start processes on different machines to consume data in a Logstore, you can use the machine IP addresses to identify each consumer. 
            // maxFetchLogGroupSize specifies the maximum number of log groups that can be obtained from Simple Log Service at a time. Retain the default value. You can use config.setMaxFetchLogGroupSize(100); to change the maximum number. Valid values: (0,1000]. 
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // You can use setQuery to specify a Simple Log Service Processing Language (SPL) statement for data consumption.
            config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After the Thread instance runs, the ClientWorker instance automatically runs and extends the Runnable interface. 
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // The shutdown function of the ClientWorker instance is called to exit the consumer. The Thread instance is automatically stopped. 
            worker.shutdown();
            // Multiple asynchronous tasks are generated when the ClientWorker instance is running. To ensure that all running tasks securely stop after the shutdown, we recommend that you set Thread.sleep to 30 seconds. 
            Thread.sleep(30 * 1000);
        }
    }
  5. Main.javaファイルを実行します。

    この例では、NGINXログが消費され、消費結果が表示されます。

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

手順3: コンシューマグループのステータスを表示する

このセクションでは、コンシューマーグループのステータスを表示するために使用できる方法について説明します。

Simple Log Service SDK for Javaの使用

  1. 各シャードの消費チェックポイントを表示します。 サンプルコード:

    ConsumerGroupTest.java

    import java.util.List;
    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.Consts.CursorMode;
    import com.aliyun.openservices.log.common.ConsumerGroup;
    import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
    import com.aliyun.openservices.log.exception.LogException;
    public class ConsumerGroupTest {
        static String endpoint = "cn-hangzhou.log.aliyuncs.com";
        static String project = "ali-test-project";
        static String logstore = "ali-test-logstore";
        static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogException {
            Client client = new Client(endpoint, accesskeyId, accesskey);
            // Obtain all consumer groups that are created for the Logstore. If no consumer groups exist, an empty string is returned. 
            List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
            for(ConsumerGroup c: consumerGroups){
                // Display the attributes of each consumer group, including the name, heartbeat timeout period, and whether data is consumed in order. 
                System.out.println("Name: " + c.getConsumerGroupName());
                System.out.println("Heartbeat timeout period " + c.getTimeout());
                System.out.println("Ordered consumption: " + c.isInOrder());
                for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                    System.out.println("shard: " + cp.getShard());
                    // The time is a long integer and is accurate to the microsecond. 
                    System.out.println("The time at which the consumption checkpoint was last updated: " + cp.getUpdateTime());
                    System.out.println("Consumer name: " + cp.getConsumer());
                    String consumerPrg = "";
                    if(cp.getCheckPoint().isEmpty())
                        consumerPrg = "Consumption is not started";
                    else{
                        // The UNIX timestamp. Unit: seconds. Format the output value of the timestamp. 
                        try{
                            int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                            consumerPrg = "" + prg;
                        }
                        catch(LogException e){
                            if(e.GetErrorCode() == "InvalidCursor")
                                consumerPrg = "Invalid. The time at which the consumption checkpoint was last updated is beyond the retention period of the data";
                            else{
                                // internal server error
                                throw e;
                            }
                        }
                    }
                    System.out.println("Consumption checkpoint: " + consumerPrg);
                    String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                    int endPrg = 0;
                    try{
                        endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                    }
                    catch(LogException e){
                        // do nothing
                    }
                    // The UNIX timestamp. Unit: seconds. Format the output value of the timestamp. 
                    System.out.println("The time at which the last data record was received: " + endPrg);
                }
            }
        }
    }
  2. 出力を表示します。 例:

    Name: ali-test-consumergroup2
    Heartbeat timeout period: 60
    Ordered consumption: false
    shard: 0
    The time at which the consumption checkpoint was last updated: 0
    Consumer name: consumer_1
    Consumption checkpoint: Consumption is not started
    The time at which the last data record was received: 1729583617
    shard: 1
    The time at which the consumption checkpoint was last updated: 0
    Consumer name: consumer_1
    Consumption checkpoint: Consumption is not started
    The time at which the last data record was received: 1729583738
    
    Process finished with exit code 0

Simple Log Serviceコンソールの使用

  1. Simple Log Serviceコンソールにログインします。

  2. [プロジェクト] セクションで、目的のプロジェクトをクリックします。

    image

  3. [ログストレージ] > [ログストア] タブで、管理するログストアの横にある展开节点アイコンをクリックします。 次に、データ消費の横にある展开节点アイコンをクリックします。

  4. コンシューマーグループリストで、管理するコンシューマーグループをクリックします。

  5. [コンシューマーグループのステータス] ページで、各シャードの消費チェックポイントを表示します。image

次のステップ

  • RAMユーザーにコンシューマーグループに対する操作を許可する

    RAM (Resource Access Management) ユーザーを使用してコンシューマグループを管理するには、RAMユーザーに必要な権限を付与する必要があります。 詳細については、「RAMユーザーを作成し、RAMユーザーにSimple Log Serviceへのアクセスを許可する」をご参照ください。

    次の表に、RAMユーザーに実行権限を付与できる操作を示します。

    Action

    説明

    リソース

    ログ: GetCursorOrData( GetCursor)

    ログが生成された時刻に基づいてカーソルを照会します。

    acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName}

    ログ: CreateConsumerGroup( CreateConsumerGroup)

    Logstoreのコンシューマーグループを作成します。

    acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    ログ: ListConsumerGroup( ListConsumerGroup)

    Logstoreのすべての消費者グループを照会します。

    acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/*

    ログ: ConsumerGroupUpdateCheckPoint( ConsumerGroupUpdateCheckPoint)

    コンシューマーグループに割り当てられているシャードの消費チェックポイントを更新します。

    acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    ログ: ConsumerGroupHeartBeat( ConsumerGroupHeartBeat)

    コンシューマーのハートビートメッセージをSimple Log Serviceに送信します。

    acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    ログ: UpdateConsumerGroup( UpdateConsumerGroup)

    コンシューマーグループの属性を変更します。

    acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    ログ: GetConsumerGroupCheckPoint( GetCheckPoint)

    コンシューマーグループに割り当てられている1つまたはすべてのシャードの消費チェックポイントを照会します。

    acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    次のリストは、コンシューマグループに関するリソース情報を示しています。 RAMユーザーがコンシューマーグループに対する操作を実行できるようにするには、次のコードを参照して、RAMユーザーに必要な権限を付与します。

    • プロジェクトが属するAlibaba CloudアカウントのID: 174649 **** 602745

    • プロジェクトが存在するリージョンのID: cn-hangzhou

    • プロジェクト名: project-test

    • Logstoreの名前: logstore-test

    • コンシューマーグループの名前: consumergroup-test

    サンプルコード:

    {
      "Version": "1",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "log:GetCursorOrData"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:CreateConsumerGroup",
            "log:ListConsumerGroup"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:ConsumerGroupUpdateCheckPoint",
            "log:ConsumerGroupHeartBeat",
            "log:UpdateConsumerGroup",
            "log:GetConsumerGroupCheckPoint"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
        }
      ]
    }
  • トラブルシューティング用にLog4jを設定します。

    コンシューマグループで例外が発生したときにエラーメッセージを表示するように、コンシューマプログラムのLog4jを設定することを推奨します。 これは、エラーのトラブルシューティングに役立ちます。 次のコードは、一般的なlog4j.properties設定ファイルを示しています。

    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

    Log4jを設定した後、コンシューマプログラムを実行するときにエラーメッセージを受け取ることができます。 エラーメッセージの例を次に示します。

    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • コンシューマグループを使用して、ある時点以降に生成されるデータを消費する

    // consumerStartTimeInSeconds specifies a point in time. The data that is generated after the point in time is consumed. 
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // position is an enumeration variable. LogHubConfig.ConsumePosition.BEGIN_CURSOR specifies that the consumption starts from the earliest data. LogHubConfig.ConsumePosition.END_CURSOR specifies that the consumption starts from the latest data. 
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    説明
    • ビジネス要件に基づいて異なるコンストラクターを使用できます。

    • 消費チェックポイントがSimple Log Serviceに保存されている場合、データの消費は消費チェックポイントから始まります。

    • Simple Log Serviceがデータを消費する場合、データ消費を開始するために消費チェックポイントが優先的に使用されます。 Simple Log Serviceがデータの消費を開始する時点を指定する場合は、consumerStartTimeInSecondsの値が有効期間 (TTL) 内にあることを確認してください。 それ以外の場合、Simple Log Serviceは設定に基づいてデータを消費できません。

  • 消費チェックポイントのリセット

    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            // Specify a UNIX timestamp that is accurate to the second. If you specify a timestamp in milliseconds, divide the timestamp by 1000. Example:
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

関連ドキュメント