SLS Catalog通过自动解析Logstore中的日志来推导Schema。配置SLS Catalog后,您可以在Flink作业中直接访问SLS Logstore,无需在Flink SQL中声明SLS表的Schema便可以获取具体字段信息。本文为您介绍如何创建、查看、使用及删除SLS Catalog。
背景信息
本文将从以下方面为您介绍如何管理SLS Catalog:
使用限制
目前所有字段均被解析为String类型,如果您有其它数据类型需求,需要使用Flink SQL自行转换。
仅Flink计算引擎VVR 6.0.7及以上版本支持配置SLS Catalog。
不支持通过DDL语句修改已有的SLS Catalog。
仅支持查询数据表,不支持创建、修改和删除数据库和表。
SLS Catalog提供的表可以直接作为Flink SQL作业中的源表和结果表使用,不支持作为Lookup维表。
创建SLS Catalog
支持UI与SQL命令两种方式配置SLS Catalog,推荐使用UI方式配置SLS Catalog。
UI方式
进入元数据管理页面。
登录实时计算控制台,单击目标工作空间操作列下的控制台。
单击元数据管理。
单击创建Catalog,选择SLS,单击下一步。
填写参数配置信息。
重要Catalog创建完成后,以下配置信息都不支持修改。如果需要修改,则您需要删除掉已创建的Catalog,重新进行创建。
参数
类型
说明
是否必填
备注
catalog name
String
SLS Catalog名称。
是
请填写为自定义的英文名。
endpoint
String
EndPoint地址。
是
详情请参见服务入口。
project
String
SLS项目名称。
是
无。
accessId
String
阿里云账号的AccessKey ID。
是
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见密钥管理。
accessKey
String
阿里云账号的AccessKey Secret。
是
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见密钥管理。
单击确定。
在左侧元数据区域,查看创建的Catalog。
SQL命令
在查询脚本文本编辑区域,输入配置SLS Catalog的命令。
CREATE CATALOG <catalogName> WITH( 'type'='sls', 'endpoint'='<brokers>', 'project'='project', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}' )
参数
类型
说明
是否必填
备注
catalogName
String
SLS Catalog名称。
是
请填写为自定义的英文名。
type
String
Catalog类型。
是
固定值为sls。
endpoint
String
EndPoint地址。
是
详情请参见服务入口。
project
String
SLS项目名称。
是
无。
accessId
String
阿里云账号的AccessKey ID。
是
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见密钥管理。
accessKey
String
阿里云账号的AccessKey Secret。
是
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见密钥管理。
选中创建Catalog的代码后,单击左侧代码行数上的运行。您也可以光标放在创建Catalog的代码上,右键单击运行。
在左侧元数据区域,查看创建的Catalog。
查看SLS Catalog
SLS Catalog配置成功后,您可以通过以下步骤查看SLS元数据。
进入元数据管理页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
单击元数据管理。
在Catalog列表页面,查看Catalog名称和类型。
说明如果您需要查看Catalog下的logstore信息,请单击查看。
查看SLS LogStore
在查询脚本文本编辑区域,输入以下命令。
DESCRIBE `${catalog_name}`.`${project_name}`.`${logstore_name}`;
参数
说明
${catalog_name}
SLS Catalog名称。
${project_name}
SLS项目名称。
${logstore_name}
SLS LogStore名称。
选中查看Catalog的代码后,单击左侧代码行数上的运行。
运行成功后,可以在运行结果中查看表的具体信息。
使用SLS Catalog
作为源表,从SLS Logstore中读取数据。
INSERT INTO ${other_sink_table} SELECT... FROM `${catalog_name}`.`${project_name}`.`${logstore_name}`/*+OPTIONS('startTime'='2023-06-01 00:00:00')*/;
作为结果表,将数据写入SLS Logstore。
INSERT INTO `${catalog_name}`.`${project_name}`.`${logstore_name}` SELECT ... FROM ${other_source_table}
在读取源Logstore数据之前,需要先开启索引。只有开启了索引,才能够读取源Logstore中的数据。同样地,在将数据写入目标Logstore之前,也需要开启索引。这是因为Catalog需要先读取结果表中的部分数据,以确定写入数据的Schema是否与目标SLS Logstore一致。如需了解如何开启索引的详细步骤,请参见创建索引。
删除SLS Catalog
支持UI与SQL命令两种方式删除SLS Catalog,推荐使用UI方式删除SLS Catalog。
删除SLS Catalog不会影响已运行的作业,但会导致使用该Catalog下表的作业,在上线或重启时报无法找到该表的错误,请您谨慎操作。
UI方式
进入元数据管理页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
单击元数据管理。
在Catalog列表页面,单击目标Catalog名称对应操作列的删除。
在弹出的提示页面中,单击删除。
左侧元数据区域下,查看目标Catalog是否已删除。
SQL命令
在查询脚本文本编辑区域,输入以下命令。
DROP CATALOG ${catalog_name};
其中${catalog_name}为您要删除的目标SLS Catalog名称。
选中删除Catalog的命令,鼠标右键选择运行。
在左侧元数据区域,查看目标Catalog是否已删除。
从SLS Catalog获取的表信息详解
为了方便使用SLS Catalog获取的表,SLS Catalog会在推导的表上添加默认的配置参数和元数据。SLS Catalog获取的表的详细信息如下:
SLS表的Schema推导
SLS Catalog在解析日志获取Topic的Schema时,Catalog会尝试消费一条消息,解析该条数据的Schema。SLS Catalog会从SLS日志中解析出字段名和类型,由于SLS日志数据均以String类型存储,所以字段类型均为String。
默认添加的表参数
参数
说明
备注
connector
Connector类型。
固定值为sls。
endpoint
EndPoint地址。
详情请参见服务入口。
project
SLS项目名称。
无。
logstore
SLS LogStore或metricstore名称。
无。
accessId
阿里云账号的AccessKey ID。
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见密钥管理。
accessKey
阿里云账号的AccessKey Secret。
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见密钥管理。