Logstash是一个开源的服务器端数据处理管道,起初用于将日志类数据写入ES中。随着开源社区的不断发展,Logstash可以同时从多个数据源获取数据,并对其进行转换,然后将其发送到您需要的“存储端”。
以日志数据为例,由于AnalyticDB MySQL支持原生JDBC方式访问,您可以通过开源logstash output插件logstash-output-jdbc将日志数据导入AnalyticDB MySQL中进行进一步分析。但经过测试发现,在日志量非常大的情况下,通过JDBC方式将数据写入AnalyticDB MySQL数仓版(3.0)的性能较低,并且非常消耗CPU的资源(JDBC是单条记录写入)。为此,AnalyticDB MySQL优化了一个基于JDBC的Logstash output plugin插件——logstash-ouput-analyticdb,专门用于以聚合方式向AnalyticDB MySQL中写入日志数据。
通过logstash-output-analyticdb将数据写入AnalyticDB MySQL时的性能,相较于logstash-output-jdbc有5倍提升,并且对CPU的消耗也明显降低。
安装
Logstash的安装流程请参见Installing Logstash。
使用方式
在config目录下创建一个logstash-analyticdb.conf
(名字可以自定义)配置文件,logstash-analyticdb.conf
文件的内容如下所示。
input
{
stdin { }
}
output {
analyticdb {
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://HOSTNAME:PORT/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
commit_size => 4194304
}
}
connection_string
:连接AnalyticDB MySQL的JDBC URL。statement
:INSERT SQL的声明数组。
更多参数配置:
max_flush_exceptions
:当写入数据出现异常时,设置最大重试次数,默认值100。skip_exception
:设置是否跳过异常,默认为FALSE,表示出现异常时将重试直到到达最大重试次数max_flush_exceptions
,如果仍然失败,则同步程序抛异常终止。设置为TRUE时,如果达到重试次数后仍是失败,则跳过异常,将异常写入日志。flush_size
:一次最多攒批数量,和commit_size
参数搭配使用。commit_size
:一次最多攒批数据量大小,和flush_size
参数搭配使用,达到限定值即提交写入任务。
上述配置文件只是一个示例,您需要根据实际业务配置logstash-analyticdb.conf
文件。与AnalyticDB MySQL相关的其他配置请参见README。更多logstash配置和使用规则,请参见logstash文档。
至此,配置任务已全部完成,接下来将启动任务。
启动任务
在logstash的安装目录执行bin/logstash -f config/logstash-analyticdb.conf
启动任务。
注意事项
建议您通过以下命令将logstash升级至最新版本后,再进行数据写入。
bin/logstash-plugin update logstash-output-analyticdb