All Products
Search
Document Center

E-MapReduce:Basic usage

Last Updated:Jul 05, 2024

This topic describes the configuration information of Delta Lake in E-MapReduce (EMR) and provides some examples of common commands.

Configuration information

Default configuration of Delta Lake in EMR:

  • Spark 2.X

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
  • Spark 3.X

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

Common sample commands

  • Create a table

    CREATE TABLE delta_table (id INT) USING delta;
  • Insert data into a table

    INSERT INTO delta_table VALUES 0,1,2,3,4;
  • Overwrite the existing data in a table

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  • Query data

    SELECT * FROM delta_table;
  • Update data

    Add 100 to all IDs that are even numbers.

    UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
  • Delete data

    Delete the records whose IDs are even numbers.

    DELETE FROM delta_table WHERE mod(id, 2) = 0;

Merge data

  1. Create a source table for the merge operation.

    CREATE TABLE newData(id INT) USING delta;
  2. Insert data into the table.

    INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
  3. Merge data in the newData table into the delta_table table. If the ID of a record in the newData table is the same as the ID of a record in the delta_table table, 100 is added to the ID of the record in the source table. Otherwise, the record is directly inserted.

    MERGE INTO delta_table AS target
      USING newData AS source
      ON target.id = source.id
      WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
      WHEN NOT MATCHED THEN INSERT *;
  4. Read data from a Delta table in streaming mode.

    1. Create a destination table into which you want to write data in streaming mode.

      CREATE TABLE stream_debug_table (id INT) USING delta;
    2. Create a stream.

      CREATE SCAN stream_delta_table on delta_table USING STREAM;
      Note

      In this example, delta_table is an existing Delta table.

    3. Write data to the destination table in streaming mode.

      CREATE STREAM job
      options (
        triggerType='ProcessingTime',
        checkpointLocation = '/tmp/streaming_read_cp'
      )
      INSERT INTO stream_debug_table
      SELECT * FROM stream_delta_table;

Example

  1. Log on to your cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to start streaming-sql:

    streaming-sql
    Note

    If the Delta Lake service is added to your cluster, you can directly run the streaming-sql command. If the service is not added to your cluster, you can run the following command to use Delta Lake:

    streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
  3. Run the following command to create a destination table:

    CREATE TABLE stream_debug_table (id INT) USING DELTA;
  4. Run the following command to create a stream:

    CREATE SCAN stream_delta_table on delta_table USING STREAM;
  5. Run the following command to write data in the source table named delta_table to the destination table in streaming mode:

    CREATE STREAM job
    options (
      triggerType='ProcessingTime',
      checkpointLocation = '/tmp/streaming_read_cp'
    )
    INSERT INTO stream_debug_table
    SELECT * FROM stream_delta_table;
  6. Start another streaming-sql client and perform the following operations to insert data into the source table and query data in the destination table:

    1. Run the following command to query existing data in the destination table:

      SELECT * FROM stream_debug_table;
    2. Run the following command to insert data into the source table:

      INSERT INTO delta_table VALUES 801, 802;
    3. Run the following command to check whether the data that is inserted into the source table is written to the destination table in streaming mode:

      SELECT * FROM stream_debug_table;
    4. Run the following command to insert data into the source table:

      INSERT INTO delta_table VALUES 901, 902;
    5. Run the following command to check whether the data that is inserted into the source table is written to the destination table in streaming mode:

      SELECT * FROM stream_debug_table;