借助阿里云在亚洲加速迈向成功
一站式安全合规咨询服务
MLPS 2.0 一站式合规解决方案
依托我们的网络进军中国市场
提升面向互联网应用的性能和安全性
保障您的中国业务安全无忧
通过强大的数据安全框架保护您的数据资产
申请 ICP 备案的流程解读和咨询服务
面向大数据建设、管理及应用的全域解决方案
企业内大数据建设、管理和应用的一站式解决方案
将您的采购和销售置于同一企业级全渠道数字平台上
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
快速搭建在线教育平台
提供域名注册、分析和保护服务
云原生 Kubernetes 容器化应用运行环境
以 Kubernetes 为使用界面的容器服务产品,提供符合容器规范的算力资源
安全的镜像托管服务,支持全生命周期管理
多集群环境下微服务应用流量统一管理
提供任意基础设施上容器集群的统一管控,助您轻松管控分布式云场景
高弹性、高可靠的企业级无服务器 Kubernetes 容器产品
敏捷安全的 Serverless 容器运行服务
为虚拟机和容器提供高可靠性、高性能、低时延的块存储服务
一款海量、安全、低成本、高可靠的云存储服务
可靠、弹性、高性能、多共享的文件存储服务
全托管、可扩展的并行文件系统服务。
全托管的 NoSQL 结构化数据实时存储服务
可抵扣多种存储产品的容量包,兼具灵活性和长期成本优化
让您的应用跨不同可用区资源自动分配访问量
随时绑定和解绑 VPC ECS
云网络公网、跨域流量统一计费
高性价比,可抵扣按流量计费的流量费用
创建云上隔离的网络,在专有环境中运行资源
在 VPC 环境下构建公网流量的出入口
具备网络状态可视化、故障智能诊断能力的自助式网络运维服务。
安全便捷的云上服务专属连接
基于阿里云专有网络的私有 DNS 解析服务
保障在线业务不受大流量 DDoS 攻击影响
系统运维和安全审计管控平台
业务上云的第一个网络安全基础设施
集零信任内网访问、办公数据保护、终端管理等多功能于一体的办公安全管控平台
提供7X24小时安全运维平台
防御常见 Web 攻击,缓解 HTTP 泛洪攻击
实现全站 HTTPS,呈现可信的 WEB 访问
为云上应用提供符合行业标准和密码算法等级的数据加解密、签名验签和数据认证能力
一款发现、分类和保护敏感数据的安全服务
创建、控制和管理您的加密密钥
快速提高应用高可用能力服务
围绕应用和微服务的 PaaS 平台
兼容主流开源微服务生态的一站式平台
多集群环境下微服务应用流量统一管理
Super MySQL 和 PostgreSQL,高度兼容 Oracle 语法
全托管 MySQL、PostgreSQL、SQL Server、MariaDB
兼容 Redis® 的缓存和KV数据库
兼容Apache Cassandra、Apache HBase、Elasticsearch、OpenTSDB 等多种开源接口
文档型数据库,支持副本集和分片架构
100%兼容 Apache HBase 并深度扩展,稳定、易用、低成本的NoSQL数据库。
低成本、高可用、可弹性伸缩的在线时序数据库服务
专为搜索和分析而设计,成本效益达到开源的两倍,采用最新的企业级AI搜索和AI助手功能。
一款兼容PostgreSQL协议的实时交互式分析产品
一种快速、完全托管的 TB/PB 级数据仓库
基于 Flink 为大数据行业提供解决方案
基于Qwen和其他热门模型的一站式生成式AI平台,可构建了解您业务的智能应用程
一站式机器学习平台,满足数据挖掘分析需求
高性能向量检索服务,提供低代码API和高成本效益
帮助您的应用快速构建高质量的个性化推荐服务能力
提供定制化的高品质机器翻译服务
全面的AI计算平台,满足大模型训练等高性能AI计算的算力和性能需求
具备智能会话能力的会话机器人
基于机器学习的智能图像搜索产品
基于阿里云深度学习技术,为用户提供图像分割、视频分割、文字识别等离线SDK能力,支持Android、iOS不同的适用终端。
语音识别、语音合成服务以及自学习平台
一站式智能搜索业务开发平台
助力金融企业快速搭建超低时延、高质量、稳定的行情数据服务
帮助企业快速测算和分析企业的碳排放和产品碳足迹
企业工作流程自动化,全面提高效率
金融级云原生分布式架构的一站式高可用应用研发、运维平台
eKYC 数字远程在线解决方案
可智能检测、大数据驱动的综合性反洗钱 (AML) 解决方案
阿里云APM类监控产品
实时云监控服务,确保应用及服务器平稳运行
为系统运维人员管理云基础架构提供全方位服务的云上自动化运维平台
面向您的云资源的风险检测服务
提升分布式环境下的诊断效率
日志类数据一站式服务,无需开发就能部署
ECS 预留实例
让弹性计算产品的成本和灵活性达到最佳平衡的付费方式。云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势自带IP上云
自带公网 IP 地址上云全球网络互联
端到端的软件定义网络解决方案,可推动跨国企业的业务发展全球应用加速
提升面向互联网应用的性能和安全性全球互联网接入
将IDC网关迁移到云端云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势金融科技云数据库解决方案
利用专为金融科技而设的云原生数据库解决方案游戏行业云数据库解决方案
提供多种成熟架构,解决所有数据问题Oracle 数据库迁移
将 Oracle 数据库顺利迁移到云原生数据库数据库迁移
加速迁移您的数据到阿里云阿里云上的数据湖
实时存储、管理和分析各种规模和类型的数据数码信贷
利用大数据和 AI 降低信贷和黑灰产风险面向企业数据技术的大数据咨询服务
帮助企业实现数据现代化并规划其数字化未来人工智能对话服务
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人EasyDispatch 现场服务管理
为现场服务调度提供实时AI决策支持在线教育
快速搭建在线教育平台窄带高清 (HD) 转码
带宽成本降低高达 30%广电级大型赛事直播
为全球观众实时直播大型赛事,视频播放流畅不卡顿直播电商
快速轻松地搭建一站式直播购物平台用于供应链规划的Alibaba Dchain
构建和管理敏捷、智能且经济高效的供应链云胸牌
针对赛事运营的创新型凭证数字服务数字门店中的云 POS 解决方案
将所有操作整合到一个云 POS 系统中元宇宙
元宇宙是下一代互联网人工智能 (AI) 加速
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速DevOps
快速、安全地最大限度提高您的DevOps优势数据迁移解决方案
加速迁移您的数据到阿里云企业 IT 治理
在阿里云上构建高效可控的云环境基于日志管理的AIOps
登录到带有智能化日志管理解决方案的 AIOps 环境备份与存档
数据备份、数据存档和灾难恢复用阿里云金融服务加快创新
在云端开展业务,提升客户满意度
为全球资本市场提供安全、准确和数字化的客户体验
利用专为金融科技而设的云原生数据库解决方案
利用大数据和 AI 降低信贷和黑灰产风险
建立快速、安全的全球外汇交易平台
新零售时代下,实现传统零售业转型
利用云服务处理流量波动问题,扩展业务运营、降低成本
快速轻松地搭建一站式直播购物平台
面向大数据建设、管理及应用的全域解决方案
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
以数字化媒体旅程为当今的媒体市场准备就绪您的内容
带宽成本降低高达 30%
快速轻松地搭建一站式直播购物平台
为全球观众实时直播大型赛事,视频播放流畅不卡顿
使用阿里云弹性高性能计算 E-HPC 将本地渲染农场连接到云端
构建发现服务,帮助客户找到最合适的内容
保护您的媒体存档安全
通过统一的数据驱动平台提供一致的全生命周期客户服务
在钉钉上打造一个多功能的电信和数字生活平台
在线存储、共享和管理照片与文件
提供全渠道的无缝客户体验
面向中小型企业,为独立软件供应商提供可靠的IT服务
打造最快途径,助力您的新云业务扬帆起航
先进的SD-WAN平台,可实现WAN连接、实时优化并降低WAN成本
通过自动化和流程标准化实现快速事件响应
针对关键网络安全威胁提供集中可见性并进行智能安全分析
提供大容量、可靠且高度安全的企业文件传输
用智能技术数字化体育赛事
基于人工智能的低成本体育广播服务
专业的广播转码及信号分配管理服务
基于云的音视频内容引入、编辑和分发服务
在虚拟场馆中模拟关键运营任务
针对赛事运营的创新型凭证数字服务
智能和交互式赛事指南
轻松管理云端背包单元的绑定直播流
通过数据加强您的营销工作
元宇宙是下一代互联网
利用生成式 AI 加速创新,创造新的业务佳绩
阿里云高性能开源大模型
借助AI轻松解锁和提炼文档中的知识
通过AI驱动的语音转文本服务获取洞察
探索阿里云人工智能和数据智能的所有功能、新优惠和最新产品
该体验中心提供广泛的用例和产品帮助文档,助您开始使用阿里云 AI 产品和浏览您的业务数据。
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速
元宇宙是下一代互联网
构建发现服务,帮助客户找到最合适的内容
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
加速迁移您的数据到阿里云
在阿里云上建立一个安全且易扩容的环境,助力高效率且高成本效益的上云旅程
迁移到完全托管的云数据库
将 Oracle 数据库顺利迁移到云原生数据库
自带公网 IP 地址上云
利用阿里云强大的安全工具集,保障业务安全、应用程序安全、数据安全、基础设施安全和帐户安全
保护、备份和还原您的云端数据资产
MLPS 2.0 一站式合规解决方案
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
实现对 CloudOps、DevOps、SecOps、AIOps 和 FinOps 的高效、安全和透明的管理
构建您的原生云环境并高效管理集群
快速、安全地最大限度提高您的DevOps优势
实施细粒度安全控制
提供运维效率和总体系统安全性
实时分析您的云消耗并实现节约
实时存储、管理和分析各种规模和类型的数据
登录到带有智能化日志管理解决方案的 AIOps 环境
帮助企业实现数据现代化并规划其数字化未来
帮助零售商快速规划数字化之旅
将全球知名的 CRM 平台引入中国
在线存储、共享和管理照片与文件
构建、部署和管理高可用、高可靠、高弹性的应用程序
快速、安全地最大限度提高您的DevOps优势
将您的采购和销售置于同一企业级全渠道数字平台上
企业内大数据建设、管理和应用的一站式解决方案
帮助企业简化 IT 架构、实现商业价值、加速数字化转型的步伐
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
快速搜集、处理、分析联网设备产生的数据
0.0.201
Elasticsearch数据源为您提供读取和写入Elasticsearch双向通道的功能,本文为您介绍DataWorks的Elasticsearch数据同步的能力支持情况。
Elasticsearch在公共资源组上支持Elasticsearch 5.x版本,在Serverless资源组(推荐)和独享数据集成资源组上支持Elasticsearch 5.x、6.x、7.x和8.x版本。
Serverless资源组的详情请参见新增和使用Serverless资源组。
独享数据集成资源组的详情请参见新增和使用独享数据集成资源组。
Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。Elasticsearch是一个基于Lucene的搜索和数据分析工具,它提供分布式服务。Elasticsearch核心概念同数据库核心概念的对应关系如下所示。
Relational DB(实例)-> Databases(数据库)-> Tables(表)-> Rows(一行数据)-> Columns(一行数据的一列)
Elasticsearch -> Index -> Types -> Documents -> Fields
Elasticsearch中可以有多个索引或数据库,每个索引可以包括多个类型或表,每个类型可以包括多个文档或行,每个文档可以包括多个字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把从Reader读入的数据写入Elasticsearch中。
DataWorks平台目前仅支持配置阿里云Elasticsearch 5.x、6.x、7.x和8.x版本数据源,不支持配置自建Elasticsearch数据源。
Elasticsearch数据源在进行离线读写时会受到以下限制:
Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
如果您使用的是6.x及以上版本,支持使用Serverless资源组(推荐)和独享数据集成资源组。
不支持同步scaled_float类型的字段。
不支持同步字段中带有关键字 $ref
的索引。
类型 | 离线读(Elasticsearch Reader) | 离线写(Elasticsearch Writer) | 实时写 |
类型 | 离线读(Elasticsearch Reader) | 离线写(Elasticsearch Writer) | 实时写 |
binary | 支持 | 支持 | 支持 |
boolean | 支持 | 支持 | 支持 |
keyword | 支持 | 支持 | 支持 |
constant_keyword | 不支持 | 不支持 | 不支持 |
wildcard | 不支持 | 不支持 | 不支持 |
long | 支持 | 支持 | 支持 |
integer | 支持 | 支持 | 支持 |
short | 支持 | 支持 | 支持 |
byte | 支持 | 支持 | 支持 |
double | 支持 | 支持 | 支持 |
float | 支持 | 支持 | 支持 |
half_float | 不支持 | 不支持 | 不支持 |
scaled_float | 不支持 | 不支持 | 不支持 |
unsigned_long | 不支持 | 不支持 | 不支持 |
date | 支持 | 支持 | 支持 |
date_nanos | 不支持 | 不支持 | 不支持 |
alias | 不支持 | 不支持 | 不支持 |
object | 支持 | 支持 | 支持 |
flattened | 不支持 | 不支持 | 不支持 |
nested | 支持 | 支持 | 支持 |
join | 不支持 | 不支持 | 不支持 |
integer_range | 支持 | 支持 | 支持 |
float_range | 支持 | 支持 | 支持 |
long_range | 支持 | 支持 | 支持 |
double_range | 支持 | 支持 | 支持 |
date_range | 支持 | 支持 | 支持 |
ip_range | 不支持 | 支持 | 支持 |
ip | 支持 | 支持 | 支持 |
version | 支持 | 支持 | 支持 |
murmur3 | 不支持 | 不支持 | 不支持 |
aggregate_metric_double | 不支持 | 不支持 | 不支持 |
histogram | 不支持 | 不支持 | 不支持 |
text | 支持 | 支持 | 支持 |
annotated-text | 不支持 | 不支持 | 不支持 |
completion | 支持 | 不支持 | 不支持 |
search_as_you_type | 不支持 | 不支持 | 不支持 |
token_count | 支持 | 不支持 | 不支持 |
dense_vector | 不支持 | 不支持 | 不支持 |
rank_feature | 不支持 | 不支持 | 不支持 |
rank_features | 不支持 | 不支持 | 不支持 |
geo_point | 支持 | 支持 | 支持 |
geo_shape | 支持 | 支持 | 支持 |
point | 不支持 | 不支持 | 不支持 |
shape | 不支持 | 不支持 | 不支持 |
percolator | 不支持 | 不支持 | 不支持 |
string | 支持 | 支持 | 支持 |
Elasticsearch Reader的工作原理如下:
通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。
根据Elasticsearch中的Mapping配置,转换数据类型。
更多详情请参见Elasticsearch官方文档。
Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
实际运行时,请删除下述代码中的注释。
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //错误记录数。
},
"jvmOption":"",
"speed":{
"concurrent":3,//并发数
"throttle":true,//
"mbps":"12",//限流,此处1mbps = 1MB/s。
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //读取列。
"id",
"name"
],
"endpoint":"", //服务地址。
"index":"", //索引。
"password":"", //密码。
"scroll":"", //scroll标志。
"search":"", //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
"type":"default",
"username":"" //用户名。
},
"stepType":"elasticsearch"
},
{
"stepType": "elasticsearch",
"parameter": {
"column": [ //写入列
{
"name": "id",
"type": "integer"
},
{
"name": "name",
"type": "text"
}
],
"index": "test", //写入索引
"indexType": "", //写入索引类型,es7不填
"actionType": "index", //写入方式
"cleanup": false, //是否重建索引
"datasource": "test", //数据源名称
"primaryKeyInfo": { //主键取值方式
"fieldDelimiterOrigin": ",",
"column": [
"id"
],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false, //动态映射
"batchSize": 1024 //批量写文档数
},
"name": "Writer",
"category": "writer"
}
],
"type":"job",
"version":"2.0" //版本号。
}
支持全量拉取
支持将Elasticsearch中一个文档的所有内容拉取为一个字段。配置详情请参见场景一:全量拉取。
支持提取半结构化到结构化数据
分类 | 描述 | 相关文档 |
分类 | 描述 | 相关文档 |
产生背景 | Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。 | — |
实现原理 | 将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。 | — |
解决方案 | JSON有嵌套的情况,通过path路径来解决。
| |
附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。 属性[*].子属性 | ||
数组归并,一个字符串数组内容,归并为一个属性,并进行去重。 属性[] | ||
多属性合一,将多个属性合并为一个属性。 属性1,属性2 | ||
多属性选择处理。 属性1|属性2 |
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录一:脚本Demo与参数说明。
操作流程请参见DataStudio侧实时同步任务配置。
操作流程请参见数据集成侧同步任务配置。
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //错误记录数。
},
"jvmOption":"",
"speed":{
"concurrent":3,
"throttle":false
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //读取列。
"id",
"name"
],
"endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
"index":"aliyun_es_xx", //索引。
"password":"*******", //密码。
"multiThread":true,
"scroll":"5m", //scroll标志。
"pageSize":5000,
"connTimeOut":600000,
"readTimeOut":600000,
"retryCount":30,
"retrySleepTime":"10000",
"search":{
"range":{
"gmt_modified":{
"gte":0
}
}
}, //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
"type":"doc",
"username":"aliyun_di" //用户名。
},
"stepType":"elasticsearch"
},
{
"category":"writer",
"name":"Writer",
"parameter":{ },
"stepType":"stream"
}
],
"type":"job",
"version":"2.0" //版本号。
}
参数 | 描述 | 是否必选 | 默认值 |
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
index | Elasticsearch中的index名。 | 是 | 无 |
type | Elasticsearch中index的type名。 | 否 | index名 |
search | Elasticsearch的query参数。 | 是 | 无 |
pageSize | 每次读取数据的条数。 | 否 | 100 |
scroll | Elasticsearch的分页参数,设置游标存放时间。
| 是 | 无 |
strictMode | 以严格模式读取Elasticsearch中的数据,当出现Elasticsearch的shard.failed时会停止读取,避免读取到少量数据 | 否 | true |
sort | 返回结果的排序字段。 | 否 | 无 |
retryCount | 失败后重试的次数。 | 否 | 300 |
connTimeOut | 客户端连接超时时间。 | 否 | 600,000 |
readTimeOut | 客户端读取超时时间。 | 否 | 600,000 |
multiThread | http请求,是否有多线程。 | 否 | true |
preemptiveAuth | http是否使用抢先模式请求 | 否 | false |
retrySleepTime | 失败后重试的时间间隔。 | 否 | 1000 |
discovery | 是否开启节点发现。
| 否 | false |
compression | 是否使用GZIP压缩请求正文,使用时需要在es节点上启用http.compression设置。 | 否 | false |
dateFormat | 待同步字段存在date类型,且该字段mapping没有format配置时,需要配置dateFormat参数。配置形式如下: | 否 | 无 |
full | 是否将全文档内容作为一个字段同步至目标端,将Elasticsearch的查询数据作为一个字段,配置详情请参见场景一:全量拉取。 | 否 | 无 |
multi | 该配置是一个高级功能,具有五种用法,两个子属性分别为 | 否 | 无 |
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
},
"stepType": "stream"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"datasource":"xxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"discovery": false,
"primaryKeyInfo":{
"type":"pk",
"fieldDelimiter":",",
"column":[]
},
"batchSize": 1000,
"dynamic":false,
"esPartitionColumn":[
{
"name":"col1",
"comment":"xx",
"type":"STRING"
}
],
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_array",
"type": "long",
"array": true,
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word",
"other_params":
{
"doc_values": false
},
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}
VPC环境的Elasticsearch运行在默认资源组会存在网络不通的情况。您需要使用Serverless资源组(推荐)和独享数据集成资源组,才能连通VPC进行数据同步。添加资源的详情请参见Serverless资源组。
参数 | 描述 | 是否必选 | 默认值 |
参数 | 描述 | 是否必选 | 默认值 |
datasource | 选择需要同步的Elasticsearch数据源,若还未在DataWorks创建该数据源,请先创建,详情请参见配置Elasticsearch数据源。 | 是 | 无 |
index | Elasticsearch中的index名。 | 是 | 无 |
indexType | Elasticsearch中index的type名。 | 否 | Elasticsearch |
cleanup | 定义当前任务在索引index已存在的情况是否要删除数据。
| 否 | false |
batchSize | 定义同步任务一次性插入ElasticSearch的Document条数。 | 否 | 1,000 |
trySize | 定义往ElasticSearch写入数据失败后的重试次数。 | 否 | 30 |
timeout | 客户端超时时间。 | 否 | 600,000 |
discovery | 任务是否启动节点发现功能。
| 否 | false |
compression | HTTP请求,开启压缩。 | 否 | true |
multiThread | HTTP请求,是否有多线程。 | 否 | true |
ignoreWriteError | 忽略写入错误,不重试,继续写入。 | 否 | false |
ignoreParseError | 忽略解析数据格式错误,继续写入。 | 否 | true |
alias | Elasticsearch的别名类似于数据库的视图机制,为索引my_index创建一个别名my_index_alias,对my_index_alias的操作与my_index的操作一致。 配置alias表示在数据导入完成后,为指定的索引创建别名。 | 否 | 无 |
aliasMode | 数据在导入完成后增加别名的模式,包括append(增加模式)和exclusive(只留这一个):
后续会转换别名为实际的索引名称,别名可以用来进行索引迁移和多个索引的查询统一,并可以用来实现视图的功能。 | 否 | append |
settings | 创建index时的settings,与Elasticsearch官方一致。 | 否 | 无 |
column | column用来配置文档的多个字段Filed信息,具体每个字段项可以配置name(名称)、type(类型)等基础配置,以及Analyzer、Format和Array等扩展配置。 Elasticsearch所支持的字段类型如下所示。
列类型的说明如下:
如果需要在column中配置除了type以外的属性值,您可以使用other_params参数,该参数配置在column中,在update mappings时,用于描述column中除了type以外的Elasticsearch属性信息。
如果您希望源端数据写入为Elasticsearch时按照数组类型写入,您可按照JSON格式或指定分隔符的方式来解析源端数据。配置详情请参见附录二:ElasticSearch写入的格式期望是数组类型。 | 是 | 无 |
dynamic | 定义当在文档中发现不存在的字段时,同步任务是否通过Elasticsearch动态映射机制为字段添加映射。
Elasticsearch 7.x版本的默认type为_doc。使用Elasticsearch的自动mappings时,请配置_doc和esVersion为7。 您需要转换为脚本模式,添加一个版本参数: | 否 | false |
actionType | 表示Elasticsearch在数据写出时的action类型,目前数据集成支持index和update两种actionType,默认值为index:
| 否 | index |
primaryKeyInfo | 定义当前写入ElasticSearch的主键取值方式。
| 是 | specific |
esPartitionColumn | 定义写入ElasticSearch时是否开启分区,用于修改ElasticSearch中的routing的参数。
| 否 | false |
enableWriteNull | 该参数用于是否支持将来源端的空值字段同步至Elasticsearch。取值如下:
| 否 | true |
支持以下两种方式将源端数据按照数组类型写入ElasticSearch。
按JSON格式解析源端数据
例如:源端数据为"[1,2,3,4,5]"
,配置json_array=true对其进行解析,同步将以数组格式写入ElasticSearch。
"parameter" : {
{
"name":"docs_1",
"type":"keyword",
"json_array":true
}
}
按分隔符解析源端数据
例如:源端数据为"1,2,3,4,5"
,配置分隔符splitter=","对其进行解析,同步将以数组格式写入ElasticSearch。
一个任务仅支持配置一种分隔符,splitter全局唯一,不支持多array字段配置为不同的分隔符。例如源端字段列col1="1,2,3,4,5"
,col2="6-7-8-9-10"
, splitter无法针对每列单独配置使用。
"parameter" : {
"column": [
{
"name": "docs_2",
"array": true,
"type": "long"
}
],
"splitter":","//注意:splitter配置与column配置同级。
}
背景说明:将Elasticsearch中文档查询的结果拉取为一个字段。
配置示例:
## 读端:Elasticsearch中的原始数据
"hits": [
{
"_index": "mutiltest_1",
"_type": "_doc",
"_id": "IXgdO4MB4GR_1DmrjTXP",
"_score": 1.0,
"_source": {
"feature1": "value1",
"feature2": "value2",
"feature3": "value3"
}
}]
##数据集成Elasticsearch Reader插件配置
"parameter": {
"column": [
"content"
],
"full":true
}
##写端结果:同步至目标端1行1列
{"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}
背景说明:Object对象或nested嵌套字段的属性时,通过path路径来解决。
配置形式:
属性
属性.子属性
属性[0].子属性
脚本配置:
"multi":{
"multi":true
}
向导模式暂不支持配置。
配置示例:
## 读端:Elasticsearch中的原始数据
"hits": [
{
"_index": "mutiltest_1",
"_type": "_doc",
"_id": "7XAOOoMB4GR_1Dmrrust",
"_score": 1.0,
"_source": {
"level1": {
"level2": [
{
"level3": "testlevel3_1"
},
{
"level3": "testlevel3_2"
}
]
}
}
}
]
##数据集成Elasticsearch reader插件配置
"parameter": {
"column": [
"level1",
"level1.level2",
"level1.level2[0]",
"level1.level2.level3"
],
"multi":{
"multi":true
}
}
##写端结果:1行数据4列
column1(level1): {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
column2(level1.level2): [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
column3(level1.level2[0]): {"level3":"testlevel3_1"}
column4(level1.level2.level3): null
获取的节点上层有数组时结果为null,如上样例获取level1.level2.level3不会报错,但同步结果为null,需要配置为level1.level2[0].level3或level1.level2[1].level3,当前不支持level1.level2[*].level3。
不支持key出现"."的数据,如"level1.level2":{"level3":"testlevel3_1"},此时该条数据获取结果为null。
背景说明:附属信息有一对多的情况,需要将数组列拆成多行。
配置形式:属性[*].子属性
效果示意:源端数据{ "splitKey" :[1,2,3,4,5]},拆完后写到目标端为5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}
脚本配置:
"multi":{
"multi":true,
"key": "headers"
}
向导模式下配置拆多行数组列名,会自动生成脚本配置,具有相同效果。
value必须为List,否则会报错。
配置示例:
## 读端:Elasticsearch中的原始数据
[
{
"_index": "lmtestjson",
"_type": "_doc",
"_id": "nhxmIYMBKDL4VkVLyXRN",
"_score": 1.0,
"_source": {
"headers": [
{
"remoteip": "192.0.2.1"
},
{
"remoteip": "192.0.2.2"
}
]
}
},
{
"_index": "lmtestjson",
"_type": "_doc",
"_id": "wRxsIYMBKDL4VkVLcXqf",
"_score": 1.0,
"_source": {
"headers": [
{
"remoteip": "192.0.2.3"
},
{
"remoteip": "192.0.2.4"
}
]
}
}
]
##数据集成Elasticsearch reader插件配置
{
"column":[
"headers[*].remoteip"
]
"multi":{
"multi":true,
"key": "headers"
}
}
##写端结果:4行
192.0.2.1
192.0.2.2
192.0.2.3
192.0.2.4
背景说明:数组去重归并,将一个数组属性去重归并后写入为字符串属性,数组属性可以为子属性如name1.name2,去重采用tostring结果作为标准。
配置形式:属性[]。
column里面带有 [] 关键字就会被认为对该属性做去重归并。
脚本配置:
"multi":{
"multi":true
}
向导模式暂不支持配置。
配置示例:
## 读端:Elasticsearch中的原始数据
"hits": [
{
"_index": "mutiltest_1",
"_type": "_doc",
"_id": "4nbUOoMB4GR_1Dmryj8O",
"_score": 1.0,
"_source": {
"feature1": [
"value1",
"value1",
"value2",
"value2",
"value3"
]
}
}
]
##数据集成Elasticsearch reader插件配置
"parameter": {
"column":[
"feature1[]"
],
"multi":{
"multi":true
}
}
##写端结果:1行1列数据
"value1,value2,value3"
背景说明:多属性选择处理,返回第一个有值的属性;如果都不存在时将写入null。
配置形式:属性1|属性2|...
column里面带有 "|"关键字就会对该项做多属性选择。
脚本配置:
"multi":{
"multi":true
}
向导模式暂不支持该配置。
配置示例:
##读端:Elasticsearch中的原始数据
"hits": [
{
"_index": "mutiltest_1",
"_type": "_doc",
"_id": "v3ShOoMB4GR_1DmrZN22",
"_score": 1.0,
"_source": {
"feature1": "feature1",
"feature2": [
1,
2,
3
],
"feature3": {
"child": "feature3"
}
}
}]
##数据集成Elasticsearch reade插件配置
"parameter": {
"column":[
"feature1|feature2|feature3"
],
"multi":{
"multi":true
}
}
##写端结果:1行1列数据
"feature1"
背景说明:多属性选择处理,返回第一个有值的属性,若都不存在时写入null。
配置形式:属性1|属性2|...
column里面带有 "|"关键字就会对该项做多属性选择
脚本配置:
"multi":{
"multi":true
}
向导模式暂不支持该配置。
配置示例:
##读端:Elasticsearch中的原始数据
"hits": [
{
"_index": "mutiltest_1",
"_type": "_doc",
"_id": "v3ShOoMB4GR_1DmrZN22",
"_score": 1.0,
"_source": {
"feature1": "feature1",
"feature2": [
1,
2,
3
],
"feature3": {
"child": "feature3"
}
}
}]
##数据集成Elasticsearch reader插件配置
"parameter": {
"column":[
"feature1,feature2,feature3"
],
"multi":{
"multi":true
}
}
##写端结果:1行1列数据
"feature1,[1,2,3],{"child":"feature3"}"
数据集成支持其他更多数据源接入,更多信息,请参见支持的数据源及同步方案。