This topic describes the configuration information of Delta Lake in E-MapReduce (EMR) and provides some examples of common commands.
Configuration information
Default configuration of Delta Lake in EMR:
Spark 2.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
Spark 3.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
Common sample commands
Create a table
CREATE TABLE delta_table (id INT) USING delta;
Insert data into a table
INSERT INTO delta_table VALUES 0,1,2,3,4;
Overwrite the existing data in a table
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
Query data
SELECT * FROM delta_table;
Update data
Add 100 to all
IDs
that are even numbers.UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
Delete data
Delete the records whose
IDs
are even numbers.DELETE FROM delta_table WHERE mod(id, 2) = 0;
Merge data
Create a source table for the merge operation.
CREATE TABLE newData(id INT) USING delta;
Insert data into the table.
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
Merge data in the newData table into the delta_table table. If the
ID
of a record in the newData table is the same as the ID of a record in the delta_table table, 100 is added to theID
of the record in the source table. Otherwise, the record is directly inserted.MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;
Read data from a Delta table in streaming mode.
Create a destination table into which you want to write data in streaming mode.
CREATE TABLE stream_debug_table (id INT) USING delta;
Create a stream.
CREATE SCAN stream_delta_table on delta_table USING STREAM;
NoteIn this example, delta_table is an existing Delta table.
Write data to the destination table in streaming mode.
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
Example
Log on to your cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to start streaming-sql:
streaming-sql
NoteIf the Delta Lake service is added to your cluster, you can directly run the
streaming-sql
command. If the service is not added to your cluster, you can run the following command to use Delta Lake:streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
Run the following command to create a destination table:
CREATE TABLE stream_debug_table (id INT) USING DELTA;
Run the following command to create a stream:
CREATE SCAN stream_delta_table on delta_table USING STREAM;
Run the following command to write data in the source table named delta_table to the destination table in streaming mode:
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
Start another streaming-sql client and perform the following operations to insert data into the source table and query data in the destination table:
Run the following command to query existing data in the destination table:
SELECT * FROM stream_debug_table;
Run the following command to insert data into the source table:
INSERT INTO delta_table VALUES 801, 802;
Run the following command to check whether the data that is inserted into the source table is written to the destination table in streaming mode:
SELECT * FROM stream_debug_table;
Run the following command to insert data into the source table:
INSERT INTO delta_table VALUES 901, 902;
Run the following command to check whether the data that is inserted into the source table is written to the destination table in streaming mode:
SELECT * FROM stream_debug_table;