借助阿里云在亚洲加速迈向成功
一站式安全合规咨询服务
MLPS 2.0 一站式合规解决方案
依托我们的网络进军中国市场
提升面向互联网应用的性能和安全性
保障您的中国业务安全无忧
通过强大的数据安全框架保护您的数据资产
申请 ICP 备案的流程解读和咨询服务
面向大数据建设、管理及应用的全域解决方案
企业内大数据建设、管理和应用的一站式解决方案
将您的采购和销售置于同一企业级全渠道数字平台上
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
快速搭建在线教育平台
提供域名注册、分析和保护服务
云原生 Kubernetes 容器化应用运行环境
以 Kubernetes 为使用界面的容器服务产品,提供符合容器规范的算力资源
安全的镜像托管服务,支持全生命周期管理
多集群环境下微服务应用流量统一管理
提供任意基础设施上容器集群的统一管控,助您轻松管控分布式云场景
高弹性、高可靠的企业级无服务器 Kubernetes 容器产品
敏捷安全的 Serverless 容器运行服务
为虚拟机和容器提供高可靠性、高性能、低时延的块存储服务
一款海量、安全、低成本、高可靠的云存储服务
可靠、弹性、高性能、多共享的文件存储服务
全托管、可扩展的并行文件系统服务。
全托管的 NoSQL 结构化数据实时存储服务
可抵扣多种存储产品的容量包,兼具灵活性和长期成本优化
让您的应用跨不同可用区资源自动分配访问量
随时绑定和解绑 VPC ECS
云网络公网、跨域流量统一计费
高性价比,可抵扣按流量计费的流量费用
创建云上隔离的网络,在专有环境中运行资源
在 VPC 环境下构建公网流量的出入口
具备网络状态可视化、故障智能诊断能力的自助式网络运维服务。
安全便捷的云上服务专属连接
基于阿里云专有网络的私有 DNS 解析服务
保障在线业务不受大流量 DDoS 攻击影响
系统运维和安全审计管控平台
业务上云的第一个网络安全基础设施
集零信任内网访问、办公数据保护、终端管理等多功能于一体的办公安全管控平台
提供7X24小时安全运维平台
防御常见 Web 攻击,缓解 HTTP 泛洪攻击
实现全站 HTTPS,呈现可信的 WEB 访问
为云上应用提供符合行业标准和密码算法等级的数据加解密、签名验签和数据认证能力
一款发现、分类和保护敏感数据的安全服务
创建、控制和管理您的加密密钥
快速提高应用高可用能力服务
围绕应用和微服务的 PaaS 平台
兼容主流开源微服务生态的一站式平台
多集群环境下微服务应用流量统一管理
Super MySQL 和 PostgreSQL,高度兼容 Oracle 语法
全托管 MySQL、PostgreSQL、SQL Server、MariaDB
兼容 Redis® 的缓存和KV数据库
兼容Apache Cassandra、Apache HBase、Elasticsearch、OpenTSDB 等多种开源接口
文档型数据库,支持副本集和分片架构
100%兼容 Apache HBase 并深度扩展,稳定、易用、低成本的NoSQL数据库。
低成本、高可用、可弹性伸缩的在线时序数据库服务
专为搜索和分析而设计,成本效益达到开源的两倍,采用最新的企业级AI搜索和AI助手功能。
一款兼容PostgreSQL协议的实时交互式分析产品
一种快速、完全托管的 TB/PB 级数据仓库
基于 Flink 为大数据行业提供解决方案
基于Qwen和其他热门模型的一站式生成式AI平台,可构建了解您业务的智能应用程
一站式机器学习平台,满足数据挖掘分析需求
高性能向量检索服务,提供低代码API和高成本效益
帮助您的应用快速构建高质量的个性化推荐服务能力
提供定制化的高品质机器翻译服务
全面的AI计算平台,满足大模型训练等高性能AI计算的算力和性能需求
具备智能会话能力的会话机器人
基于机器学习的智能图像搜索产品
基于阿里云深度学习技术,为用户提供图像分割、视频分割、文字识别等离线SDK能力,支持Android、iOS不同的适用终端。
语音识别、语音合成服务以及自学习平台
一站式智能搜索业务开发平台
助力金融企业快速搭建超低时延、高质量、稳定的行情数据服务
帮助企业快速测算和分析企业的碳排放和产品碳足迹
企业工作流程自动化,全面提高效率
金融级云原生分布式架构的一站式高可用应用研发、运维平台
eKYC 数字远程在线解决方案
可智能检测、大数据驱动的综合性反洗钱 (AML) 解决方案
阿里云APM类监控产品
实时云监控服务,确保应用及服务器平稳运行
为系统运维人员管理云基础架构提供全方位服务的云上自动化运维平台
面向您的云资源的风险检测服务
提升分布式环境下的诊断效率
日志类数据一站式服务,无需开发就能部署
ECS 预留实例
让弹性计算产品的成本和灵活性达到最佳平衡的付费方式。云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势自带IP上云
自带公网 IP 地址上云全球网络互联
端到端的软件定义网络解决方案,可推动跨国企业的业务发展全球应用加速
提升面向互联网应用的性能和安全性全球互联网接入
将IDC网关迁移到云端云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势金融科技云数据库解决方案
利用专为金融科技而设的云原生数据库解决方案游戏行业云数据库解决方案
提供多种成熟架构,解决所有数据问题Oracle 数据库迁移
将 Oracle 数据库顺利迁移到云原生数据库数据库迁移
加速迁移您的数据到阿里云阿里云上的数据湖
实时存储、管理和分析各种规模和类型的数据数码信贷
利用大数据和 AI 降低信贷和黑灰产风险面向企业数据技术的大数据咨询服务
帮助企业实现数据现代化并规划其数字化未来人工智能对话服务
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人EasyDispatch 现场服务管理
为现场服务调度提供实时AI决策支持在线教育
快速搭建在线教育平台窄带高清 (HD) 转码
带宽成本降低高达 30%广电级大型赛事直播
为全球观众实时直播大型赛事,视频播放流畅不卡顿直播电商
快速轻松地搭建一站式直播购物平台用于供应链规划的Alibaba Dchain
构建和管理敏捷、智能且经济高效的供应链云胸牌
针对赛事运营的创新型凭证数字服务数字门店中的云 POS 解决方案
将所有操作整合到一个云 POS 系统中元宇宙
元宇宙是下一代互联网人工智能 (AI) 加速
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速DevOps
快速、安全地最大限度提高您的DevOps优势数据迁移解决方案
加速迁移您的数据到阿里云企业 IT 治理
在阿里云上构建高效可控的云环境基于日志管理的AIOps
登录到带有智能化日志管理解决方案的 AIOps 环境备份与存档
数据备份、数据存档和灾难恢复用阿里云金融服务加快创新
在云端开展业务,提升客户满意度
为全球资本市场提供安全、准确和数字化的客户体验
利用专为金融科技而设的云原生数据库解决方案
利用大数据和 AI 降低信贷和黑灰产风险
建立快速、安全的全球外汇交易平台
新零售时代下,实现传统零售业转型
利用云服务处理流量波动问题,扩展业务运营、降低成本
快速轻松地搭建一站式直播购物平台
面向大数据建设、管理及应用的全域解决方案
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
以数字化媒体旅程为当今的媒体市场准备就绪您的内容
带宽成本降低高达 30%
快速轻松地搭建一站式直播购物平台
为全球观众实时直播大型赛事,视频播放流畅不卡顿
使用阿里云弹性高性能计算 E-HPC 将本地渲染农场连接到云端
构建发现服务,帮助客户找到最合适的内容
保护您的媒体存档安全
通过统一的数据驱动平台提供一致的全生命周期客户服务
在钉钉上打造一个多功能的电信和数字生活平台
在线存储、共享和管理照片与文件
提供全渠道的无缝客户体验
面向中小型企业,为独立软件供应商提供可靠的IT服务
打造最快途径,助力您的新云业务扬帆起航
先进的SD-WAN平台,可实现WAN连接、实时优化并降低WAN成本
通过自动化和流程标准化实现快速事件响应
针对关键网络安全威胁提供集中可见性并进行智能安全分析
提供大容量、可靠且高度安全的企业文件传输
用智能技术数字化体育赛事
基于人工智能的低成本体育广播服务
专业的广播转码及信号分配管理服务
基于云的音视频内容引入、编辑和分发服务
在虚拟场馆中模拟关键运营任务
针对赛事运营的创新型凭证数字服务
智能和交互式赛事指南
轻松管理云端背包单元的绑定直播流
通过数据加强您的营销工作
元宇宙是下一代互联网
利用生成式 AI 加速创新,创造新的业务佳绩
阿里云高性能开源大模型
借助AI轻松解锁和提炼文档中的知识
通过AI驱动的语音转文本服务获取洞察
探索阿里云人工智能和数据智能的所有功能、新优惠和最新产品
该体验中心提供广泛的用例和产品帮助文档,助您开始使用阿里云 AI 产品和浏览您的业务数据。
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速
元宇宙是下一代互联网
构建发现服务,帮助客户找到最合适的内容
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
加速迁移您的数据到阿里云
在阿里云上建立一个安全且易扩容的环境,助力高效率且高成本效益的上云旅程
迁移到完全托管的云数据库
将 Oracle 数据库顺利迁移到云原生数据库
自带公网 IP 地址上云
利用阿里云强大的安全工具集,保障业务安全、应用程序安全、数据安全、基础设施安全和帐户安全
保护、备份和还原您的云端数据资产
MLPS 2.0 一站式合规解决方案
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
实现对 CloudOps、DevOps、SecOps、AIOps 和 FinOps 的高效、安全和透明的管理
构建您的原生云环境并高效管理集群
快速、安全地最大限度提高您的DevOps优势
实施细粒度安全控制
提供运维效率和总体系统安全性
实时分析您的云消耗并实现节约
实时存储、管理和分析各种规模和类型的数据
登录到带有智能化日志管理解决方案的 AIOps 环境
帮助企业实现数据现代化并规划其数字化未来
帮助零售商快速规划数字化之旅
将全球知名的 CRM 平台引入中国
在线存储、共享和管理照片与文件
构建、部署和管理高可用、高可靠、高弹性的应用程序
快速、安全地最大限度提高您的DevOps优势
将您的采购和销售置于同一企业级全渠道数字平台上
企业内大数据建设、管理和应用的一站式解决方案
帮助企业简化 IT 架构、实现商业价值、加速数字化转型的步伐
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
快速搜集、处理、分析联网设备产生的数据
0.0.201
如果您在使用Flink、Spark、Storm等大数据计算引擎时,需要将日志进行压缩、批量上传日志到日志服务、减少网络传输资源的占用,API或者SDK往往无法满足大数据场景对数据写入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地将数据上传到日志服务。
您已完成以下操作:
已安装日志服务Java SDK。具体操作,请参见安装Java SDK。
Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。Aliyun LOG Java Producer使用阿里云日志服务提供的顺序写入功能来保证日志的上传顺序。
日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application。
线程安全:Producer接口暴露的所有方法都是线程安全的。
异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。
自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。
行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。
上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。
优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。
producer对比原始的API或SDK的优势如下:
高性能
在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。
异步非阻塞
在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。随后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。
资源可控制
可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。
定位问题简单
如果日志数据发送失败,Producer除了返回状态码,还会返回一个String类型的异常信息,用于描述失败的原因和详细信息。例如,如果发送失败是因为网络连接超时,则返回的异常信息可能是“连接超时”;如果发送失败是因为服务器无响应,则返回的异常信息可能是“服务器无响应”。
使用SDK产生的费用和使用控制台产生的费用一致。更多信息,请参见计费概述。
在Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在<dependencies>中加入如下内容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log-producer</artifactId>
<version>0.3.22</version>
</dependency>
添加更新完后,如果提示Producer依赖的版本冲突,在<dependencies>中加入如下内容:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.114</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
ProducerConfig用于配置发送策略,您可以根据不同的业务场景为参数指定不同的值,各参数含义如下表所示:
Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);
参数 | 类型 | 描述 |
参数 | 类型 | 描述 |
totalSizeInBytes | 整型 | 单个Producer实例能缓存的日志大小上限,默认为 100MB。 |
maxBlockMs | 整型 | 如果Producer可用空间不足,调用者在send方法上的最大阻塞时间,默认为60秒。 如果超过这个时间后所需空间仍无法得到满足,send方法会抛出TimeoutException。 如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。 如果您希望send方法一直阻塞直到所需空间得到满足,可将该值设为负数。 |
ioThreadCount | 整型 | 执行日志发送任务的线程池大小,默认为可用处理器个数。 |
batchSizeThresholdInBytes | 整型 | 当一个ProducerBatch中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该batch将被发送,默认为512KB,最大可设置成 5MB。 |
batchCountThreshold | 整型 | 当一个ProducerBatch中缓存的日志条数大于等于 batchCountThreshold时,该batch将被发送,默认4096,最大可设置成40960。 |
lingerMs | 整型 | 一个ProducerBatch从创建到可发送的逗留时间,默认为2秒,最小可设置成100毫秒。 |
retries | 整型 | 如果某个ProducerBatch首次发送失败,能够对其重试的次数,默认为10次。 如果retries小于等于 0,该ProducerBatch首次发送失败后将直接进入失败队列。 |
maxReservedAttempts | 整型 | 每个ProducerBatch每次被尝试发送都对应着一个Attempt,此参数用来控制返回给用户的attempt个数,默认只保留最近的11次attempt信息。 该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 |
baseRetryBackoffMs | 整型 | 首次重试的退避时间,默认为100毫秒。 Producer采样指数退避算法,第N次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 |
maxRetryBackoffMs | 整型 | 重试的最大退避时间,默认为50秒。 |
adjustShardHash | 布尔 | 如果调用send方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为true。 |
buckets | 整型 | 当且仅当adjustShardHash为true时,该参数才生效。此时,producer会自动将shardHash重新分组,分组数量为buckets。 如果两条数据的shardHash不同,它们是无法合并到一起发送的,会降低producer吞吐量。将shardHash重新分组后,能让数据有更多地机会被批量发送。 该参数的取值范围是 [1, 256],且必须是2的整数次幂,默认为64。 |
Producer 支持用户配置AK或STS token。如果使用STS token,需要定期创建新的ProjectConfig然后将其添加到ProjectConfigs里。
LogProducer是接口Producer的实现类,它接收唯一的参数producerConfig。当您准备好producerConfig后,可以按照下列方式创建producer实例。
Producer producer = new LogProducer(producerConfig);
创建producer的同时会创建一系列线程,这是一个相对昂贵的操作,因此建议一个应用共用一个producer实例。一个producer实例包含的线程如下表所示,其中N为该producer实例在当前进程中的编号,从 0 开始。另外,LogProducer提供的所有方法都是线程安全的,可以在多线程环境下安全执行。
线程名格式 | 数量 | 描述 |
线程名格式 | 数量 | 描述 |
aliyun-log-producer-<N>-mover | 1 | 负责将满足发送条件的batch投递到发送线程池里。 |
aliyun-log-producer-<N>-io-thread | ioThreadCount | IOThreadPool中真正用于执行数据发送任务的线程。 |
aliyun-log-producer-<N>-success-batch-handler | 1 | 用于处理发送成功的batch。 |
aliyun-log-producer-<N>-failure-batch-handler | 1 | 用于处理发送失败的batch。 |
ProjectConfig包含目标Project的服务入口信息以及表征调用者身份的访问凭证。每个日志项目对应一个ProjectConfig对象。
可以按照如下方式创建实例。
ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);
在使用Aliyun Log Java Producer发送日志数据时,需要指定一个回调函数来处理发送过程中的各种情况。当日志数据发送成功时,回调函数会被调用,并返回一个发送结果;当日志数据发送失败时,回调函数也会被调用,并传入一个异常对象。
如果获取结果后,应用的处理逻辑比较简单且不会造成producer阻塞,建议直接使用callback。否则,建议使用ListenableFuture,在单独的线程(池)中执行后续业务
方法的各个参数含义如下:
参数 | 描述 |
参数 | 描述 |
project | 待发送数据的目标 project。 |
logstore | 待发送数据的目标 logStore。 |
logTem | 待发送数据。 |
completed | Java提供的一个原子类型,用来确保所有日志发送完成(成功或者失败)。 |
Producer接口提供多种发送方法,方法的各个参数含义如下。
参数 | 描述 | 是否必选 |
参数 | 描述 | 是否必选 |
project | 目标Project。 | 是 |
logStore | 目标LogStore。 | 是 |
logItem | 要发送的日志/日志列表。 | 是 |
topic | 日志主题 | 否 说明 如果留空或没有指定,该字段将被赋予""。 |
source | 发送源。 | 否 说明 如果留空或没有指定,该字段将被赋予producer所在宿主机的 IP。 |
shardHash | 可为发送的日志设置自定义哈希,服务端将根据此哈希选择对应的日志库Shard分片写入日志。 | 否 说明 如果留空或没有指定,数据将被随机写入目标LogStore的某个shard中。 |
callback | 可设置一个回调函数。该回调函数将在日志被成功发送或者重试多次失败后被丢弃时调用。 | 否 |
异常 | 说明 |
异常 | 说明 |
TimeoutException | 当Producer缓存的日志大小超过设定的内存上限时,且阻塞maxBlockMs毫秒后仍未获取到足够内存时,将抛出TimeoutException。 maxBlockMs 为-1时,阻塞没有时间上限,将永远不会抛出 TimeoutException。 |
IllegalStateException | 当Producer已经处于关闭状态(调用过close方法)时,再调用send 方法,会抛出IllegalStateException。 |
由于producer提供的所有发送方法都是异步的,需要通过返回的future或者传入的callback获取发送结果。
Send 方法会返回一个ListenableFuture,它除了可以像普通future那样通过调用get方法阻塞获得发送结果外,还允许你注册回调方法(回调方法会在完成 future 设置后被调用)。以下代码片段展示了ListenableFuture的使用方法,用户需要为该future注册一个FutureCallback并将其投递到应用提供的线程池EXECUTOR_SERVICE中执行,完整样例请参见SampleProducerWithFuture.java。
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully send, or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
//The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// 只有进程退出的时候,才需要考虑如下的逻辑。
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
Callback由producer内部线程负责执行,并且只有在执行完毕后数据“占用”的空间才会释放。为了不阻塞producer造成整体吞吐量的下降,要避免在callback里执行耗时的操作。另外,在callback中调用send方法进行重试也是不建议的,您可以在ListenableFuture的callback中进行重试。完整样例请参见SampleProducerWithCallback.java。
import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class SampleProducerWithCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
final String project = "example-project";
final String logstore = "example-logstore";
String endpoint = "example-endpoint";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
ProducerConfig producerConfig = new ProducerConfig();
final Producer producer = new LogProducer(producerConfig);
producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
int nTask = 100;
// The number of logs that have finished (either successfully send, or failed).
final AtomicLong completed = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(nTask);
for (int i = 0; i < nTask; ++i) {
threadPool.submit(
new Runnable() {
@Override
public void run() {
//The maximum size of a LogItem (key) is 128 bytes. The maximum size of a LogItem (value) is 1 MB.
LogItem logItem = new LogItem();
logItem.PushBack("key1", "foo");
logItem.PushBack("key2", "bar");
try {
producer.send(
project,
logstore,
"your-topic",
"your-source",
logItem,
new SampleCallback(project, logstore, logItem, completed));
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted during send logs.");
} catch (Exception e) {
LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
} finally {
latch.countDown();
}
}
});
}
// 只有进程退出的时候,才需要考虑如下的逻辑。
latch.await();
threadPool.shutdown();
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("The current thread has been interrupted from close.");
} catch (ProducerException e) {
LOGGER.info("Failed to close producer, e=", e);
}
LOGGER.info("All log complete, completed={}", completed.get());
}
private static final class SampleCallback implements Callback {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
private final String project;
private final String logStore;
private final LogItem logItem;
private final AtomicLong completed;
SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
this.project = project;
this.logStore = logStore;
this.logItem = logItem;
this.completed = completed;
}
@Override
public void onCompletion(Result result) {
try {
if (result.isSuccessful()) {
LOGGER.info("Send log successfully.");
} else {
LOGGER.error(
"Failed to send log, project={}, logStore={}, logItem={}, result={}",
project,
logStore,
logItem.ToJsonString(),
result);
}
} finally {
completed.getAndIncrement();
}
}
}
}
当您已经没有数据需要发送或者当前进程准备退出时,需要关闭Producer,目的是让Producer中缓存的数据全部被处理。目前,Producer提供安全关闭和有限关闭两种模式。
在大多数情况下,建议您使用安全关闭。安全关闭对应的方法是close()
,它会等到Producer中缓存的数据全部被处理、线程全部停止、注册的callback全部执行,返回future全部被设置后才会返回。
虽然要等到数据全部处理完成,但Producer被关闭后,缓存的batch会被立刻处理且不会被重试。因此,如果callback不被阻塞,close方法往往能在很短的时间内返回。
如果您的callback在执行过程中有可能阻塞,但您又希望close方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs)
,如果超过指定的timeoutMs后Producer仍未完全关闭,它会抛出IllegalStateException异常,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的Callback也可能不会被执行。
日志服务读写数据的次数和大小均存在限制。更多信息,请参见数据读写。
日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源。
如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。
检查您项目中引入的aliyun-log-producer
、aliyun-log
、protobuf-java
Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。
Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。
如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用producer.close()
方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()
方法。
Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。
如果通过上述步骤仍然没有解决,请提工单。
在调用API接口过程中,若服务端返回结果中包含错误信息,则表示调用API接口失败。您可以参考API错误码对照表查找对应的解决方法。更多信息,请参见API错误处理对照表。
日志服务除自研的SDK外,还支持公共的阿里云SDK,关于阿里云SDK的使用方式,请参见日志服务_SDK中心-阿里云OpenAPI开发者门户。
为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI。
更多示例代码,请参见Aliyun Log Java SDK on GitHub。