Syslog是一個常見的日誌通道,幾乎所有的SIEM(例如IBM Qradar, HP Arcsight)都支援通過Syslog渠道接收日誌。本文主要介紹如何通過Syslog將Log Service中的日誌投遞到SIEM。
背景資訊
Syslog主要是基於RFC5424和RFC3164定義相關格式規範,RFC3164協議是2001年發布的,RFC5424協議是2009年發布的升級版本。因為新版相容舊版,且新版本解決了很多問題,因此推薦使用RFC5424協議。更多資訊,請參見RFC5424和RFC3164。
Syslog over TCP/TLS:Syslog只規定日誌格式,理論上TCP和UDP都支援Syslog,可以較好的保證資料轉送穩定性。RFC5425協議也定義了TLS的安全傳輸層,如果您的SIEM支援TCP通道或者TLS通道,則建議優先使用。更多資訊,請參見RFC5425。
Syslog facility:早期Unix定義的程式組件,此處選擇
user
作為預設組件。更多資訊,請參見程式組件。Syslog severity:定義記錄層級,您可以根據需求設定指定內容的日誌為較高的層級。預設一般用
info
。更多資訊,請參見記錄層級。
本文中的配置代碼僅為樣本,最新最全的程式碼範例請參見Github。
投遞流程
推薦使用Log Service消費組構建程式來進行即時消費,然後通過Syslog over TCP/TLS來發送日誌給SIEM。
主程式樣本
如下代碼展示主程式控制邏輯。
def main():
option, settings = get_monitor_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
程式配置樣本
配置內容:
程式記錄檔:以便後續測試或者診斷問題。
基本配置項:包括Log Service串連配置和消費組配置。
消費組的進階選項:效能調參,不推薦修改。
SIEM的Syslog server相關參數與選項。
說明如果SIEM支援基於TCP或者TLS的Syslog通道,則需要配置proto為TLS及配置正確的SSL的認證。
程式碼範例
請仔細閱讀代碼中相關注釋並根據業務需求調整選項。
#encoding: utf8 import os import logging from logging.handlers import RotatingFileHandler user = logging.getLogger() handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5) handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) user.setLevel(logging.INFO) user.addHandler(handler) user.addHandler(logging.StreamHandler()) logger = logging.getLogger(__name__) def get_option(): ########################## # 基本選項 ########################## #從環境變數中載入Log Service參數與選項。 endpoint = os.environ.get('SLS_ENDPOINT', '') accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') consumer_group = os.environ.get('SLS_CG', '') # 消費的起點。這個參數在首次運行程式的時候有效,後續再次運行時將從上一次消費的儲存點繼續消費。 # 可以使用“begin”、“end”,或者特定的ISO時間格式。 cursor_start_time = "2018-12-26 0:0:0" ########################## # 進階選項 ########################## # 一般不要修改消費者名稱,尤其是需要並發消費時。 consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # 心跳時間長度,當伺服器在2倍時間內沒有收到特定Shard的心跳報告時,伺服器會認為對應消費者離線並重新調配任務。 # 所以當網路環境不佳的時候,不建議將時間長度設定的比較小。 heartbeat_interval = 20 # 消費資料的最大間隔,如果資料產生的速度很快,不需要調整這個參數。 data_fetch_interval = 1 # 構建一個消費組和消費者 option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval) # syslog options settings = { "host": "1.2.3.4", # 必選 "port": 514, # 必選,連接埠 "protocol": "tcp", # 必選,TCP、UDP或TLS(僅Python3)。 "sep": "||", # 必選,key=value索引值對的分隔字元,這裡用雙豎線(||)分隔。 "cert_path": None, # 可選,TLS的認證位置。 "timeout": 120, # 可選,逾時時間,預設120秒。 "facility": syslogclient.FAC_USER, #可選,可以參考其他syslogclient.FAC_*的值。 "severity": syslogclient.SEV_INFO, #可選,可以參考其他syslogclient.SEV_*的值。 "hostname": None, # 可選,機器名,預設選擇本機機器名。 "tag": None # 可選,標籤,預設是短劃線(-)。 } return option, settings
消費與投遞樣本
如下代碼展示如何從Log Service擷取資料投遞到SIEM Syslog伺服器。請仔細閱讀代碼中相關注釋並根據需求調整格式。
from syslogclient import SyslogClientRFC5424 as SyslogClient
class SyncData(ConsumerProcessorBase):
"""
消費者從Log Service消費資料並發送給Syslog server。
"""
def __init__(self, splunk_setting):
"""初始化並驗證Syslog server連通性。"""
super(SyncData, self).__init__() # remember to call base's init
assert target_setting, ValueError("You need to configure settings of remote target")
assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = target_setting
self.protocol = self.option['protocol']
self.timeout = int(self.option.get('timeout', 120))
self.sep = self.option.get('sep', "||")
self.host = self.option["host"]
self.port = int(self.option.get('port', 514))
self.cert_path=self.option.get('cert_path', None)
# try connection
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
pass
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
try:
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
for log in logs:
# Put your sync code here to send to remote.
# the format of log is just a dict with example as below (Note, all strings are unicode):
# Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
# suppose we only care about audit log
timestamp = datetime.fromtimestamp(int(log[u'__time__']))
del log['__time__']
io = six.StringIO()
first = True
# 可以根據需要修改格式化內容,這裡使用Key=Value傳輸,並使用預設的雙豎線(||)進行分割。
for k, v in six.iteritems(log):
io.write("{0}{1}={2}".format(self.sep, k, v))
data = io.getvalue()
# 可以根據需要修改facility或者severity。
client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))
except Exception as err:
logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
# 需要添加一些錯誤處理的代碼,例如重試或者通知等。
raise err
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
啟動程式樣本
例如程式命名為sync_data.py,啟動程式樣本如下所示。
export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<消費組名,可以簡單命名為"syc_data">
python3 sync_data.py
限制與約束
每一個日誌庫(logstore)最多可以配置30個消費組,如果遇到ConsumerGroupQuotaExceed
則表示超出限制,建議在控制台刪除一些不再使用的消費組。
消費狀態與監控
在控制台查看消費組狀態,詳情請參見步驟三:查看消費組狀態。
並發消費
基於消費組的程式,可以直接啟動多次程式以實現並發效果。
nohup python3 sync_data.py &
nohup python3 sync_data.py &
nohup python3 sync_data.py &
...
所有消費者的名稱均不相同(消費者名以進程ID為尾碼),且屬於同一個消費組。因為一個分區(Shard)只能被一個消費者消費,例如一個日誌庫有10個分區,那麼最多有10個消費組同時消費。
輸送量
基於測試,在沒有頻寬、接收端速率限制(如Splunk端)的情況下,用python3運行上述範例,單個消費者大約佔用20%的單核CPU資源,此時消費可以達到10 MB/s原始日誌的速率。因此10個消費者理論上可以達到100 MB/s原始日誌,即每個CPU核每天可以消費0.9 TB原始日誌。
高可用
消費組將檢測點(check-point)儲存在伺服器端,當一個消費者停止,另外一個消費者將自動接管並從斷點繼續消費。可以在不同機器上啟動消費者,這樣在一台機器停止或者損壞的情況下,其他機器上的消費者可以自動接管並從斷點進行消費。為了備用,也可以通過不同機器啟動大於Shard數量的消費者。