Syslogは、IBM QRadarやHP ArcSightなど、ほとんどのセキュリティ情報およびイベント管理 (SIEM) システムに適用できる、広く使用されているロギング標準です。 このトピックでは、Syslog経由でSimple Log ServiceからSIEMシステムにログを送信する方法について説明します。
背景情報
Syslogは、RFC 5424およびRFC 3164で定義されています。 RFC 3164は2001年に公開され、RFC 5424は2009年に公開されたアップグレード版でした。 このバージョンはRFC 3164と互換性があり、RFC 3164よりも多くの問題を解決するため、RFC 5424の使用を推奨します。 詳細については、「RFC 5424」および「RFC 3164」をご参照ください。
Syslog over TCP/TLS: Syslogは、ログメッセージの標準形式を定義します。 TCPとUDPの両方がSyslogをサポートし、データ伝送の安定性を確保します。 RFC 5425は、トランスポート層セキュリティ (TLS) の使用を定義して、Syslogメッセージの安全なトランスポートを有効にします。 SIEMシステムがTCPまたはTLSをサポートしている場合は、TCPまたはTLS経由でSyslogメッセージを送信することを推奨します。 詳細については、「RFC 5425」をご参照ください。
Syslogファシリティ: 以前のバージョンのUNIXで定義されたプログラムコンポーネント。 デフォルトの機能として
user
を選択できます。 詳細については、「プログラムコンポーネント」をご参照ください。Syslog重大度: Syslogメッセージに対して定義された重大度。 ビジネス要件に基づいて、特定のコンテンツを含むログをより高い重大度レベルに設定できます。 デフォルト値は
info
です。 詳細は、「ログレベル」をご参照ください。
このトピックのコード例は、参照のみに使用されます。 最新のコード例の詳細については、『GitHub』をご参照ください。
配送プロセス
Simple Log Serviceでコンシューマーグループに基づいたプログラムを作成することを推奨します。 このようにして、プログラムを使用してSyslogメッセージをTCPまたはTLS経由でSIEMシステムに送信できます。
メインプログラムを書く
次のコードは、メインプログラムの制御ロジックを示しています。
def main():
option, settings = get_monitor_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
プログラムの設定
次の情報を設定します。
プログラムのログファイル: 潜在的な問題のその後のテストと診断に使用されます。
基本オプション: 消費者グループ設定とSimple Log Serviceの接続設定が含まれます。
コンシューマーグループの高度なオプション: パフォーマンスチューニングに使用されます。 これらのオプションの設定は変更しないことをお勧めします。
SIEMシステムのSyslogサーバのパラメータとオプション
説明
コード例
次の例のコードコメントを読み、ビジネス要件に基づいてパラメーターを変更します。
#encoding: utf8 import os import logging from logging.handlers import RotatingFileHandler user = logging.getLogger() handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5) handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) user.setLevel(logging.INFO) user.addHandler(handler) user.addHandler(logging.StreamHandler()) logger = logging.getLogger(__name__) def get_option(): ########################## # Basic options ########################## # Obtain parameters and options for Log Service from environment variables. endpoint = os.environ.get('SLS_ENDPOINT', '') accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') consumer_group = os.environ.get('SLS_CG', '') # The starting point of data consumption. The first time that you run the program, the starting point is specified by this parameter. The next time you run the program, the consumption starts from the last consumption checkpoint. # You can set the parameter to begin, end, or a time in the ISO 8601 standard. cursor_start_time = "2018-12-26 0:0:0" ########################## # Advanced options ########################## # We recommend that you do not modify the consumer name, especially when concurrent consumption is required. consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # The heartbeat interval. If the server does not receive a heartbeat for a specific shard for two consecutive intervals, the consumer is considered disconnected. In this case, the server allocates the task to another consumer. # If the network performance is poor, we recommend that you specify a larger interval. heartbeat_interval = 20 # The maximum interval between two data consumption processes. If data is generated at a fast speed, you do not need to adjust the parameter. data_fetch_interval = 1 # Create a consumer group that contains the consumer. option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval) # syslog options settings = { "host": "1.2.3.4", # Required. "port": 514, # Required. The port number. "protocol": "tcp", # Required. Valid values: tcp, udp, and tls. The tls value is only applicable to Python 3. "sep": "||", # Required. The separator that is used to separate key-value pairs. In this example, the separator is two consecutive vertical bars (||). "cert_path": None, # Optional. The path where the TLS certificate is stored. "timeout": 120, # Optional. The timeout period. The default value is 120. Unit: seconds. "facility": syslogclient.FAC_USER, # Optional. You can refer to the values of the syslogclient.FAC_* parameter in other examples. "severity": syslogclient.SEV_INFO, # Optional. You can refer to the values of the syslogclient.SEV_* parameter in other examples. "hostname": None, # Optional. The machine name. The default value is the name of your computer. "tag": None # Optional. The tag. The default value is a hyphen (-). } return option, settings
データの消費と出荷
次の例は、SIEMシステムのSimple Log Serviceからデータを収集し、そのデータをSyslogサーバーに送信する方法を示しています。 次の例のコードコメントを読み、ビジネス要件に基づいてパラメーターを変更します。
syslogclientから
from syslogclient import SyslogClientRFC5424 as SyslogClient
class SyncData(ConsumerProcessorBase):
"""
The consumer consumes data from Log Service and ships it to the Syslog server.
"""
def __init__(self, splunk_setting):
"""Initiate the Syslog server and test network connectivity."""
super(SyncData, self).__init__() # remember to call base's init
assert target_setting, ValueError("You need to configure settings of remote target")
assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = target_setting
self.protocol = self.option['protocol']
self.timeout = int(self.option.get('timeout', 120))
self.sep = self.option.get('sep', "||")
self.host = self.option["host"]
self.port = int(self.option.get('port', 514))
self.cert_path=self.option.get('cert_path', None)
# try connection
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
pass
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
try:
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
for log in logs:
# Put your sync code here to send to remote.
# the format of log is just a dict with example as below (Note, all strings are unicode):
# Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
# suppose we only care about audit log
timestamp = datetime.fromtimestamp(int(log[u'__time__']))
del log['__time__']
io = six.StringIO()
first = True
# Modify the formatted content based on your business requirements. The data is transmitted by using key-value pairs that are separated with two consecutive vertical bars (||).
for k, v in six.iteritems(log):
io.write("{0}{1}={2}".format(self.sep, k, v))
data = io.getvalue()
# Modify the facility and severity settings based on your business requirements.
client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))
except Exception as err:
logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
# Add code to handle errors. For example, you can add the code to retry requests or report errors.
raise err
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
プログラムを開始する
次のコードは、sync_data.pyという名前のプログラムを起動する方法を示しています。
export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<Consumer group name, such as syc_data>
python3 sync_data.py
制限事項
Simple Log Serviceのログストアごとに最大30のコンシューマーグループを設定できます。 システムにConsumerGroupQuotaExceed
エラーメッセージが表示された場合は、Simple log Serviceコンソールにログインし、不要になったコンシューマーグループを削除することを推奨します。
データ消費の表示と監視
Simple log Serviceコンソールにログインして、コンシューマーグループのデータ消費ステータスを表示できます。 詳細については、「手順2: コンシューマーグループのステータスの表示」をご参照ください。
同時消費
データを同時に消費するには、複数のコンシューマー向けに複数のコンシューマーグループベースのプログラムを起動します。
nohup python3 sync_data.py &
nohup python3 sync_data.py &
nohup python3 sync_data.py &
...
各コンシューマの名前は、コンシューマグループ内で一意です。 コンシューマの名前には、プロセスIDが付いています。 1つのシャードのデータは、1つのコンシューマーのみが使用できます。 Logstoreに10個のシャードが含まれ、各コンシューマグループに1つのコンシューマしか含まれていない場合、最大10個のコンシューマグループがすべてのシャードのデータを同時に消費できます。
スループット
スループットは次のシナリオでテストされます。前述の例では、Python 3を使用してプログラムを実行し、Splunkでの受信速度などの帯域幅と受信速度は制限されず、単一のコンシューマーがシングルコアCPUリソースの約20% を消費します。 テスト結果は、生ログの消費速度が10メガバイト/秒に達する可能性があることを示しています。 したがって、10人の消費者が同時にデータを消費する場合、生ログの消費速度はCPUコアあたり100メガバイト/秒に達する可能性があります。 各CPUコアは、1日あたり最大0.9テラバイトの生ログを消費できます。
High availability
コンシューマーグループは、チェックポイントをサーバーに格納します。 ある消費者のデータ消費プロセスが停止すると、別の消費者が自動的にデータ消費プロセスを引き継ぎ、最後の消費のチェックポイントからプロセスを続行します。 さまざまなマシンでコンシューマーを開始できます。 マシンが停止または損傷した場合、別のマシンのコンシューマがデータ消費プロセスを引き継ぎ、最後の消費のチェックポイントからプロセスを続行できます。 十分な数の消費者を持つために、異なるマシン上のシャードよりも多くの消費者を開始できます。