E-MapReduce的Flink Table Store服务支持在Trino中查询数据。本文通过示例为您介绍如何在Trino中查询Flink Table Store中的数据。
使用限制
仅EMR-3.45.0版本、EMR-5.11.0版本的集群,支持在Trino中查询Flink Table Store中的数据。
操作步骤
指定warehouse路径。
Flink Table Store将数据和元数据都保存在文件系统或对象存储中,存储的根路径由warehouse参数指定,您可以根据实际情况修改。
进入Trino服务的配置页面。
在顶部菜单栏处,根据实际情况选择地域和资源组。
在EMR on ECS页面,单击目标集群操作列的集群服务。
在集群服务页面,单击Trino服务区域的配置。
在配置页面,单击tablestore.properties页签。
修改warehouse的参数值。
保存配置。
单击保存。
在弹出的对话框中,输入执行原因,单击保存。
可选:指定MetaStore类型。
Trino使用的MetaStore类型根据您创建集群时选择的服务自动设置。如果您想更改MetaStore类型,可以在Trino服务配置页面的tablestore.properties页签,修改metastore的参数值。
Flink Table Store共有以下三种MetaStore类型:
filesystem:元数据仅保存在文件系统或对象存储中。
hive:元数据同步到指定的Hive MetaStore中。
dlf:元数据同步到DLF中。
重启Trino服务。
在Trino服务配置页面,选择右上角的 。
在弹出的对话框中,输入执行原因,单击确定。
在确认对话框中,单击确定。
查询Flink Table Store数据。
下面以Spark写入Filesystem Catalog,Trino查询为例。
执行以下命令,启动Spark SQL。
spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=filesystem --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
执行以下Spark SQL语句,在Catalog中创建一张表,并写入数据。
-- 在之前创建的Catalog中,创建并使用一个测试database。 CREATE DATABASE tablestore.test_db; USE tablestore.test_db; -- 创建Flink Table Store表。 CREATE TABLE test_tbl ( uuid int, name string, price double ) TBLPROPERTIES ( 'primary-key' = 'uuid' ); -- 向Flink Table Store表中写入数据。 INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);
执行以下命令,启动Trino。
trino --server master-1-1:9090 --catalog tablestore --schema default --user hadoop
执行以下命令,查询刚刚写入的数据。
USE test_db; SELECT * FROM test_tbl;