全部產品
Search
文件中心

Simple Log Service:通過Syslog投遞日誌到SIEM

更新時間:Jul 21, 2024

Syslog是一個常見的日誌通道,幾乎所有的SIEM(例如IBM Qradar, HP Arcsight)都支援通過Syslog渠道接收日誌。本文主要介紹如何通過Syslog將Log Service中的日誌投遞到SIEM。

背景資訊

  • Syslog主要是基於RFC5424和RFC3164定義相關格式規範,RFC3164協議是2001年發布的,RFC5424協議是2009年發布的升級版本。因為新版相容舊版,且新版本解決了很多問題,因此推薦使用RFC5424協議。更多資訊,請參見RFC5424RFC3164

  • Syslog over TCP/TLS:Syslog只規定日誌格式,理論上TCP和UDP都支援Syslog,可以較好的保證資料轉送穩定性。RFC5425協議也定義了TLS的安全傳輸層,如果您的SIEM支援TCP通道或者TLS通道,則建議優先使用。更多資訊,請參見RFC5425

  • Syslog facility:早期Unix定義的程式組件,此處選擇user作為預設組件。更多資訊,請參見程式組件

  • Syslog severity:定義記錄層級,您可以根據需求設定指定內容的日誌為較高的層級。預設一般用info。更多資訊,請參見記錄層級

說明

本文中的配置代碼僅為樣本,最新最全的程式碼範例請參見Github

投遞流程

推薦使用Log Service消費組構建程式來進行即時消費,然後通過Syslog over TCP/TLS來發送日誌給SIEM。投遞流程

主程式樣本

如下代碼展示主程式控制邏輯。

def main():
    option, settings = get_monitor_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

程式配置樣本

  • 配置內容:

    • 程式記錄檔:以便後續測試或者診斷問題。

    • 基本配置項:包括Log Service串連配置和消費組配置。

    • 消費組的進階選項:效能調參,不推薦修改。

    • SIEM的Syslog server相關參數與選項。

      說明

      如果SIEM支援基於TCP或者TLS的Syslog通道,則需要配置protoTLS及配置正確的SSL的認證。

  • 程式碼範例

    請仔細閱讀代碼中相關注釋並根據業務需求調整選項。

    #encoding: utf8
    import os
    import logging
    from logging.handlers import RotatingFileHandler
    
    user = logging.getLogger()
    handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
    handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
    user.setLevel(logging.INFO)
    user.addHandler(handler)
    user.addHandler(logging.StreamHandler())
    
    logger = logging.getLogger(__name__)
    
    def get_option():
        ##########################
        # 基本選項
        ##########################
    
        #從環境變數中載入Log Service參數與選項。
        endpoint = os.environ.get('SLS_ENDPOINT', '')
        accessKeyId = os.environ.get('SLS_AK_ID', '')
        accessKey = os.environ.get('SLS_AK_KEY', '')
        project = os.environ.get('SLS_PROJECT', '')
        logstore = os.environ.get('SLS_LOGSTORE', '')
        consumer_group = os.environ.get('SLS_CG', '')
    
        # 消費的起點。這個參數在首次運行程式的時候有效,後續再次運行時將從上一次消費的儲存點繼續消費。
        # 可以使用“begin”、“end”,或者特定的ISO時間格式。
        cursor_start_time = "2018-12-26 0:0:0"
    
        ##########################
        # 進階選項
        ##########################
    
        # 一般不要修改消費者名稱,尤其是需要並發消費時。
        consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
    
        # 心跳時間長度,當伺服器在2倍時間內沒有收到特定Shard的心跳報告時,伺服器會認為對應消費者離線並重新調配任務。
        # 所以當網路環境不佳的時候,不建議將時間長度設定的比較小。
        heartbeat_interval = 20
    
        # 消費資料的最大間隔,如果資料產生的速度很快,不需要調整這個參數。
        data_fetch_interval = 1
    
        # 構建一個消費組和消費者
        option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                              cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                              cursor_start_time=cursor_start_time,
                              heartbeat_interval=heartbeat_interval,
                              data_fetch_interval=data_fetch_interval)
    
        # syslog options
        settings = {
                    "host": "1.2.3.4", # 必選
                    "port": 514,       # 必選,連接埠
                    "protocol": "tcp", # 必選,TCP、UDP或TLS(僅Python3)。
                    "sep": "||",       # 必選,key=value索引值對的分隔字元,這裡用雙豎線(||)分隔。
                    "cert_path": None, # 可選,TLS的認證位置。
                    "timeout": 120,    # 可選,逾時時間,預設120秒。
                    "facility": syslogclient.FAC_USER,  #可選,可以參考其他syslogclient.FAC_*的值。
                    "severity": syslogclient.SEV_INFO,  #可選,可以參考其他syslogclient.SEV_*的值。
                    "hostname": None,  # 可選,機器名,預設選擇本機機器名。
                    "tag": None        # 可選,標籤,預設是短劃線(-)。
                }
    
        return option, settings

消費與投遞樣本

如下代碼展示如何從Log Service擷取資料投遞到SIEM Syslog伺服器。請仔細閱讀代碼中相關注釋並根據需求調整格式。

from syslogclient import SyslogClientRFC5424 as SyslogClient

class SyncData(ConsumerProcessorBase):
    """
    消費者從Log Service消費資料並發送給Syslog server。
    """
    def __init__(self, splunk_setting):
      """初始化並驗證Syslog server連通性。"""
        super(SyncData, self).__init__()   # remember to call base's init

        assert target_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # try connection
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # Put your sync code here to send to remote.
                    # the format of log is just a dict with example as below (Note, all strings are unicode):
                    #    Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                    # suppose we only care about audit log
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
          # 可以根據需要修改格式化內容,這裡使用Key=Value傳輸,並使用預設的雙豎線(||)進行分割。
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()

          # 可以根據需要修改facility或者severity。
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))

            # 需要添加一些錯誤處理的代碼,例如重試或者通知等。
            raise err

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)

啟動程式樣本

例如程式命名為sync_data.py,啟動程式樣本如下所示。

export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<消費組名,可以簡單命名為"syc_data">

python3 sync_data.py

限制與約束

每一個日誌庫(logstore)最多可以配置30個消費組,如果遇到ConsumerGroupQuotaExceed則表示超出限制,建議在控制台刪除一些不再使用的消費組。

消費狀態與監控

在控制台查看消費組狀態,詳情請參見步驟三:查看消費組狀態

並發消費

基於消費組的程式,可以直接啟動多次程式以實現並發效果。

nohup python3 sync_data.py &
nohup python3 sync_data.py &
nohup python3 sync_data.py &
...
說明

所有消費者的名稱均不相同(消費者名以進程ID為尾碼),且屬於同一個消費組。因為一個分區(Shard)只能被一個消費者消費,例如一個日誌庫有10個分區,那麼最多有10個消費組同時消費。

輸送量

基於測試,在沒有頻寬、接收端速率限制(如Splunk端)的情況下,用python3運行上述範例,單個消費者大約佔用20%的單核CPU資源,此時消費可以達到10 MB/s原始日誌的速率。因此10個消費者理論上可以達到100 MB/s原始日誌,即每個CPU核每天可以消費0.9 TB原始日誌。

高可用

消費組將檢測點(check-point)儲存在伺服器端,當一個消費者停止,另外一個消費者將自動接管並從斷點繼續消費。可以在不同機器上啟動消費者,這樣在一台機器停止或者損壞的情況下,其他機器上的消費者可以自動接管並從斷點進行消費。為了備用,也可以通過不同機器啟動大於Shard數量的消費者。