You can use Apache Paimon to deploy your data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS) in an efficient manner, and use the Spark compute engine to perform data lake analytics. This topic describes how to use Spark SQL to read data from and write data to Paimon in EMR.
Prerequisites
A DataLake or custom cluster is created, and the Spark and Paimon services are selected when you create the cluster. For more information, see Create a cluster.
Limits
Only clusters of EMR V3.46.0, EMR V5.12.0, or a minor version later than EMR V3.46.0 or EMR V5.12.0 allow you to use Spark SQL to read data from and write data to Paimon.
Only Spark SQL of Spark 3 allows you to use catalogs to read data from and write data to Paimon.
Procedure
Step 1: Configure a catalog
Spark can read data from and write data to Paimon tables by using catalogs. Paimon catalogs and spark_catalog are supported. Determine whether to use a Paimon catalog or spark_catalog based on your business scenarios.
Paimon catalog: used to manage metadata in the Paimon format. It can only be used to query data from and write data to Paimon tables.
spark_catalog: the default built-in catalog of Spark. It is usually used to manage the metadata of Spark SQL internal tables. It can be used to query data from and write data to Paimon tables or non-Paimon tables.
Use a Paimon catalog
You can save metadata in a file system, such as HDFS, or an object storage system, such as OSS. You can also synchronize metadata to DLF and Hive to facilitate access to Paimon by other services.
The root path used for storage is specified by the spark.sql.catalog.paimon.warehouse parameter. If the specified root path does not exist, it is automatically created. If the specified root path exists, you can use the catalog to access existing tables in the path.
Log on to the master node of your cluster in SSH mode. For more information, see Log on to a cluster.
Select the type of the catalog that you want to configure based on the metadata storage type, and run the given commands to start Spark SQL.
Configure a file system catalog
A file system catalog stores metadata in a file system or an object storage system.
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=filesystem \ --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Notespark.sql.catalog.paimon
: defines a catalog named paimon.spark.sql.catalog.paimon.metastore
: specifies the metadata storage type used by the catalog. If you set this parameter tofilesystem
, metadata is stored in your on-premises file system.spark.sql.catalog.paimon.warehouse
: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements. Replace<yourBucketName>
with the name of an OSS bucket. For more information about how to create an OSS bucket, see Create a bucket.
Configure a DLF catalog
A DLF catalog can synchronize metadata to DLF.
ImportantWhen you create an EMR cluster, you must select DLF Unified Metadata for Metadata.
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=dlf \ --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Notespark.sql.catalog.paimon
: defines a catalog named paimon.spark.sql.catalog.paimon.metastore
: specifies the metadata storage type used by the catalog. If you set this parameter todlf
, metadata is synchronized to DLF.spark.sql.catalog.paimon.warehouse
: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements. Replace<yourBucketName>
with the name of an OSS bucket. For more information about how to create an OSS bucket, see Create a bucket.
Configure a Hive catalog
A Hive catalog can synchronize metadata to Hive Metastore. Hive allows you to query data in tables that are created in a Hive catalog. For information about how to query data of Paimon in Hive, see Integrate Paimon with Hive.
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=hive \ --conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 \ --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Notespark.sql.catalog.paimon
: defines a catalog named paimon.spark.sql.catalog.paimon.metastore
: specifies the metadata storage type used by the catalog. If you set this parameter tohive
, metadata is synchronized to Hive Metastore.spark.sql.catalog.paimon.uri
: specifies the address and port number of Hive Metastore. If you set this parameter tothrift://master-1-1:9083
, the Spark SQL client connects to Hive Metastore that runs on themaster-1-1
node and whose listening port is 9083 to obtain metadata information.spark.sql.catalog.paimon.warehouse
: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements. Replace<yourBucketName>
with the name of an OSS bucket. For more information about how to create an OSS bucket, see Create a bucket.
Use spark_catalog
Log on to the master node of your cluster in SSH mode. For more information, see Log on to a cluster.
Run the following commands to configure a catalog and start Spark SQL:
spark-sql --conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Notespark.sql.catalog.spark_catalog
: defines a catalog named spark_catalog.The root path used for storage in spark_catalog is specified by the
spark.sql.warehouse.dir
parameter and does not need to be modified in most cases.
Step 2: Read data from and write data to a Paimon table
Execute the following Spark SQL statements to create a Paimon table in the configured catalog and read data from and write data to the table.
Use a Paimon catalog
To access a Paimon table, you must specify the table name in the paimon.<db_name>.<tbl_name>
format, in which <db_name>
indicates a database name and <tbl_name>
indicates a table name.
-- Create a database.
CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;
-- Create a Paimon table.
CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
-- Write data to the Paimon table.
INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana"), (3, "cherry");
-- Query data from the Paimon table.
SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;
-- Drop the database.
DROP DATABASE paimon.ss_paimon_db CASCADE;
If the error metastore: Failed to connect to the MetaStore Server
is reported when you create a database after you configure a Hive catalog, the Hive Metastore service is not started. If this occurs, you must run the following command to start Hive Metastore. After Hive Metastore is started, run commands to configure the Hive catalog again.
hive --service metastore &
If you set Metadata to DLF Unified Metadata when you create a cluster, we recommend that you synchronize metadata to DLF and configure a DLF catalog.
Use spark_catalog
You can specify a table name in the spark_catalog.<db_name>.<tbl_name>
format regardless of whether you want to access a Paimon table or non-Paimon table. spark_catalog is the default built-in catalog of Spark. Therefore, you can omit spark_catalog and directly specify a table name in the <db_name>.<tbl_name>
format for table access. <db_name>
indicates a database name and <tbl_name>
indicates a table name.
-- Create databases.
CREATE DATABASE IF NOT EXISTS ss_paimon_db;
CREATE DATABASE IF NOT EXISTS ss_parquet_db;
-- Create a Paimon table and a Parquet table.
CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "cherry";
-- Write data to the Paimon table.
INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana");
INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;
-- Query data from the Paimon table.
SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;
-- Drop the databases.
DROP DATABASE ss_paimon_db CASCADE;
DROP DATABASE ss_parquet_db CASCADE;
The following result is returned:
1 apple
2 banana
3 cherry
FAQ
References
For more information about Paimon, see Apache Paimon.