E-MapReduce (EMR) allows you to use the Spark SQL client to read data from and write data to Paimon. This topic describes how to use the Spark SQL client to read data from and write data to Paimon.
Prerequisites
A DataLake or custom cluster is created, and the Spark and Paimon services are selected when you create the cluster. For more information, see Create a cluster.
Limits
Only clusters of EMR V3.46.0, EMR V5.12.0, or a minor version later than EMR V3.46.0 or EMR V5.12.0 allow you to use the Spark SQL client to read data from and write data to Paimon and use the Spark CLI to read data from Paimon.
Only Spark SQL of Spark 3 allows you to use catalogs to read data from and write data to Paimon.
The Spark CLI allows you to read data from Paimon only by using the path of a file system or an object storage system.
Procedure
Step 1: Create a catalog
Paimon stores data and metadata in a file system such as Hadoop Distributed File System (HDFS) or an object storage system such as Object Storage Service (OSS). The root path for storage is specified by the warehouse parameter. If the specified root path does not exist, a root path is automatically created. If the specified root path exists, you can use the created catalog to access existing tables in the path.
You can synchronize metadata to Hive or Data Lake Formation (DLF). This way, other services can access data of Paimon by using Hive or DLF.
Create a file system catalog
A file system catalog stores metadata in a file system or an object storage system.
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=filesystem \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
spark.sql.catalog.paimon
: defines a catalog named paimon.spark.sql.catalog.paimon.metastore
: specifies the metadata storage type used by the catalog. If you set this parameter tofilesystem
, metadata is stored in your on-premises file system.spark.sql.catalog.paimon.warehouse
: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements.
Create a DLF catalog
A DLF catalog can synchronize metadata to DLF.
When you create an EMR cluster, you must select DLF Unified Metadata for Metadata.
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=dlf \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
spark.sql.catalog.paimon
: defines a catalog named paimon.spark.sql.catalog.paimon.metastore
: specifies the metadata storage type used by the catalog. If you set this parameter todlf
, metadata is synchronized to DLF.spark.sql.catalog.paimon.warehouse
: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements.
Create a Hive catalog
A Hive catalog can synchronize metadata to Hive Metastore. Hive allows you to query data in tables that are created in a Hive catalog.
For information about how to query data of Paimon in Hive, see Integrate Paimon with Hive.
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=hive \
--conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
spark.sql.catalog.paimon
: defines a catalog named paimon.spark.sql.catalog.paimon.metastore
: specifies the metadata storage type used by the catalog. If you set this parameter tohive
, metadata is synchronized to Hive Metastore.spark.sql.catalog.paimon.uri
: specifies the address and port number of Hive Metastore. If you set this parameter tothrift://master-1-1:9083
, the Spark SQL client connects to Hive Metastore that runs on themaster-1-1
node and whose listening port is 9083 to obtain metadata information.spark.sql.catalog.paimon.warehouse
: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements.
Step 2: Use the Spark SQL client to read data from and write data to Paimon
Execute the following Spark SQL statements to create a Paimon table in the created catalog and read data from and write data to the table:
-- Switch to the paimon catalog.
USE paimon;
-- Create a test database in the created paimon catalog.
CREATE DATABASE test_db;
USE test_db;
-- Create a Paimon table.
CREATE TABLE test_tbl (
uuid int,
name string,
price double
) TBLPROPERTIES (
'primary-key' = 'uuid'
);
-- Write data to the Paimon table.
INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);
-- Read data from the Paimon table.
SELECT * FROM test_tbl;
The following result is returned:
1 apple 3.5
2 banana 4.0
3 cherry 20.5
Step 3: Use the Spark CLI to read data from Paimon
Run the following command to start the Spark CLI:
spark-shell
Run the following Scala code on the Spark CLI to query data from the Paimon table that is stored in the specified directory:
val dataset = spark.read.format("paimon").load("oss://<yourBucketName>/warehouse/test_db.db/test_tbl") dataset.createOrReplaceTempView("test_tbl") spark.sql("SELECT * FROM test_tbl").show()
Notepaimon
: a fixed value. This value indicates that you use Paimon as a data storage format to read or write data.oss://<yourBucketName>/warehouse/test_db.db/test_tbl
: the path in which the Paimon table is stored. Replace the path based on your business requirements.
The following output is returned:
+----+------+-----+ |uuid| name|price| +----+------+-----+ | 1| apple| 3.5| | 2|banana| 4.0| | 3|cherry| 20.5| +----+------+-----+