日志服务支持通过SDK写入时序数据,本文列举了Java、Golang和Python语言的SDK demo。
Java SDK示例
安装步骤可参见安装Alyun Lo Java Producer。
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
private static final Random random = new Random();
public static void main(String[] args) throws InterruptedException {
final String project = "";
final String logstore = "";
// 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
final String endpoint = "https://cn-hangzhou.log.aliyuncs.com";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
int sendThreadCount = 8;
final int times = 10;
LOGGER.info(
"project={}, logstore={}, endpoint={}, sendThreadCount={}, times={}",
project, logstore, endpoint, sendThreadCount, times);
ExecutorService executorService = Executors.newFixedThreadPool(sendThreadCount);
ProducerConfig producerConfig = new ProducerConfig();
producerConfig.setBatchSizeThresholdInBytes(3 * 1024 * 1024);
producerConfig.setBatchCountThreshold(40960);
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
final AtomicInteger completedCount = new AtomicInteger(0);
LOGGER.info("Test started.");
long t1 = System.currentTimeMillis();
final Map<String, String> labels = new HashMap<String, String>();
labels.put("test_k", "test_v");
for (int i = 0; i < sendThreadCount; ++i) {
executorService.submit(
new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < times; ++i) {
int r = random.nextInt(times);
producer.send(project, logstore, generateTopic(r), generateSource(r),
buildLogItem("test_metric", labels, i),
new Callback() {
@Override
public void onCompletion(Result result) {
completedCount.incrementAndGet();
if (!result.isSuccessful()) {
LOGGER.error(
"Failed to send log, project={}, logstore={}, result={}",
project,
logstore,
result);
}
}
});
}
} catch (Exception e) {
LOGGER.error("Failed to send log, e=", e);
}
}
});
}
while (completedCount.get() < sendThreadCount * times) {
Thread.sleep(100);
}
long t2 = System.currentTimeMillis();
LOGGER.info("Test end.");
LOGGER.info("======Summary======");
LOGGER.info("Total count " + sendThreadCount * times + ".");
long timeCost = t2 - t1;
LOGGER.info("Time cost " + timeCost + " millis");
try {
producer.close();
} catch (ProducerException e) {
LOGGER.error("Failed to close producer, e=", e);
}
executorService.shutdown();
}
private static String generateTopic(int r) {
return "topic-" + r % 5;
}
private static String generateSource(int r) {
return "source-" + r % 10;
}
/**
* @param metricName: the metric name, eg: http_requests_count
* @param labels: labels map, eg: {'idc': 'idc1', 'ip': '192.0.2.0', 'hostname': 'appserver1'}
* @param value: double value, eg: 1.234
* @return LogItem
*/
public static LogItem buildLogItem(String metricName, Map<String, String> labels, double value) {
String labelsKey = "__labels__";
String timeKey = "__time_nano__";
String valueKey = "__value__";
String nameKey = "__name__";
LogItem logItem = new LogItem();
int timeInSec = (int) (System.currentTimeMillis() / 1000);
logItem.SetTime(timeInSec);
logItem.PushBack(timeKey, timeInSec + "000000");
logItem.PushBack(nameKey, metricName);
logItem.PushBack(valueKey, String.valueOf(value));
// 按照字典序对labels排序, 如果您的labels已排序, 请忽略此步骤。
TreeMap<String, String> sortedLabels = new TreeMap<String, String>(labels);
StringBuilder labelsBuilder = new StringBuilder();
boolean hasPrev = false;
for (Map.Entry<String, String> entry : sortedLabels.entrySet()) {
if (hasPrev) {
labelsBuilder.append("|");
}
hasPrev = true;
labelsBuilder.append(entry.getKey());
labelsBuilder.append("#$#");
labelsBuilder.append(entry.getValue());
}
logItem.PushBack(labelsKey, labelsBuilder.toString());
return logItem;
}
}
Python SDK示例
更多信息,请参见安装日志服务Python SDK。
# encoding: utf-8
import time
import os
from aliyun.log import *
def build_log_item(metric_name, labels, value):
"""
build log item
:param metric_name: the metric name, eg: http_requests_count
:param labels: dict labels, eg: {'idc': 'idc1', 'ip': '192.0.2.0', 'hostname': 'appserver1'}
:param value: double value, eg: 1.234
:return: LogItem
"""
sorted_labels = sorted(labels.items())
log_item = LogItem()
now = int(time.time())
log_item.set_time(now)
contents = [('__time_nano__', str(now)), ('__name__', metric_name), ('__value__', str(value))]
labels_str = ''
for i, kv in enumerate(sorted_labels):
labels_str += kv[0]
labels_str += '#$#'
labels_str += kv[1]
if i < len(sorted_labels) - 1:
labels_str += '|'
contents.append(('__labels__', labels_str))
log_item.set_contents(contents)
return log_item
def main():
# 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
endpoint = 'cn-hangzhou.log.aliyuncs.com'
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
project = ''
logstore = ''
client = LogClient(endpoint, accessKeyId, accessKey)
item1 = build_log_item('test1', {'k1': 'v1'}, 1)
item2 = build_log_item('test2', {'k2': 'v2'}, 2)
item3 = build_log_item('test3', {'k3': 'v3'}, 3)
items = [item1, item2, item3]
res = client.put_logs(PutLogsRequest(project=project, logstore=logstore, logitems=items))
res.log_print()
if __name__ == '__main__':
main()
GO SDK示例
更多信息,请参见安装Go SDK。
package main
import (
"fmt"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/producer"
"github.com/golang/protobuf/proto"
"os"
"os/signal"
"sort"
"strconv"
"sync"
"time"
)
func buildLogItem(metricName string, labels map[string]string, value float64) *sls.Log {
now := uint32(time.Now().Unix())
log := &sls.Log{Time: proto.Uint32(now)}
var contents []*sls.LogContent
contents = append(contents, &sls.LogContent{
Key: proto.String("__time_nano__"),
Value: proto.String(strconv.FormatInt(int64(now), 10)),
})
contents = append(contents, &sls.LogContent{
Key: proto.String("__name__"),
Value: proto.String(metricName),
})
contents = append(contents, &sls.LogContent{
Key: proto.String("__value__"),
Value: proto.String(strconv.FormatFloat(value, 'f', 6, 64)),
})
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
labelsStr := ""
for i, k := range keys {
labelsStr += k
labelsStr += "#$#"
labelsStr += labels[k]
if i < len(keys) - 1 {
labelsStr += "|"
}
}
contents = append(contents, &sls.LogContent{Key: proto.String("__labels__"), Value: proto.String(labelsStr)})
log.Contents = contents
return log
}
func main() {
project := ""
logstore := ""
producerConfig := producer.GetDefaultProducerConfig()
// 日志服务的服务入口。此处以杭州为例,其它地域请根据实际情况填写。
producerConfig.Endpoint = "https://cn-hangzhou.log.aliyuncs.com"
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
producerConfig.AccessKeyID = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
producerConfig.AccessKeySecret = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
producerInstance, err := producer.NewProducer(producerConfig)
if err != nil {
panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch)
producerInstance.Start()
var m sync.WaitGroup
for i := 0; i < 10; i++ {
m.Add(1)
go func() {
defer m.Done()
for i := 0; i < 1000; i++ {
// GenerateLog is producer's function for generating SLS format logs
// GenerateLog has low performance, and native Log interface is the best choice for high performance.
log := buildLogItem("test_metric", map[string]string{"test_k":"test_v"}, float64(i))
err := producerInstance.SendLog(project, logstore, "topic", "127.0.0.1", log)
if err != nil {
fmt.Println(err)
}
}
}()
}
m.Wait()
fmt.Println("Send completion")
if _, ok := <-ch; ok {
fmt.Println("Get the shutdown signal and start to shut down")
producerInstance.Close(60000)
}
}