本文通過以Splunk HEC為例來展示如何通過HEC將阿里雲相關日誌投遞到SIEM。
例如當前SIEM(如Splunk)位於組織內部環境(on-premise),而不是雲端。為了安全考慮,沒有開放任何連接埠讓外界環境來訪問此SIEM。
本章節中的配置代碼僅為樣本,最新的程式碼範例請參見Github或Github(多源日誌庫時)。
投遞流程
即時消費
在全量資料投遞情境中,建議採用Log Service消費組來實現即時資料消費,並利用Splunk的API(HEC)將日誌安全地投遞至Splunk。
基於規則消費
在資料投遞過程中涉及資料清洗(如行過濾、列裁剪和資料規整等)時,建議使用基於規則的程式來進行消費與投遞,並利用Splunk的API(HTTP事件收集,HEC)將日誌安全地傳輸至Splunk。
主程式樣本
如下代碼展示主程式控制邏輯。
def main():
option, settings = get_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
程式配置樣本
配置內容
程式記錄檔:以便後續測試或者診斷問題。
基本配置項:包括Log Service串連配置和消費組配置。
消費組的進階選項:效能調參,不推薦修改。
SIEM(Splunk)相關參數與選項。
程式碼範例
請仔細閱讀代碼中相關注釋並根據業務需求調整選項。
即時消費
#encoding: utf8 import os import logging from logging.handlers import RotatingFileHandler user = logging.getLogger() handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5) handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) user.setLevel(logging.INFO) user.addHandler(handler) user.addHandler(logging.StreamHandler()) logger = logging.getLogger(__name__) def get_option(): ########################## # 基本選項 ########################## # 從環境變數中載入Log Service參數與選項。 endpoint = os.environ.get('SLS_ENDPOINT', '') accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') consumer_group = os.environ.get('SLS_CG', '') # 消費的起點。這個參數在首次運行程式的時候有效,後續再次運行時將從上一次消費的儲存點繼續消費。 # 可以使用“begin”、“end”,或者特定的ISO時間格式。 cursor_start_time = "2018-12-26 0:0:0" ########################## # 一些進階選項 ########################## # 一般不建議修改消費者名稱,尤其是需要進行並發消費時。 consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # 心跳時間長度,當伺服器在2倍時間內沒有收到特定Shard的心跳報告時,伺服器會認為對應消費者離線並重新調配任務。 # 所以當網路環境不佳時,不建議將時間長度設定的比較小。 heartbeat_interval = 20 # 消費資料的最大間隔,如果資料產生的速度很快,不需要調整這個參數。 data_fetch_interval = 1 # 構建一個消費組和消費者 option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval) # Splunk選項 settings = { "host": "10.1.2.3", "port": 80, "token": "a023nsdu123123123", 'https': False, # 可選, bool 'timeout': 120, # 可選, int 'ssl_verify': True, # 可選, bool "sourcetype": "", # 可選, sourcetype "index": "", # 可選, index "source": "", # 可選, source } return option, settings
基於規則消費
# encoding: utf8 import os import logging from logging.handlers import RotatingFileHandler user = logging.getLogger() handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100 * 1024 * 1024, backupCount=5) handler.setFormatter(logging.Formatter( fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) user.setLevel(logging.INFO) user.addHandler(handler) user.addHandler(logging.StreamHandler()) logger = logging.getLogger(__name__) def get_option(): ########################## # 基本選項 ########################## # 從環境變數中載入Log Service參數與選項。 endpoint = os.environ.get('SLS_ENDPOINT', '') accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') consumer_group = os.environ.get('SLS_CG', '') # 消費的起點。這個參數在首次運行程式的時候有效,後續再次運行時將從上一次消費的儲存點繼續消費。 # 可以使用“begin”、“end”,或者特定的ISO時間格式。 cursor_start_time = "2018-12-26 0:0:0" ########################## # 一些進階選項 ########################## # 一般不建議修改消費者名稱,尤其是需要進行並發消費時。 consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # 心跳時間長度,當伺服器在2倍時間內沒有收到特定Shard的心跳報告時,伺服器會認為對應消費者離線並重新調配任務。 # 所以當網路環境不佳時,不建議將時間長度設定的比較小。 heartbeat_interval = 20 # 消費資料的最大間隔,如果資料產生的速度很快,不需要調整這個參數。 data_fetch_interval = 1 # SPL 語句 query = "* | where instance_id in ('instance-1', 'instance-2')" # 基於規則消費配置 option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval, query=query) # Splunk選項 settings = { "host": "10.1.2.3", "port": 80, "token": "a023nsdu123123123", 'https': False, # 可選, bool 'timeout': 120, # 可選, int 'ssl_verify': True, # 可選, bool "sourcetype": "", # 可選, sourcetype "index": "", # 可選, index "source": "", # 可選, source } return option, settings
消費與投遞樣本
如下代碼展示如何從Log Service擷取資料並投遞到Splunk,請仔細閱讀代碼中相關注釋並根據需求調整格式。
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requests
class SyncData(ConsumerProcessorBase):
"""
這個消費者從Log Service消費資料並發送給Splunk。
"""
def __init__(self, splunk_setting):
"""初始化並驗證Splunk連通性"""
super(SyncData, self).__init__()
assert splunk_setting, ValueError("You need to configure settings of remote target")
assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = splunk_setting
self.timeout = self.option.get("timeout", 120)
# 測試Splunk連通性
s = socket.socket()
s.settimeout(self.timeout)
s.connect((self.option["host"], self.option['port']))
self.r = requests.session()
self.r.max_redirects = 1
self.r.verify = self.option.get("ssl_verify", True)
self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
self.url = "{0}://{1}:{2}/services/collector/event".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])
self.default_fields = {}
if self.option.get("sourcetype"):
self.default_fields['sourcetype'] = self.option.get("sourcetype")
if self.option.get("source"):
self.default_fields['source'] = self.option.get("source")
if self.option.get("index"):
self.default_fields['index'] = self.option.get("index")
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
for log in logs:
# 發送資料到Splunk
event = {}
event.update(self.default_fields)
event['time'] = log[u'__time__']
del log['__time__']
json_topic = {"actiontrail_audit_event": ["event"] }
topic = log.get("__topic__", "")
if topic in json_topic:
try:
for field in json_topic[topic]:
log[field] = json.loads(log[field])
except Exception as ex:
pass
event['event'] = json.dumps(log)
data = json.dumps(event, sort_keys=True)
try:
req = self.r.post(self.url, data=data, timeout=self.timeout)
req.raise_for_status()
except Exception as err:
logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}", self.url, err)
# 根據需要,添加一些重試或者報告。
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
啟動程式樣本
例如程式命名為sync_data.py,啟動程式樣本如下所示。
export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<消費組名,可以簡單命名為"syc_data">
python3 sync_data.py
多源日誌庫樣本
針對多源日誌庫,需要共用一個executor以避免進程過多。更多資訊,請參見多源日誌庫時發送日誌到Splunk。核心的變化是主函數,樣本如下所示。
exeuctor, options, settings = get_option()
logger.info("*** start to consume data...")
workers = []
for option in options:
worker = ConsumerWorker(SyncData, option, args=(settings,) )
workers.append(worker)
worker.start()
try:
for i, worker in enumerate(workers):
while worker.is_alive():
worker.join(timeout=60)
logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format(
options[i].project, options[i].logstore))
worker.shutdown()
except KeyboardInterrupt:
logger.info("*** try to exit **** ")
for worker in workers:
worker.shutdown()
# wait for all workers to shutdown before shutting down executor
for worker in workers:
while worker.is_alive():
worker.join(timeout=60)
exeuctor.shutdown()
限制與約束
每一個日誌庫(logstore)最多可以配置30個消費組,如果遇到ConsumerGroupQuotaExceed
則表示超出限制,建議在控制台刪除一些不再使用的消費組。
消費狀態與監控
在控制台查看消費組狀態,詳情請參見步驟三:查看消費組狀態。
並發消費
基於消費組的程式,可以直接啟動多次程式以實現並發效果。
nohup python3 sync_data.py &
nohup python3 sync_data.py &
nohup python3 sync_data.py &
...
所有消費者的名稱均不相同(消費者名以進程ID為尾碼),且屬於同一個消費組。因為一個分區(Shard)只能被一個消費者消費,例如一個日誌庫有10個分區,那麼最多有10個消費組同時消費。
輸送量
基於測試,在沒有頻寬、接收端速率限制(如Splunk端)的情況下,用python3運行上述範例,單個消費者大約佔用20%的單核CPU資源,此時消費可以達到10 MB/s原始日誌的速率。因此10個消費者理論上可以達到100 MB/s原始日誌,即每個CPU核每天可以消費0.9 TB原始日誌。
高可用
消費組將檢測點(check-point)儲存在伺服器端,當一個消費者停止,另外一個消費者將自動接管並從斷點繼續消費。可以在不同機器上啟動消費者,這樣在一台機器停止或者損壞的情況下,其他機器上的消費者可以自動接管並從斷點進行消費。為了備用,也可以通過不同機器啟動大於Shard數量的消費者。
HTTPS
如果服務入口(Endpoint)配置為https://
首碼,例如https://cn-beijing.log.aliyuncs.com
,則程式自動使用HTTPS加密與Log Service串連。
伺服器憑證*.aliyuncs.com是由GlobalSign簽發,預設大多數Linux和Windows機器自動信任此認證。如果有機器不信任此認證,請參見Certificate installation下載並安裝此認證。