How Delta Lake helps cloud users solve the problem of real-time data storage
1. Introduction to CDC
CDC is the abbreviation of Change Data Capture, that is, change data capture. For example, at the very beginning, we used tools to import business data into data warehouses and data lakes. Later, when importing data, we hoped to reflect the dynamic changes of the data, perform incremental imports, and be able to capture these changed data as soon as possible, so that faster Follow-up analysis can be carried out in a timely manner, and CDC technology can help us capture these changing data.
In the big data scenario, our commonly used tool is Sqoop, which is a batch mode tool, and we can use it to import data from the business library to the data warehouse. It should be noted that before importing, we need to select fields that can reflect time changes in the data in the business library, and then import the changed data into the data warehouse according to the timestamp. This is a limitation of using it. In addition, this tool has the following disadvantages:
put pressure on the source library;
The delay is large, depending on the frequency of calling it;
The delete event cannot be processed, and the deleted data in the source database cannot be deleted in the data warehouse synchronously;
Unable to cope with schema changes, once the schema in the source library changes, the table model in the logarithmic warehouse will be remodeled and imported.
In addition to using sqoop, another way is to use binlog for data synchronization. The source library will generate a binlog when performing operations such as inserting, updating, and deleting. We only need to enter the binlog into Kafka, read the binlog from Kafka, and perform corresponding operations after parsing one by one. However, this method requires the downstream to support relatively frequent update/delete operations to cope with frequent upstream update/delete situations. Here you can choose KUDU or HBASE as the target storage. However, since KUDU and HBASE are not data warehouses, they cannot store the full amount of data, so the data in them need to be imported into Hive regularly, as shown in the figure below. It should be noted that this method has disadvantages such as heavy operation and maintenance pressure of multiple components, and complicated Merge logic.
2. CDC scheme based on Spark Streaming SQL & Delta
(1) Spark Streaming SQL
Spark Streaming SQL is the SQL support developed by the EMR team of Alibaba Computing Platform Division based on Spark Streaming, and the community version does not have it. Spark Streaming SQL is not necessary in this CDC solution, but it is more user-friendly, especially for users who are accustomed to using SQL, so the EMR team developed support for Spark Streaming SQL. As shown in the figure below, EMR's Spark Streaming SQL implements support for SQL syntax in many aspects, such as DDL, DML, SELECT, etc., and a few are introduced below.
(1) CREATE SCAN & CREATE STREAM
Shown below is an example, our goal is to select some data from a table in KafKa, the design goal is to support both batch and stream as much as possible. In ordinary SQL, select actually generates a read operation, but here in order to distinguish between batch and Streaming, we need an explicit create scan, because we cannot distinguish from the data source whether it is a batch read or a Streaming read. If it is a batch, We use USING batch, if it is Streaming, we use USING stream.
For batch scan, after creating scan, you can directly select from scan and treat scan as a table; however, for Streaming, if you want to read this scan, you need to design a lot of parameters, because you want to initiate a job, so you have the following The create stream syntax shown in the figure is essentially an encapsulation of the select syntax.
(2) MERGE INTO
Another core syntax is MERGE INTO, which plays a very important role in Delta Lake's CDC solution. The syntax of MERGE INTO is more complicated, as shown in the figure below. It should be noted that the mergeCondition in MERGE INTO must have a one-to-one correspondence between the source table and the target table. Otherwise, if one source record corresponds to multiple target records, the system will not know which one to operate on. So here it is actually required that mergeCondition is a primary key join, or equivalent to the effect of a primary key join.
In addition to the grammars introduced above, we have also implemented some other UDFs, such as DELAY, TUMBLING, etc., to make it easier for everyone to use Spark Streaming SQL.
(2) Delta Lake
Data lake is a hot technology in recent years. In the early days, everyone used some relatively mature data warehouse systems, and the data was imported into the data warehouse through ETL. The typical use of the data warehouse is for analysis scenarios such as BI reports, and the scenarios are relatively limited. In the mobile Internet era, data sources are more abundant and diverse, data structures are not limited to structured data, and data uses are not limited to analysis, so data lakes have emerged. The data is not processed first, or only simple processing is performed and imported into the data lake, and then transformation operations such as screening, filtering, and conversion are performed, so the ETL of the data warehouse era has become the ELT of the data lake era.
The typical architecture of a data lake is one/or multiple analysis engines/or other computing frameworks on the upper layer, and a distributed storage system on the lower layer, as shown on the left side of the figure below. However, this original data lake usage lacks management, such as lack of transaction support, lack of data quality verification, etc., and all data management is completely guaranteed manually.
Delta Lake is to put a layer of management on top of the unified storage layer to solve the pain points of people manually managing data lake data. With the addition of a layer of management, we can first introduce meta data management. With meta data management, if the data has a schema, we can manage the schema, verify the data quality during the data storage process, and Data that does not match are eliminated. In addition, after managing the meta data, ACID Transactions can also be implemented, which is the characteristics of transactions. If concurrent operations are performed when there is no management layer, multiple operations may affect each other. For example, when one user is inquiring, another user performs a delete operation. With the support of transactions, this situation can be avoided. With the support of transactions, each operation will generate a snapshot, and all operations will generate a sequence of snapshots, which is convenient for backtracking in time, that is, time travel.
The comparison of the main features of Data Warehouse, Data Lake and Delta Lake is shown in the figure below. It can be seen that Delta Lake is equivalent to combining the advantages of Data Warehouse and Data Lake, and introducing a management layer to solve most of the shortcomings of both.
(3) CDC scheme based on Spark Streaming SQL & Delta
So, let's go back to our topic now, that is, how to implement the CDC solution based on Spark Streaming SQL & Delta? As shown in the figure below, it is still from binlog to KafKa first. The difference from the previous method is that there is no need to replay the binlog in KafKa to HBASE or KUDU, but directly into Delta Lake. This solution is easy to use, requires no additional operation and maintenance, and the Merge logic is easy to implement, and it is almost a real-time data stream.
The specific operation steps of the above scheme are shown in the figure below. Its essence is to continuously give each mini batch to Merge INTO into the target table. Since Spark Streaming's mini batch scheduling can be set at the second level, this solution achieves near real-time data synchronization.
We also encountered some problems during the actual implementation of the program, the most important one is the problem of small files. For example, if a batch is executed every five seconds, there will be a lot of batches in a day, which may generate a large number of small files. Serious Affect the query performance of the table. For the problem of small files, the solutions are as follows:
Increase the scheduling batch interval: If the real-time requirements are not very high, you can increase the scheduling batch interval to reduce the frequency of small files;
Merge small files: Merge small files to reduce the number of small files. The syntax is as follows:
OPTIMIZE WHERE where_clause]
Adaptive execution: Adaptive execution can combine some small reduce tasks, thereby reducing the number of small files.
For the optimize trigger of small file merging, we have done two ways. The first is automated optimize, which is to check whether it needs to be merged after each mini batch is executed. If not, skip to the next mini batch. There are many judgment rules, such as small files reaching a certain number, total The files are merged when they reach a certain size. Of course, some optimizations are also made during the merge, such as filtering out files that are already relatively large. The automated optimize method requires a merge operation every time a certain number of batches pass, which may have a certain impact on data intake. Therefore, there is a second method, which is to execute optimize regularly. This method is suitable for real-time data intake. No effect. However, the method of periodically executing optimize will have the problem of transaction conflict, that is, conflict between optimize and flow. In this case, we have optimized the transaction submission mechanism inside Delta so that the insert flow does not have to fail. If update/delete is performed before optimize , and the optimize is successful, then a retry process must be added after the success to avoid the stream from being broken.
The implementation of OPTIMIZE is also relatively complicated. We have developed a bin-packing mechanism and an adaptive mechanism. The effect achieved is that all files (except the last one) reach the target size (such as 128M) after OPTIMIZE, regardless of whether re- partition.
3. Future work
In the future, the following aspects will be our work goals:
(1) Automatic Schema detection
Users using Delta Lake may not only be exposed to business data, but also machine data. In many scenarios, fields of machine data may change. For users in this scenario, an automatic schema detection mechanism is urgently needed. Our goal in the next stage is to automatically detect new fields, changed fields, etc. during binlog parsing, and reflect them in the Delta table.
(2) Streaming Merge performance (Merge on Read)
As mentioned above, the CDC scheme of Spark Streaming SQL & Delta essentially initiates a stream processing, and then merges the data into the target table according to the mini batch. The realization of merge is actually a join. When the table becomes larger and larger, merge The performance will get worse and worse, seriously affecting performance. The way to solve this problem is to adopt the method of merge on read, which is similar to the method of HIVE and is our next goal.
(3) Easier-to-use experience
It can be seen that the CDC solution mentioned above still requires users to have certain professional knowledge and some manual work. In the next step, we hope to provide an easier-to-use experience and further reduce the burden on users.
CDC is the abbreviation of Change Data Capture, that is, change data capture. For example, at the very beginning, we used tools to import business data into data warehouses and data lakes. Later, when importing data, we hoped to reflect the dynamic changes of the data, perform incremental imports, and be able to capture these changed data as soon as possible, so that faster Follow-up analysis can be carried out in a timely manner, and CDC technology can help us capture these changing data.
In the big data scenario, our commonly used tool is Sqoop, which is a batch mode tool, and we can use it to import data from the business library to the data warehouse. It should be noted that before importing, we need to select fields that can reflect time changes in the data in the business library, and then import the changed data into the data warehouse according to the timestamp. This is a limitation of using it. In addition, this tool has the following disadvantages:
put pressure on the source library;
The delay is large, depending on the frequency of calling it;
The delete event cannot be processed, and the deleted data in the source database cannot be deleted in the data warehouse synchronously;
Unable to cope with schema changes, once the schema in the source library changes, the table model in the logarithmic warehouse will be remodeled and imported.
In addition to using sqoop, another way is to use binlog for data synchronization. The source library will generate a binlog when performing operations such as inserting, updating, and deleting. We only need to enter the binlog into Kafka, read the binlog from Kafka, and perform corresponding operations after parsing one by one. However, this method requires the downstream to support relatively frequent update/delete operations to cope with frequent upstream update/delete situations. Here you can choose KUDU or HBASE as the target storage. However, since KUDU and HBASE are not data warehouses, they cannot store the full amount of data, so the data in them need to be imported into Hive regularly, as shown in the figure below. It should be noted that this method has disadvantages such as heavy operation and maintenance pressure of multiple components, and complicated Merge logic.
2. CDC scheme based on Spark Streaming SQL & Delta
(1) Spark Streaming SQL
Spark Streaming SQL is the SQL support developed by the EMR team of Alibaba Computing Platform Division based on Spark Streaming, and the community version does not have it. Spark Streaming SQL is not necessary in this CDC solution, but it is more user-friendly, especially for users who are accustomed to using SQL, so the EMR team developed support for Spark Streaming SQL. As shown in the figure below, EMR's Spark Streaming SQL implements support for SQL syntax in many aspects, such as DDL, DML, SELECT, etc., and a few are introduced below.
(1) CREATE SCAN & CREATE STREAM
Shown below is an example, our goal is to select some data from a table in KafKa, the design goal is to support both batch and stream as much as possible. In ordinary SQL, select actually generates a read operation, but here in order to distinguish between batch and Streaming, we need an explicit create scan, because we cannot distinguish from the data source whether it is a batch read or a Streaming read. If it is a batch, We use USING batch, if it is Streaming, we use USING stream.
For batch scan, after creating scan, you can directly select from scan and treat scan as a table; however, for Streaming, if you want to read this scan, you need to design a lot of parameters, because you want to initiate a job, so you have the following The create stream syntax shown in the figure is essentially an encapsulation of the select syntax.
(2) MERGE INTO
Another core syntax is MERGE INTO, which plays a very important role in Delta Lake's CDC solution. The syntax of MERGE INTO is more complicated, as shown in the figure below. It should be noted that the mergeCondition in MERGE INTO must have a one-to-one correspondence between the source table and the target table. Otherwise, if one source record corresponds to multiple target records, the system will not know which one to operate on. So here it is actually required that mergeCondition is a primary key join, or equivalent to the effect of a primary key join.
In addition to the grammars introduced above, we have also implemented some other UDFs, such as DELAY, TUMBLING, etc., to make it easier for everyone to use Spark Streaming SQL.
(2) Delta Lake
Data lake is a hot technology in recent years. In the early days, everyone used some relatively mature data warehouse systems, and the data was imported into the data warehouse through ETL. The typical use of the data warehouse is for analysis scenarios such as BI reports, and the scenarios are relatively limited. In the mobile Internet era, data sources are more abundant and diverse, data structures are not limited to structured data, and data uses are not limited to analysis, so data lakes have emerged. The data is not processed first, or only simple processing is performed and imported into the data lake, and then transformation operations such as screening, filtering, and conversion are performed, so the ETL of the data warehouse era has become the ELT of the data lake era.
The typical architecture of a data lake is one/or multiple analysis engines/or other computing frameworks on the upper layer, and a distributed storage system on the lower layer, as shown on the left side of the figure below. However, this original data lake usage lacks management, such as lack of transaction support, lack of data quality verification, etc., and all data management is completely guaranteed manually.
Delta Lake is to put a layer of management on top of the unified storage layer to solve the pain points of people manually managing data lake data. With the addition of a layer of management, we can first introduce meta data management. With meta data management, if the data has a schema, we can manage the schema, verify the data quality during the data storage process, and Data that does not match are eliminated. In addition, after managing the meta data, ACID Transactions can also be implemented, which is the characteristics of transactions. If concurrent operations are performed when there is no management layer, multiple operations may affect each other. For example, when one user is inquiring, another user performs a delete operation. With the support of transactions, this situation can be avoided. With the support of transactions, each operation will generate a snapshot, and all operations will generate a sequence of snapshots, which is convenient for backtracking in time, that is, time travel.
The comparison of the main features of Data Warehouse, Data Lake and Delta Lake is shown in the figure below. It can be seen that Delta Lake is equivalent to combining the advantages of Data Warehouse and Data Lake, and introducing a management layer to solve most of the shortcomings of both.
(3) CDC scheme based on Spark Streaming SQL & Delta
So, let's go back to our topic now, that is, how to implement the CDC solution based on Spark Streaming SQL & Delta? As shown in the figure below, it is still from binlog to KafKa first. The difference from the previous method is that there is no need to replay the binlog in KafKa to HBASE or KUDU, but directly into Delta Lake. This solution is easy to use, requires no additional operation and maintenance, and the Merge logic is easy to implement, and it is almost a real-time data stream.
The specific operation steps of the above scheme are shown in the figure below. Its essence is to continuously give each mini batch to Merge INTO into the target table. Since Spark Streaming's mini batch scheduling can be set at the second level, this solution achieves near real-time data synchronization.
We also encountered some problems during the actual implementation of the program, the most important one is the problem of small files. For example, if a batch is executed every five seconds, there will be a lot of batches in a day, which may generate a large number of small files. Serious Affect the query performance of the table. For the problem of small files, the solutions are as follows:
Increase the scheduling batch interval: If the real-time requirements are not very high, you can increase the scheduling batch interval to reduce the frequency of small files;
Merge small files: Merge small files to reduce the number of small files. The syntax is as follows:
OPTIMIZE WHERE where_clause]
Adaptive execution: Adaptive execution can combine some small reduce tasks, thereby reducing the number of small files.
For the optimize trigger of small file merging, we have done two ways. The first is automated optimize, which is to check whether it needs to be merged after each mini batch is executed. If not, skip to the next mini batch. There are many judgment rules, such as small files reaching a certain number, total The files are merged when they reach a certain size. Of course, some optimizations are also made during the merge, such as filtering out files that are already relatively large. The automated optimize method requires a merge operation every time a certain number of batches pass, which may have a certain impact on data intake. Therefore, there is a second method, which is to execute optimize regularly. This method is suitable for real-time data intake. No effect. However, the method of periodically executing optimize will have the problem of transaction conflict, that is, conflict between optimize and flow. In this case, we have optimized the transaction submission mechanism inside Delta so that the insert flow does not have to fail. If update/delete is performed before optimize , and the optimize is successful, then a retry process must be added after the success to avoid the stream from being broken.
The implementation of OPTIMIZE is also relatively complicated. We have developed a bin-packing mechanism and an adaptive mechanism. The effect achieved is that all files (except the last one) reach the target size (such as 128M) after OPTIMIZE, regardless of whether re- partition.
3. Future work
In the future, the following aspects will be our work goals:
(1) Automatic Schema detection
Users using Delta Lake may not only be exposed to business data, but also machine data. In many scenarios, fields of machine data may change. For users in this scenario, an automatic schema detection mechanism is urgently needed. Our goal in the next stage is to automatically detect new fields, changed fields, etc. during binlog parsing, and reflect them in the Delta table.
(2) Streaming Merge performance (Merge on Read)
As mentioned above, the CDC scheme of Spark Streaming SQL & Delta essentially initiates a stream processing, and then merges the data into the target table according to the mini batch. The realization of merge is actually a join. When the table becomes larger and larger, merge The performance will get worse and worse, seriously affecting performance. The way to solve this problem is to adopt the method of merge on read, which is similar to the method of HIVE and is our next goal.
(3) Easier-to-use experience
It can be seen that the CDC solution mentioned above still requires users to have certain professional knowledge and some manual work. In the next step, we hope to provide an easier-to-use experience and further reduce the burden on users.
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00