DataHub plug-in for Fluentd
Overview
The DataHub plug-in for Fluentd, an output plug-in developed based on Fluentd, writes collected data to DataHub. The plug-in complies with the development conventions of Fluentd output plug-ins and is easy to install. You can use the plug-in to write the collected data to DataHub with ease.
Install the DataHub plug-in for Fluentd
Install the plug-in by using RubyGems
We recommend that you change the gem source to https://gems.ruby-china.com
gem install fluent-plugin-datahub
Install the plug-in by using an installation package
The plug-in must be installed in Linux. Before you install the plug-in, install Ruby.
For users who have not installed Fluentd, a full installation package for installing both Fluentd and the DataHub plug-in for Fluentd is provided. For users who have installed Fluentd, an installation package of the DataHub plug-in for Fluentd is provided.
(1) Install both Fluentd and the DataHub plug-in for Fluentd: If you have not installed Fluentd, download the full installation package for installing both Fluentd and the DataHub plug-in for Fluentd. Note that Fluentd 0.12.25 is provided in the full installation package.
$ tar -xzvf fluentd-with-datahub-0.12.25.tar.gz
$ cd fluentd-with-datahub
$ sudo sh install.sh
(2) Separately install the DataHub plug-in for Fluentd: If you have installed Fluentd, download the installation package of the DataHub plug-in for Fluentd and run the gem command to install the plug-in.
$ sudo gem install --local fluent-plugin-datahub-0.12.25.gem
Use cases
Case 1: Upload a CSV file
This section describes how to write the incremental content of a CSV file to DataHub in quasi-real time by using the DataHub plug-in for Fluentd. The following example shows the content format of the CSV file.
0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111
Each line is a record to be written to DataHub. Fields are separated by commas (,). The CSV file is saved as /temp/test.csv on the on-premises computer. The following table describes the schema of the DataHub topic to which the CSV file is written.
Field name | Data type |
---|---|
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
The following Fluentd configuration file is used in this example. The configuration file is saved as ${CONFIG_HOME}/fluentd_test.conf.
<source>
@type tail
path The path of the CSV file.
tag test1
format csv
keys id,name,gender,salary,my_time
</source>
<match test1>
@type datahub
access_id your_app_id
access_key your_app_key
endpoint http://ip:port
project_name test_project
topic_name fluentd_performance_test_1
column_names ["id", "name", "gender", "salary", "my_time"]
flush_interval 1s
buffer_chunk_limit 3m
buffer_queue_limit 128
dirty_data_continue true
dirty_data_file The path of the dirty record file.
retry_times 3
put_data_batch_size 1000
</match>
Run the following command to start Fluentd to write the CSV file to DataHub:
${FLUENTD_HOME}/fluentd-with-dataHub/bin/fluentd -c ${CONFIG_HOME}/fluentd_test.conf
Case 2: Collect Log4j logs
The following code shows a sample Log4j log:
11:48:43.439 [qtp1847995714-17] INFO AuditInterceptor - [c2un5sh7cu52ek6am1ui1m5h] end /web/v1/project/tefe4mfurtix9kwwyrvfqd0m/node/0m0169kapshvgc3ujskwkk8g/health GET, 4061 ms
The following Fluentd configuration file is used in this example:
<source>
@type tail
path bayes.log
tag test
format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
</source>
<match test>
@type datahub
access_id your_access_id
access_key your_access_key
endpoint http://ip:port
project_name test_project
topic_name dataHub_fluentd_out_1
column_names ["thread_id", "log_level", "class"]
</match>
Use the preceding configuration file to start Fluentd to collect Log4j logs to DataHub.
Parameters
Input configuration
tag test1: the tag, which is mapped to the destination information by using the specified regular expression.
format csv: the format of the file from which data is collected.
keys id,name,gender,salary,my_time: the fields to be collected from the CSV file. The field names must be the same as those in the schema of the destination DataHub topic.
Output configuration
shard_id 0: the ID of the shard to which all records are written. By default, all records are written to the shard by polling.
shard_keys ["id"]: the field used as the shard key. Hashed shard key values are used as indexes for writing data.
flush_interval 1: the interval between data flushes. Default value: 60s.
buffer_chunk_limit 3m: the maximum size of a chunk. Unit: k or m, which indicates KB or MB. We recommend you set the maximum size to 3 MB.
buffer_queue_limit 128: the maximum length of the chunk queue. Both the buffer_chunk_limit and buffer_queue_limit parameters determine the size of the buffer.
put_data_batch_size 1000: the number of records to be written to DataHub at a time. In this example, 1,000 records are written to DataHub each time.
retry_times 3: the number of retries.
retry_interval 3: the interval between retries. Unit: seconds.
dirty_data_continue true: specifies whether to ignore dirty records. A value of true indicates that the plug-in retries the operation for a specified number of times before it writes the dirty records to the dirty record file.
dirty_data_file /xxx/yyy: the directory where the dirty record file is stored.
column_names ["id"]: the name of the fields to be written to DataHub.
Performance testing
Environment for performance testing: Fluentd runs in Linux with a dual-core CPU and 4 GB memory. The following points can be observed from the performance testing data:
For a single record of 512 bytes in size, the write speed is kept at about 2,800 records per second.
As the number of records to be written to DataHub at a time increases, the write speed slightly increases.
For a single record of 100 KB in size, the plug-in can work only when 100 records are written to DataHub at a time. The plug-in does not work when 500 or 1,000 records are written to DataHub at a time because the amount of the data written to DataHub at a time is too large. The size of 500 or 1,000 records is greater than 50 MB.
The average write speed remains at 3 MB/s.
FAQ
Q: How do I write regular expressions for Fluentd? A: You can use the regular expression editor.