For big data compute engines such as Flink, Spark, and Storm, which require log compression, batch uploading to Simple Log Service, and reduced network resource consumption, the API or SDK may not suffice. Aliyun Log Java Producer offers a convenient and efficient solution for uploading data to Simple Log Service in such scenarios.
Background information
Aliyun Log Java Producer is a high-performance class library designed for Java applications in big data and high concurrency environments. It offers several benefits over the standard API or SDK, including high performance, separation of computing and I/O logic, and resource management. It leverages the sequential writing feature of Alibaba Cloud Simple Log Service to ensure ordered log uploads.
Simple Log Service offers sample applications using Aliyun Log Java Producer to facilitate a quick start. For more information, see Aliyun Log Java Producer Sample Application.
The following figure shows the workflow of Aliyun Log Java Producer:
Limits
The underlying mechanism of Aliyun Log Java Producer calls the PutLogs operation to upload logs. The size of raw logs that can be uploaded each time is limited. For more information, see Data read and write.
The basic resources of Simple Log Service, such as projects, logstores, shards, and machine groups, also have limitations. For more information, see Basic resources.
The first time you run code, you must enable the indexing feature for your logstore in the Simple Log Service console. Then, wait for about one minute before querying logs.
If you query logs in the Simple Log Service console and the value length of a field in the returned logs exceeds the upper limit, the field value will be truncated, and the excess part will not be used for analysis. For more information, see Create indexes.
Billing
The costs incurred using the SDK are consistent with those incurred using the console. For more information, see Billing overview.
Prerequisites
Simple Log Service is activated. For more information, see Activate Simple Log Service.
The Simple Log Service SDK for Java is installed and initialized.
Step 1: Install Aliyun Log Java Producer
To use Aliyun Log Java Producer in a Maven project, add the corresponding dependency in the pom.xml file. Then, Maven will automatically download the relevant JAR packages. For example, add the following in <dependencies>:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log-producer</artifactId>
<version>0.3.10</version>
</dependency>
If a version conflict occurs with the producer dependency, add the following in <dependencies>:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.35</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
Step 2: Configure ProducerConfig
ProducerConfig is used to set the sending policy. Adjust the parameter values to suit different business needs.
Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);
The parameters are explained in the following table:
Parameter | Type | Description |
totalSizeInBytes | Integer | The maximum size of logs that can be cached by a producer instance. Default value: 100 MB. |
maxBlockMs | Integer | The maximum blocking time during which the send method is called but the available space of a producer instance is insufficient. Default value: 60 seconds. If the maximum blocking time that you specify elapses and the available space of the producer instance is still insufficient, the send method throws the TimeoutException error. If you set this parameter to 0 and the available space of the producer instance is insufficient, the send method immediately throws the TimeoutException error. If you want to block the send method until the available space of the producer instance is sufficient, you must set this parameter to a negative value. |
ioThreadCount | Integer | The number of threads for log sending tasks. The default value is the number of available processors. |
batchSizeThresholdInBytes | Integer | The threshold for sending a batch of logs. Default value: 512 KB. Maximum value: 5 MB. |
batchCountThreshold | Integer | The number of logs in a batch before sending. Default value: 4096. Maximum value: 40960. |
lingerMs | Integer | The delay before a batch can be sent. Default value: 2 seconds. Minimum value: 100 ms. |
retries | Integer | The number of retry attempts for a batch after an initial failure. Default value: 10. If this parameter is set to 0 or less, the batch enters the failure queue immediately after the first failure. |
maxReservedAttempts | Integer | Each attempt to send a ProducerBatch corresponds to an attempt. This parameter controls how many attempts are reported back to the user, retaining by default only the latest 11 attempts. Increasing this parameter allows for more detailed tracing at the expense of higher memory consumption. |
baseRetryBackoffMs | Integer | The initial backoff time for retries. Default value: 100 milliseconds. The producer employs an exponential backoff algorithm, where the wait time before the Nth retry is calculated as |
maxRetryBackoffMs | Integer | The maximum backoff time for retries. Default value: 50 seconds. |
adjustShardHash | Boolean | Determines whether to adjust the shardHash when the send method is called. Default value: true. |
buckets | Integer | This parameter is effective when adjustShardHash is true. This parameter regroups shardHash into the specified number of buckets. Differing shardHash values prevent data from being merged and batched, thus limiting the producer's throughput. By regrouping the shardHash, data can be more effectively batched for transmission. The value of this parameter must be an integer power of 2 within the range [1, 256]. Default value: 64. |
Step 3: Create a producer
The producer supports configuration with AK or STS tokens. For STS tokens, periodically create a new ProjectConfig and add it to ProjectConfigs.
LogProducer is the producer's implementation class, and requires a unique producerConfig. After the producerConfig is prepared, you can instantiate a producer as follows:
Producer producer = new LogProducer(producerConfig);
Creating a producer initiates several threads, a process which is resource-intensive. We recommend that you share a producer instance across the application. All methods of LogProducer are thread-safe for concurrent use. The table below lists the threads within a producer instance, where N is the instance number, starting at 0.
Thread name format | Quantity | Description |
aliyun-log-producer-<N>-mover | 1 | Transfers batches ready to be sent to the sending thread pool. |
aliyun-log-producer-<N>-io-thread | ioThreadCount | Threads in the IOThreadPool that execute data sending tasks. |
aliyun-log-producer-<N>-success-batch-handler | 1 | Handles batches that have been successfully sent. |
aliyun-log-producer-<N>-failure-batch-handler | 1 | Manages batches that failed to send. |
Step 4: Configure a log project
ProjectConfig includes the endpoint information of the destination project and the access credentials representing the caller's identity. Each log project corresponds to one ProjectConfig object.
Create an instance as follows:
ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);
Step 5: Send data
Create Future or Callback
When sending log data with the Aliyun Log Java Producer, specify a Callback function to handle the sending process. This Callback function is invoked when data transmission is successful or when an exception occurs during a failed send.
If post-result processing in the application is simple and does not block the producer, use the callback directly. Otherwise, use ListenableFuture to handle business logic in a separate thread or thread pool.
The method parameters are described below:
Parameter | Description |
project | The destination project for the data to be sent. |
logstore | The destination logstore for the data to be sent. |
logItem | The data to be sent. |
completed | A Java atomic type to ensure that all logs are sent (both successfully and unsuccessfully). |
Send data
The producer interface offers multiple sending methods, each with specific parameters as described below.
Parameter | Description | Required |
project | The destination project. | Yes |
logStore | The destination logstore. | Yes |
logItem | The logs to be sent. | Yes |
topic | The topic of the logs. | No Note If not specified, this parameter is assigned |
source | The source of the logs. | No Note If not specified, this parameter is assigned the IP address of the host where the producer resides. |
shardHash | The hash value for the logs to be sent. You can specify a hash value based on your business requirements, and then the logs are written to a specific shard in the specified logstore based on the hash value. | No Note If not specified, the data is written to a random shard in the destination logstore. |
callback | You can define a Callback function that is invoked when the log delivery is successful, or after it is discarded following multiple failed retries. | No |
Common exceptions
Exception | Description |
TimeoutException | The TimeoutException is thrown when the producer's cached log size exceeds the memory limit and will fail to acquire sufficient memory after maxBlockMs milliseconds. If maxBlockMs is set to -1, it indicates an indefinite blocking period, and the TimeoutException does not occur. |
IllegalState | If the producer is in a closed state (the close method has been invoked), then any subsequent calls to the send method will result in an IllegalStateException. |
Step 6: Obtain the sending result
Because the producer's sending methods are asynchronous, the sending result must be obtained through the returned future or the provided callback.
Future
The send method returns a ListenableFuture, which aside from the standard get method also allows for callback registration after completion. The sample code below demonstrates ListenableFuture usage. Register a FutureCallback for the future and execute it in the application-provided EXECUTOR_SERVICE thread pool. For a full example, see SampleProducerWithFuture.java.
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully sent or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
// The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// The following logic must be considered only if the process exits.
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
Callback
The callback is executed by the producer's internal thread, and the data space is only released after completion. To prevent blocking the producer and reducing throughput, avoid lengthy operations in the callback. Additionally, do not call the send method for retries within the callback. Instead, handle retries in the ListenableFuture callback. For a complete example, see SampleProducerWithCallback.java.
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully sent or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
// The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// The following logic must be considered only if the process exits.
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
Step 7: Close the producer
When the producer is no longer needed or the process is exiting, close the producer to ensure all cached data is processed. Two shutdown modes are supported: safe shutdown and limited shutdown.
Safe shutdown
A safe shutdown is recommended in most cases, using the close() method. The close() method waits for all cached data to be processed, for threads to stop, for callbacks to execute, and for futures to complete before returning.
Although this method waits for all data to be processed, it quickly returns if the callback is not blocked, and batches are immediately processed without retries after closure.
Limited shutdown
For a quick return when callbacks may block, use a limited shutdown with the close(long timeoutMs) method. If the producer is not fully closed after the specified timeoutMs, an IllegalStateException will be thrown, indicating potential data loss and unexecuted callbacks.
FAQ
What do I do if no data is written to Simple Log Service?
If no data is written to Simple Log Service, follow these troubleshooting steps:
Verify that the versions of the
aliyun-log-producer
,aliyun-log
, andprotobuf-java
JAR packages in your project match those specified in the installation documentation. Upgrade them if necessary.The send method of the Aliyun Log Java Producer is asynchronous, so the return data is not immediately available. Use a Callback or Future object to determine the cause of a sending failure.
If the onCompletion method of the Callback interface is not called, ensure that the
producer.close()
method is invoked before program termination. Because data transmission is handled by a backend thread, callingproducer.close()
ensures no data loss.The Aliyun Log Java Producer uses the SLF4J logging framework to return runtime behaviors. Configure a logging framework in your program and enable DEBUG-level logging to check for ERROR logs.
If the issue persists after you have completed the previous steps, submit a ticket for further assistance.
References
- If the response that is returned by Log Service contains error information after you call an API operation, the call fails. You can handle errors based on the error codes that are returned when API calls fail. For more information, see Error codes.
- Alibaba Cloud OpenAPI Explorer provides debugging capabilities, SDKs, examples, and related documents. You can use OpenAPI Explorer to debug Log Service API operations without the need to manually encapsulate or sign requests. For more information, visit OpenAPI Portal.
- Log Service provides the command-line interface (CLI) to meet the requirements for automated configurations in Log Service. For more information, see Log Service CLI.
- For more information about sample code, see Alibaba Cloud Log Service SDK for Java on GitHub.