すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:コンシューマーグループを使用したデータの消費

最終更新日:Sep 04, 2024

Simple Log Serviceを使用すると、サードパーティのソフトウェア、さまざまなプログラミング言語のアプリケーション、クラウドサービス、およびストリームコンピューティングフレームワークで、Simple Log Service SDKを呼び出すことでリアルタイムでデータを消費できます。 SDKの呼び出し中に、数秒以内にデータを消費するコンシューマグループを作成できます。 このトピックでは、コンシューマーグループを使用してデータを消費する方法について説明します。

ワークフロー

image

ログストアには複数のシャードがあります。 Simple Log Serviceは、次のルールに基づいて、コンシューマーグループ内のコンシューマーにシャードを割り当てます。

  • 各シャードは1つのコンシューマーにのみ割り当てることができます。

  • コンシューマーは複数のシャードからデータを消費できます。

新しいコンシューマーがコンシューマーグループに追加された後、コンシューマーグループ内のコンシューマーに割り当てられたシャードは、負荷分散のために各コンシューマーに再割り当てされます。 シャードは、前述のルールに基づいて再割り当てされます。

説明

コンシューマーグループを使用してデータを消費する場合、プログラムでエラーが発生すると、Simple Log Serviceは自動的にチェックポイントを保存します。 消費者は、プログラムの回復後にデータを繰り返し消費することなく、チェックポイントからデータ消費を再開できます。

前提条件

  • RAM (Resource Access Management) ユーザーが作成され、必要な権限がRAMユーザーに付与されます。 詳細については、「RAMユーザーを作成し、RAMユーザーにSimple Log Serviceへのアクセスを許可する」をご参照ください。

  • RAMユーザーを使用してコンシューマーグループを管理する場合は、RAMユーザーに必要な権限が付与されていることを確認してください。 詳細は、「RAMユーザー権限付与」をご参照ください。

  • ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数が設定されています。 詳細については、「Linux、macOS、およびWindowsでの環境変数の設定」をご参照ください。

    重要
    • Alibaba CloudアカウントのAccessKeyペアには、すべてのAPI操作に対する権限があります。 RAMユーザーのAccessKeyペアを使用して、API操作を呼び出したり、ルーチンのO&Mを実行したりすることを推奨します。

    • プロジェクトコードにAccessKey IDまたはAccessKeyシークレットを含めないことを推奨します。 そうしないと、AccessKeyペアが漏洩し、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。

  • SDK開発環境をセットアップします。 詳細については、「Simple Log Service SDKの概要」をご参照ください。

用語

期間

説明

消費者グループ

コンシューマーグループを使用して、Simple Log Serviceのデータを消費できます。 コンシューマーグループは複数のコンシューマーで構成されています。 コンシューマーグループ内のすべてのコンシューマーは、同じLogstore内のデータを消費します。 消費者はデータを繰り返し消費しません。

重要

1つのLogstoreに最大30のコンシューマグループを作成できます。

consumer

コンシューマーグループ内のコンシューマーはデータを消費します。

重要

コンシューマーグループ内のコンシューマーの名前は一意である必要があります。

ログストア

Logstoreは、データの収集、保存、およびクエリに使用されます。 詳細は、「Logstore」をご参照ください。

shard

シャードは、Logstoreの読み取りおよび書き込み容量を制御するために使用されます。 Simple Log Serviceでは、データはシャードに保存されます。 詳細については、「シャード」をご参照ください。

checkpoint

チェックポイントは、プログラムがデータの消費を停止する位置である。 プログラムが再起動されると、プログラムは最後のチェックポイントからデータを消費します。

ステップ1: コンシューマグループの作成

API操作を呼び出してコンシューマーグループを作成する

APIを呼び出してコンシューマーグループを作成する方法の詳細については、「CreateConsumerGroup」をご参照ください。

コンシューマーグループが作成されているかどうかを確認する方法の詳細については、「ListConsumerGroup」をご参照ください。

Simple Log Service SDKを使用したコンシューマグループの作成

コンシューマーグループの管理に使用されるサンプルコードの詳細については、「Simple Log Service SDK For Javaを使用したコンシューマーグループの管理」および「Simple Log Service SDK for Pythonを使用したコンシューマーグループの管理」をご参照ください。

Simple Log Service CLIを使用したコンシューマグループの作成

Simple Log Service CLIを使用してコンシューマーグループを作成する方法の詳細については、「create_consumer_group」をご参照ください。

コンシューマーグループが作成されているかどうかを確認する方法の詳細については、「list_consumer_group」をご参照ください。

ステップ2: データを消費する

データの使用

Simple Log Service SDK for Java、C ++ 、Python、またはGoを使用して、コンシューマーグループを作成し、データを消費できます。 この例では、Simple Log Service SDK for Javaが使用されています。

仕組み

Simple Log Service SDK for Javaを初めて呼び出してコンシューマーを起動すると、コンシューマーが属するコンシューマーグループが見つからない場合、SDKはコンシューマーグループを作成します。 コンシューマーグループが作成されると、SDKは開始チェックポイントを記録し、チェックポイントからデータの消費を開始します。 開始チェックポイントは、初回消費後に無効になります。 コンシューマーが再起動されると、コンシューマーはSimple Log Serviceによって保存された最後のチェックポイントからデータの消費を再開します。 サンプルチェックポイント:

  • LogHubConfig.ConsumePosition.BEGIN_CURSOR: 開始チェックポイント。Logstoreの最初のログを指定します。 消費者は、最も早いデータから消費を開始する。

  • LogHubConfig.ConsumePosition.END_CURSOR: ログストア内の最後のログを指定する終了チェックポイント。

  1. Mavenの依存関係を追加します。

    Javaプロジェクトのルートディレクトリにあるpom.xmlファイルを開き、次のコードを追加します。

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.47</version>
    </dependency>
  2. コンシューマの実装ロジックを記述するには、SampleLogHubProcessor.javaという名前のファイルを作成します。

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // The time at which the last checkpoint was stored. 
        private long mLastSaveTime = 0;
    
        // The initialize method is called once when a processor object is initialized.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // The main logic of data consumption. You must include the code to handle all exceptions that may occur during data consumption. 
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Display the obtained data. 
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // A checkpoint is written to Simple Log Service at an interval of 30 seconds. If the ClientWorker instance unexpectedly stops within 30 seconds, a newly started ClientWorker instance consumes data from the last checkpoint. A small amount of data may be repeatedly consumed. 
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // The value true indicates that the checkpoints are immediately updated to Simple Log Service. By default, the checkpoints that are cached in memory are automatically updated to Simple Log Service at an interval of 60 seconds. 
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // The value false indicates that the checkpoints are locally cached and can be updated to Simple Log Service by using the automatic checkpoint update mechanism. 
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // The shutdown function of the ClientWorker instance is called. You can manage the checkpoints. 
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Immediately store checkpoints to Simple Log Service. 
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }

    詳細については、「Java」、「C ++ 」、「Python」、および「Go」をご参照ください。

  3. コンシューマエンティティを定義するために、SampleLogHubProcessorFactory.javaという名前のファイルを作成します。

    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumer. Each time the generatorProcessor method is called, a new SampleLogHubProcessor object is returned as expected. 
            return new SampleLogHubProcessor();
        }
    }
  4. Main.javaファイルを作成します。 コンシューマーグループを作成し、コンシューマースレッドを起動して、コンシューマーグループ内のコンシューマーが指定されたLogstore内のデータを消費できるようにします。

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // The Simple Log Service endpoint. Enter an endpoint based on your business requirements. 
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // The name of the Simple Log Service project. Enter a name based on your business requirements. You must enter the name of an existing project. 
        private static String Project = "ali-cn-hangzhou-sls-admin";
        // The name of the Logstore. Enter a name based on your business requirements. You must enter the name of an existing Logstore. 
        private static String Logstore = "sls_operation_log";
        // The name of the consumer group. Enter a name based on your business requirements. You do not need to create a consumer group in advance. A consumer group is automatically created when a program runs. 
        private static String ConsumerGroup = "consumerGroupX";
        // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.  
        private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 specifies the name of a consumer. The name of each consumer in a consumer group must be unique. If different consumers start processes on different machines to consume data in a Logstore, you can use the machine IP addresses to identify each consumer. 
            // maxFetchLogGroupSize specifies the maximum number of log groups that can be obtained from Simple Log Service at a time. Retain the default value. You can use config.setMaxFetchLogGroupSize(100); to change the maximum number. Valid values: (0,1000]. 
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After the Thread instance runs, the ClientWorker instance automatically runs and extends the Runnable interface. 
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // The shutdown function of the ClientWorker instance is called to exit the consumer. The Thread instance is automatically stopped. 
            worker.shutdown();
            // Multiple asynchronous tasks are generated when the ClientWorker instance is running. To ensure that all running tasks securely stop after the shutdown, we recommend that you set Thread.sleep to 30 seconds. 
            Thread.sleep(30 * 1000);
        }
    }
  5. Main.javaファイルを実行します。

    この例では、NGINXログが消費され、消費結果が表示されます。

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

SPLに基づくデータの消費

Simple Log Service SDK for Java、C ++ 、Python、またはGoを使用して、コンシューマーグループを作成し、データを消費できます。 この例では、Simple Log Service SDK for Javaが使用されています。

仕組み

Simple Log Service SDK for Javaを初めて呼び出してコンシューマーを起動すると、コンシューマーが属するコンシューマーグループが見つからない場合、SDKはコンシューマーグループを作成します。 コンシューマーグループが作成されると、SDKは開始チェックポイントを記録し、チェックポイントからデータの消費を開始します。 開始チェックポイントは、初回消費後に無効になります。 コンシューマーが再起動されると、コンシューマーはSimple Log Serviceによって保存された最後のチェックポイントからデータの消費を再開します。 サンプルチェックポイント:

  • LogHubConfig.ConsumePosition.BEGIN_CURSOR: 開始チェックポイント。Logstoreの最初のログを指定します。 消費者は、最も早いデータから消費を開始する。

  • LogHubConfig.ConsumePosition.END_CURSOR: ログストア内の最後のログを指定する終了チェックポイント。

  1. Maven依存関係を追加します。

    Javaプロジェクトのルートディレクトリにあるpom.xmlファイルを開き、次のコードを追加します。

        <dependency>
          <groupId>com.google.protobuf</groupId>
          <artifactId>protobuf-java</artifactId>
          <version>2.5.0</version>
        </dependency>
        <dependency>
          <groupId>com.aliyun.openservices</groupId>
          <artifactId>aliyun-log</artifactId>
          <version>0.6.99</version>
        </dependency>
  2. SPLLogHubProcessor.javaという名前のファイルを作成します。

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // The time at which the last checkpoint was stored. 
        private long mLastSaveTime = 0;
    
        // The initialize method is called once when a processor object is initialized.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // The main logic of data consumption. You must include the code to handle all exceptions that may occur during data consumption. 
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Display the obtained data. 
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // A checkpoint is written to Simple Log Service at an interval of 30 seconds. If the ClientWorker instance unexpectedly stops within 30 seconds, a newly started ClientWorker instance consumes data from the last checkpoint. A small amount of data may be repeatedly consumed. 
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // The value true indicates that the checkpoints are immediately updated to Simple Log Service. By default, the checkpoints that are cached in memory are automatically updated to Simple Log Service at an interval of 60 seconds. 
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // The value false indicates that the checkpoints are locally cached and can be updated to Simple Log Service by using the automatic checkpoint update mechanism. 
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // The shutdown function of the ClientWorker instance is called. You can manage the checkpoints. 
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Immediately store checkpoints to Simple Log Service. 
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. SPLLogHubProcessorFactory.javaという名前のファイルを作成します。

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumer. Each time the generatorProcessor method is called, a new SPLLogHubProcessor object is returned as expected. 
            return new SPLLogHubProcessor();
        }
    }
  4. Main.javaファイルを作成します。 コンシューマーグループを作成し、コンシューマースレッドを起動して、コンシューマーグループ内のコンシューマーが指定されたLogstore内のデータを消費できるようにします。 コンシューマーグループの管理に使用されるサンプルコードの詳細については、「Simple Log Service SDK For Javaを使用したコンシューマーグループの管理」および「Simple Log Service SDK for Pythonを使用したコンシューマーグループの管理」をご参照ください。

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class SPLConsumer {
        // The Simple Log Service endpoint. Enter an endpoint based on your business requirements. 
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // The name of the Simple Log Service project. Enter a name based on your business requirements. You must enter the name of an existing project. 
        private static String Project = "your_project";
        // The name of the Logstore. Enter a name based on your business requirements. You must enter the name of an existing Logstore. 
        private static String Logstore = "your_logstore";
        // The name of the consumer group. Enter a name based on your business requirements. You do not need to create a consumer group in advance. A consumer group is automatically created when a program runs. 
        private static String ConsumerGroup = "consumerGroupX";
        // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.  
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 specifies the name of a consumer. The name of each consumer in a consumer group must be unique. If different consumers start processes on different machines to consume data in a Logstore, you can use the machine IP addresses to identify each consumer. 
            // maxFetchLogGroupSize specifies the maximum number of log groups that can be obtained from Simple Log Service at a time. Retain the default value. You can use config.setMaxFetchLogGroupSize(100); to change the maximum number. Valid values: (0,1000]. 
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // You can use setQuery to specify a Simple Log Service Processing Language (SPL) statement for data consumption.
            config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After the Thread instance runs, the ClientWorker instance automatically runs and extends the Runnable interface. 
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // The shutdown function of the ClientWorker instance is called to exit the consumer. The Thread instance is automatically stopped. 
            worker.shutdown();
            // Multiple asynchronous tasks are generated when the ClientWorker instance is running. To ensure that all running tasks securely stop after the shutdown, we recommend that you set Thread.sleep to 30 seconds. 
            Thread.sleep(30 * 1000);
        }
    }
  5. Main.javaファイルを実行します。

    この例では、NGINXログが消費され、消費結果が表示されます。

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

手順3: コンシューマグループのステータスを表示する

Simple Log Serviceコンソールの使用

  1. Simple Log Serviceコンソール.

  2. [プロジェクト] セクションで、管理するプロジェクトをクリックします。

    image

  3. [ログストレージ] > [ログストア] タブで、管理するログストアの横にある展开节点アイコンをクリックします。 次に、データ消費の横にある展开节点アイコンをクリックします。

  4. コンシューマーグループリストで、管理するコンシューマーグループをクリックします。

  5. コンシューマーグループステータス ページで、各シャードの消費チェックポイントを表示します。

Simple Log Service SDKの使用

この例では、Simple Log Service SDK for Javaが使用されています。 ConsumerGroupTest.javaファイルを実行して、各シャードの消費チェックポイントを表示します。

import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
    static String endpoint = "";
    static String project = "";
    static String logstore = "";
    static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accesskeyId, accesskey);
        // Obtain all consumer groups that are created for the Logstore. If no consumer groups exist, an empty string is returned. 
        List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
        for(ConsumerGroup c: consumerGroups){
            // Display the attributes of each consumer group, including the name, heartbeat timeout period, and whether data is consumed in order. 
            System.out.println("Name: " + c.getConsumerGroupName());
            System.out.println("Heartbeat timeout period " + c.getTimeout());
            System.out.println("Ordered consumption: " + c.isInOrder());
            for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                System.out.println("shard: " + cp.getShard());
                // The time is a long integer and is accurate to the microsecond. 
                System.out.println("The time at which the progress of data consumption was last updated: " + cp.getUpdateTime());
                System.out.println("Consumer name: " + cp.getConsumer());
                String consumerPrg = "";
                if(cp.getCheckPoint().isEmpty())
                    consumerPrg = "Consumption is not started";
                else{
                    // The UNIX timestamp. Unit: seconds. Format the output value of the timestamp. 
                    try{
                        int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                        consumerPrg = "" + prg;
                    }
                    catch(LogException e){
                        if(e.GetErrorCode() == "InvalidCursor")
                            consumerPrg = "Invalid. The time at which the progress of data consumption was last updated is beyond the retention period of the data";
                        else{
                            // internal server error
                            throw e;
                        }
                    }
                }
                System.out.println("Consumption progress: " + consumerPrg);
                String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                int endPrg = 0;
                try{
                    endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                }
                catch(LogException e){
                    // do nothing
                }
                // The UNIX timestamp. Unit: seconds. Format the output value of the timestamp. 
                System.out.println("The time at which the last data record was received: " + endPrg);
            }
        }
    }
}

次の情報が返されます。

Name: etl-6cac01c571d5a4b933649c04a7ba215b
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time at which the progress of data consumption was last updated: 1639555453575211
Consumer name: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
Consumption progress: 1639555453
The time at which the last data record was received: 1639555453
shard: 1
The time at which the progress of data consumption was last updated: 1639555392071328
Consumer name: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
Consumption progress: 1639555391
The time at which the last data record was received: 1639555391
Name: etl-2bd3fdfdd63595d56b1ac24393bf5991
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time at which the progress of data consumption was last updated: 1639555453256773
Consumer name: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
Consumption progress: 1639555453
The time at which the last data record was received: 1639555453
shard: 1
The time at which the progress of data consumption was last updated: 1639555392066234
Consumer name: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
Consumption progress: 1639555391
The time at which the last data record was received: 1639555391
Name: consumerGroupX
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time at which the progress of data consumption was last updated: 1639555434142879
Consumer name: consumer_1
Consumption progress: 1635615029
The time at which the last data record was received: 1639555453
shard: 1
The time at which the progress of data consumption was last updated: 1639555437976929
Consumer name: consumer_1
Consumption progress: 1635616802
The time at which the last data record was received: 1639555391

RAMユーザーにコンシューマーグループに対する操作を許可する

RAMユーザーを使用してコンシューマーグループを管理するには、RAMユーザーに必要な権限を付与する必要があります。 詳細については、「手順2: RAMユーザーに権限を付与する」をご参照ください。

次の表に、RAMユーザーに実行権限を付与できる操作を示します。

Action

説明

リソース

ログ: GetCursorOrData (GetCursorおよびPullData)

ログが生成された時刻に基づいてカーソルを照会します。

acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName}

ログ: CreateConsumerGroup

Logstoreのコンシューマーグループを作成します。

acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

ログ: ListConsumerGroup

Logstoreのすべての消費者グループを照会します。

acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/*

ログ: ConsumerGroupUpdateCheckPoint

コンシューマーグループに割り当てられているシャードのチェックポイントを更新します。

acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

ログ: ConsumerGrouppHeartBeat

コンシューマーのハートビートメッセージをSimple Log Serviceに送信します。

acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

ログ: UpdateConsumerGroup

コンシューマーグループの属性を変更します。

acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

ログ: GetConsumerGroupCheckPoint

コンシューマーグループに割り当てられている1つまたはすべてのシャードのチェックポイントを照会します。

acs:log: ${regionName}: ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

次のリストは、コンシューマグループに関するリソース情報を示しています。 RAMユーザーがコンシューマーグループに対する操作を実行できるようにするには、次のコードを参照して、RAMユーザーに必要な権限を付与します。

  • プロジェクトが属するAlibaba CloudアカウントのID: 174649 **** 602745

  • プロジェクトが存在するリージョンのID: cn-hangzhou

  • プロジェクト名: project-test

  • Logstoreの名前: logstore-test

  • コンシューマーグループの名前: consumergroup-test

{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "log:GetCursorOrData"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:CreateConsumerGroup",
        "log:ListConsumerGroup"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:ConsumerGroupUpdateCheckPoint",
        "log:ConsumerGroupHeartBeat",
        "log:UpdateConsumerGroup",
        "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
    }
  ]
}

次のステップ

  • トラブルシューティング用にLog4jを設定します。

    コンシューマグループで例外が発生したときにエラーメッセージを表示するように、コンシューマプログラムのLog4jを設定することを推奨します。 これは、エラーのトラブルシューティングに役立ちます。 次のコードは、一般的なlog4j.properties設定ファイルを示しています。

    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

    Log4jを設定した後、コンシューマプログラムを実行するときにエラーメッセージを受け取ることができます。 エラーメッセージの例を次に示します。

    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • コンシューマグループを使用して、ある時点以降に生成されたデータを消費します。

    // consumerStartTimeInSeconds specifies a point in time. The data that is generated after the point in time is consumed. 
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // position is an enumeration variable. LogHubConfig.ConsumePosition.BEGIN_CURSOR specifies that the consumption starts from the earliest data. LogHubConfig.ConsumePosition.END_CURSOR specifies that the consumption starts from the latest data. 
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    説明
    • ビジネス要件に基づいて異なるコンストラクターを使用できます。

    • チェックポイントがSimple Log Serviceに保存されている場合、データの消費はチェックポイントから開始されます。

    • Simple Log Serviceがデータを消費する場合、データの消費を開始するためにチェックポイントが優先的に使用されます。 Simple Log Serviceがデータの消費を開始する時点を指定する場合は、consumerStartTimeInSecondsの値が有効期間 (TTL) 内にあることを確認してください。 それ以外の場合、Simple Log Serviceは設定に基づいてデータを消費できません。

  • チェックポイントをリセットします。

    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            // Specify a UNIX timestamp that is accurate to the second. If you specify a timestamp in milliseconds, divide the timestamp by 1000. Example:
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

関連ドキュメント