全部產品
Search
文件中心

Simple Log Service:通過HTTPS投遞日誌到SIEM

更新時間:Oct 01, 2024

本文通過以Splunk HEC為例來展示如何通過HEC將阿里雲相關日誌投遞到SIEM。

例如當前SIEM(如Splunk)位於組織內部環境(on-premise),而不是雲端。為了安全考慮,沒有開放任何連接埠讓外界環境來訪問此SIEM。

說明

本章節中的配置代碼僅為樣本,最新的程式碼範例請參見GithubGithub(多源日誌庫時)

投遞流程

即時消費

在全量資料投遞情境中,建議採用Log Service消費組來實現即時資料消費,並利用Splunk的API(HEC)將日誌安全地投遞至Splunk。

基於規則消費

在資料投遞過程中涉及資料清洗(如行過濾、列裁剪和資料規整等)時,建議使用基於規則的程式來進行消費與投遞,並利用Splunk的API(HTTP事件收集,HEC)將日誌安全地傳輸至Splunk。

主程式樣本

如下代碼展示主程式控制邏輯。

def main():
    option, settings = get_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

程式配置樣本

  • 配置內容

    • 程式記錄檔:以便後續測試或者診斷問題。

    • 基本配置項:包括Log Service串連配置和消費組配置。

    • 消費組的進階選項:效能調參,不推薦修改。

    • SIEM(Splunk)相關參數與選項。

  • 程式碼範例

    請仔細閱讀代碼中相關注釋並根據業務需求調整選項。

    即時消費

    #encoding: utf8
    import os
    import logging
    from logging.handlers import RotatingFileHandler
    
    user = logging.getLogger()
    handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
    handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
    user.setLevel(logging.INFO)
    user.addHandler(handler)
    user.addHandler(logging.StreamHandler())
    
    logger = logging.getLogger(__name__)
    
    def get_option():
        ##########################
        # 基本選項
        ##########################
    
        # 從環境變數中載入Log Service參數與選項。
        endpoint = os.environ.get('SLS_ENDPOINT', '')
        accessKeyId = os.environ.get('SLS_AK_ID', '')
        accessKey = os.environ.get('SLS_AK_KEY', '')
        project = os.environ.get('SLS_PROJECT', '')
        logstore = os.environ.get('SLS_LOGSTORE', '')
        consumer_group = os.environ.get('SLS_CG', '')
    
        # 消費的起點。這個參數在首次運行程式的時候有效,後續再次運行時將從上一次消費的儲存點繼續消費。
        # 可以使用“begin”、“end”,或者特定的ISO時間格式。
        cursor_start_time = "2018-12-26 0:0:0"
    
        ##########################
        # 一些進階選項
        ##########################
    
        # 一般不建議修改消費者名稱,尤其是需要進行並發消費時。
        consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
    
        # 心跳時間長度,當伺服器在2倍時間內沒有收到特定Shard的心跳報告時,伺服器會認為對應消費者離線並重新調配任務。
        # 所以當網路環境不佳時,不建議將時間長度設定的比較小。
        heartbeat_interval = 20
    
        # 消費資料的最大間隔,如果資料產生的速度很快,不需要調整這個參數。
        data_fetch_interval = 1
    
        # 構建一個消費組和消費者
        option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                              cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                              cursor_start_time=cursor_start_time,
                              heartbeat_interval=heartbeat_interval,
                              data_fetch_interval=data_fetch_interval)
    
        # Splunk選項
        settings = {
                    "host": "10.1.2.3",
                    "port": 80,
                    "token": "a023nsdu123123123",
                    'https': False,             # 可選, bool
                    'timeout': 120,             # 可選, int
                    'ssl_verify': True,         # 可選, bool
                    "sourcetype": "",           # 可選, sourcetype
                    "index": "",                # 可選, index
                    "source": "",               # 可選, source
                }
    
        return option, settings

    基於規則消費

    # encoding: utf8
    import os
    import logging
    from logging.handlers import RotatingFileHandler
    
    user = logging.getLogger()
    handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid),
                                  maxBytes=100 * 1024 * 1024, backupCount=5)
    handler.setFormatter(logging.Formatter(
        fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'))
    user.setLevel(logging.INFO)
    user.addHandler(handler)
    user.addHandler(logging.StreamHandler())
    
    logger = logging.getLogger(__name__)
    
    
    def get_option():
        ##########################
        # 基本選項
        ##########################
    
        # 從環境變數中載入Log Service參數與選項。
        endpoint = os.environ.get('SLS_ENDPOINT', '')
        accessKeyId = os.environ.get('SLS_AK_ID', '')
        accessKey = os.environ.get('SLS_AK_KEY', '')
        project = os.environ.get('SLS_PROJECT', '')
        logstore = os.environ.get('SLS_LOGSTORE', '')
        consumer_group = os.environ.get('SLS_CG', '')
    
        # 消費的起點。這個參數在首次運行程式的時候有效,後續再次運行時將從上一次消費的儲存點繼續消費。
        # 可以使用“begin”、“end”,或者特定的ISO時間格式。
        cursor_start_time = "2018-12-26 0:0:0"
    
        ##########################
        # 一些進階選項
        ##########################
    
        # 一般不建議修改消費者名稱,尤其是需要進行並發消費時。
        consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
    
        # 心跳時間長度,當伺服器在2倍時間內沒有收到特定Shard的心跳報告時,伺服器會認為對應消費者離線並重新調配任務。
        # 所以當網路環境不佳時,不建議將時間長度設定的比較小。
        heartbeat_interval = 20
    
        # 消費資料的最大間隔,如果資料產生的速度很快,不需要調整這個參數。
        data_fetch_interval = 1
    
        # SPL 語句
        query = "* | where instance_id in ('instance-1', 'instance-2')"
    
    
        # 基於規則消費配置
        option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                              cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                              cursor_start_time=cursor_start_time,
                              heartbeat_interval=heartbeat_interval,
                              data_fetch_interval=data_fetch_interval,
                              query=query)
    
        # Splunk選項
        settings = {
            "host": "10.1.2.3",
            "port": 80,
            "token": "a023nsdu123123123",
            'https': False,  # 可選, bool
            'timeout': 120,  # 可選, int
            'ssl_verify': True,  # 可選, bool
            "sourcetype": "",  # 可選, sourcetype
            "index": "",  # 可選, index
            "source": "",  # 可選, source
        }
    
        return option, settings
    

消費與投遞樣本

如下代碼展示如何從Log Service擷取資料並投遞到Splunk,請仔細閱讀代碼中相關注釋並根據需求調整格式。

from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requests

class SyncData(ConsumerProcessorBase):
    """
    這個消費者從Log Service消費資料並發送給Splunk。
    """
    def __init__(self, splunk_setting):
      """初始化並驗證Splunk連通性"""
        super(SyncData, self).__init__()

        assert splunk_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # 測試Splunk連通性
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector/event".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # 發送資料到Splunk
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

                try:
                    req = self.r.post(self.url, data=data, timeout=self.timeout)
                    req.raise_for_status()
                except Exception as err:
                    logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}", self.url, err)

                    # 根據需要,添加一些重試或者報告。

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)

啟動程式樣本

例如程式命名為sync_data.py,啟動程式樣本如下所示。

export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<消費組名,可以簡單命名為"syc_data">

python3 sync_data.py

多源日誌庫樣本

針對多源日誌庫,需要共用一個executor以避免進程過多。更多資訊,請參見多源日誌庫時發送日誌到Splunk。核心的變化是主函數,樣本如下所示。

exeuctor, options, settings = get_option()

logger.info("*** start to consume data...")
workers = []

for option in options:
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    workers.append(worker)
    worker.start()

try:
    for i, worker in enumerate(workers):
        while worker.is_alive():
        worker.join(timeout=60)
    logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format(
        options[i].project, options[i].logstore))
    worker.shutdown()
except KeyboardInterrupt:
    logger.info("*** try to exit **** ")
    for worker in workers:
        worker.shutdown()

    # wait for all workers to shutdown before shutting down executor
    for worker in workers:
        while worker.is_alive():
            worker.join(timeout=60)

exeuctor.shutdown()

限制與約束

每一個日誌庫(logstore)最多可以配置30個消費組,如果遇到ConsumerGroupQuotaExceed則表示超出限制,建議在控制台刪除一些不再使用的消費組。

消費狀態與監控

在控制台查看消費組狀態,詳情請參見步驟三:查看消費組狀態

並發消費

基於消費組的程式,可以直接啟動多次程式以實現並發效果。

nohup python3 sync_data.py &
nohup python3 sync_data.py &
nohup python3 sync_data.py &
...
說明

所有消費者的名稱均不相同(消費者名以進程ID為尾碼),且屬於同一個消費組。因為一個分區(Shard)只能被一個消費者消費,例如一個日誌庫有10個分區,那麼最多有10個消費組同時消費。

輸送量

基於測試,在沒有頻寬、接收端速率限制(如Splunk端)的情況下,用python3運行上述範例,單個消費者大約佔用20%的單核CPU資源,此時消費可以達到10 MB/s原始日誌的速率。因此10個消費者理論上可以達到100 MB/s原始日誌,即每個CPU核每天可以消費0.9 TB原始日誌。

高可用

消費組將檢測點(check-point)儲存在伺服器端,當一個消費者停止,另外一個消費者將自動接管並從斷點繼續消費。可以在不同機器上啟動消費者,這樣在一台機器停止或者損壞的情況下,其他機器上的消費者可以自動接管並從斷點進行消費。為了備用,也可以通過不同機器啟動大於Shard數量的消費者。

HTTPS

如果服務入口(Endpoint)配置為https://首碼,例如https://cn-beijing.log.aliyuncs.com,則程式自動使用HTTPS加密與Log Service串連。

伺服器憑證*.aliyuncs.com是由GlobalSign簽發,預設大多數Linux和Windows機器自動信任此認證。如果有機器不信任此認證,請參見Certificate installation下載並安裝此認證。