Practice of Accelerating Hive Daily Table Production Based on Flink
1. Project Background
SmartNews is a machine learning driven internet company. Established in Tokyo, Japan since 2012, with offices in the US and China. After more than 8 years of development, SmartNews has grown into the No. 1 news app in Japan and the fastest-growing news app in the United States, covering more than 150 countries and markets around the world. According to statistics at the beginning of 2019, the iOS and Android versions of SmartNews have been downloaded more than 50 million times worldwide.
In the past 9 years, SmartNews has built a large number of data sets based on technology stacks such as Airflow, Hive, and EMR. As the amount of data grows, the processing time of these offline tables is gradually lengthening. In addition, with the acceleration of the iterative rhythm of the business side, higher requirements are placed on the real-time performance of the table. Therefore, SmartNews initiated the Speedy Batch project internally to speed up the existing offline table production efficiency.
This sharing is an example of the Speedy Batch project, which accelerates the practice of user behavior (actions) tables.
The user behavior log reported by the APP is used to generate a daily table through Hive jobs every day. This table is the source of many other tables and is very important. This job needs to run for 3 hours, which increases the latency of many downstream tables and significantly affects the experience of users such as data scientists and product managers. So we need to speed up these jobs so that the tables are available earlier.
The company's business is basically on the public cloud, and the original logs of the server are uploaded to the cloud storage in the form of files, which are partitioned by day; the current jobs are scheduled to run on the EMR with Airflow, and Hive daily tables are generated, and the data is stored in the cloud storage.
2. Definition of the problem
1. Input
The news server uploads a raw log file every 30 seconds, and the file is uploaded to the cloud storage directory for the corresponding date and hour.
2. Output
After the raw logs are processed by ETL, they are output in two levels of partitions: day (dt) and behavior (action). There are about 300 types of actions, which are not fixed and often increase or decrease.
3. User
Use of this table is extensive and multi-faceted. There are queries from Hive, and queries from Presto, Jupyter, and Spark, and we can't even be sure that the above are all access methods.
3. Objectives of the project
Reduced latency of actions table from 3 hours to 30 minutes;
Be transparent to downstream users. Transparency is divided into two aspects:
In terms of function: users do not need to modify any code, so it is completely insensitive
In terms of performance: the table generated by the new project should not cause performance degradation when reading downstream
4. Technology selection
Before this project, colleagues have made several rounds of improvement on this assignment, but the effect is not very significant.
Tried solutions include increasing resources and investing more machines, but encountered the IOPS limitation of cloud storage: each prefix supports up to 3000 concurrent reads and writes. This problem is especially obvious in the output stage, that is, multiple reducers simultaneously send to the same When the action subdirectory is output, it is easy to encounter this limitation. In addition, I also tried preprocessing by the hour, and then merged into a daily table in the early morning of each day, but the merging process also took a lot of time, and the overall delay was still about 2.5 hours, and the effect was not significant enough.
In view of the fact that the server-side logs are uploaded to cloud storage in near real time, the team proposed the idea of stream processing, abandoning the mode of batch jobs waiting for one day and processing for 3 hours, but spreading the calculations throughout the day, thereby reducing the workload after the end of the day. processing time. The team has a good background on Flink, and Flink has made many improvements to Hive recently, so it decided to adopt a Flink-based solution.
5. Technical challenges
The challenges are manifold.
1. Output RC file format
The file format of the current Hive table is RCFile. In order to ensure transparency to users, we can only do in-place upgrades on the existing Hive table, that is, we have to reuse the current table, so the file format output by Flink must also conform to RCFile format, because a Hive table can only have one format.
RCFile belongs to bulk format (corresponding to row format), and must be output once at each checkpoint. If we choose a checkpoint every 5 minutes, each action must output a file every 5 minutes, which will greatly increase the number of result files, which will affect the downstream reading performance. Especially for low-frequency actions, the number of files will increase hundreds of times. We have learned about Flink's file merging function, but that is the merging of multiple sink data within a checkpoint, which does not solve our problem. What we need is file merging across checkpoints.
The team considered outputting in row format (e.g. CSV) and then implementing a custom Hive SerDe to make it compatible with RCFile and CSV. But we quickly gave up on this idea, because in that case, the Hybrid SerDe needs to be implemented for each query scenario, for example, it needs to be implemented for Presto, for Spark, and so on.
On the one hand, we cannot invest so many resources;
On the other hand, that kind of solution is also felt by users. After all, users still need to install this custom SerDe.
We previously proposed generating a table in a new format, but this was also rejected for not being transparent enough to users.
2. Perceptibility and integrity of Partition
How to make downstream jobs aware that the partition is ready that day? The actions table is divided into two levels partition, dt and action. Action belongs to the dynamic partition of Hive, and the number is large and not fixed. The current Airflow downstream job waits for the insert_actions Hive task to complete before starting execution. This is no problem, because when insert_actions ends, all action partitions are ready. But for Flink jobs, there is no end signal, and it can only submit partitions one by one to Hive, such as dt=2021-05-29/action=refresh. Because of the large number of actions, the process of submitting partitions may last for several minutes. Therefore, we cannot allow Airflow jobs to perceive partitions at the dt level, which may trigger downstream when there are only some actions.
3. Streaming read cloud storage files
The input of the project is a cloud storage file uploaded continuously, not from MQ (message queue). Flink supports FileStreamingSource, which can read files in a streaming manner, but it is based on timing the list directory to discover new files. But this solution is not suitable for our scenario, because our directory is too large, and the cloud storage list operation cannot be completed at all.
4. Exactly Once Guarantee
Given the importance of the actions table, users cannot accept any data loss or duplication, so the entire solution needs to be processed exactly once.
6 Overall plan and response to challenges
1. Output RCFile and avoid small files
The solution we finally chose is to go in two steps. The first Flink job is output in json (row format) format, and then another Flink job is used to convert Json to RC format. This solves the problem that Flink cannot happily output RC files of appropriate size.
Output the intermediate results of json, so that we can control the size of the output file through the Rolling Policy, which can be saved across multiple checkpoints to be large enough, or for a long enough time, and then output to cloud storage. Here Flink actually uses the Multi Part Upload (MPU) function of cloud storage, that is, each checkpoint Flink also uploads the data saved in the current checkpoint to cloud storage, but the output is not a file, but a part. Finally, when multiple parts meet the size or time requirements, you can call the cloud storage interface to merge multiple parts into one file. This merging operation is completed on the cloud storage side, and the application side does not need to read the part again to merge locally and then upload it. . The Bulk format requires one-time global processing, so it cannot be uploaded in parts and then merged, and must be uploaded all at once.
When the second job senses that a new json file is uploaded, it loads it, converts it into an RCFile, and uploads it to the final path. The delay caused by this process is small, and a file can be controlled within 10s, which is acceptable.
2. Elegant perceptual input files
On the input side, instead of using Flink's FileStreamingSource, it uses cloud storage's event notification to perceive the generation of new files, and then actively loads the file after receiving the notification.
3. Perceptibility and integrity of Partition
At the output end, we output the dt-level success file to allow the downstream to reliably perceive the readiness of the daily table. We implement a custom StreamingFileWriter to output the partitionCreated and partitionInactive signals, and implement a custom PartitionCommitter to judge the end of the daily table based on the above signals.
The mechanism is as follows, each cloud storage writer starts to write a certain action, it will send a partitionCreated signal, and when it ends, it will send a partitionInactive signal. PartitionCommitter judges whether all partitions are inactive within a certain day. If yes, all the data of the day is processed, and outputs a dt-level success file. Airflow judges whether Flink has completed the processing of the daily table by sensing this file.
4. Exactly Once
The event notification of cloud storage provides At Least once guarantee. The file level is deduplicated in the Flink job. The job adopts the checkpoint setting of Exactly Once. The cloud storage file output is based on the MPU mechanism, which is equivalent to supporting truncate. Therefore, the cloud storage output is equivalent to idempotent, so it is equivalent to the end-to-end Exactly Once.
7. Project achievements and prospects
The project has gone live with a latency of around 34 minutes, including a 15-minute wait for late files.
The first Flink job takes about 8 minutes to complete checkpoint and output, and the json to rc job takes 12 minutes to complete all processing. We can continue to compress this time, but considering timeliness and cost, we choose the current state.
The job of converting json to rc takes more time than originally expected, because the last checkpoint of the upstream job outputs too many files, resulting in a long overall time-consuming, which can be linearly reduced by increasing the concurrency of the job.
The number of files output has increased compared to the number of files output by the batch job, by about 50%. This is the disadvantage of stream processing over batch processing. Stream processing needs to output a file when the time arrives, and the file size may not meet the expectation at this time. Fortunately, this level of increase in the number of files does not significantly affect downstream performance.
The downstream is fully transparent, and no abnormal user feedback has been received before and after the entire launch.
This project allowed us to verify the use of the stream processing framework Flink to seamlessly intervene in the batch processing system in the production environment to achieve partial improvements that users do not feel. In the future, we will use the same technology to accelerate the production of more other Hive tables, and widely provide the production of finer-grained Hive representations, such as hourly level. On the other hand, we will explore the use of data lake to manage batch-flow integrated data and realize the gradual convergence of the technology stack.
SmartNews is a machine learning driven internet company. Established in Tokyo, Japan since 2012, with offices in the US and China. After more than 8 years of development, SmartNews has grown into the No. 1 news app in Japan and the fastest-growing news app in the United States, covering more than 150 countries and markets around the world. According to statistics at the beginning of 2019, the iOS and Android versions of SmartNews have been downloaded more than 50 million times worldwide.
In the past 9 years, SmartNews has built a large number of data sets based on technology stacks such as Airflow, Hive, and EMR. As the amount of data grows, the processing time of these offline tables is gradually lengthening. In addition, with the acceleration of the iterative rhythm of the business side, higher requirements are placed on the real-time performance of the table. Therefore, SmartNews initiated the Speedy Batch project internally to speed up the existing offline table production efficiency.
This sharing is an example of the Speedy Batch project, which accelerates the practice of user behavior (actions) tables.
The user behavior log reported by the APP is used to generate a daily table through Hive jobs every day. This table is the source of many other tables and is very important. This job needs to run for 3 hours, which increases the latency of many downstream tables and significantly affects the experience of users such as data scientists and product managers. So we need to speed up these jobs so that the tables are available earlier.
The company's business is basically on the public cloud, and the original logs of the server are uploaded to the cloud storage in the form of files, which are partitioned by day; the current jobs are scheduled to run on the EMR with Airflow, and Hive daily tables are generated, and the data is stored in the cloud storage.
2. Definition of the problem
1. Input
The news server uploads a raw log file every 30 seconds, and the file is uploaded to the cloud storage directory for the corresponding date and hour.
2. Output
After the raw logs are processed by ETL, they are output in two levels of partitions: day (dt) and behavior (action). There are about 300 types of actions, which are not fixed and often increase or decrease.
3. User
Use of this table is extensive and multi-faceted. There are queries from Hive, and queries from Presto, Jupyter, and Spark, and we can't even be sure that the above are all access methods.
3. Objectives of the project
Reduced latency of actions table from 3 hours to 30 minutes;
Be transparent to downstream users. Transparency is divided into two aspects:
In terms of function: users do not need to modify any code, so it is completely insensitive
In terms of performance: the table generated by the new project should not cause performance degradation when reading downstream
4. Technology selection
Before this project, colleagues have made several rounds of improvement on this assignment, but the effect is not very significant.
Tried solutions include increasing resources and investing more machines, but encountered the IOPS limitation of cloud storage: each prefix supports up to 3000 concurrent reads and writes. This problem is especially obvious in the output stage, that is, multiple reducers simultaneously send to the same When the action subdirectory is output, it is easy to encounter this limitation. In addition, I also tried preprocessing by the hour, and then merged into a daily table in the early morning of each day, but the merging process also took a lot of time, and the overall delay was still about 2.5 hours, and the effect was not significant enough.
In view of the fact that the server-side logs are uploaded to cloud storage in near real time, the team proposed the idea of stream processing, abandoning the mode of batch jobs waiting for one day and processing for 3 hours, but spreading the calculations throughout the day, thereby reducing the workload after the end of the day. processing time. The team has a good background on Flink, and Flink has made many improvements to Hive recently, so it decided to adopt a Flink-based solution.
5. Technical challenges
The challenges are manifold.
1. Output RC file format
The file format of the current Hive table is RCFile. In order to ensure transparency to users, we can only do in-place upgrades on the existing Hive table, that is, we have to reuse the current table, so the file format output by Flink must also conform to RCFile format, because a Hive table can only have one format.
RCFile belongs to bulk format (corresponding to row format), and must be output once at each checkpoint. If we choose a checkpoint every 5 minutes, each action must output a file every 5 minutes, which will greatly increase the number of result files, which will affect the downstream reading performance. Especially for low-frequency actions, the number of files will increase hundreds of times. We have learned about Flink's file merging function, but that is the merging of multiple sink data within a checkpoint, which does not solve our problem. What we need is file merging across checkpoints.
The team considered outputting in row format (e.g. CSV) and then implementing a custom Hive SerDe to make it compatible with RCFile and CSV. But we quickly gave up on this idea, because in that case, the Hybrid SerDe needs to be implemented for each query scenario, for example, it needs to be implemented for Presto, for Spark, and so on.
On the one hand, we cannot invest so many resources;
On the other hand, that kind of solution is also felt by users. After all, users still need to install this custom SerDe.
We previously proposed generating a table in a new format, but this was also rejected for not being transparent enough to users.
2. Perceptibility and integrity of Partition
How to make downstream jobs aware that the partition is ready that day? The actions table is divided into two levels partition, dt and action. Action belongs to the dynamic partition of Hive, and the number is large and not fixed. The current Airflow downstream job waits for the insert_actions Hive task to complete before starting execution. This is no problem, because when insert_actions ends, all action partitions are ready. But for Flink jobs, there is no end signal, and it can only submit partitions one by one to Hive, such as dt=2021-05-29/action=refresh. Because of the large number of actions, the process of submitting partitions may last for several minutes. Therefore, we cannot allow Airflow jobs to perceive partitions at the dt level, which may trigger downstream when there are only some actions.
3. Streaming read cloud storage files
The input of the project is a cloud storage file uploaded continuously, not from MQ (message queue). Flink supports FileStreamingSource, which can read files in a streaming manner, but it is based on timing the list directory to discover new files. But this solution is not suitable for our scenario, because our directory is too large, and the cloud storage list operation cannot be completed at all.
4. Exactly Once Guarantee
Given the importance of the actions table, users cannot accept any data loss or duplication, so the entire solution needs to be processed exactly once.
6 Overall plan and response to challenges
1. Output RCFile and avoid small files
The solution we finally chose is to go in two steps. The first Flink job is output in json (row format) format, and then another Flink job is used to convert Json to RC format. This solves the problem that Flink cannot happily output RC files of appropriate size.
Output the intermediate results of json, so that we can control the size of the output file through the Rolling Policy, which can be saved across multiple checkpoints to be large enough, or for a long enough time, and then output to cloud storage. Here Flink actually uses the Multi Part Upload (MPU) function of cloud storage, that is, each checkpoint Flink also uploads the data saved in the current checkpoint to cloud storage, but the output is not a file, but a part. Finally, when multiple parts meet the size or time requirements, you can call the cloud storage interface to merge multiple parts into one file. This merging operation is completed on the cloud storage side, and the application side does not need to read the part again to merge locally and then upload it. . The Bulk format requires one-time global processing, so it cannot be uploaded in parts and then merged, and must be uploaded all at once.
When the second job senses that a new json file is uploaded, it loads it, converts it into an RCFile, and uploads it to the final path. The delay caused by this process is small, and a file can be controlled within 10s, which is acceptable.
2. Elegant perceptual input files
On the input side, instead of using Flink's FileStreamingSource, it uses cloud storage's event notification to perceive the generation of new files, and then actively loads the file after receiving the notification.
3. Perceptibility and integrity of Partition
At the output end, we output the dt-level success file to allow the downstream to reliably perceive the readiness of the daily table. We implement a custom StreamingFileWriter to output the partitionCreated and partitionInactive signals, and implement a custom PartitionCommitter to judge the end of the daily table based on the above signals.
The mechanism is as follows, each cloud storage writer starts to write a certain action, it will send a partitionCreated signal, and when it ends, it will send a partitionInactive signal. PartitionCommitter judges whether all partitions are inactive within a certain day. If yes, all the data of the day is processed, and outputs a dt-level success file. Airflow judges whether Flink has completed the processing of the daily table by sensing this file.
4. Exactly Once
The event notification of cloud storage provides At Least once guarantee. The file level is deduplicated in the Flink job. The job adopts the checkpoint setting of Exactly Once. The cloud storage file output is based on the MPU mechanism, which is equivalent to supporting truncate. Therefore, the cloud storage output is equivalent to idempotent, so it is equivalent to the end-to-end Exactly Once.
7. Project achievements and prospects
The project has gone live with a latency of around 34 minutes, including a 15-minute wait for late files.
The first Flink job takes about 8 minutes to complete checkpoint and output, and the json to rc job takes 12 minutes to complete all processing. We can continue to compress this time, but considering timeliness and cost, we choose the current state.
The job of converting json to rc takes more time than originally expected, because the last checkpoint of the upstream job outputs too many files, resulting in a long overall time-consuming, which can be linearly reduced by increasing the concurrency of the job.
The number of files output has increased compared to the number of files output by the batch job, by about 50%. This is the disadvantage of stream processing over batch processing. Stream processing needs to output a file when the time arrives, and the file size may not meet the expectation at this time. Fortunately, this level of increase in the number of files does not significantly affect downstream performance.
The downstream is fully transparent, and no abnormal user feedback has been received before and after the entire launch.
This project allowed us to verify the use of the stream processing framework Flink to seamlessly intervene in the batch processing system in the production environment to achieve partial improvements that users do not feel. In the future, we will use the same technology to accelerate the production of more other Hive tables, and widely provide the production of finer-grained Hive representations, such as hourly level. On the other hand, we will explore the use of data lake to manage batch-flow integrated data and realize the gradual convergence of the technology stack.
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00