阿里云助力您在中国加快取得成功
一站式安全合规咨询服务
MLPS 2.0 一站式合规解决方案
依托我们的网络进军中国市场
提升面向互联网应用的性能和安全性
保障您的中国业务安全无忧
通过强大的数据安全框架保护您的数据资产
申请 ICP 备案的流程解读和咨询服务
面向大数据建设、管理及应用的全域解决方案
企业内大数据建设、管理和应用的一站式解决方案
将您的采购和销售置于同一企业级全渠道数字平台上
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
快速搭建在线教育平台
提供域名注册、分析和保护服务
即时部署可伸缩的虚拟云服务器
完全托管、本地部署的阿里云基础架构和服务。
阿里云开放给企业应用服务商和其客户的服务管理PaaS平台
助您构建公共云上的专有资源池
富弹性可伸缩的高性能计算服务
易用、安全、高效的云上桌面服务
高效运维、稳定安全和操作流畅的云端虚拟手机
基于GPU的弹性计算服务
可快速搭建应用且易于管理的轻量级云服务器
根据用户的业务需求和策略,自动调整其弹性计算资源的管理服务
为虚拟机和容器提供高可靠性、高性能、低时延的块存储服务
一款海量、安全、低成本、高可靠的云存储服务
可靠、弹性、高性能、多共享的文件存储服务
全托管、可扩展的并行文件系统服务。
全托管的 NoSQL 结构化数据实时存储服务
可抵扣多种存储产品的容量包,兼具灵活性和长期成本优化
让您的应用跨不同可用区资源自动分配访问量
随时绑定和解绑 VPC ECS
云网络公网、跨域流量统一计费
高性价比,可抵扣按流量计费的流量费用
创建云上隔离的网络,在专有环境中运行资源
在 VPC 环境下构建公网流量的出入口
具备网络状态可视化、故障智能诊断能力的自助式网络运维服务。
安全便捷的云上服务专属连接
基于阿里云专有网络的私有 DNS 解析服务
保障在线业务不受大流量 DDoS 攻击影响
系统运维和安全审计管控平台
业务上云的第一个网络安全基础设施
提供7X24小时安全运维平台
防御常见 Web 攻击,缓解 HTTP 泛洪攻击
实现全站 HTTPS,呈现可信的 WEB 访问
为云上应用提供符合行业标准和密码算法等级的数据加解密、签名验签和数据认证能力
一款发现、分类和保护敏感数据的安全服务
创建、控制和管理您的加密密钥
快速提高应用高可用能力服务
围绕应用和微服务的 PaaS 平台
兼容主流开源微服务生态的一站式平台
多集群环境下微服务应用流量统一管理
企业级全托管实时数据流平台
全托管,开箱即用的Apache Kafka全托管服务
提供物联网移动端和云交互的消息队列
开箱即用的全托管 RabbitMQ 服务
提供基于消息的可靠异步通信机制
应用之间的消息队列和通知
无服务器事件总线服务
Super MySQL 和 PostgreSQL,高度兼容 Oracle 语法
全托管 MySQL、PostgreSQL、SQL Server、MariaDB
兼容 Redis® 的缓存和KV数据库
兼容Apache Cassandra、Apache HBase、Elasticsearch、OpenTSDB 等多种开源接口
文档型数据库,支持副本集和分片架构
100%兼容 Apache HBase 并深度扩展,稳定、易用、低成本的NoSQL数据库。
低成本、高可用、可弹性伸缩的在线时序数据库服务
专为搜索和分析而设计,成本效益达到开源的两倍,采用最新的企业级AI搜索和AI助手功能。
一款兼容PostgreSQL协议的实时交互式分析产品
一种快速、完全托管的 TB/PB 级数据仓库
基于 Flink 为大数据行业提供解决方案
基于Qwen和其他热门模型的一站式生成式AI平台,可构建了解您业务的智能应用程
一站式机器学习平台,满足数据挖掘分析需求
高性能向量检索服务,提供低代码API和高成本效益
帮助您的应用快速构建高质量的个性化推荐服务能力
提供定制化的高品质机器翻译服务
全面的AI计算平台,满足大模型训练等高性能AI计算的算力和性能需求
具备智能会话能力的会话机器人
基于机器学习的智能图像搜索产品
基于阿里云深度学习技术,为用户提供图像分割、视频分割、文字识别等离线SDK能力,支持Android、iOS不同的适用终端。
语音识别、语音合成服务以及自学习平台
一站式智能搜索业务开发平台
助力金融企业快速搭建超低时延、高质量、稳定的行情数据服务
帮助企业快速测算和分析企业的碳排放和产品碳足迹
企业工作流程自动化,全面提高效率
金融级云原生分布式架构的一站式高可用应用研发、运维平台
eKYC 数字远程在线解决方案
可智能检测、大数据驱动的综合性反洗钱 (AML) 解决方案
强大、灵活、安全的全网域名智能解析服务
亚洲 No.1 的域名注册商,注册量超 2000 万
帮助企业高效实现GoChina备案合规
域名注册者、到期日、所属注册商等信息查询
阿里云APM类监控产品
实时云监控服务,确保应用及服务器平稳运行
为系统运维人员管理云基础架构提供全方位服务的云上自动化运维平台
面向您的云资源的风险检测服务
提升分布式环境下的诊断效率
日志类数据一站式服务,无需开发就能部署
ECS 预留实例
让弹性计算产品的成本和灵活性达到最佳平衡的付费方式。云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势自带IP上云
自带公网 IP 地址上云全球网络互联
端到端的软件定义网络解决方案,可推动跨国企业的业务发展全球应用加速
提升面向互联网应用的性能和安全性全球互联网接入
将IDC网关迁移到云端云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势金融科技云数据库解决方案
利用专为金融科技而设的云原生数据库解决方案游戏行业云数据库解决方案
提供多种成熟架构,解决所有数据问题Oracle 数据库迁移
将 Oracle 数据库顺利迁移到云原生数据库数据库迁移
加速迁移您的数据到阿里云阿里云上的数据湖
实时存储、管理和分析各种规模和类型的数据数码信贷
利用大数据和 AI 降低信贷和黑灰产风险面向企业数据技术的大数据咨询服务
帮助企业实现数据现代化并规划其数字化未来人工智能对话服务
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人EasyDispatch 现场服务管理
为现场服务调度提供实时AI决策支持在线教育
快速搭建在线教育平台窄带高清 (HD) 转码
带宽成本降低高达 30%广电级大型赛事直播
为全球观众实时直播大型赛事,视频播放流畅不卡顿直播电商
快速轻松地搭建一站式直播购物平台用于供应链规划的Alibaba Dchain
构建和管理敏捷、智能且经济高效的供应链云胸牌
针对赛事运营的创新型凭证数字服务数字门店中的云 POS 解决方案
将所有操作整合到一个云 POS 系统中元宇宙
元宇宙是下一代互联网人工智能 (AI) 加速
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速DevOps
快速、安全地最大限度提高您的DevOps优势数据迁移解决方案
加速迁移您的数据到阿里云企业 IT 治理
在阿里云上构建高效可控的云环境基于日志管理的AIOps
登录到带有智能化日志管理解决方案的 AIOps 环境备份与存档
数据备份、数据存档和灾难恢复用阿里云金融服务加快创新
在云端开展业务,提升客户满意度
为全球资本市场提供安全、准确和数字化的客户体验
利用专为金融科技而设的云原生数据库解决方案
利用大数据和 AI 降低信贷和黑灰产风险
建立快速、安全的全球外汇交易平台
新零售时代下,实现传统零售业转型
利用云服务处理流量波动问题,扩展业务运营、降低成本
快速轻松地搭建一站式直播购物平台
面向大数据建设、管理及应用的全域解决方案
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
利用自动化、数字化、智能化的物流解决方案,平衡成本和用户体验
构建和管理敏捷、智能且经济高效的供应链
为现场服务调度提供实时AI决策支持
以数字化媒体旅程为当今的媒体市场准备就绪您的内容
带宽成本降低高达 30%
快速轻松地搭建一站式直播购物平台
为全球观众实时直播大型赛事,视频播放流畅不卡顿
使用阿里云弹性高性能计算 E-HPC 将本地渲染农场连接到云端
构建发现服务,帮助客户找到最合适的内容
保护您的媒体存档安全
通过统一的数据驱动平台提供一致的全生命周期客户服务
在钉钉上打造一个多功能的电信和数字生活平台
在线存储、共享和管理照片与文件
提供全渠道的无缝客户体验
面向中小型企业,为独立软件供应商提供可靠的IT服务
打造最快途径,助力您的新云业务扬帆起航
先进的SD-WAN平台,可实现WAN连接、实时优化并降低WAN成本
通过自动化和流程标准化实现快速事件响应
针对关键网络安全威胁提供集中可见性并进行智能安全分析
提供大容量、可靠且高度安全的企业文件传输
用智能技术数字化体育赛事
基于人工智能的低成本体育广播服务
专业的广播转码及信号分配管理服务
基于云的音视频内容引入、编辑和分发服务
在虚拟场馆中模拟关键运营任务
针对赛事运营的创新型凭证数字服务
智能和交互式赛事指南
轻松管理云端背包单元的绑定直播流
通过数据加强您的营销工作
元宇宙是下一代互联网
加速迁移您的数据到阿里云
在阿里云上建立一个安全且易扩容的环境,助力高效率且高成本效益的上云旅程
迁移到完全托管的云数据库
将 Oracle 数据库顺利迁移到云原生数据库
自带公网 IP 地址上云
利用阿里云强大的安全工具集,保障业务安全、应用程序安全、数据安全、基础设施安全和帐户安全
保护、备份和还原您的云端数据资产
MLPS 2.0 一站式合规解决方案
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
实现对 CloudOps、DevOps、SecOps、AIOps 和 FinOps 的高效、安全和透明的管理
构建您的原生云环境并高效管理集群
快速、安全地最大限度提高您的DevOps优势
实施细粒度安全控制
提供运维效率和总体系统安全性
实时分析您的云消耗并实现节约
利用生成式 AI 加速创新,创造新的业务佳绩
阿里云高性能开源大模型
探索阿里云人工智能和数据智能的所有功能、新优惠和最新产品
该体验中心提供广泛的用例和产品帮助文档,助您开始使用阿里云 AI 产品和浏览您的业务数据。
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速
元宇宙是下一代互联网
构建发现服务,帮助客户找到最合适的内容
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
实时存储、管理和分析各种规模和类型的数据
登录到带有智能化日志管理解决方案的 AIOps 环境
帮助企业实现数据现代化并规划其数字化未来
帮助零售商快速规划数字化之旅
将全球知名的 CRM 平台引入中国
在线存储、共享和管理照片与文件
构建、部署和管理高可用、高可靠、高弹性的应用程序
快速、安全地最大限度提高您的DevOps优势
将您的采购和销售置于同一企业级全渠道数字平台上
企业内大数据建设、管理和应用的一站式解决方案
帮助企业简化 IT 架构、实现商业价值、加速数字化转型的步伐
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
快速搜集、处理、分析联网设备产生的数据
0.0.201
为满足业务对数据仓库中高度时效性数据的需求,MaxCompute基于Delta Table实现了分钟级近实时数据写入和主键更新功能,显著提升了数据仓库的数据更新效率。
面对具有突发性和热点性的客户行为日志,如评论、评分和点赞,传统的关系型数据库和离线数据分析方法在处理这类数据时可能存在资源消耗大、成本高、数据延迟以及更新复杂的问题,通常只能满足次日分析需求。
针对上述问题,您可以采用近实时数仓数据入仓方案,可以在分钟级别内实现数据增量同步到Delta Table,从而将数据写入到查询的延迟控制在5~10分钟,极大地提高了数据分析的时效性。如果您的生产任务是将数据同步至MaxCompute ODS(Operational Data Store)层的普通表,为避免生产任务改造的风险,您可以使用Delta Table的Upsert功能,它能有效将数据同步至Delta Table,同时防止数据重复存储,并提高存储效率和降低存储成本。
本文以第三方引擎Flink为例,介绍了Flink集成MaxCompute Flink Connector进行近实时写入数据至Delta Table的主要流程。
介绍如下:
序号 | 说明 |
序号 | 说明 |
【1】 | 支持按照数据的Primary Key列进行分组并发写入。 若您的并发写入的分区较多,且每个分区数据分布均匀,同时表的Bucket数量较少(如个位数),那么您也可以根据Partition列进行分组写入,有助于提高写入吞吐量。 |
【2】 | UpsertWriterTask收到数据后,会解析数据所属分区并向UpsertOperatorCoordinator发起请求,然后创建分区实时写入的Upsert Session。 |
【3】 | UpsertOperatorCoordinator向UpsertWriterTask返回已创建的Upsert session。 |
【4】 | UpsertWriterTask根据Upsert Session创建Upsert Writer,并连接MaxCompute的数据传输通道服务Tunnel Server,将数据持续写入。 在数据传输过程中,若启用了文件缓存,数据将会先进入Flink本地磁盘的缓存区,直到数据文件大小达到特定阈值或Checkpoint流程启动后,才将数据传输至Tunnel Server。 |
【5】 | Checkpoint流程启动后,Upsert Writer将数据全量提交至Tunnel Server,再向UpsertOperatorCoordinator发起请求,触发Commit操作,成功后数据可见。 |
【6】 | 若开启自动Major Compact,当分区Commit次数超过特定阈值时,由UpsertOperatorCoordinator向Storage Service发起Major compact操作。 说明 根据表数据量大小,此操作可能会对实时数据导入造成延时,因此需要谨慎使用。 |
将Flink数据写入至MaxCompute Delta Table的操作,详情请参见使用Flink写入数据到Delta Table。
您可以通过调整Upsert实时写入场景的配置参数来提高系统吞吐量和性能,并确保稳定性,以满足不同的业务需求。Upsert写入参数详情,请参见Upsert写入参数。
表Bucket数量可影响同时写入的最大并发数,在一定程度上决定了最大写入吞吐,推荐按照1 M/s * 表Bucket数量来计算总吞吐。
实际能达到的吞吐量与Sink节点并发等参数相关。详情请参见表格式和数据治理。
sink.parallelism:数据写入的Sink节点并发数,强烈建议表Bucket数量是该配置值的整数倍,可达到较好的性能效果。当sink.parallelism参数值与表Bucket数量一致时,理论上可以实现最佳性能。
如果设置了sink parallelism参数以增加写入并发,但发现吞吐量并未提升,可能的问题在于Sink节点的上游数据处理链路效率低下,建议您可优化数据处理链路来提高整体性能。
若表Bucket的数量是sink.parallelism的整数倍,那单个Sink节点写入的Bucket数量 = 表Bucket数量 ÷ sink.parallelism,若Bucket值过大,也会影响性能。建议您优先调整表Bucket数量和sink.parallelism参数值。若upsert.writer.buffer-size ÷ 单节点Bucket数量低于特定阈值(如128 K)时,可能会导致网络传输效率降低。为改善网络性能,建议考虑增大upsert.writer.buffer-size。
upsert.flush.concurrent参数:默认值为2,表示可并发flush的Bucket数。为了优化吞吐量,可以适当增加该值以观察性能提升。
需要注意的是,如果此值设置得过大,可能会导致过多的Bucket同时发送,从而引起网络拥堵,反而会使整体吞吐量下降。因此,在调整这个参数时需要谨慎,找到一个平衡点以确保系统的稳定和高效运行。
在此场景下,您可以参考通用关键参数配置和非分区表参数配置建议。同时,您还可以参考以下内容。
单个Sink节点在写入数据时涉及多个分区的操作,同时在Checkpoint阶段,每个分区需要独立进行Commit操作,这些特性可能会对整体的写入吞吐量产生影响。
单个Sink节点Buffer数据的最大内存=upsert.writer.buffer-size * 分区数,因此如果发生内存溢出(OOM),建议调整upsert.writer.buffer-size参数,减小其值以防止内存超出限制。
增加upsert.commit.thread-num参数值,可减少checkpoint阶段Commit的耗时。此参数默认值为16,意味着有16个线程并发处理分区进行Commit操作。
尽管可以适当增加这个数值以提高性能,但要注意不应超过32,以防止过度并发可能导致的问题。
在此场景下,您可以参考少量分区并发写入参数配置建议。同时,您还可以参考以下内容:
每个分区的数据都会首先缓存在本地文件中,然后在Checkpoint阶段并发写入MaxCompute中。
sink.file-cached.writer.num参数默认值为16,增加该参数值(不建议超过32),可增加单个Sink节点并发写入的分区数量。建议并发写入的Bucket数量建议等于sink.file-cached.writer.num * upsert.flush.concurrent。但需注意此值不应设置得过大,以防止引发网络拥堵问题,从而导致整体吞吐量下降。
FileCached模式写入参数详情,请参见FileCached模式写入参数。
如果参考以上参数建议都无法达到吞吐要求,或者吞吐不稳定,需考虑以下因素:
每个项目空间可免费使用的公共数据传输服务资源组是有限的,达到上限后,会Block数据写入,从而导致整体吞吐下降。如果数据写入吞吐较大,同时对延时要求比较高,建议购买独享数据传输服务资源组,确保资源供给。
Connector的上游数据处理链路效率低下,导致整体吞吐率不高。建议您优化数据处理链路,以提高整体性能。
问题一:
问题现象:提示出现报错信息“Checkpoint xxx expired before completing”。
问题原因:Checkpoint流程超时,通常由于Checkpoint过程中写入的分区数过多。
解决措施:
建议调整Flink Checkpoint时间,增加其时间间隔。
配置sink.file-cached.enable参数,开启文件缓存模式。详情请参见附录:新版Flink Connector全量参数。
问题二:
问题现象:提示出现报错信息“org.apache.flink.util.FlinkException: An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. ”。
问题原因:通常由于JobManager与TaskManager通信异常导致,任务会自动发起重试。
解决措施:建议提升任务资源来确保任务稳定性。
问题一:
问题现象:TIMESTAMP类型的数据在写入MaxCompute后,时间偏移了8小时。
问题原因:Flink中的TIMESTAMP类型不包含时区信息,且在MaxCompute写入过程中也不会进行时区转换,因此数据会被视为零时区数据。然而,MaxCompute在读取这些数据时,会根据项目的时区设定对数据进行转换。
解决措施:使用TIMESTAMP_LTZ类型替换MaxCompute Sink Table中的TIMESTAMP类型。
问题一:
问题现象:数据写入时出现Tengine相关报错,报错信息内容如下。
<body>
<h1>An error occurred.</h1>
<p>Sorry, the page you are looking for is currently unavailable.<br/>
Please try again later.</p>
<p>If you are the system administrator of this resource then you should check
the <a href="http://nginx.org/r/error_log">error log</a> for details.</p>
<p><em>Faithfully yours, tengine.</em></p>
</body>
</html>
问题原因:远程Tunnel服务暂时不可用。
解决措施:等待Tunnel服务恢复后任务可以自动重试成功。
问题二:
问题现象:提示出现报错信息“java.io.IOException: RequestId=xxxxxx, ErrorCode=SlotExceeded, ErrorMessage=Your slot quota is exceeded.”。
问题原因:写入Quota超出限制,需要降低写入并发,或者增加独享Tunnel并发数。
解决措施:
降低写入并发,以减少对系统资源的占用。
增加独享Tunnel并发数,通过提升处理能力来适应更高的数据写入需求。购买独享资源详情,请参见购买与使用独享数据传输服务资源组。