全部产品
Search
文档中心

:使用Aliyun Log Go Producer写入日志数据

更新时间:Nov 20, 2024

如果您在使用FlinkSparkStorm等大数据计算引擎时,需要将日志进行压缩、批量上传日志到日志服务、减少网络传输资源的占用,API或者SDK往往无法满足大数据场景对数据写入能力的要求,您可以使用Aliyun Log Go Producer,便捷高效地将数据上传到日志服务。

前提条件

注意事项

本示例以华东1(杭州)的公网Endpoint为例,其公网Endpoint为https://cn-hangzhou.log.aliyuncs.com

如果您通过与Project同地域的其他阿里云产品访问日志服务,请使用内网Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com

关于日志服务支持的地域与Endpoint的对应关系,请参见服务入口

什么是Aliyun Log Go Producer

Aliyun Log Go Producer是为运行在大数据、高并发场景下的Go应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。Aliyun LOG Go Producer使用阿里云日志服务提供的顺序写入功能来保证日志的上传顺序

工作流程

image

特点

  • 线程安全:Producer接口暴露的所有方法都是线程安全的。

  • 异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。

  • 自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。

  • 行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。

  • 上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。

  • 优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。

应用场景

原始的API或SDK需要实现高性能,异步非阻塞,资源可控制,需要自己写代码来实现,成功较高,并且会提高服务器的负载。并且此种方式写日志返回值是状态码,具体失败原因需要重新定位,开发人员定位成本较高。

producer对比原始的API或SDK的优势如下:

  • 高性能

    在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。

  • 异步非阻塞

    在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。随后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。

  • 资源可控制

    可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。

  • 定位问题简单

    如果日志数据发送失败,Producer除了返回状态码,还会返回一个String类型的异常信息,用于描述失败的原因和详细信息。例如,如果发送失败是因为网络连接超时,则返回的异常信息可能是“连接超时”;如果发送失败是因为服务器无响应,则返回的异常信息可能是“服务器无响应”。

使用限制

  • aliyun-log-producer底层调用PutLogs接口上传日志,每次可以写入的原始日志大小存在限制。更多信息,请参见数据读写

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源

  • 代码首次运行后,请在日志服务控制台开启日志库索引,等待一分钟后,进行查询。

  • 在控制台进行日志查询时,当单个字段值长度超过最大长度时,超出部分被截断,不参与分析。更多信息,请参考创建索引

1. 配置ProducerConfig

producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = "cn-hangzhou.log.aliyuncs.com"
provider := sls.NewStaticCredentialsProvider(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "")
producerConfig.CredentialsProvider = provider

ProducerConfig用于配置发送策略,您可以根据不同的业务场景为参数指定不同的值,各参数含义如下表所示:

参数

类型

描述

TotalSizeLnBytes

Int64

单个Producer实例能缓存的日志大小上限,默认为100MB。

MaxIoWorkerCount

Int64

单个Producer能并发的最多goroutine的数量,默认为50,该参数用户可以根据自己实际服务器的性能进行配置。

MaxBlockSec

Int

如果Producer可用空间不足,调用者在send方法上的最大阻塞时间,默认为60秒。

如果超过这个时间后所需空间仍无法得到满足,send方法会抛出TimeoutException。如果将该值设为0,当所需空间无法得到满足时,send方法会立即抛出TimeoutException。如果您希望send方法一直阻塞直到所需空间得到满足,可将该值设为负数。

MaxBatchSize

Int64

当一个ProducerBatch中缓存的日志大小大于等于 MaxBatchSize时,该batch 将被发送,默认为512KB,最大可设置成5MB。

MaxBatchCount

Int

当一个ProducerBatch中缓存的日志条数大于等于 MaxBatchCount时,该batch将被发送,默认为4096,最大可设置成40960。

LingerMs

Int64

一个ProducerBatch从创建到可发送的逗留时间,默认为2 秒,最小可设置成100 毫秒。

Retries

Int

如果某个ProducerBatch首次发送失败,能够对其重试的次数,默认为10 次。

如果Retries小于等于0,该ProducerBatch首次发送失败后将直接进入失败队列。

MaxReservedAttempts

Int

每个ProducerBatch每次被尝试发送都对应着一个Attempt,此参数用来控制返回给用户的Attempt个数,默认只保留最近的11次Attempt信息。

该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。

BaseRetryBackoffMs

Int64

首次重试的退避时间,默认为 100 毫秒。 Producer采取指数退避算法,第N次重试的计划等待时间为 BaseRetryBackoffMs * 2^(N-1)。

MaxRetryBackoffMs

Int64

重试的最大退避时间,默认为 50秒。

AdjustShardHash

Bool

如果调用send方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为true。

Buckets

Int

当且仅当AdjustShardHash为true时,该参数才生效。此时,Producer会自动将 shardHash重新分组,分组数量为buckets。

如果两条数据的shardHash 不同,它们是无法合并到一起发送的,这会降低Producer吞吐量。将shardHash重新分组后,能让数据有更多的机会被批量发送。该参数的取值范围是 [1, 256],且必须是2的整数次幂,默认为64。

CredentialsProvider

Interface

可自定义CredentialsProvider,来提供动态的 Access KeyId/AccessKey Secret/STS Token,该接口应当缓存AK,且必须线程安全。

NoRetryStatusCodeList

[ ]int

用户配置的不需要重试的错误码列表,当发送日志失败时返回的错误码在列表中,则不会重试。默认包含400和404两个值。

UpdateStsToken

Func

通过定义一个函数类型,在该函数中实现获取STS Token的逻辑,Producer会自动刷新STS Token并将其放入Client中。

StsTokenShutDown

channel

关闭STS Token自动刷新的通讯信道,当该信道关闭时,不再自动刷新STS Token值。当Producer关闭的时候,该参数不为nil值,则会主动调用close去关闭该信道停止STS Token的自动刷新。

Region

String

日志服务的区域,当签名版本使用AuthV4时必选,例如cn-hangzhou。

AuthVersion

String

使用的签名版本,可选枚举值为 AuthV1,AuthV4。AuthV4签名示例可参考程序producer_test.go

UseMetricStoreURL

bool

使用Metricstore地址进行发送日志,可以提升大基数时间线下的查询性能。

2. 启动Producer进程

调用producerInstance.Start()这个函数会开启一个goroutine去监听producer中是否有日志写入以及符合发送条件的日志组,将符合发送条件的日志组发送到服务端logStore中。

producerInstance, err := producer.NewProducer(producerConfig)
if err != nil {
  panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()

3. 调用send方法发送日志

本示例调用HashSendLogListWithCallBack方法发送日志,方法参数说明如下:

参数

是否必选

描述

project

目标Project。

logStore

目标LogStore。

shardHash

可为发送的日志设置自定义哈希,服务端将根据此哈希选择对应的日志库Shard分片写入日志。

说明

如果留空或没有指定,数据将被随机写入目标LogStore的某个shard中。

topic

日志主题

说明

如果留空或没有指定,该字段将被赋予""。

source

发送源。

说明

如果留空或没有指定,该字段将被赋予Producer所在宿主机的IP。

logs

要发送的日志/日志列表。

callback

可设置一个回调函数。该回调函数将在日志被成功发送或者重试多次失败后被丢弃时调用。

说明

producer.GenerateLog()方法可以方便地生成投递到LogHub的日志实例,但效率较低,推荐示例中使用原生的sls.Log接口去创建日志。

	var m sync.WaitGroup
	callBack := &Callback{}
	logs := []*sls.Log{}
	content := []*sls.LogContent{}
	for colIdx := 0; colIdx < 10; colIdx++ {
		if colIdx/2 == 0 {
			content = append(content, &sls.LogContent{
				Key:   tea.String("request_method"),
				Value: tea.String("GET"),
			}, &sls.LogContent{
				Key:   tea.String("status"),
				Value: tea.String("200"),
			})
		} else if colIdx/3 == 0 {
			content = append(content, &sls.LogContent{
				Key:   tea.String("request_method"),
				Value: tea.String("POST"),
			}, &sls.LogContent{
				Key:   tea.String("status"),
				Value: tea.String("500"),
			})
		} else {
			content = append(content, &sls.LogContent{
				Key:   tea.String("request_method"),
				Value: tea.String("POST"),
			}, &sls.LogContent{
				Key:   tea.String("status"),
				Value: tea.String("200"),
			})
		}
	}
	log := &sls.Log{
		Time:     proto.Uint32(uint32(time.Now().Unix())),
		Contents: content,
	}
	logs = append(logs, log)
	shardHash := ""
	for i := 0; i < 10; i++ {
		m.Add(1)
		go func() {
			defer m.Done()
			for i := 0; i < 1000; i++ {
				// GenerateLog  is producer's function for generating SLS format logs
				// GenerateLog has low performance, and native Log interface is the best choice for high performance.
				err := producerInstance.HashSendLogListWithCallBack("gs-log-test", "gstest", shardHash, "topic", "127.0.X.1", logs, callBack)
				if err != nil {
					fmt.Println(err)
				}
			}
		}()
	}
	fmt.Println("Send completion")

4. 关闭Producer

producer提供了两种关闭模式,分为有限关闭和安全关闭。

  • 安全关闭会等待producer中缓存的所有的数据全部发送完成以后再关闭producer。

  • 有限关闭producer会接收用户传递的一个参数值,时间单位为秒,当开始关闭producer的时候开始计时,超过传递的设定值还未能完全关闭producer的话会强制退出producer,此时可能会有部分数据未被成功发送而丢失。

producerInstance.Close(60) // 有限关闭,传递int值,参数值需为正整数,单位为秒
producerInstance.SafeClose()// 安全关闭

相关文档

  • 在调用API接口过程中,若服务端返回结果中包含错误信息,则表示调用API接口失败。您可以参考API错误码对照表查找对应的解决方法。更多信息,请参见错误码

  • 更多示例代码,请参见Aliyun Log GoProducer on Github