全部產品
Search
文件中心

Simple Log Service:使用Aliyun Log Java Producer寫入日誌資料

更新時間:Nov 23, 2024

如果您在使用FlinkSparkStorm等巨量資料計算引擎時,需要將日誌進行壓縮、批量上傳日誌到Log Service、減少網路傳輸資源的佔用,API或者SDK往往無法滿足巨量資料情境對資料寫入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地將資料上傳到Log Service。

前提條件

您已完成以下操作:

  • 已安裝Log ServiceJava SDK。具體操作,請參見安裝Java SDK

什麼是Aliyun Log Java Producer

Aliyun Log Java Producer是為運行在巨量資料、高並發情境下的Java應用量身打造的高效能類庫。相對於原始的API或SDK,使用該類庫寫日誌資料能為您帶來諸多優勢,包括高效能、計算與I/O邏輯分離、資源可控制等。Aliyun LOG Java Producer使用阿里雲Log Service提供的順序寫入功能來保證日誌的上傳順序。

Log Service提供基於Aliyun Log Java Producer的範例應用程式,便於您快速上手。更多資訊,請參見Aliyun Log Producer Sample Application

工作流程

特點

  • 安全執行緒:Producer介面暴露的所有方法都是安全執行緒的。

  • 非同步發送:調用Producer的發送介面通常能夠立即返迴響應。Producer內部會緩衝併合並待發送資料,然後批量發送以提高輸送量。

  • 自動重試:Producer會根據配置的最大重試次數和重試退避時間進行重試。

  • 行為追溯:通過Callback或Future能擷取當前資料是否發送成功的資訊,也可以獲得該資料每次被嘗試發送的資訊,有利於問題追溯和行為決策。

  • 上下文還原:同一個Producer執行個體產生的日誌在同一上下文中,在服務端可以查看某條日誌前後相關的日誌。

  • 優雅關閉:保證close方法退出時,Producer緩衝的所有資料都能被處理,同時您也能得到相應的通知。

應用情境

producer對比原始的API或SDK的優勢如下:

  • 高效能

    在海量資料、資源有限的前提下,寫入端要達到目標輸送量需要實現複雜的控制邏輯,包括多線程、緩衝策略、批量發送等,另外還要充分考慮失敗重試的情境。Producer實現了上述功能,在為您帶來效能優勢的同時簡化了程式開發步驟。

  • 非同步非阻塞

    在可用記憶體充足的前提下,Producer會對發往日誌庫的資料進行緩衝,因此調用send方法時能夠立即返迴響應且不會阻塞,可達到計算與I/O邏輯分離的目的。隨後,您可以通過返回的Future對象或傳入的Callback獲得資料發送的結果。

  • 資源可控制

    可以通過參數控制Producer用於緩衝待發送資料的記憶體大小,同時還可以配置用於執行資料發送任務的線程數量。這樣可避免Producer無限制地消耗資源,且可以讓您根據實際情況平衡資源消耗和寫入輸送量。

  • 定位問題簡單

    如果日誌資料發送失敗,Producer除了返回狀態代碼,還會返回一個String類型的異常資訊,用於描述失敗的原因和詳細資料。例如,如果發送失敗是因為網路連接逾時,則返回的異常資訊可能是“連線逾時”;如果發送失敗是因為伺服器無響應,則返回的異常資訊可能是“伺服器無響應”。

使用限制

  • aliyun-log-producer底層調用PutLogs介面上傳日誌,每次可以寫入的原始日誌大小存在限制。更多資訊,請參見資料讀寫

  • Log Service的基礎資源,包括建立Project個數、Logstore個數、Shard個數、LogtailConfig個數、機器組個數、單個LogItem大小、LogItem(Key)長度和LogItem(Value)長度等均存在限制。更多資訊,請參見基礎資源

  • 代碼首次運行後,請在Log Service控制台開啟日誌庫索引,等待一分鐘後,進行查詢。

  • 在控制台進行日誌查詢時,當單個欄位值長度超過最大長度時,超出部分被截斷,不參與分析。更多資訊,請參考建立索引

費用說明

使用SDK產生的費用和使用控制台產生的費用一致。更多資訊,請參見計費概述

步驟一:安裝Aliyun Log Java Producer

在Maven工程中使用Log ServiceAliyun Log Java Producer,只需在pom.xml中加入相應依賴。Maven專案管理工具會自動下載相關JAR包。例如,在<dependencies>中加入如下內容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.22</version>
</dependency>

添加更新完後,如果提示Producer依賴的版本衝突,在<dependencies>中加入如下內容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.114</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

步驟二:配置ProducerConfig

ProducerConfig用於配置發送策略,您可以根據不同的業務情境為參數指定不同的值,各參數含義如下表所示:

Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);

參數

類型

描述

totalSizeInBytes

整型

單個Producer執行個體能緩衝的日誌大小上限,預設為 100MB。

maxBlockMs

整型

如果Producer可用空間不足,調用者在send方法上的最大阻塞時間,預設為60秒。

如果超過這個時間後所需空間仍無法得到滿足,send方法會拋出TimeoutException

如果將該值設為0,當所需空間無法得到滿足時,send 方法會立即拋出 TimeoutException。

如果您希望send方法一直阻塞直到所需空間得到滿足,可將該值設為負數。

ioThreadCount

整型

執行日誌發送任務的線程池大小,預設為可用處理器個數。

batchSizeThresholdInBytes

整型

當一個ProducerBatch中緩衝的日誌大小大於等於 batchSizeThresholdInBytes 時,該batch將被發送,預設為512KB,最大可設定成 5MB。

batchCountThreshold

整型

當一個ProducerBatch中緩衝的日誌條數大於等於 batchCountThreshold時,該batch將被發送,預設4096,最大可設定成40960。

lingerMs

整型

一個ProducerBatch從建立到可發送的逗留時間,預設為2秒,最小可設定成100毫秒。

retries

整型

如果某個ProducerBatch首次發送失敗,能夠對其重試的次數,預設為10次。

如果retries小於等於 0,該ProducerBatch首次發送失敗後將直接進入失敗隊列。

maxReservedAttempts

整型

每個ProducerBatch每次被嘗試發送都對應著一個Attempt,此參數用來控制返回給使用者的attempt個數,預設只保留最近的11次attempt資訊。

該參數越大能讓您追溯更多的資訊,但同時也會消耗更多的記憶體。

baseRetryBackoffMs

整型

首次重試的退避時間,預設為100毫秒。

Producer採樣指數退避演算法,第N次重試的計劃等待時間為 baseRetryBackoffMs * 2^(N-1)。

maxRetryBackoffMs

整型

重試的最大退避時間,預設為50秒。

adjustShardHash

布爾

如果調用send方法時指定了 shardHash,該參數用於控制是否需要對其進行調整,預設為true。

buckets

整型

若且唯若adjustShardHash為true時,該參數才生效。此時,producer會自動將shardHash重新分組,分組數量為buckets。

如果兩條資料的shardHash不同,它們是無法合并到一起發送的,會降低producer輸送量。將shardHash重新分組後,能讓資料有更多地機會被批量發送。

該參數的取值範圍是 [1, 256],且必須是2的整數次冪,預設為64。

步驟三:建立Producer

Producer 支援使用者配置AK或STS token。如果使用STS token,需要定期建立新的ProjectConfig然後將其添加到ProjectConfigs裡。

LogProducer是介面Producer的實作類別,它接收唯一的參數producerConfig。當您準備好producerConfig後,可以按照下列方式建立producer執行個體。

Producer producer = new LogProducer(producerConfig);

建立producer的同時會建立一系列線程,這是一個相對昂貴的操作,因此建議一個應用共用一個producer執行個體。一個producer執行個體包含的線程如下表所示,其中N為該producer執行個體在當前進程中的編號,從 0 開始。另外,LogProducer提供的所有方法都是安全執行緒的,可以在多線程環境下安全執行。

線程名格式

數量

描述

aliyun-log-producer-<N>-mover

1

負責將滿足發送條件的batch投遞到發送線程池裡。

aliyun-log-producer-<N>-io-thread

ioThreadCount

IOThreadPool中真正用於執行資料發送任務的線程。

aliyun-log-producer-<N>-success-batch-handler

1

用於處理髮送成功的batch。

aliyun-log-producer-<N>-failure-batch-handler

1

用於處理髮送失敗的batch。

步驟四:配置記錄項目

ProjectConfig包含目標Project的服務入口資訊以及表徵調用者身份的訪問憑證。每個記錄項目對應一個ProjectConfig對象。

可以按照如下方式建立執行個體。

ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);

步驟五:發送資料

建立Future或Callback

在使用Aliyun Log Java Producer發送日誌資料時,需要指定一個回呼函數來處理髮送過程中的各種情況。當日誌資料發送成功時,回呼函數會被調用,並返回一個發送結果;當日誌資料發送失敗時,回呼函數也會被調用,並傳入一個異常對象。

說明

如果擷取結果後,應用的處理邏輯比較簡單且不會造成producer阻塞,建議直接使用callback。否則,建議使用ListenableFuture,在單獨的線程(池)中執行後續業務

方法的各個參數含義如下:

參數

描述

project

待發送資料的目標 project。

logstore

待發送資料的目標 logStore。

logTem

待發送資料。

completed

Java提供的一個原子類型,用來確保所有日誌發送完成(成功或者失敗)。

發送資料

Producer介面提供多種發送方法,方法的各個參數含義如下。

參數

描述

是否必選

project

目標Project。

logStore

目標LogStore。

logItem

要發送的日誌/日誌列表。

topic

日誌主題

說明

如果留空或沒有指定,該欄位將被賦予""。

source

發送源。

說明

如果留空或沒有指定,該欄位將被賦予producer所在宿主機的 IP。

shardHash

可為發送的日誌設定自訂雜湊,服務端將根據此雜湊選擇對應的日誌庫Shard分區寫入日誌。

說明

如果留空或沒有指定,資料將被隨機寫入目標LogStore的某個shard中。

callback

可設定一個回呼函數。該回呼函數將在日誌被成功發送或者重試多次失敗後被丟棄時調用。

常見異常

異常

說明

TimeoutException

當Producer緩衝的日誌大小超過設定的記憶體上限時,且阻塞maxBlockMs毫秒後仍未擷取到足夠記憶體時,將拋出TimeoutException。

maxBlockMs 為-1時,阻塞沒有時間上限,將永遠不會拋出 TimeoutException。

IllegalStateException

當Producer已經處於關閉狀態(調用過close方法)時,再調用send 方法,會拋出IllegalStateException。

步驟六:擷取發送資料

由於producer提供的所有發送方法都是非同步,需要通過返回的future或者傳入的callback擷取發送結果。

Future

Send 方法會返回一個ListenableFuture,它除了可以像普通future那樣通過調用get方法阻塞獲得發送結果外,還允許你註冊回調方法(回調方法會在完成 future 設定後被調用)。以下程式碼片段展示了ListenableFuture的使用方法,使用者需要為該future註冊一個FutureCallback並將其投遞到應用提供的線程池EXECUTOR_SERVICE中執行,完整範例請參見SampleProducerWithFuture.java

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SampleProducerWithCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws InterruptedException {
        final String project = "example-project";
        final String logstore = "example-logstore";
        String endpoint = "example-endpoint";
        // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        ProducerConfig producerConfig = new ProducerConfig();
        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        int nTask = 100;
        // The number of logs that have finished (either successfully send, or failed).
        final AtomicLong completed = new AtomicLong(0);
        final CountDownLatch latch = new CountDownLatch(nTask);

        for (int i = 0; i < nTask; ++i) {
            threadPool.submit(
                    new Runnable() {
                        @Override
                        public void run() {
       //The maximum size of a LogItem (key) is 128 bytes.  The maximum size of a LogItem (value) is 1 MB.               
                            LogItem logItem = new LogItem();
                            logItem.PushBack("key1", "foo");
                            logItem.PushBack("key2", "bar");
                            try {
                                producer.send(
                                        project,
                                        logstore,
                                        "your-topic",
                                        "your-source",
                                        logItem,
                                        new SampleCallback(project, logstore, logItem, completed));
                            } catch (InterruptedException e) {
                                LOGGER.warn("The current thread has been interrupted during send logs.");
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
        }

        // 只有進程退出的時候,才需要考慮如下的邏輯。
        latch.await();
        threadPool.shutdown();
        try {
            producer.close();
        } catch (InterruptedException e) {
            LOGGER.warn("The current thread has been interrupted from close.");
        } catch (ProducerException e) {
            LOGGER.info("Failed to close producer, e=", e);
        }

        LOGGER.info("All log complete, completed={}", completed.get());
    }

    private static final class SampleCallback implements Callback {
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
        private final String project;
        private final String logStore;
        private final LogItem logItem;
        private final AtomicLong completed;

        SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
            this.project = project;
            this.logStore = logStore;
            this.logItem = logItem;
            this.completed = completed;
        }

        @Override
        public void onCompletion(Result result) {
            try {
                if (result.isSuccessful()) {
                    LOGGER.info("Send log successfully.");
                } else {
                    LOGGER.error(
                            "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                            project,
                            logStore,
                            logItem.ToJsonString(),
                            result);
                }
            } finally {
                completed.getAndIncrement();
            }
        }
    }
}

Callback

Callback由producer內部線程負責執行,並且只有在執行完畢後資料“佔用”的空間才會釋放。為了不阻塞producer造成整體輸送量的下降,要避免在callback裡執行耗時的操作。另外,在callback中調用send方法進行重試也是不建議的,您可以在ListenableFuture的callback中進行重試。完整範例請參見SampleProducerWithCallback.java

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SampleProducerWithCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws InterruptedException {
        final String project = "example-project";
        final String logstore = "example-logstore";
        String endpoint = "example-endpoint";
        // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        ProducerConfig producerConfig = new ProducerConfig();
        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        int nTask = 100;
        // The number of logs that have finished (either successfully send, or failed).
        final AtomicLong completed = new AtomicLong(0);
        final CountDownLatch latch = new CountDownLatch(nTask);

        for (int i = 0; i < nTask; ++i) {
            threadPool.submit(
                    new Runnable() {
                        @Override
                        public void run() {
       //The maximum size of a LogItem (key) is 128 bytes.  The maximum size of a LogItem (value) is 1 MB.               
                            LogItem logItem = new LogItem();
                            logItem.PushBack("key1", "foo");
                            logItem.PushBack("key2", "bar");
                            try {
                                producer.send(
                                        project,
                                        logstore,
                                        "your-topic",
                                        "your-source",
                                        logItem,
                                        new SampleCallback(project, logstore, logItem, completed));
                            } catch (InterruptedException e) {
                                LOGGER.warn("The current thread has been interrupted during send logs.");
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
        }

        // 只有進程退出的時候,才需要考慮如下的邏輯。
        latch.await();
        threadPool.shutdown();
        try {
            producer.close();
        } catch (InterruptedException e) {
            LOGGER.warn("The current thread has been interrupted from close.");
        } catch (ProducerException e) {
            LOGGER.info("Failed to close producer, e=", e);
        }

        LOGGER.info("All log complete, completed={}", completed.get());
    }

    private static final class SampleCallback implements Callback {
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
        private final String project;
        private final String logStore;
        private final LogItem logItem;
        private final AtomicLong completed;

        SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
            this.project = project;
            this.logStore = logStore;
            this.logItem = logItem;
            this.completed = completed;
        }

        @Override
        public void onCompletion(Result result) {
            try {
                if (result.isSuccessful()) {
                    LOGGER.info("Send log successfully.");
                } else {
                    LOGGER.error(
                            "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                            project,
                            logStore,
                            logItem.ToJsonString(),
                            result);
                }
            } finally {
                completed.getAndIncrement();
            }
        }
    }
}

步驟七:關閉Producer

當您已經沒有資料需要發送或者當前進程準備退出時,需要關閉Producer,目的是讓Producer中緩衝的資料全部被處理。目前,Producer提供安全關閉和有限關閉兩種模式。

安全關閉

在大多數情況下,建議您使用安全關閉。安全關閉對應的方法是close(),它會等到Producer中緩衝的資料全部被處理、線程全部停止、註冊的callback全部執行,返回future全部被設定後才會返回。

雖然要等到資料全部處理完成,但Producer被關閉後,緩衝的batch會被立刻處理且不會被重試。因此,如果callback不被阻塞,close方法往往能在很短的時間內返回。

有限關閉

如果您的callback在執行過程中有可能阻塞,但您又希望close方法能在短時間內返回,可以使用有限關閉。有限關閉對應的方法是close(long timeoutMs),如果超過指定的timeoutMs後Producer仍未完全關閉,它會拋出IllegalStateException異常,這意味著緩衝的資料可能還沒來得及處理就被丟棄,使用者註冊的Callback也可能不會被執行。

常見問題

寫入資料次數是否存在限制?

  • Log Service讀寫資料的次數和大小均存在限制。更多資訊,請參見資料讀寫

  • Log Service的基礎資源,包括建立Project個數、Logstore個數、Shard個數、LogtailConfig個數、機器組個數、單個LogItem大小、LogItem(Key)長度和LogItem(Value)長度等均存在限制。更多資訊,請參見基礎資源

為什麼資料沒有寫入Log Service?

如果您探索資料沒有寫入Log Service,可通過如下步驟診斷問題。

  1. 檢查您專案中引入的aliyun-log-produceraliyun-logprotobuf-java Jar包的版本是否和文檔中安裝部分列出的Jar包版本一致,如果不一致請進行升級。

  2. Producer介面的send方法非同步發送資料,無法及時擷取返回的值。請通過Callback介面或返回的Future對象擷取資料發送失敗的原因。

  3. 如果您發現並沒有回調Callback介面的onCompletion方法,請檢查在您的程式退出之前是否有調用producer.close()方法。因為資料發送是由後台線程非同步完成的,為了防止緩衝在記憶體裡的少量資料丟失,請務必在程式退出之前調用producer.close()方法。

  4. Producer介面會把運行過程中的關鍵行為通過日誌架構slf4j進行輸出,您可以在程式中配置好相應的日誌實現架構並開啟DEBUG層級的日誌。重點檢查是否輸出ERROR層級的日誌。

  5. 如果通過上述步驟仍然沒有解決,請提工單

相關文檔