如果您在使用Flink、Spark、Storm等巨量資料計算引擎時,需要將日誌進行壓縮、批量上傳日誌到Log Service、減少網路傳輸資源的佔用,API或者SDK往往無法滿足巨量資料情境對資料寫入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地將資料上傳到Log Service。
前提條件
您已完成以下操作:
已安裝Log ServiceJava SDK。具體操作,請參見安裝Java SDK。
什麼是Aliyun Log Java Producer
Aliyun Log Java Producer是為運行在巨量資料、高並發情境下的Java應用量身打造的高效能類庫。相對於原始的API或SDK,使用該類庫寫日誌資料能為您帶來諸多優勢,包括高效能、計算與I/O邏輯分離、資源可控制等。Aliyun LOG Java Producer使用阿里雲Log Service提供的順序寫入功能來保證日誌的上傳順序。
Log Service提供基於Aliyun Log Java Producer的範例應用程式,便於您快速上手。更多資訊,請參見Aliyun Log Producer Sample Application。
工作流程
特點
安全執行緒:Producer介面暴露的所有方法都是安全執行緒的。
非同步發送:調用Producer的發送介面通常能夠立即返迴響應。Producer內部會緩衝併合並待發送資料,然後批量發送以提高輸送量。
自動重試:Producer會根據配置的最大重試次數和重試退避時間進行重試。
行為追溯:通過Callback或Future能擷取當前資料是否發送成功的資訊,也可以獲得該資料每次被嘗試發送的資訊,有利於問題追溯和行為決策。
上下文還原:同一個Producer執行個體產生的日誌在同一上下文中,在服務端可以查看某條日誌前後相關的日誌。
優雅關閉:保證close方法退出時,Producer緩衝的所有資料都能被處理,同時您也能得到相應的通知。
應用情境
producer對比原始的API或SDK的優勢如下:
高效能
在海量資料、資源有限的前提下,寫入端要達到目標輸送量需要實現複雜的控制邏輯,包括多線程、緩衝策略、批量發送等,另外還要充分考慮失敗重試的情境。Producer實現了上述功能,在為您帶來效能優勢的同時簡化了程式開發步驟。
非同步非阻塞
在可用記憶體充足的前提下,Producer會對發往日誌庫的資料進行緩衝,因此調用send方法時能夠立即返迴響應且不會阻塞,可達到計算與I/O邏輯分離的目的。隨後,您可以通過返回的Future對象或傳入的Callback獲得資料發送的結果。
資源可控制
可以通過參數控制Producer用於緩衝待發送資料的記憶體大小,同時還可以配置用於執行資料發送任務的線程數量。這樣可避免Producer無限制地消耗資源,且可以讓您根據實際情況平衡資源消耗和寫入輸送量。
定位問題簡單
如果日誌資料發送失敗,Producer除了返回狀態代碼,還會返回一個String類型的異常資訊,用於描述失敗的原因和詳細資料。例如,如果發送失敗是因為網路連接逾時,則返回的異常資訊可能是“連線逾時”;如果發送失敗是因為伺服器無響應,則返回的異常資訊可能是“伺服器無響應”。
使用限制
aliyun-log-producer底層調用PutLogs介面上傳日誌,每次可以寫入的原始日誌大小存在限制。更多資訊,請參見資料讀寫。
Log Service的基礎資源,包括建立Project個數、Logstore個數、Shard個數、LogtailConfig個數、機器組個數、單個LogItem大小、LogItem(Key)長度和LogItem(Value)長度等均存在限制。更多資訊,請參見基礎資源。
代碼首次運行後,請在Log Service控制台開啟日誌庫索引,等待一分鐘後,進行查詢。
在控制台進行日誌查詢時,當單個欄位值長度超過最大長度時,超出部分被截斷,不參與分析。更多資訊,請參考建立索引。
費用說明
使用SDK產生的費用和使用控制台產生的費用一致。更多資訊,請參見計費概述。
步驟一:安裝Aliyun Log Java Producer
在Maven工程中使用Log ServiceAliyun Log Java Producer,只需在pom.xml中加入相應依賴。Maven專案管理工具會自動下載相關JAR包。例如,在<dependencies>中加入如下內容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log-producer</artifactId>
<version>0.3.22</version>
</dependency>
添加更新完後,如果提示Producer依賴的版本衝突,在<dependencies>中加入如下內容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.114</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
步驟二:配置ProducerConfig
ProducerConfig用於配置發送策略,您可以根據不同的業務情境為參數指定不同的值,各參數含義如下表所示:
Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);
參數 | 類型 | 描述 |
totalSizeInBytes | 整型 | 單個Producer執行個體能緩衝的日誌大小上限,預設為 100MB。 |
maxBlockMs | 整型 | 如果Producer可用空間不足,調用者在send方法上的最大阻塞時間,預設為60秒。 如果超過這個時間後所需空間仍無法得到滿足,send方法會拋出TimeoutException。 如果將該值設為0,當所需空間無法得到滿足時,send 方法會立即拋出 TimeoutException。 如果您希望send方法一直阻塞直到所需空間得到滿足,可將該值設為負數。 |
ioThreadCount | 整型 | 執行日誌發送任務的線程池大小,預設為可用處理器個數。 |
batchSizeThresholdInBytes | 整型 | 當一個ProducerBatch中緩衝的日誌大小大於等於 batchSizeThresholdInBytes 時,該batch將被發送,預設為512KB,最大可設定成 5MB。 |
batchCountThreshold | 整型 | 當一個ProducerBatch中緩衝的日誌條數大於等於 batchCountThreshold時,該batch將被發送,預設4096,最大可設定成40960。 |
lingerMs | 整型 | 一個ProducerBatch從建立到可發送的逗留時間,預設為2秒,最小可設定成100毫秒。 |
retries | 整型 | 如果某個ProducerBatch首次發送失敗,能夠對其重試的次數,預設為10次。 如果retries小於等於 0,該ProducerBatch首次發送失敗後將直接進入失敗隊列。 |
maxReservedAttempts | 整型 | 每個ProducerBatch每次被嘗試發送都對應著一個Attempt,此參數用來控制返回給使用者的attempt個數,預設只保留最近的11次attempt資訊。 該參數越大能讓您追溯更多的資訊,但同時也會消耗更多的記憶體。 |
baseRetryBackoffMs | 整型 | 首次重試的退避時間,預設為100毫秒。 Producer採樣指數退避演算法,第N次重試的計劃等待時間為 baseRetryBackoffMs * 2^(N-1)。 |
maxRetryBackoffMs | 整型 | 重試的最大退避時間,預設為50秒。 |
adjustShardHash | 布爾 | 如果調用send方法時指定了 shardHash,該參數用於控制是否需要對其進行調整,預設為true。 |
buckets | 整型 | 若且唯若adjustShardHash為true時,該參數才生效。此時,producer會自動將shardHash重新分組,分組數量為buckets。 如果兩條資料的shardHash不同,它們是無法合并到一起發送的,會降低producer輸送量。將shardHash重新分組後,能讓資料有更多地機會被批量發送。 該參數的取值範圍是 [1, 256],且必須是2的整數次冪,預設為64。 |
步驟三:建立Producer
Producer 支援使用者配置AK或STS token。如果使用STS token,需要定期建立新的ProjectConfig然後將其添加到ProjectConfigs裡。
LogProducer是介面Producer的實作類別,它接收唯一的參數producerConfig。當您準備好producerConfig後,可以按照下列方式建立producer執行個體。
Producer producer = new LogProducer(producerConfig);
建立producer的同時會建立一系列線程,這是一個相對昂貴的操作,因此建議一個應用共用一個producer執行個體。一個producer執行個體包含的線程如下表所示,其中N為該producer執行個體在當前進程中的編號,從 0 開始。另外,LogProducer提供的所有方法都是安全執行緒的,可以在多線程環境下安全執行。
線程名格式 | 數量 | 描述 |
aliyun-log-producer-<N>-mover | 1 | 負責將滿足發送條件的batch投遞到發送線程池裡。 |
aliyun-log-producer-<N>-io-thread | ioThreadCount | IOThreadPool中真正用於執行資料發送任務的線程。 |
aliyun-log-producer-<N>-success-batch-handler | 1 | 用於處理髮送成功的batch。 |
aliyun-log-producer-<N>-failure-batch-handler | 1 | 用於處理髮送失敗的batch。 |
步驟四:配置記錄項目
ProjectConfig包含目標Project的服務入口資訊以及表徵調用者身份的訪問憑證。每個記錄項目對應一個ProjectConfig對象。
可以按照如下方式建立執行個體。
ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);
步驟五:發送資料
建立Future或Callback
在使用Aliyun Log Java Producer發送日誌資料時,需要指定一個回呼函數來處理髮送過程中的各種情況。當日誌資料發送成功時,回呼函數會被調用,並返回一個發送結果;當日誌資料發送失敗時,回呼函數也會被調用,並傳入一個異常對象。
如果擷取結果後,應用的處理邏輯比較簡單且不會造成producer阻塞,建議直接使用callback。否則,建議使用ListenableFuture,在單獨的線程(池)中執行後續業務
方法的各個參數含義如下:
參數 | 描述 |
project | 待發送資料的目標 project。 |
logstore | 待發送資料的目標 logStore。 |
logTem | 待發送資料。 |
completed | Java提供的一個原子類型,用來確保所有日誌發送完成(成功或者失敗)。 |
發送資料
Producer介面提供多種發送方法,方法的各個參數含義如下。
參數 | 描述 | 是否必選 |
project | 目標Project。 | 是 |
logStore | 目標LogStore。 | 是 |
logItem | 要發送的日誌/日誌列表。 | 是 |
topic | 日誌主題 | 否 說明 如果留空或沒有指定,該欄位將被賦予""。 |
source | 發送源。 | 否 說明 如果留空或沒有指定,該欄位將被賦予producer所在宿主機的 IP。 |
shardHash | 可為發送的日誌設定自訂雜湊,服務端將根據此雜湊選擇對應的日誌庫Shard分區寫入日誌。 | 否 說明 如果留空或沒有指定,資料將被隨機寫入目標LogStore的某個shard中。 |
callback | 可設定一個回呼函數。該回呼函數將在日誌被成功發送或者重試多次失敗後被丟棄時調用。 | 否 |
常見異常
異常 | 說明 |
TimeoutException | 當Producer緩衝的日誌大小超過設定的記憶體上限時,且阻塞maxBlockMs毫秒後仍未擷取到足夠記憶體時,將拋出TimeoutException。 maxBlockMs 為-1時,阻塞沒有時間上限,將永遠不會拋出 TimeoutException。 |
IllegalStateException | 當Producer已經處於關閉狀態(調用過close方法)時,再調用send 方法,會拋出IllegalStateException。 |
步驟六:擷取發送資料
由於producer提供的所有發送方法都是非同步,需要通過返回的future或者傳入的callback擷取發送結果。
Future
Send 方法會返回一個ListenableFuture,它除了可以像普通future那樣通過調用get方法阻塞獲得發送結果外,還允許你註冊回調方法(回調方法會在完成 future 設定後被調用)。以下程式碼片段展示了ListenableFuture的使用方法,使用者需要為該future註冊一個FutureCallback並將其投遞到應用提供的線程池EXECUTOR_SERVICE中執行,完整範例請參見SampleProducerWithFuture.java。
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully send, or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
//The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// 只有進程退出的時候,才需要考慮如下的邏輯。
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
Callback
Callback由producer內部線程負責執行,並且只有在執行完畢後資料“佔用”的空間才會釋放。為了不阻塞producer造成整體輸送量的下降,要避免在callback裡執行耗時的操作。另外,在callback中調用send方法進行重試也是不建議的,您可以在ListenableFuture的callback中進行重試。完整範例請參見SampleProducerWithCallback.java。
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully send, or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
//The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// 只有進程退出的時候,才需要考慮如下的邏輯。
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
步驟七:關閉Producer
當您已經沒有資料需要發送或者當前進程準備退出時,需要關閉Producer,目的是讓Producer中緩衝的資料全部被處理。目前,Producer提供安全關閉和有限關閉兩種模式。
安全關閉
在大多數情況下,建議您使用安全關閉。安全關閉對應的方法是close()
,它會等到Producer中緩衝的資料全部被處理、線程全部停止、註冊的callback全部執行,返回future全部被設定後才會返回。
雖然要等到資料全部處理完成,但Producer被關閉後,緩衝的batch會被立刻處理且不會被重試。因此,如果callback不被阻塞,close方法往往能在很短的時間內返回。
有限關閉
如果您的callback在執行過程中有可能阻塞,但您又希望close方法能在短時間內返回,可以使用有限關閉。有限關閉對應的方法是close(long timeoutMs)
,如果超過指定的timeoutMs後Producer仍未完全關閉,它會拋出IllegalStateException異常,這意味著緩衝的資料可能還沒來得及處理就被丟棄,使用者註冊的Callback也可能不會被執行。
常見問題
寫入資料次數是否存在限制?
Log Service讀寫資料的次數和大小均存在限制。更多資訊,請參見資料讀寫。
Log Service的基礎資源,包括建立Project個數、Logstore個數、Shard個數、LogtailConfig個數、機器組個數、單個LogItem大小、LogItem(Key)長度和LogItem(Value)長度等均存在限制。更多資訊,請參見基礎資源。
為什麼資料沒有寫入Log Service?
如果您探索資料沒有寫入Log Service,可通過如下步驟診斷問題。
檢查您專案中引入的
aliyun-log-producer
、aliyun-log
、protobuf-java
Jar包的版本是否和文檔中安裝部分列出的Jar包版本一致,如果不一致請進行升級。Producer介面的send方法非同步發送資料,無法及時擷取返回的值。請通過Callback介面或返回的Future對象擷取資料發送失敗的原因。
如果您發現並沒有回調Callback介面的onCompletion方法,請檢查在您的程式退出之前是否有調用
producer.close()
方法。因為資料發送是由後台線程非同步完成的,為了防止緩衝在記憶體裡的少量資料丟失,請務必在程式退出之前調用producer.close()
方法。Producer介面會把運行過程中的關鍵行為通過日誌架構slf4j進行輸出,您可以在程式中配置好相應的日誌實現架構並開啟DEBUG層級的日誌。重點檢查是否輸出ERROR層級的日誌。
如果通過上述步驟仍然沒有解決,請提工單。
相關文檔
在調用API介面過程中,若服務端返回結果中包含錯誤資訊,則表示調用API介面失敗。您可以參考API錯誤碼對照表尋找對應的解決方案。更多資訊,請參見API錯誤處理對照表。
Log Service除自研的SDK外,還支援公用的阿里雲SDK,關於阿里雲SDK的使用方式,請參見Log Service_SDK中心-阿里雲OpenAPI開發人員門戶。
為滿足越來越多的自動化Log Service配置需求,Log Service提供命令列工具CLI(Command Line Interface)。更多資訊,請參見Log Service命令列工具CLI。
更多範例程式碼,請參見Aliyun Log Java SDK on GitHub。