このトピックでは、Splunk HTTP event Collector (HEC) を使用して、Alibaba Cloudのログをセキュリティ情報およびイベント管理 (SIEM) システムに送信する方法について説明します。
SplunkなどのSIEMシステムがオンプレミス環境で展開されている場合、ポートは開いていません。 これは、外部環境からSIEMシステムへのアクセスを防止し、SIEMシステムのセキュリティを確保する。
このトピックでは、サンプル構成コードは参照用にのみ提供されています。 最新のサンプルコードの詳細については、GitHubまたはGitHub (複数のデータソースを持つLogstoreに適用可能) をご参照ください。
配送プロセス
リアルタイム消費
フルデータシッピングのシナリオでは、Simple Log Serviceコンシューマグループを使用してリアルタイムでデータを消費し、HECを使用して取得したログをSIEMシステムに送信することを推奨します。
ルールベースの消費
データクレンジングがデータシッピングプロセスに関与する場合は、ルールに基づいてデータを消費し、HECを使用して取得したログをSplunkに送信することをお勧めします。 データクレンジングには、行フィルタリング、列プルーニング、およびデータ正規化が含まれます。
メインプログラムを書く
次のコードは、メインプログラムの制御ロジックを示しています。
def main():
option, settings = get_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
プログラムの設定
構成内容
プログラムのログファイル: 潜在的な問題のその後のテストと診断に使用されます。
基本設定: Simple Log Serviceのコンシューマグループ設定と接続設定。
コンシューマーグループの詳細設定: パフォーマンスの調整に使用されます。 設定を変更しないことをお勧めします。
SIEMシステムのパラメータと設定。 このトピックでは、Splunkを例として使用します。
サンプルコード
次のコードのコメントを読み、ビジネス要件に基づいてパラメーターを変更します。
リアルタイム消費
#encoding: utf8 import os import logging from logging.handlers import RotatingFileHandler user = logging.getLogger() handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5) handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) user.setLevel(logging.INFO) user.addHandler(handler) user.addHandler(logging.StreamHandler()) logger = logging.getLogger(__name__) def get_option(): ########################## # Basic settings ########################## # Obtain Simple Log Service parameters and settings from environment variables. endpoint = os.environ.get('SLS_ENDPOINT', '') accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') consumer_group = os.environ.get('SLS_CG', '') # The starting point of data consumption. This parameter specifies the point at which the consumption starts the first time you run the program. The next time you run the program, the consumption starts from the last consumption checkpoint. # You can set the parameter to begin, end, or a time value in the ISO 8601 standard. cursor_start_time = "2018-12-26 0:0:0" ########################## # Advanced settings ########################## # We recommend that you do not change the consumer name, especially when concurrent consumption is required. consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # The heartbeat interval. If a server does not receive a heartbeat for a specific shard for two consecutive intervals, the related consumer is considered disconnected. In this case, the server allocates the task to another consumer. # If network conditions are poor, we recommend that you specify a long interval. heartbeat_interval = 20 # The maximum interval between two data consumption processes. If data is generated at a high speed, you do not need to modify this parameter. data_fetch_interval = 1 # Create a consumer group and a consumer. option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval) # The Splunk settings. settings = { "host": "10.1.2.3", "port": 80, "token": "a023nsdu123123123", 'https': False, # Optional. A Boolean variable. 'timeout': 120, # Optional. An integer. 'ssl_verify': True, # Optional. A Boolean variable. "sourcetype": "", # Optional. The sourcetype field is a default field defined by Splunk. "index": "", # Optional. The index field is a default field defined by Splunk. "source": "", # Optional. The source field is a default field defined by Splunk. } return option, settings
ルールベースの消費
# encoding: utf8 import os import logging from logging.handlers import RotatingFileHandler user = logging.getLogger() handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100 * 1024 * 1024, backupCount=5) handler.setFormatter(logging.Formatter( fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) user.setLevel(logging.INFO) user.addHandler(handler) user.addHandler(logging.StreamHandler()) logger = logging.getLogger(__name__) def get_option(): ########################## # Basic settings ########################## # Obtain Simple Log Service parameters and settings from environment variables. endpoint = os.environ.get('SLS_ENDPOINT', '') accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') consumer_group = os.environ.get('SLS_CG', '') # The starting point of data consumption. This parameter specifies the point at which the consumption starts the first time you run the program. The next time you run the program, the consumption starts from the last consumption checkpoint. # You can set the parameter to begin, end, or a time value in the ISO 8601 standard. cursor_start_time = "2018-12-26 0:0:0" ########################## # Advanced settings ########################## # We recommend that you do not change the consumer name, especially when concurrent consumption is required. consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # The heartbeat interval. If a server does not receive a heartbeat for a specific shard for two consecutive intervals, the related consumer is considered disconnected. In this case, the server allocates the task to another consumer. # If network conditions are poor, we recommend that you specify a long interval. heartbeat_interval = 20 # The maximum interval between two data consumption processes. If data is generated at a high speed, you do not need to modify this parameter. data_fetch_interval = 1 # The Simple Log Service Processing Language (SPL) statement. query = "* | where instance_id in ('instance-1', 'instance-2')" # The rule-based consumption settings. option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval, query=query) # The Splunk settings. settings = { "host": "10.1.2.3", "port": 80, "token": "a023nsdu123123123", 'https': False, # Optional. A Boolean variable. 'timeout': 120, # Optional. An integer. 'ssl_verify': True, # Optional. A Boolean variable. "sourcetype": "", # Optional. The sourcetype field is a default field defined by Splunk. "index": "", # Optional. The index field is a default field defined by Splunk. "source": "", # Optional. The source field is a default field defined by Splunk. } return option, settings
消費と配送の設定
次のコードは、Simple Log Serviceからデータを取得し、取得したデータをSplunkに送信する方法の例を示しています。 次のコードのコメントを読み、ビジネス要件に基づいてパラメーターを変更します。
aliyun.log.consumer import * からの
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requests
class SyncData(ConsumerProcessorBase):
"""
The consumer consumes data from Simple Log Service and ships the data to Splunk.
"""
def __init__(self, splunk_setting):
"""Initiate Splunk and test network connectivity."""
super(SyncData, self).__init__()
assert splunk_setting, ValueError("You need to configure settings of remote target")
assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = splunk_setting
self.timeout = self.option.get("timeout", 120)
# Test the network connectivity to Splunk.
s = socket.socket()
s.settimeout(self.timeout)
s.connect((self.option["host"], self.option['port']))
self.r = requests.session()
self.r.max_redirects = 1
self.r.verify = self.option.get("ssl_verify", True)
self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
self.url = "{0}://{1}:{2}/services/collector/event".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])
self.default_fields = {}
if self.option.get("sourcetype"):
self.default_fields['sourcetype'] = self.option.get("sourcetype")
if self.option.get("source"):
self.default_fields['source'] = self.option.get("source")
if self.option.get("index"):
self.default_fields['index'] = self.option.get("index")
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
for log in logs:
# Ship data to Splunk.
event = {}
event.update(self.default_fields)
event['time'] = log[u'__time__']
del log['__time__']
json_topic = {"actiontrail_audit_event": ["event"] }
topic = log.get("__topic__", "")
if topic in json_topic:
try:
for field in json_topic[topic]:
log[field] = json.loads(log[field])
except Exception as ex:
pass
event['event'] = json.dumps(log)
data = json.dumps(event, sort_keys=True)
try:
req = self.r.post(self.url, data=data, timeout=self.timeout)
req.raise_for_status()
except Exception as err:
logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}", self.url, err)
# Add code to handle errors. For example, you can add the code to retry requests or report errors.
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
プログラムを開始する
次のコードは、sync_data.pyという名前のプログラムを起動する方法の例を示しています。
export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<Consumer group, such as "syc_data">
python3 sync_data.py
複数のデータソースを持つLogstoreからデータを出荷する
Logstoreに複数のデータソースがある場合は、パブリックエグゼキュータを設定する必要があります。 これにより、多数のプロセスが実行されなくなります。 詳細については、「複数のソースがあるLogstoreからSplunkへのログの送信」をご参照ください。 次のコードは、エグゼキュータを設定する方法の例を示しています。
exeuctor, options, settings = get_option()
logger.info("*** start to consume data...")
workers = []
for option in options:
worker = ConsumerWorker(SyncData, option, args=(settings,) )
workers.append(worker)
worker.start()
try:
for i, worker in enumerate(workers):
while worker.is_alive():
worker.join(timeout=60)
logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format(
options[i].project, options[i].logstore))
worker.shutdown()
except KeyboardInterrupt:
logger.info("*** try to exit **** ")
for worker in workers:
worker.shutdown()
# wait for all workers to shutdown before shutting down executor
for worker in workers:
while worker.is_alive():
worker.join(timeout=60)
exeuctor.shutdown()
制限事項
Simple Log Serviceのログストアごとに最大30のコンシューマーグループを設定できます。 システムにConsumerGroupQuotaExceed
エラーメッセージが表示された場合は、Simple log Serviceコンソールにログインし、不要になったコンシューマーグループを削除することを推奨します。
データ消費の表示と監視
Simple log Serviceコンソールにログインして、コンシューマーグループのデータ消費ステータスを表示できます。 詳細については、「手順2: コンシューマーグループのステータスの表示」をご参照ください。
同時消費
データを同時に消費するには、複数のコンシューマー向けに複数のコンシューマーグループベースのプログラムを起動します。
nohup python3 sync_data.py &
nohup python3 sync_data.py &
nohup python3 sync_data.py &
...
各コンシューマの名前は、コンシューマグループ内で一意です。 コンシューマの名前には、プロセスIDが付いています。 1つのシャードのデータは、1つのコンシューマーのみが使用できます。 Logstoreに10個のシャードが含まれ、各コンシューマグループに1つのコンシューマしか含まれていない場合、最大10個のコンシューマグループがすべてのシャードのデータを同時に消費できます。
スループット
スループットは次のシナリオでテストされます。前述の例では、Python 3を使用してプログラムを実行し、Splunkでの受信速度などの帯域幅と受信速度は制限されず、単一のコンシューマーがシングルコアCPUリソースの約20% を消費します。 テスト結果は、生ログの消費速度が10メガバイト/秒に達する可能性があることを示しています。 したがって、10人の消費者が同時にデータを消費する場合、生ログの消費速度はCPUコアあたり100メガバイト/秒に達する可能性があります。 各CPUコアは、1日あたり最大0.9テラバイトの生ログを消費できます。
High availability
コンシューマーグループは、チェックポイントをサーバーに格納します。 ある消費者のデータ消費プロセスが停止すると、別の消費者が自動的にデータ消費プロセスを引き継ぎ、最後の消費のチェックポイントからプロセスを続行します。 さまざまなマシンでコンシューマーを開始できます。 マシンが停止または損傷した場合、別のマシンのコンシューマがデータ消費プロセスを引き継ぎ、最後の消費のチェックポイントからプロセスを続行できます。 十分な数の消費者を持つために、異なるマシン上のシャードよりも多くの消費者を開始できます。
HTTPS
プログラムがhttps://
というプレフィックスが付いたSimple Log Serviceエンドポイントを使用している場合、プログラムはHTTPSを使用してSimple Log Serviceとの接続を自動的に暗号化します。 例: https://cn-beijing.log.aliyuncs.com
。
ドメイン名 * .aliyuncs.comの証明書はGlobalSignによって発行されます。 デフォルトでは、ほとんどのLinuxおよびWindowsサーバーはこの証明書を信頼するように事前設定されます。 サーバーがこの証明書を信頼しない場合は、証明書をダウンロードしてインストールする必要があります。 詳細については、「証明書のインストール」をご参照ください。