All Products
Search
Document Center

E-MapReduce:Integrate Paimon with Spark

Last Updated:Feb 17, 2025

You can use Apache Paimon to deploy your data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS) in an efficient manner, and use the Spark compute engine to perform data lake analytics. This topic describes how to use Spark SQL to read data from and write data to Paimon in EMR.

Prerequisites

A DataLake or custom cluster is created, and the Spark and Paimon services are selected when you create the cluster. For more information, see Create a cluster.

Limits

  • Only clusters of EMR V3.46.0, EMR V5.12.0, or a minor version later than EMR V3.46.0 or EMR V5.12.0 allow you to use Spark SQL to read data from and write data to Paimon.

  • Only Spark SQL of Spark 3 allows you to use catalogs to read data from and write data to Paimon.

Procedure

Step 1: Configure a catalog

Spark can read data from and write data to Paimon tables by using catalogs. Paimon catalogs and spark_catalog are supported. Determine whether to use a Paimon catalog or spark_catalog based on your business scenarios.

  • Paimon catalog: used to manage metadata in the Paimon format. It can only be used to query data from and write data to Paimon tables.

  • spark_catalog: the default built-in catalog of Spark. It is usually used to manage the metadata of Spark SQL internal tables. It can be used to query data from and write data to Paimon tables or non-Paimon tables.

Use a Paimon catalog

You can save metadata in a file system, such as HDFS, or an object storage system, such as OSS. You can also synchronize metadata to DLF and Hive to facilitate access to Paimon by other services.

The root path used for storage is specified by the spark.sql.catalog.paimon.warehouse parameter. If the specified root path does not exist, it is automatically created. If the specified root path exists, you can use the catalog to access existing tables in the path.

  1. Log on to the master node of your cluster in SSH mode. For more information, see Log on to a cluster.

  2. Select the type of the catalog that you want to configure based on the metadata storage type, and run the given commands to start Spark SQL.

    Configure a file system catalog

    A file system catalog stores metadata in a file system or an object storage system.

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=filesystem \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    Note
    • spark.sql.catalog.paimon: defines a catalog named paimon.

    • spark.sql.catalog.paimon.metastore: specifies the metadata storage type used by the catalog. If you set this parameter to filesystem, metadata is stored in your on-premises file system.

    • spark.sql.catalog.paimon.warehouse: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements. Replace <yourBucketName> with the name of an OSS bucket. For more information about how to create an OSS bucket, see Create a bucket.

    Configure a DLF catalog

    A DLF catalog can synchronize metadata to DLF.

    Important

    When you create an EMR cluster, you must select DLF Unified Metadata for Metadata.

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=dlf \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    Note
    • spark.sql.catalog.paimon: defines a catalog named paimon.

    • spark.sql.catalog.paimon.metastore: specifies the metadata storage type used by the catalog. If you set this parameter to dlf, metadata is synchronized to DLF.

    • spark.sql.catalog.paimon.warehouse: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements. Replace <yourBucketName> with the name of an OSS bucket. For more information about how to create an OSS bucket, see Create a bucket.

    Configure a Hive catalog

    A Hive catalog can synchronize metadata to Hive Metastore. Hive allows you to query data in tables that are created in a Hive catalog. For information about how to query data of Paimon in Hive, see Integrate Paimon with Hive.

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    Note
    • spark.sql.catalog.paimon: defines a catalog named paimon.

    • spark.sql.catalog.paimon.metastore: specifies the metadata storage type used by the catalog. If you set this parameter to hive, metadata is synchronized to Hive Metastore.

    • spark.sql.catalog.paimon.uri: specifies the address and port number of Hive Metastore. If you set this parameter to thrift://master-1-1:9083, the Spark SQL client connects to Hive Metastore that runs on the master-1-1 node and whose listening port is 9083 to obtain metadata information.

    • spark.sql.catalog.paimon.warehouse: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements. Replace <yourBucketName> with the name of an OSS bucket. For more information about how to create an OSS bucket, see Create a bucket.

Use spark_catalog

  1. Log on to the master node of your cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following commands to configure a catalog and start Spark SQL:

    spark-sql --conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    Note
    • spark.sql.catalog.spark_catalog: defines a catalog named spark_catalog.

    • The root path used for storage in spark_catalog is specified by the spark.sql.warehouse.dir parameter and does not need to be modified in most cases.

Step 2: Read data from and write data to a Paimon table

Execute the following Spark SQL statements to create a Paimon table in the configured catalog and read data from and write data to the table.

Use a Paimon catalog

To access a Paimon table, you must specify the table name in the paimon.<db_name>.<tbl_name> format, in which <db_name> indicates a database name and <tbl_name> indicates a table name.

-- Create a database.
CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;

-- Create a Paimon table.
CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;

-- Write data to the Paimon table.
INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana"), (3, "cherry");

-- Query data from the Paimon table.
SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;

-- Drop the database.
DROP DATABASE paimon.ss_paimon_db CASCADE;
Note

If the error metastore: Failed to connect to the MetaStore Server is reported when you create a database after you configure a Hive catalog, the Hive Metastore service is not started. If this occurs, you must run the following command to start Hive Metastore. After Hive Metastore is started, run commands to configure the Hive catalog again.

hive --service metastore &

If you set Metadata to DLF Unified Metadata when you create a cluster, we recommend that you synchronize metadata to DLF and configure a DLF catalog.

Use spark_catalog

You can specify a table name in the spark_catalog.<db_name>.<tbl_name> format regardless of whether you want to access a Paimon table or non-Paimon table. spark_catalog is the default built-in catalog of Spark. Therefore, you can omit spark_catalog and directly specify a table name in the <db_name>.<tbl_name> format for table access. <db_name> indicates a database name and <tbl_name> indicates a table name.

-- Create databases.
CREATE DATABASE IF NOT EXISTS ss_paimon_db;
CREATE DATABASE IF NOT EXISTS ss_parquet_db;

-- Create a Paimon table and a Parquet table.
CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "cherry";

-- Write data to the Paimon table.
INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana");
INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;

-- Query data from the Paimon table.
SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;

-- Drop the databases.
DROP DATABASE ss_paimon_db CASCADE;
DROP DATABASE ss_parquet_db CASCADE;

The following result is returned:

1       apple   
2       banana
3       cherry 

FAQ

After I add the Paimon component to a cluster, will the parameter configuration spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions be automatically added to the cluster?

Yes, the configuration will be automatically added. After the Paimon component is added to the cluster, you can perform the following operations to view the configuration information.

  1. Go to the Services tab of the cluster.

  2. View the configurations of the Spark service.

    1. Click Configure for the Spark service.

    2. Search for spark.sql.extensions by configuration item name to view the configuration information.

      image

Can I use Spark Shell to read data from and write data to Paimon?

Yes, you can. To read data from and write data to Paimon by using Spark Shell, perform the following steps:

  1. Run the following command to start Spark Shell:

    spark-shell
  2. Run the following Scala code in Spark Shell to write data to and query data from a Paimon table stored in the specified directory:

    val dataset = spark.read.format("paimon").load("oss://<yourBucketName>/warehouse/test_db.db/test_tbl")
    dataset.createOrReplaceTempView("test_tbl")
    spark.sql("INSERT INTO test_tbl VALUES (4, 'apple1', 3.5), (5, 'banana1', 4.0), (6, 'cherry1', 20.5)")
    spark.sql("SELECT * FROM test_tbl").show()
    Note
    • paimon: a fixed value. This value indicates that you use Paimon as a data storage format to read or write data.

    • oss://<yourBucketName>/warehouse/test_db.db/test_tbl: the path in which the Paimon table is stored. Replace the path based on your business requirements. Replace <yourBucketName> with the name of an OSS bucket.

References

For more information about Paimon, see Apache Paimon.