This topic describes how to ship logs in Alibaba Cloud to a security information and event management (SIEM) system by using Splunk HTTP Event Collector (HEC).
If a SIEM system, such as Splunk, is deployed in an on-premises environment, no ports are open. This prevents access to the SIEM system from an external environment and ensures the security of the SIEM system.
In this topic, the sample configuration code is provided only for reference. For more information about the latest sample code, visit GitHub or GitHub (applicable to a Logstore that has multiple data sources).
Shipping process
Real-time consumption
In full data shipping scenarios, we recommend that you use Simple Log Service consumer groups to consume data in real time and ship the obtained logs to a SIEM system by using HEC.
Rule-based consumption
If data cleansing is involved in a data shipping process, we recommend that you consume data based on rules and ship the obtained logs to Splunk by using HEC. Data cleansing includes row filtering, column pruning, and data normalization.
Write a main program
The following code shows the control logic of a main program:
def main():
option, settings = get_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
Configure the program
Configuration content
Log file of the program: used for subsequent tests and diagnosis of potential issues.
Basic settings: consumer group settings and connection settings of Simple Log Service.
Advanced settings of consumer groups: used for performance tuning. We recommend that you do not modify the settings.
Parameters and settings for the SIEM system. In this topic, Splunk is used as an example.
Sample code
Read the comments in the following code and modify the parameters based on your business requirements:
Real-time consumption
#encoding: utf8 import os import logging from logging.handlers import RotatingFileHandler user = logging.getLogger() handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5) handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) user.setLevel(logging.INFO) user.addHandler(handler) user.addHandler(logging.StreamHandler()) logger = logging.getLogger(__name__) def get_option(): ########################## # Basic settings ########################## # Obtain Simple Log Service parameters and settings from environment variables. endpoint = os.environ.get('SLS_ENDPOINT', '') accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') consumer_group = os.environ.get('SLS_CG', '') # The starting point of data consumption. This parameter specifies the point at which the consumption starts the first time you run the program. The next time you run the program, the consumption starts from the last consumption checkpoint. # You can set the parameter to begin, end, or a time value in the ISO 8601 standard. cursor_start_time = "2018-12-26 0:0:0" ########################## # Advanced settings ########################## # We recommend that you do not change the consumer name, especially when concurrent consumption is required. consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # The heartbeat interval. If a server does not receive a heartbeat for a specific shard for two consecutive intervals, the related consumer is considered disconnected. In this case, the server allocates the task to another consumer. # If network conditions are poor, we recommend that you specify a long interval. heartbeat_interval = 20 # The maximum interval between two data consumption processes. If data is generated at a high speed, you do not need to modify this parameter. data_fetch_interval = 1 # Create a consumer group and a consumer. option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval) # The Splunk settings. settings = { "host": "10.1.2.3", "port": 80, "token": "a023nsdu123123123", 'https': False, # Optional. A Boolean variable. 'timeout': 120, # Optional. An integer. 'ssl_verify': True, # Optional. A Boolean variable. "sourcetype": "", # Optional. The sourcetype field is a default field defined by Splunk. "index": "", # Optional. The index field is a default field defined by Splunk. "source": "", # Optional. The source field is a default field defined by Splunk. } return option, settings
Rule-based consumption
# encoding: utf8 import os import logging from logging.handlers import RotatingFileHandler user = logging.getLogger() handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100 * 1024 * 1024, backupCount=5) handler.setFormatter(logging.Formatter( fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) user.setLevel(logging.INFO) user.addHandler(handler) user.addHandler(logging.StreamHandler()) logger = logging.getLogger(__name__) def get_option(): ########################## # Basic settings ########################## # Obtain Simple Log Service parameters and settings from environment variables. endpoint = os.environ.get('SLS_ENDPOINT', '') accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') consumer_group = os.environ.get('SLS_CG', '') # The starting point of data consumption. This parameter specifies the point at which the consumption starts the first time you run the program. The next time you run the program, the consumption starts from the last consumption checkpoint. # You can set the parameter to begin, end, or a time value in the ISO 8601 standard. cursor_start_time = "2018-12-26 0:0:0" ########################## # Advanced settings ########################## # We recommend that you do not change the consumer name, especially when concurrent consumption is required. consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # The heartbeat interval. If a server does not receive a heartbeat for a specific shard for two consecutive intervals, the related consumer is considered disconnected. In this case, the server allocates the task to another consumer. # If network conditions are poor, we recommend that you specify a long interval. heartbeat_interval = 20 # The maximum interval between two data consumption processes. If data is generated at a high speed, you do not need to modify this parameter. data_fetch_interval = 1 # The Simple Log Service Processing Language (SPL) statement. query = "* | where instance_id in ('instance-1', 'instance-2')" # The rule-based consumption settings. option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval, query=query) # The Splunk settings. settings = { "host": "10.1.2.3", "port": 80, "token": "a023nsdu123123123", 'https': False, # Optional. A Boolean variable. 'timeout': 120, # Optional. An integer. 'ssl_verify': True, # Optional. A Boolean variable. "sourcetype": "", # Optional. The sourcetype field is a default field defined by Splunk. "index": "", # Optional. The index field is a default field defined by Splunk. "source": "", # Optional. The source field is a default field defined by Splunk. } return option, settings
Configure consumption and shipping
The following code provides an example on how to obtain data from Simple Log Service and ship the obtained data to Splunk. Read the comments in the following code and modify the parameters based on your business requirements.
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requests
class SyncData(ConsumerProcessorBase):
"""
The consumer consumes data from Simple Log Service and ships the data to Splunk.
"""
def __init__(self, splunk_setting):
"""Initiate Splunk and test network connectivity."""
super(SyncData, self).__init__()
assert splunk_setting, ValueError("You need to configure settings of remote target")
assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = splunk_setting
self.timeout = self.option.get("timeout", 120)
# Test the network connectivity to Splunk.
s = socket.socket()
s.settimeout(self.timeout)
s.connect((self.option["host"], self.option['port']))
self.r = requests.session()
self.r.max_redirects = 1
self.r.verify = self.option.get("ssl_verify", True)
self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
self.url = "{0}://{1}:{2}/services/collector/event".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])
self.default_fields = {}
if self.option.get("sourcetype"):
self.default_fields['sourcetype'] = self.option.get("sourcetype")
if self.option.get("source"):
self.default_fields['source'] = self.option.get("source")
if self.option.get("index"):
self.default_fields['index'] = self.option.get("index")
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
for log in logs:
# Ship data to Splunk.
event = {}
event.update(self.default_fields)
event['time'] = log[u'__time__']
del log['__time__']
json_topic = {"actiontrail_audit_event": ["event"] }
topic = log.get("__topic__", "")
if topic in json_topic:
try:
for field in json_topic[topic]:
log[field] = json.loads(log[field])
except Exception as ex:
pass
event['event'] = json.dumps(log)
data = json.dumps(event, sort_keys=True)
try:
req = self.r.post(self.url, data=data, timeout=self.timeout)
req.raise_for_status()
except Exception as err:
logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}", self.url, err)
# Add code to handle errors. For example, you can add the code to retry requests or report errors.
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
Start the program
The following code provides an example on how to start a program named sync_data.py:
export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<Consumer group, such as "syc_data">
python3 sync_data.py
Ship data from a Logstore that has multiple data sources
If a Logstore has multiple data sources, you must configure a public executor. This prevents a large number of processes from running. For more information, see Ship logs from a Logstore that has multiple sources to Splunk. The following code provides an example on how to configure an executor:
exeuctor, options, settings = get_option()
logger.info("*** start to consume data...")
workers = []
for option in options:
worker = ConsumerWorker(SyncData, option, args=(settings,) )
workers.append(worker)
worker.start()
try:
for i, worker in enumerate(workers):
while worker.is_alive():
worker.join(timeout=60)
logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format(
options[i].project, options[i].logstore))
worker.shutdown()
except KeyboardInterrupt:
logger.info("*** try to exit **** ")
for worker in workers:
worker.shutdown()
# wait for all workers to shutdown before shutting down executor
for worker in workers:
while worker.is_alive():
worker.join(timeout=60)
exeuctor.shutdown()
Limits
You can configure up to 30 consumer groups for each Logstore in Simple Log Service. If the system displays the ConsumerGroupQuotaExceed
error message, we recommend that you log on to the Simple Log Service console and delete consumer groups that you no longer need.
View and monitor data consumption
You can log on to the Simple Log Service console to view the data consumption status of a consumer group. For more information, see Step 2: View the status of a consumer group.
Concurrent consumption
To consume data concurrently, you can start multiple consumer group-based programs for multiple consumers.
nohup python3 sync_data.py &
nohup python3 sync_data.py &
nohup python3 sync_data.py &
...
The name of each consumer is unique within a consumer group. The names of the consumers are suffixed with process IDs. The data of one shard can be consumed by only one consumer. If a Logstore contains 10 shards and each consumer group contains only one consumer, a maximum of 10 consumer groups can consume the data of all shards at the same time.
Throughput
Throughput is tested in the following scenario: Python 3 is used to run the program in the preceding example, the bandwidth and receiving speed, such as the receiving speed on Splunk, are not limited, and a single consumer consumes about 20% of the single-core CPU resources. The test results indicate that the consumption speed of raw logs can reach 10 MB/s. Therefore, if 10 consumers consume data at the same time, the consumption speed of raw logs can reach 100 MB/s per CPU core. Each CPU core can consume up to 0.9 TB of raw logs per day.
High availability
A consumer group stores checkpoints on the server. When the data consumption process of one consumer stops, another consumer automatically takes over the data consumption process and continues the process from the checkpoint of the last consumption. You can start consumers on different machines. If a machine stops or is damaged, a consumer on another machine can take over the data consumption process and continue the process from the checkpoint of the last consumption. To have a sufficient number of consumers, you can start more consumers than shards on different machines.
HTTPS
If a program uses a Simple Log Service endpoint that is prefixed with https://
, the program automatically encrypts its connections with Simple Log Service by using HTTPS. Example: https://cn-beijing.log.aliyuncs.com
.
The certificate for the domain name *.aliyuncs.com is issued by GlobalSign. By default, most Linux and Windows servers are preconfigured to trust this certificate. If a server does not trust this certificate, you must download and install the certificate. For more information, see Certificate installation.