すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Aliyun Log Java Producerを使用したSimple log Serviceへのログデータの書き込み

最終更新日:Dec 05, 2024

FlinkSparkStormなど、ログの圧縮、Simple log Serviceへの一括アップロード、ネットワークリソースの消費量の削減が必要なビッグデータコンピューティングエンジンの場合、APIまたはSDKでは不十分な場合があります。 Aliyun Log Java Producerは、このようなシナリオでSimple Log Serviceにデータをアップロードするための便利で効率的なソリューションを提供します。

背景情報

Aliyun Log Java Producerは、ビッグデータおよび同時実行性の高い環境でのJavaアプリケーション用に設計された高性能クラスライブラリです。 高性能、コンピューティングとI/Oロジックの分離、リソース管理など、標準のAPIまたはSDKに勝るいくつかの利点があります。 Alibaba Cloud Simple Log Serviceのシーケンシャル書き込み機能を活用して、順序付けられたログアップロードを確保します。

Simple Log Serviceは、Aliyun Log Java Producerを使用したサンプルアプリケーションを提供し、クイックスタートを容易にします。 詳細については、「Aliyun Log Java Producerサンプルアプリケーション」をご参照ください。

次の図は、Aliyun Log Java Producerのワークフローを示しています。

image

制限事項

  • Aliyun Log Java Producerの基盤となるメカニズムは、PutLogs操作を呼び出してログをアップロードします。 毎回アップロードできる生ログのサイズは限られています。 詳細については、「データの読み取りと書き込み」をご参照ください。

  • プロジェクト、ログストア、シャード、マシングループなど、Simple Log Serviceの基本リソースにも制限があります。 詳細については、「基本リソース」をご参照ください。

  • コードを初めて実行するときは、Simple Log Serviceコンソールでログストアのインデックス作成機能を有効にする必要があります。 次に、ログを照会する前に約1分間待ちます。

  • Simple Log Serviceコンソールでログを照会し、返されたログのフィールドの値の長さが上限を超えた場合、フィールドの値は切り捨てられ、余分な部分は分析に使用されません。 詳細については、「インデックスの作成」をご参照ください。

課金

SDKを使用して発生したコストは、コンソールを使用して発生したコストと一致します。 詳細については、「課金概要」をご参照ください。

前提条件

  • Simple Log Serviceが有効化されています。 詳細については、

    Simple Log Serviceの有効化

  • Simple Log Service SDK for Javaがインストールされ、初期化されます。

ステップ1: Aliyun Log Java Producerのインストール

MavenプロジェクトでAliyun Log Java Producerを使用するには、対応する依存関係をpom.xmlファイルに追加します。 その後、Mavenは関連するJARパッケージを自動的にダウンロードします。 たとえば、<dependencies> に以下を追加します。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.22</version>
</dependency>

プロデューサーの依存関係でバージョンの競合が発生した場合は、<dependencies> に次を追加します。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.114</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

ステップ2: ProducerConfigの設定

ProducerConfigは、送信ポリシーを設定するために使用されます。 さまざまなビジネスニーズに合わせてパラメーター値を調整します。

Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);

パラメータは次の表で説明されています。

パラメーター

説明

totalSizeInBytes

Integer

プロデューサーインスタンスがキャッシュできるログの最大サイズ。 デフォルト値: 100 MB。

maxBlockMs

Integer

sendメソッドが呼び出されても、プロデューサーインスタンスの使用可能なスペースが不十分である最大ブロック時間。 デフォルト値: 60秒。

指定した最大ブロック時間が経過し、プロデューサーインスタンスの使用可能なスペースがまだ不十分な場合、sendメソッドはTimeoutExceptionエラーをスローします。

このパラメーターを0に設定し、プロデューサーインスタンスの使用可能なスペースが不十分な場合、sendメソッドはすぐにTimeoutExceptionエラーをスローします。

プロデューサーインスタンスの使用可能なスペースが十分になるまで送信メソッドをブロックする場合は、このパラメーターを負の値に設定する必要があります。

ioThreadCount

Integer

ログ送信タスクのスレッド数。 デフォルト値は、使用可能なプロセッサの数です。

batchSizeThresholdInBytes

Integer

ログのバッチを送信するためのしきい値。 デフォルト値: 512 KB。 最大値: 5 MB。

batchCountThreshold

Integer

送信前のバッチ内のログの数。 デフォルト値: 4096 最大値: 40960。

lingerMs

Integer

バッチが送信される前の遅延。 デフォルト値: 2秒。 最小値: 100 ms。

再試行

Integer

初期失敗後のバッチの再試行回数。 デフォルト値は 10 です。

このパラメーターが0以下に設定されている場合、バッチは最初の障害の直後に障害キューに入ります。

maxReservedAttempts

Integer

ProducerBatchを送信する各試行は、試行に対応します。 このパラメーターは、ユーザーに報告される試行回数を制御し、デフォルトでは最新の11回の試行のみを保持します。

このパラメータを増やすと、メモリ消費量が大きくなり、より詳細なトレースが可能になります。

baseRetryBackoffMs

Integer

再試行の初期バックオフ時間。 デフォルト値: 100ミリ秒。

プロデューサは指数バックオフアルゴリズムを採用し、N回目の再試行までの待ち時間はbaseRetryBackoffMs * 2 ^(N-1) として計算されます。

maxRetryBackoffMs

Integer

再試行の最大バックオフ時間。 デフォルト値: 50秒。

adjustShardHash

Boolean

sendメソッドが呼び出されたときにshardHashを調整するかどうかを決定します。 デフォルト値:true

バケット

Integer

このパラメーターは、adjustShardHashがtrueの場合に有効です。 このパラメーターは、shardHashを指定した数のバケットに再グループ化します。

shardHash値が異なると、データのマージとバッチ処理ができなくなり、プロデューサーのスループットが制限されます。 shardHashを再グループ化することにより、データをより効果的にバッチ処理して送信できます。

このパラメーターの値は、[1, 256] の範囲内で2の整数乗である必要があります。 デフォルト値: 64。

ステップ3: プロデューサーを作成する

プロデューサーは、AKまたはSTSトークンによる設定をサポートします。 STSトークンの場合は、定期的に新しいProjectConfigを作成し、ProjectConfigsに追加します。

LogProducerはプロデューサーの実装クラスであり、一意のproducerConfigが必要です。producerConfigの準備後、次のようにプロデューサーをインスタンス化できます。

Producer producer = new LogProducer(producerConfig);

プロデューサーを作成すると、複数のスレッドが開始されます。 アプリケーション間でプロデューサーインスタンスを共有することを推奨します。 LogProducerのすべてのメソッドは、同時使用のためのスレッドセーフです。 次の表に、プロデューサインスタンス内のスレッドを示します。ここで、Nは0から始まるインスタンス番号です。

スレッド名の形式

数量

説明

aliyun-log-producer-<N>-mover

1

送信スレッドプールに送信する準備ができているバッチを転送します。

aliyun-log-producer-<N>-io-thread

ioThreadCount

データ送信タスクを実行するIOThreadPool内のスレッド。

aliyun-log-producer-<N>-success-batch-handler

1

正常に送信されたバッチを処理します。

aliyun-log-producer-<N>-failure-batch-handler

1

送信に失敗したバッチを管理します。

手順4: ログプロジェクトの設定

ProjectConfigには、宛先プロジェクトのエンドポイント情報と、呼び出し元のIDを表すアクセス資格情報が含まれます。 各ログプロジェクトは1つのProjectConfigオブジェクトに対応します。

次のようにインスタンスを作成します。

ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);

ステップ5: データを送信

将来の作成またはコールバック

Aliyun log Java Producerでログデータを送信する場合は、送信プロセスを処理するコールバック関数を指定します。 このコールバック関数は、データ送信が成功したとき、または送信失敗中に例外が発生したときに呼び出されます。

説明

アプリケーションでの事後処理が単純でプロデューサーをブロックしない場合は、コールバックを直接使用します。 それ以外の場合は、ListenableFutureを使用して、別のスレッドまたはスレッドプールでビジネスロジックを処理します。

メソッドのパラメータは以下のとおりです。

パラメーター

説明

project

送信するデータの宛先プロジェクト。

logstore

送信するデータの宛先ログストア。

logItem

送信されるデータ。

完了

すべてのログが送信されることを保証するJavaアトミック型 (正常と失敗の両方) 。

データを送信

プロデューサインターフェースは、以下に説明するような特定のパラメータをそれぞれ有する複数の送信方法を提供する。

パラメーター

説明

必須 / 任意

project

宛先プロジェクト。

logStore

宛先ログストア。

logItem

送信されるログ。

topic

ログのトピック。

不可

説明

指定しない場合、このパラメーターには "" が割り当てられます。

source

ログのソース。

不可

説明

指定しない場合、このパラメーターには、プロデューサーが存在するホストのIPアドレスが割り当てられます。

shardHash

送信されるログのハッシュ値。 ビジネス要件に基づいてハッシュ値を指定すると、ハッシュ値に基づいて指定されたログストア内の特定のシャードにログが書き込まれます。

不可

説明

指定しない場合、データは宛先ログストアのランダムシャードに書き込まれます。

callback

ログの配信が成功したとき、または複数の再試行が失敗した後に破棄された後に呼び出されるコールバック関数を定義できます。

不可

一般的な例外

例外

説明

TimeoutException

プロデューサーのキャッシュされたログサイズがメモリ制限を超え、maxBlockMsミリ秒後に十分なメモリを取得できない場合、TimeoutExceptionがスローされます。

maxBlockMsが-1に設定されている場合、それは不定のブロッキング期間を示し、TimeoutExceptionは発生しません。

IllegalState

プロデューサがクローズ状態にある場合 (closeメソッドが呼び出されている場合) 、その後のsendメソッドの呼び出しはIllegalStateExceptionになります。

ステップ6: 送信結果の取得

プロデューサーの送信メソッドは非同期であるため、返されたfutureまたは提供されたコールバックを介して送信結果を取得する必要があります。

未来

sendメソッドはListenableFutureを返します。これは、標準のgetメソッドとは別に、完了後のコールバック登録も可能です。 次のサンプルコードは、ListenableFutureの使用方法を示しています。 FutureCallbackを登録し、アプリケーションが提供するEXECUTOR_SERVICEスレッドプールで実行します。 完全な例については、「SampleProducerWithFuture.java」をご参照ください。

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SampleProducerWithCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws InterruptedException {
        final String project = "example-project";
        final String logstore = "example-logstore";
        String endpoint = "example-endpoint";
        // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        ProducerConfig producerConfig = new ProducerConfig();
        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        int nTask = 100;
        // The number of logs that have finished (either successfully sent or failed).
        final AtomicLong completed = new AtomicLong(0);
        final CountDownLatch latch = new CountDownLatch(nTask);

        for (int i = 0; i < nTask; ++i) {
            threadPool.submit(
                    new Runnable() {
                        @Override
                        public void run() {
       // The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.               
                            LogItem logItem = new LogItem();
                            logItem.PushBack("key1", "foo");
                            logItem.PushBack("key2", "bar");
                            try {
                                producer.send(
                                        project,
                                        logstore,
                                        "your-topic",
                                        "your-source",
                                        logItem,
                                        new SampleCallback(project, logstore, logItem, completed));
                            } catch (InterruptedException e) {
                                LOGGER.warn("The current thread has been interrupted during send logs.");
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
        }

        // The following logic must be considered only if the process exits.
        latch.await();
        threadPool.shutdown();
        try {
            producer.close();
        } catch (InterruptedException e) {
            LOGGER.warn("The current thread has been interrupted from close.");
        } catch (ProducerException e) {
            LOGGER.info("Failed to close producer, e=", e);
        }

        LOGGER.info("All log complete, completed={}", completed.get());
    }

    private static final class SampleCallback implements Callback {
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
        private final String project;
        private final String logStore;
        private final LogItem logItem;
        private final AtomicLong completed;

        SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
            this.project = project;
            this.logStore = logStore;
            this.logItem = logItem;
            this.completed = completed;
        }

        @Override
        public void onCompletion(Result result) {
            try {
                if (result.isSuccessful()) {
                    LOGGER.info("Send log successfully.");
                } else {
                    LOGGER.error(
                            "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                            project,
                            logStore,
                            logItem.ToJsonString(),
                            result);
                }
            } finally {
                completed.getAndIncrement();
            }
        }
    }
}

Callback

コールバックはプロデューサーの内部スレッドによって実行され、データスペースは完了後にのみ解放されます。 プロデューサーのブロックとスループットの低下を防ぐには、コールバックでの長時間の操作を避けます。 さらに、コールバック内での再試行のためにsendメソッドを呼び出さないでください。 代わりに、ListenableFutureコールバックで再試行を処理します。 完全な例については、「SampleProducerWithCallback.java」をご参照ください。

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SampleProducerWithCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws InterruptedException {
        final String project = "example-project";
        final String logstore = "example-logstore";
        String endpoint = "example-endpoint";
        // In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        ProducerConfig producerConfig = new ProducerConfig();
        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        int nTask = 100;
        // The number of logs that have finished (either successfully sent or failed).
        final AtomicLong completed = new AtomicLong(0);
        final CountDownLatch latch = new CountDownLatch(nTask);

        for (int i = 0; i < nTask; ++i) {
            threadPool.submit(
                    new Runnable() {
                        @Override
                        public void run() {
       // The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.               
                            LogItem logItem = new LogItem();
                            logItem.PushBack("key1", "foo");
                            logItem.PushBack("key2", "bar");
                            try {
                                producer.send(
                                        project,
                                        logstore,
                                        "your-topic",
                                        "your-source",
                                        logItem,
                                        new SampleCallback(project, logstore, logItem, completed));
                            } catch (InterruptedException e) {
                                LOGGER.warn("The current thread has been interrupted during send logs.");
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
        }

        // The following logic must be considered only if the process exits.
        latch.await();
        threadPool.shutdown();
        try {
            producer.close();
        } catch (InterruptedException e) {
            LOGGER.warn("The current thread has been interrupted from close.");
        } catch (ProducerException e) {
            LOGGER.info("Failed to close producer, e=", e);
        }

        LOGGER.info("All log complete, completed={}", completed.get());
    }

    private static final class SampleCallback implements Callback {
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
        private final String project;
        private final String logStore;
        private final LogItem logItem;
        private final AtomicLong completed;

        SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
            this.project = project;
            this.logStore = logStore;
            this.logItem = logItem;
            this.completed = completed;
        }

        @Override
        public void onCompletion(Result result) {
            try {
                if (result.isSuccessful()) {
                    LOGGER.info("Send log successfully.");
                } else {
                    LOGGER.error(
                            "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                            project,
                            logStore,
                            logItem.ToJsonString(),
                            result);
                }
            } finally {
                completed.getAndIncrement();
            }
        }
    }
}

ステップ7: プロデューサー

プロデューサーが不要になった場合、またはプロセスが終了した場合は、プロデューサーを閉じて、すべてのキャッシュデータが処理されていることを確認します。 安全シャットダウンと限定シャットダウンの2つのシャットダウンモードがサポートされています。

安全なシャットダウン

ほとんどの場合、close() メソッドを使用して、安全なシャットダウンを推奨します。 close() メソッドは、キャッシュされたすべてのデータの処理、スレッドの停止、コールバックの実行、および先物の完了を待ってから戻ります。

このメソッドはすべてのデータが処理されるのを待ちますが、コールバックがブロックされていない場合はすぐに戻ります。

限られたシャットダウン

コールバックがブロックされる可能性がある場合のクイックリターンには、close(long timeoutMs) メソッドで制限付きシャットダウンを使用します。 指定されたtimeoutMs後にプロデューサーが完全にクローズされない場合、IllegalStateExceptionがスローされ、潜在的なデータ損失と未実行のコールバックを示します。

よくある質問

Simple Log Serviceにデータが書き込まれない場合はどうすればよいですか?

Simple Log Serviceにデータが書き込まれない場合は、次のトラブルシューティング手順に従います。

  1. プロジェクト内のaliyun-log-produceraliyun-log、およびprotobuf-java JARパッケージのバージョンが、インストールドキュメントで指定されているバージョンと一致することを確認します。 必要に応じてアップグレードします。

  2. Aliyun Log Java Producerのsendメソッドは非同期であるため、戻りデータはすぐには使用できません。 CallbackまたはFutureオブジェクトを使用して、送信失敗の原因を特定します。

  3. CallbackインターフェイスのonCompletionメソッドが呼び出されない場合は、プログラムを終了する前にproducer.close() メソッドが呼び出されていることを確認してください。 データ送信はバックエンドスレッドによって処理されるため、producer.close() を呼び出すと、データの損失が発生しません。

  4. Aliyun Log Java Producerは、SLF4Jロギングフレームワークを使用してランタイム動作を返します。 プログラムでロギングフレームワークを設定し、DEBUGレベルのロギングを有効にしてERRORログを確認します。

  5. 前の手順を完了した後も問題が解決しない場合は、チケットを起票してください

関連ドキュメント

  • API操作を呼び出した後、Simple Log Serviceによって返された応答にエラー情報が含まれている場合、呼び出しは失敗します。 関連するAPI操作のエラーコードの説明に基づいて、エラーを修正できます。 詳細については、エラーコードをご参照ください。

  • Alibaba Cloud OpenAPI Explorerは、デバッグ機能、SDK、サンプル、および関連ドキュメントを提供します。 OpenAPI Explorerを使用して、リクエストを手動でカプセル化したり署名したりすることなく、Simple Log Service API操作をデバッグできます。 詳細については、

    OpenAPIポータル

  • 自動設定の要件を満たすために、Simple Log Serviceはコマンドラインインターフェイス (CLI) を提供しています。 詳細については、「Simple Log Service CLIの概要」をご参照ください。

  • コードサンプルについては、『GitHubのaliyun-log-java-sdk』をご参照ください。