Use Logstash (streaming data transfer)

Updated at: 2024-04-22 02:52

The log data that is collected by open source Logstash can be written to MaxCompute. You can use the output plug-in logstash-output-maxcompute of Logstash and MaxCompute Streaming Tunnel to upload the log data collected by Logstash to MaxCompute.

Prerequisites

Make sure that the following prerequisites are met:

Background information

Logstash is an open source data processing pipeline that is installed on a server. Logstash can be used to collect data from multiple data sources at the same time, convert data types, and transfer the converted data to a specified data store.

The output plug-in logstash-output-maxcompute is developed based on Logstash 7.8.0 and can be used as an output port. This output plug-in has the following benefits:

  • Uses MaxCompute Streaming Tunnel to avoid concurrency and generation of small files that are caused by batch data import through a tunnel.

  • Supports dynamic partitioning, which can generate partition fields based on log fields parsed by Logstash. It can also be used to automatically create partitions.

You can use the output plug-in logstash-output-maxcompute in the following scenarios:

  • The format of the logs that need to be collected is supported by the input plug-in on Logstash or the log format is easy to parse, such as NGINX logs.

  • Users want partitions to be automatically created based on the collected logs and logs to be automatically imported into the specified partitions.

The output plug-in logstash-output-maxcompute supports the following data types: STRING, BIGINT, DOUBLE, DATETIME, and BOOLEAN.

Note
  • The formats of the log fields of the DATETIME data type are automatically determined by the ruby Time.parse function.

  • If you configure .to_string().lowercase() == "true" for the log fields of the BOOLEAN data type, the return value is True. The return value is False if .to_string().lowercase() is set to another value.

The following example describes how to install and use the logstash-output-maxcompute plug-in to collect NGINX logs.

Step 1: Download and install the plug-in

You can click the link to download a Logstash cluster where the output plug-in logstash-output-maxcompute is installed. If you have installed the plug-in, skip this step and proceed to the next step. If you need to install the plug-in, perform the following operations:

  1. Click the link to download the output plug-in logstash-output-maxcompute and store the plug-in in the root directory %logstash% of Logstash.

  2. Replace source "https://rubygems.org" with source 'https://gems.ruby-china.com' in the Gemfile file under the root directory %logstash% of Logstash.

  3. In the Windows operating system, switch to the root directory %logstash% of Logstash in the command-line interface (CLI) and run the following command to install the output plug-in logstash-output-maxcompute:

    bin\logstash-plugin install logstash-output-maxcompute-1.1.0.gem

    If Installation successful is returned, the output plug-in is installed.安装成功

  4. Optional: Run the following command to verify the installation result:

    bin\logstash-plugin list maxcompute
    Note

    In the Linux operating system, run the bin/logstash-plugin list maxcompute command to check whether the output plug-in is installed.

    If logstash-output-maxcompute is returned, the output plug-in is installed.验证结果

Step 2: Create a table to which logs are imported

Execute the following statement on the MaxCompute client or by using another tool that can run MaxCompute SQL statements. This statement can be used to create a table in a specified MaxCompute project, such as logstash_test_groknginx. The collected logs are imported to the partitions of this table by date.

create table logstash_test_groknginx(
 clientip string,
 remote_user string,
 time datetime,
 verb string,
 uri string,
 version string,
 response string,
 body_bytes bigint,
 referrer string,
 agent string
) partitioned by (pt string);

Step 3: Create and configure the pipeline.conf file of Logstash

Create the pipeline.conf file in the root directory %logstash% of Logstash, and enter the following content:

input { stdin {} }

filter {
        grok {
                match => {
                        "message" => "%{IP:clientip} - (%{USER:remote_user}|-) \[%{HTTPDATE:httptimestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:body_bytes} %{QS:referrer} %{QS:agent}"
                }
        }
        date {
                match => [ "httptimestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
                target => "timestamp"
        }
}

output {
       maxctunnel {
                aliyun_access_id => "<your_accesskey_id>"
                aliyun_access_key => "<your_accesskey_secret>"
                aliyun_mc_endpoint => "<your_project_endpoint>"
                project => "<your_project_name>"
                table => "<table_name>"
                partition => "pt=$<timestamp.strftime('%F')>"
                value_fields => ["clientip", "remote_user", "timestamp", "verb", "request", "httpversion", "response", "bytes", "referrer", "agent"]
        }
}

Parameter

Description

your_accesskey_id

The AccessKey ID that is used to access the specified MaxCompute project.

your_accesskey_secret

The AccessKey secret that corresponds to the AccessKey ID.

your_project_endpoint

The endpoint of the region where the specified MaxCompute project is located. For more information about endpoints, see Endpoints.

your_project_name

The name of the MaxCompute project.

table_name

The name of the table that is created in Step 2.

partition

Specifies the format of the partition to which log data is imported. If the table has multiple levels of partitions, you must specify the last-level partition. Configuration formats:

  • If the value in a partition is a constant, the format of this parameter is {Name of the partition key column}={Constant value}.

  • If the value in a partition is the value of one of the field in the parsed log, the format of this parameter is {Name of the partition key column}=$<{Name of the log field}>.

  • If the value in a partition is the value of a datetime field in the parsed log and must be reformatted, the format of this parameter is {Name of the partition key column}=$<{Name of the log field}.strftime('{Time format')>. {Time format} is a reformatted string. For more information, see Formatted string.

    In this example, the reformatted time value is accurate to the date (%F). If date is the first-level partition and hour is the second-level partition, the configuration format is "date=$<timestamp.strftime('%F')>,hour=$<timestamp.strftime('%H')>".

  • The names of the multi-level partitions are separated by commas (,). The sequence of the partitions specified by this parameter must be the same as that defined in the CREATE TABLE statement.

partition_time_format

Optional. This parameter specifies the source format string of a datetime field of the STRING type that is referenced by partitions.

In this example, you do not need to specify the format of the timestamp field because the date filter plug-in has converted the format of this field to the DATETIME type.

If you do not use the date filter plug-in and do not specify the format of the field, the plug-in can still automatically identify and convert the field format based on your business requirements. You need to specify the format only in a few cases where the plug-in fails to automatically identify and convert the field format.

If you manually convert the format without using the date filter plug-in, take note of the following items:

  • Manually specify partition_time_format => "%d/%b/%Y:%H:%M:%S %z" for the partition_time_format parameter.

  • Change the field referenced in the partition to a string field in logs: partition => "pt=$<httptimestamp.strftime('%F')>".

value_fields

Specifies the log fields that correspond to the fields in the destination table. The sequence of the specified fields must be the same as that in the table.

The sequence of the fields in the table is clientip string, remote_user string, time datetime, verb string, uri string, version string, response string, body_bytes bigint, referrer string, and agent string. These fields correspond to "clientip", "remote_user", "timestamp", "verb", "request", "httpversion", "response", "bytes", "referrer", and "agent" in the log.

aliyun_mc_tunnel_endpoint

Optional. You can use this parameter to forcibly specify the Tunnel endpoint. If you specify this parameter, the automatic routing mechanism becomes ineffective.

retry_time

The number of retries for writing data to MaxCompute. The number of retries when the system fails to write data to MaxCompute. Default value: 3.

retry_interval

The interval between two retries. The minimum interval between two retries. Unit: seconds. Default value: 1.

batch_size

The maximum number of logs that can be processed at a time. Default value: 100.

batch_timeout

The timeout period for writing data to MaxCompute, in seconds. Default value: 5.

Note

In the pipeline.conf file, logs are written in the standard format: (input { stdin {} }). In actual scenarios, you can use the Logstash file input plug-in to read NGINX logs from your local disk. For more information, see Logstash documentation.

Step 4: Run Logstash and conduct a test

  1. In the Windows operating system, switch to the root directory %logstash% of Logstash in the CLI and run the following command to start Logstash:

    bin\logstash -f pipeline.conf

    If Successfully started Logstash API endpoint is returned, Logstash is started.启动完毕

  2. In the CLI of the Windows operating system, paste the following log samples and press Enter.

    1.1.1.1 - - [09/Jul/2020:01:02:03 +0800] "GET /masked/request/uri/1 HTTP/1.1" 200 143363 "-" "Masked UserAgent" - 0.095 0.071
    2.2.2.2 - - [09/Jul/2020:04:05:06 +0800] "GET /masked/request/uri/2 HTTP/1.1" 200 143388 "-" "Masked UserAgent 2" - 0.095 0.072

    If write .. records on partition .. completed is returned, data is written to MaxCompute.写入成功

  3. Execute the following statements on the MaxCompute client or use another tool that can run MaxCompute SQL statements to query the results:

    set odps.sql.allow.fullscan=true;
    select * from logstash_test_groknginx;

    The following result is returned:

    +--------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
    | clientip   | remote_user | time       | verb       | uri        | version    | response   | body_bytes | referrer   | agent      | pt         |
    +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
    | 1.1.1.1    | -           | 2020-07-09 01:02:03 | GET        | /masked/request/uri/1 | 1.1        | 200        | 0          | "-"        | "Masked UserAgent" | 2020-02-10       |
    | 2.2.2.2    | -           | 2020-07-09 04:05:06 | GET        | /masked/request/uri/2 | 1.1        | 200        | 0          | "-"        | "Masked UserAgent 2" | 2020-02-10       |
    +------------+-------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
    2 records (at most 10000 supported) fetched by instance tunnel.
  • On this page (1, T)
  • Prerequisites
  • Background information
  • Step 1: Download and install the plug-in
  • Step 2: Create a table to which logs are imported
  • Step 3: Create and configure the pipeline.conf file of Logstash
  • Step 4: Run Logstash and conduct a test
Feedback