全部产品
Search
文档中心

检索分析服务Elasticsearch版:logstash-input-oss插件使用说明

更新时间:Sep 14, 2024

logstash-input-oss插件基于阿里云轻量消息队列(简称SMQ,原MNS),实现了当关联的对象存储服务OSS(Object Storage Service)文件变化时,触发SMQ通知阿里云Logstash从OSS文件系统中获取最新的数据。您可以在OSS的事件通知区域,配置当文件发生变化时,自动发送消息给SMQ。

说明

logstash-input-oss是一款开源插件,详情请参见logstash-input-oss

注意事项

  • 当logstash-input-oss插件接收到SMQ通知消息后,会自动提取通知消息中的oss object(PutObject,AppendObject等)进行全量同步。

  • 如果OSS存储的是.gz或.gzip结尾的文本文件,阿里云Logstash会以.gzip的文件格式对其进行处理,其他格式的文件以文本文件进行处理。

  • 文件是以文本文件的方式读取的,如果您的文件是不可解析的格式(例如.jar、.bin等格式),有可能读取出来是乱码。

前提条件

您已完成以下操作:

使用logstash-input-oss插件

参见通过配置文件管理管道创建管道任务,在创建管道任务时,需要按照以下说明配置管道参数。配置完成后进行保存与部署,即可触发阿里云Logstash从OSS中获取数据。

以从OSS中获取数据,然后写入到阿里云Elasticsearch为例,配置示例如下。

input {
 oss {
  endpoint => "oss-cn-hangzhou-internal.aliyuncs.com"
  bucket => "zl-ossou****"
  access_key_id => "******"
  access_key_secret => "*********"
  prefix => "file-sample-prefix"
  mns_settings => {
   endpoint => "******.mns.cn-hangzhou-internal.aliyuncs.com"
   queue => "aliyun-es-sample-mns"
  }
  codec => json {
   charset => "UTF-8"
  }
 }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
    index => "aliyun-es-sample"
    user => "elastic"
    password => "changeme"
  }
}
重要

Endpoint不能以HTTP为前缀,并且需要internal域名,否则会报错。

参数说明

logstash-input-oss插件支持的参数如下。

参数

类型

是否必选

说明

endpoint

string

OSS对外服务的访问域名,获取方式请参见访问域名和数据中心

bucket

string

OSS的Bucket名称。

access_key_id

string

阿里云账号的AccessKey ID。

access_key_secret

string

阿里云账号的Access Key Secret。

prefix

string

如果指定了该参数,则Bucket中目录或文件名的前缀必须与之匹配(不是正则表达式)。通过配置该参数,您可以读取指定Bucket下的某一个或者几个目录。

additional_oss_settings

hash

附加的OSS客户端配置。可选值

  • secure_connection_enabled:是否启用安全连接。

  • max_connections_to_oss:OSS的最大连接数。

delete

boolean

是否从原始Bucket中删除已处理的文件:

  • true:是

  • false(默认):否

backup_to_bucket

string

用来备份已处理过的文件的OSS Bucket名称。

backup_to_dir

string

用来备份已经处理过的文件的本地目录路径。

backup_add_prefix

string

文件处理后,为key(OSS中包含文件名的完整路径)附加一个前缀。当您将数据备份到另一个(或同一个)Bucket时,这个参数将有效地让您选择一个新的文件夹来放置文件。

include_object_properties

boolean

是否在[@metadata][oss]中包含OSS对象的属性(last_modifiedcontent_typemetadata):

  • true:是

  • false:否

如果不设置此参数,[@metadata][oss][key]将始终存在。

exclude_pattern

string

要从Bucket中排除的key的ruby正则表达式。

mns_settings

hash

轻量消息队列配置,可选值:

  • endpoint:SMQ端口链接。不能以HTTP为前缀,并且需要internal域名,否则会报错。

  • queue:队列名。

  • poll_interval_seconds:当队列中没有消息时,针对该队列的ReceiveMessage请求最长的等待时间,默认为10秒。

  • wait_seconds:本次ReceiveMessage请求最长的Polling等待时间,单位为秒。

ReceiveMessage的详细信息请参见ReceiveMessage

常见问题

  • Q:为什么基于SMQ设计logstash-input-oss插件?

    A:因为OSS文件的变更需要有一种机制通知客户端,而目前OSS文件事件变更可以无缝的写入到SMQ中。

  • Q:为什么不使用OSS的ListObjects API获取变更的文件?

    A:OSS在记录未处理的文件及已经处理的文件时会增加本地存储,当本地存储较大时,ListObjects API性能会降低。目前其他文件存储系统,如S3开源社区,也将ListObjects API改为了消息通知机制。

  • Q:客户端已经触发了阿里云Logstash从OSS中获取数据,但是OSS还在写入文件, logstash-input-oss会怎么处理, 会不会导致部分数据丢失?

    A:OSS还在写入文件时,触发了阿里云Logstash从OSS中获取数据,此时已经写入OSS中的数据会记录到SMQ的队列中,并通过Logstash管道传输到阿里云Elasticsearch中。还未写入到OSS的这部分数据会继续写入到OSS中,等待下次事件触发后再进行写入。