The log data that is collected by open source Logstash can be written to MaxCompute. You can use the output plug-in logstash-output-maxcompute of Logstash and MaxCompute Streaming Tunnel to upload the log data collected by Logstash to MaxCompute.
Prerequisites
Make sure that the following prerequisites are met:
Logstash is installed and a Logstash cluster is created to collect log data.
For more information, see Getting Started with Logstash.
Background information
Logstash is an open source data processing pipeline that is installed on a server. Logstash can be used to collect data from multiple data sources at the same time, convert data types, and transfer the converted data to a specified data store.
The output plug-in logstash-output-maxcompute is developed based on Logstash 7.8.0 and can be used as an output port. This output plug-in has the following benefits:
Uses MaxCompute Streaming Tunnel to avoid concurrency and generation of small files that are caused by batch data import through a tunnel.
Supports dynamic partitioning, which can generate partition fields based on log fields parsed by Logstash. It can also be used to automatically create partitions.
You can use the output plug-in logstash-output-maxcompute in the following scenarios:
The format of the logs that need to be collected is supported by the input plug-in on Logstash or the log format is easy to parse, such as NGINX logs.
Users want partitions to be automatically created based on the collected logs and logs to be automatically imported into the specified partitions.
The output plug-in logstash-output-maxcompute supports the following data types: STRING, BIGINT, DOUBLE, DATETIME, and BOOLEAN.
The formats of the log fields of the DATETIME data type are automatically determined by the
ruby Time.parsefunction.If you configure
.to_string().lowercase() == "true"for the log fields of the BOOLEAN data type, the return value is True. The return value is False if .to_string().lowercase() is set to another value.
The following example describes how to install and use the logstash-output-maxcompute plug-in to collect NGINX logs.
Step 1: Download and install the plug-in
You can click the link to download a Logstash cluster where the output plug-in logstash-output-maxcompute is installed. If you have installed the plug-in, skip this step and proceed to the next step. If you need to install the plug-in, perform the following operations:
Click the link to download the output plug-in
logstash-output-maxcomputeand store the plug-in in the root directory%logstash%of Logstash.Replace
source "https://rubygems.org"withsource 'https://gems.ruby-china.com'in theGemfilefile under the root directory%logstash%of Logstash.In the Windows operating system, switch to the root directory
%logstash%of Logstash in the command-line interface (CLI) and run the following command to install the output plug-inlogstash-output-maxcompute:bin\logstash-plugin install logstash-output-maxcompute-1.1.0.gemIf
Installation successfulis returned, the output plug-in is installed.
Optional: Run the following command to verify the installation result:
bin\logstash-plugin list maxcomputeNoteIn the Linux operating system, run the
bin/logstash-plugin list maxcomputecommand to check whether the output plug-in is installed.If
logstash-output-maxcomputeis returned, the output plug-in is installed.
Step 2: Create a table to which logs are imported
Execute the following statement on the MaxCompute client or by using another tool that can run MaxCompute SQL statements. This statement can be used to create a table in a specified MaxCompute project, such as logstash_test_groknginx. The collected logs are imported to the partitions of this table by date.
create table logstash_test_groknginx(
clientip string,
remote_user string,
time datetime,
verb string,
uri string,
version string,
response string,
body_bytes bigint,
referrer string,
agent string
) partitioned by (pt string);Step 3: Create and configure the pipeline.conf file of Logstash
Create the pipeline.conf file in the root directory %logstash% of Logstash, and enter the following content:
input { stdin {} }
filter {
grok {
match => {
"message" => "%{IP:clientip} - (%{USER:remote_user}|-) \[%{HTTPDATE:httptimestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:body_bytes} %{QS:referrer} %{QS:agent}"
}
}
date {
match => [ "httptimestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "timestamp"
}
}
output {
maxctunnel {
aliyun_access_id => "<your_accesskey_id>"
aliyun_access_key => "<your_accesskey_secret>"
aliyun_mc_endpoint => "<your_project_endpoint>"
project => "<your_project_name>"
table => "<table_name>"
partition => "pt=$<timestamp.strftime('%F')>"
value_fields => ["clientip", "remote_user", "timestamp", "verb", "request", "httpversion", "response", "bytes", "referrer", "agent"]
}
}Parameter | Description |
your_accesskey_id | The AccessKey ID that is used to access the specified MaxCompute project. |
your_accesskey_secret | The AccessKey secret that corresponds to the AccessKey ID. |
your_project_endpoint | The endpoint of the region where the specified MaxCompute project is located. For more information about endpoints, see Endpoints. |
your_project_name | The name of the MaxCompute project. |
table_name | The name of the table that is created in Step 2. |
partition | Specifies the format of the partition to which log data is imported. If the table has multiple levels of partitions, you must specify the last-level partition. Configuration formats:
|
partition_time_format | Optional. This parameter specifies the source format string of a datetime field of the STRING type that is referenced by partitions. In this example, you do not need to specify the format of the If you do not use the If you manually convert the format without using the
|
value_fields | Specifies the log fields that correspond to the fields in the destination table. The sequence of the specified fields must be the same as that in the table. The sequence of the fields in the table is |
aliyun_mc_tunnel_endpoint | Optional. You can use this parameter to forcibly specify the Tunnel endpoint. If you specify this parameter, the automatic routing mechanism becomes ineffective. |
retry_time | The number of retries for writing data to MaxCompute. The number of retries when the system fails to write data to MaxCompute. Default value: 3. |
retry_interval | The interval between two retries. The minimum interval between two retries. Unit: seconds. Default value: 1. |
batch_size | The maximum number of logs that can be processed at a time. Default value: 100. |
batch_timeout | The timeout period for writing data to MaxCompute, in seconds. Default value: 5. |
In the pipeline.conf file, logs are written in the standard format: (input { stdin {} }). In actual scenarios, you can use the Logstash file input plug-in to read NGINX logs from your local disk. For more information, see Logstash documentation.
Step 4: Run Logstash and conduct a test
In the Windows operating system, switch to the root directory
%logstash%of Logstash in the CLI and run the following command to start Logstash:bin\logstash -f pipeline.confIf
Successfully started Logstash API endpointis returned, Logstash is started.
In the CLI of the Windows operating system, paste the following log samples and press Enter.
1.1.1.1 - - [09/Jul/2020:01:02:03 +0800] "GET /masked/request/uri/1 HTTP/1.1" 200 143363 "-" "Masked UserAgent" - 0.095 0.071 2.2.2.2 - - [09/Jul/2020:04:05:06 +0800] "GET /masked/request/uri/2 HTTP/1.1" 200 143388 "-" "Masked UserAgent 2" - 0.095 0.072If
write .. records on partition .. completedis returned, data is written to MaxCompute.
Execute the following statements on the MaxCompute client or use another tool that can run MaxCompute SQL statements to query the results:
set odps.sql.allow.fullscan=true; select * from logstash_test_groknginx;The following result is returned:
+--------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ | clientip | remote_user | time | verb | uri | version | response | body_bytes | referrer | agent | pt | +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ | 1.1.1.1 | - | 2020-07-09 01:02:03 | GET | /masked/request/uri/1 | 1.1 | 200 | 0 | "-" | "Masked UserAgent" | 2020-02-10 | | 2.2.2.2 | - | 2020-07-09 04:05:06 | GET | /masked/request/uri/2 | 1.1 | 200 | 0 | "-" | "Masked UserAgent 2" | 2020-02-10 | +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+ 2 records (at most 10000 supported) fetched by instance tunnel.