全部產品
Search
文件中心

:使用Aliyun Log Go Producer寫入日誌資料

更新時間:Nov 21, 2024

如果您在使用FlinkSparkStorm等巨量資料計算引擎時,需要將日誌進行壓縮、批量上傳日誌到Log Service、減少網路傳輸資源的佔用,API或者SDK往往無法滿足巨量資料情境對資料寫入能力的要求,您可以使用Aliyun Log Go Producer,便捷高效地將資料上傳到Log Service。

前提條件

注意事項

本樣本以華東1(杭州)的公網Endpoint為例,其公網Endpoint為https://cn-hangzhou.log.aliyuncs.com

如果您通過與Project同地區的其他阿里雲產品訪問Log Service,請使用內網Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com

關於Log Service支援的地區與Endpoint的對應關係,請參見服務入口

什麼是Aliyun Log Go Producer

Aliyun Log Go Producer是為運行在巨量資料、高並發情境下的Go應用量身打造的高效能類庫。相對於原始的API或SDK,使用該類庫寫日誌資料能為您帶來諸多優勢,包括高效能、計算與I/O邏輯分離、資源可控制等。Aliyun LOG Go Producer使用阿里雲Log Service提供的順序寫入功能來保證日誌的上傳順序

工作流程

特點

  • 安全執行緒:Producer介面暴露的所有方法都是安全執行緒的。

  • 非同步發送:調用Producer的發送介面通常能夠立即返迴響應。Producer內部會緩衝併合並待發送資料,然後批量發送以提高輸送量。

  • 自動重試:Producer會根據配置的最大重試次數和重試退避時間進行重試。

  • 行為追溯:通過Callback或Future能擷取當前資料是否發送成功的資訊,也可以獲得該資料每次被嘗試發送的資訊,有利於問題追溯和行為決策。

  • 上下文還原:同一個Producer執行個體產生的日誌在同一上下文中,在服務端可以查看某條日誌前後相關的日誌。

  • 優雅關閉:保證close方法退出時,Producer緩衝的所有資料都能被處理,同時您也能得到相應的通知。

應用情境

原始的API或SDK需要實現高效能,非同步非阻塞,資源可控制,需要自己寫代碼來實現,成功率較高,並且會提高伺服器的負載。此外,此種方式寫日誌傳回值是狀態代碼,具體失敗原因需要重新置放,開發人員定位成本較高。

producer對比原始的API或SDK的優勢如下:

  • 高效能

    在海量資料、資源有限的前提下,寫入端要達到目標輸送量需要實現複雜的控制邏輯,包括多線程、緩衝策略、批量發送等,另外還要充分考慮失敗重試的情境。Producer實現了上述功能,在為您帶來效能優勢的同時簡化了程式開發步驟。

  • 非同步非阻塞

    在可用記憶體充足的前提下,Producer會對發往日誌庫的資料進行緩衝,因此調用send方法時能夠立即返迴響應且不會阻塞,可達到計算與I/O邏輯分離的目的。隨後,您可以通過返回的Future對象或傳入的Callback獲得資料發送的結果。

  • 資源可控制

    可以通過參數控制Producer用於緩衝待發送資料的記憶體大小,同時還可以配置用於執行資料發送任務的線程數量。這樣可避免Producer無限制地消耗資源,且可以讓您根據實際情況平衡資源消耗和寫入輸送量。

  • 定位問題簡單

    如果日誌資料發送失敗,Producer除了返回狀態代碼,還會返回一個String類型的異常資訊,用於描述失敗的原因和詳細資料。例如,如果發送失敗是因為網路連接逾時,則返回的異常資訊可能是“連線逾時”;如果發送失敗是因為伺服器無響應,則返回的異常資訊可能是“伺服器無響應”。

使用限制

  • aliyun-log-producer底層調用PutLogs介面上傳日誌,每次可以寫入的原始日誌大小存在限制。更多資訊,請參見資料讀寫

  • Log Service的基礎資源,包括建立Project個數、Logstore個數、Shard個數、LogtailConfig個數、機器組個數、單個LogItem大小、LogItem(Key)長度和LogItem(Value)長度等均存在限制。更多資訊,請參見基礎資源

  • 代碼首次運行後,請在Log Service控制台開啟日誌庫索引,等待一分鐘後,進行查詢。

  • 在控制台進行日誌查詢時,當單個欄位值長度超過最大長度時,超出部分被截斷,不參與分析。更多資訊,請參考建立索引

1. 配置ProducerConfig

producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = "cn-hangzhou.log.aliyuncs.com"
provider := sls.NewStaticCredentialsProvider(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "")
producerConfig.CredentialsProvider = provider

ProducerConfig用於配置發送策略,您可以根據不同的業務情境為參數指定不同的值,各參數含義如下表所示:

參數

類型

描述

TotalSizeLnBytes

Int64

單個Producer執行個體能緩衝的日誌大小上限,預設為100MB。

MaxIoWorkerCount

Int64

單個Producer能並發的最多goroutine的數量,預設為50,該參數使用者可以根據自己實際伺服器的效能進行配置。

MaxBlockSec

Int

如果Producer可用空間不足,調用者在send方法上的最大阻塞時間,預設為60秒。

如果超過這個時間後所需空間仍無法得到滿足,send方法會拋出TimeoutException。如果將該值設為0,當所需空間無法得到滿足時,send方法會立即拋出TimeoutException。如果您希望send方法一直阻塞直到所需空間得到滿足,可將該值設為負數。

MaxBatchSize

Int64

當一個ProducerBatch中緩衝的日誌大小大於等於 MaxBatchSize時,該batch 將被發送,預設為512KB,最大可設定成5MB。

MaxBatchCount

Int

當一個ProducerBatch中緩衝的日誌條數大於等於 MaxBatchCount時,該batch將被發送,預設為4096,最大可設定成40960。

LingerMs

Int64

一個ProducerBatch從建立到可發送的逗留時間,預設為2 秒,最小可設定成100 毫秒。

Retries

Int

如果某個ProducerBatch首次發送失敗,能夠對其重試的次數,預設為10 次。

如果Retries小於等於0,該ProducerBatch首次發送失敗後將直接進入失敗隊列。

MaxReservedAttempts

Int

每個ProducerBatch每次被嘗試發送都對應著一個Attempt,此參數用來控制返回給使用者的Attempt個數,預設只保留最近的11次Attempt資訊。

該參數越大能讓您追溯更多的資訊,但同時也會消耗更多的記憶體。

BaseRetryBackoffMs

Int64

首次重試的退避時間,預設為 100 毫秒。 Producer採取指數退避演算法,第N次重試的計劃等待時間為 BaseRetryBackoffMs * 2^(N-1)。

MaxRetryBackoffMs

Int64

重試的最大退避時間,預設為 50秒。

AdjustShardHash

Bool

如果調用send方法時指定了 shardHash,該參數用於控制是否需要對其進行調整,預設為true。

Buckets

Int

若且唯若AdjustShardHash為true時,該參數才生效。此時,Producer會自動將 shardHash重新分組,分組數量為buckets。

如果兩條資料的shardHash 不同,它們是無法合并到一起發送的,這會降低Producer輸送量。將shardHash重新分組後,能讓資料有更多的機會被批量發送。該參數的取值範圍是 [1, 256],且必須是2的整數次冪,預設為64。

CredentialsProvider

Interface

可自訂CredentialsProvider,來提供動態 Access KeyId/AccessKey Secret/STS Token,該介面應當緩衝AK,且必須安全執行緒。

NoRetryStatusCodeList

[ ]int

使用者配置的不需要重試的錯誤碼列表,當發送日誌失敗時返回的錯誤碼在列表中,則不會重試。預設包含400和404兩個值。

UpdateStsToken

Func

通過定義一個函數類型,在該函數中實現擷取STS Token的邏輯,Producer會自動重新整理STS Token並將其放入Client中。

StsTokenShutDown

channel

關閉STS Token自動重新整理的通訊通道,當該通道關閉時,不再自動重新整理STS Token值。當Producer關閉的時候,該參數不為nil值,則會主動調用close去關閉該通道停止STS Token的自動重新整理。

Region

String

Log Service的地區,當簽名版本使用AuthV4時必選,例如cn-hangzhou。

AuthVersion

String

使用的簽名版本,可選枚舉值為 AuthV1,AuthV4。AuthV4簽名樣本可參考程式producer_test.go

UseMetricStoreURL

bool

使用Metricstore地址進行發送日誌,可以提升大基數時間軸下的查詢效能。

2. 啟動Producer進程

調用producerInstance.Start()這個函數會開啟一個goroutine去監聽producer中是否有日誌寫入以及符合發送條件的日誌組,將符合發送條件的日誌組發送到服務端logStore中。

producerInstance, err := producer.NewProducer(producerConfig)
if err != nil {
  panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()

3. 調用send方法發送日誌

本樣本調用HashSendLogListWithCallBack方法發送日誌,方法參數說明如下:

參數

是否必選

描述

project

目標Project。

logStore

目標LogStore。

shardHash

可為發送的日誌設定自訂雜湊,服務端將根據此雜湊選擇對應的日誌庫Shard分區寫入日誌。

說明

如果留空或沒有指定,資料將被隨機寫入目標LogStore的某個shard中。

topic

日誌主題

說明

如果留空或沒有指定,該欄位將被賦予""。

source

發送源。

說明

如果留空或沒有指定,該欄位將被賦予Producer所在宿主機的IP。

logs

要發送的日誌/日誌列表。

callback

可設定一個回呼函數。該回呼函數將在日誌被成功發送或者重試多次失敗後被丟棄時調用。

說明

producer.GenerateLog()方法可以方便地產生投遞到LogHub的日誌執行個體,但效率較低,推薦樣本中使用原生的sls.Log介面去建立日誌。

	var m sync.WaitGroup
	callBack := &Callback{}
	logs := []*sls.Log{}
	content := []*sls.LogContent{}
	for colIdx := 0; colIdx < 10; colIdx++ {
		if colIdx/2 == 0 {
			content = append(content, &sls.LogContent{
				Key:   tea.String("request_method"),
				Value: tea.String("GET"),
			}, &sls.LogContent{
				Key:   tea.String("status"),
				Value: tea.String("200"),
			})
		} else if colIdx/3 == 0 {
			content = append(content, &sls.LogContent{
				Key:   tea.String("request_method"),
				Value: tea.String("POST"),
			}, &sls.LogContent{
				Key:   tea.String("status"),
				Value: tea.String("500"),
			})
		} else {
			content = append(content, &sls.LogContent{
				Key:   tea.String("request_method"),
				Value: tea.String("POST"),
			}, &sls.LogContent{
				Key:   tea.String("status"),
				Value: tea.String("200"),
			})
		}
	}
	log := &sls.Log{
		Time:     proto.Uint32(uint32(time.Now().Unix())),
		Contents: content,
	}
	logs = append(logs, log)
	shardHash := ""
	for i := 0; i < 10; i++ {
		m.Add(1)
		go func() {
			defer m.Done()
			for i := 0; i < 1000; i++ {
				// GenerateLog  is producer's function for generating SLS format logs
				// GenerateLog has low performance, and native Log interface is the best choice for high performance.
				err := producerInstance.HashSendLogListWithCallBack("gs-log-test", "gstest", shardHash, "topic", "127.0.X.1", logs, callBack)
				if err != nil {
					fmt.Println(err)
				}
			}
		}()
	}
	fmt.Println("Send completion")

4. 關閉Producer

producer提供了兩種關閉模式,分為有限關閉和安全關閉。

  • 安全關閉會等待producer中緩衝的所有的資料全部發送完成以後再關閉producer。

  • 有限關閉producer會接收使用者傳遞的一個參數值,時間單位為秒,當開始關閉producer的時候開始計時,超過傳遞的設定值還未能完全關閉producer的話會強制退出producer,此時可能會有部分資料未被成功發送而丟失。

producerInstance.Close(60) // 有限關閉,傳遞int值,參數值需為正整數,單位為秒
producerInstance.SafeClose()// 安全關閉

相關文檔

  • 在調用API介面過程中,若服務端返回結果中包含錯誤資訊,則表示調用API介面失敗。您可以參考API錯誤碼對照表尋找對應的解決方案。更多資訊,請參見錯誤碼

  • 更多範例程式碼,請參見Aliyun Log GoProducer on Github