The practice of Apache Flink in mobile cloud real-time computing
1. Introduction of real-time computing platform
The evolution of real-time computing engines in the cloud is divided into several stages:
From 2015 to 2016, we used Apache Storm, the first generation real-time computing engine;
In 2017, we began to investigate Apache Spark Streaming, which can be integrated with the self-developed framework, reducing operation and maintenance pressure and maintenance costs;
In 2018, users had more and more demands for cloud computing, and Storm and Spark could no longer satisfy the business well. At the same time, we studied several well-known articles on stream computing and found that Apache Flink has relatively completely provided some of the semantics mentioned in the article;
From 2019 to 20, we started to implement cloud services and launched real-time computing platforms to public and private clouds;
From 2020 to 21, we began to investigate real-time data warehouses and put LakeHouse online on the cloud.
At present, Flink is mainly used for China Mobile's signaling digital processing, real-time user portraits and buried points, real-time data warehouses, real-time operation and maintenance monitoring, real-time recommendations, and cloud data pipeline services.
The functions of China Mobile's real-time computing platform are divided into three parts.
The first part is service management, which supports the hosting of task lifecycle, Flink and SQL jobs, Spark Streaming jobs, and support for multiple versions of the engine;
The second part is SQL support, which provides online Notebook writing, SQL syntax detection, UDF management and metadata management;
The third part is task operation and maintenance, which supports log retrieval of real-time tasks, collection of real-time performance indicators, message delay alarm and task backpressure alarm, etc.
In the daily task scenario, we found that the cost of user program debugging is relatively high, and the period for users to try new versions of the engine is also relatively long.
The process of multi-version submission is as follows: the user's task is first submitted to the rtp service, and the rtp service uploads the user program to HDFS for storage, and then pulls it back from HDFS and submits it to the Yarn cluster when it needs to be submitted. There is one thing in common for such tasks - the core package of Apache Flink is included in the job, which will cause many problems.
Therefore, first of all, we will communicate with the business, so that the job package does not include Flink's core package, but the benefits are relatively small, so we do a test on the platform side, and actively detect the user package when the user is uploading the jar package. Whether to include the core package. If a job is found to contain an illegal core package, the user will be blocked from submitting it.
Such a simple operation has brought great benefits to the company:
First, it greatly reduces the cost of locating some low-value bugs;
Second, job upgrades and rollbacks are more convenient;
Third, the stability and safety of the operation are improved.
In daily business scenarios, we need to verify the complex logic of the process through log retrieval. In addition, the UI log of the native TM cannot be opened, and it is easy to get stuck. And TM UI does not support retrieval. As shown in the figure above, when the business logic is very complex, Flink UI cannot provide the above functions. Therefore, we designed the real-time task log retrieval function.
The design of real-time task log retrieval needs to consider the following issues: How to collect job program logs and distribute TMs on different machines? How to collect logs without intruding into jobs? How to restrict jobs from printing a lot of useless logs?
For the first question, we use the push mode to reduce the pressure of collecting logs;
For the second question, refer to the AOP mechanism in spring, we use AspectJWeaver, the entry point is the input or event of log4j, and then send the log to Sender;
For the third question, we use RateLimiter for current limiting.
The figure above is the overall design of real-time task log retrieval. We have added an AOP layer under the native TaskManager, and the log will be sent to the task through the TaskManager first, and then sent to AOP. The entire AOP is imperceptible to users because of the aspect approach. Then it is sent to RateLimiter, and then to Sender, and RateLimiter performs current limiting operation. Then the log continues to be sent to Kafka, and the log will be sent to Elestic Search when doing retrieval.
With real-time task log retrieval, business programs can support log retrieval without any modification. At the same time, developers can easily verify business logic. Thanks to the throttling measures, there will be no log storage bottleneck. In addition, the pressure on platform management is also reduced.
2. Business optimization
The emergence of the business is to solve the needs of government departments at all levels for user resource data, including tourism departments, emergency departments, transportation industries, etc., such as traffic planning, traffic survey, population flow monitoring and floating population monitoring management in key areas such as tourist attractions etc.
Relying on the high coverage rate of mobile phone users, using communication network regional service technology and GIS technology, through the statistics of user signaling data, it analyzes and predicts factors such as urban population and mobility, and provides services for urban planning, traffic planning, management, Government management behaviors such as resource allocation, immigrant population management, and policy formulation provide decision-making data support.
The average daily business data is about 10PB, 20 trillion/day, and the size of a single piece of data is 0.5KB, including 2345G Internet data, location signaling, provinces and cities, network types, interface types, etc. Data processing is also more complicated, such as data encryption, compression, and version unification. The above figure shows the conditions and business logic when processing signaling numbers.
It simplifies the requirements and responds to the cluster, which is a reporting gateway. It will upload the signaling data from various places, receive the data from the Flume cluster, and then transmit it to the Hadoop cluster. As you can see from the figure above, there is a physical wall between Flume and Hadoop.
As the amount of data increases, we also encounter many problems:
First, the Flume cluster will always alarm and prompt Flume channel full;
Second, if the firewall exceeds the limit, an alarm will also be issued;
Third, when Flume writes to Kafka, the Kafka sender will send a timeout alarm;
Fourth, when downstream processing signaling data, Spark Streaming processing is unstable.
The above problems can be summarized into two categories:
The first category is write performance issues. Kafka frequently times out when writing, and there is a bottleneck in production performance. And Flume cannot reach the upper limit speed of the network card when sending data;
The second category is architectural design issues. The many components involved in the architecture lead to high maintenance costs; in addition, component responsibilities are not clear, such as data cleaning logic in Flume; Spark logic and processing logic are complex, there are multiple shuffle, and processing performance is unstable.
The first thing to solve is the timeout problem of PRO writing to Kafka. To solve this problem, we performed the following optimizations:
Optimized the firewall port;
Optimized some performance parameters of the Kafka server;
Some performance parameters were tuned on the Kafka server side.
But this does not completely solve the problem of timeout when Flume writes to Kafka, so we focus on the client. The first is how to optimize the parameters of the client, especially how to tune batch.size, buffer.memory and request.time.out. The second is how to achieve the maximum network speed of the stand-alone network, that is, how many concurrent clients are set in the case of a stand-alone.
After practice, we found that when the batch.size is 256 megabytes and buffer.memory is 128 megabytes, the performance will be optimal, but at this time the maximum speed of the network card has not been reached.
So we conducted a second round of testing, adding compression.type, expecting to increase the sending bandwidth by compressing the sent data, but the result did not meet our expectations.
This is because there is a problem in the lower version of Kafka. Every value of the parameter in its verification script is the same, so its compression ratio will be relatively large. But in the actual production environment, each number is different, so the compression ratio is very small.
Another question is how to achieve the maximum speed of the network card? The easiest way is to increase the degree of parallelism, but the greater the degree of parallelism, the better. After practice, it is found that when the concurrency is 4, the maximum speed of the network card can be reached. After exceeding 4, the average time consumption will increase significantly, and it will also cause Kafka write timeout.
The second point is the problem of Flume channel full.
When extending the service, the transaction API processing of the service is relatively low-level and needs to be processed manually. In addition, when the transaction data of the service is processed, the data needs to be copied. As shown in the figure above, when data is sent from the source to the channel, a copy of the data will be copied to the memory first, and when it is sent from the channel to the sink, it will be copied from the channel to the memory again. The two copies in this process waste resources. However, Flink relies on state management when doing transactions, so its processing performance is relatively stable. In addition, Flink has rich sources and sinks, and its scalability is relatively strong.
Therefore, we decided to use Flink instead of Flume to solve the problem. After replacing it with Flink, the collection performance has been improved, the performance bottleneck of massive data transmission has been solved, and the stability has been significantly improved. At the same time, the component responsibilities are clarified, and we transfer all the logic existing in the original service to the back-end real-time data decomposition, allowing the collection layer to focus on data aggregation, and the processing layer to focus on data sorting. In addition, we unified the technology stack and adopted the Flink framework end-to-end to achieve higher performance and reduce development and operation and maintenance costs.
The result is a 1/3 improvement in overall performance and reduced maintenance costs.
3. Stability practice
Job stability mainly refers to service failures and solutions. Service failures mainly include job failures, job consumption delays, job OOMs, and job restarts. The corresponding solution is to physically isolate jobs, downgrade services, strengthen resource monitoring, and split services.
The platform maintainers are most concerned about the overall issue.
If a server in the ZooKeeper cluster has a network service interruption, it will also cause a large number of task restarts. Flink JobManager will use ZooKeeper to elect the leader and discover the counter management of CheckpointID.
So we analyzed the transition of the ZooKeeper network state. When the client connects to the ZooKeeper cluster, its state is connected first. After the network is disconnected, it will change to the Suspended state. The Suspended state will be converted to the lost state, and will continue to be converted to the reconnected state. Flink relies on a curator2.0 component when using ZooKeeper. However, there is a defect in this component. When encountering the Suspended state, the leader will be discarded directly, which will cause most jobs to restart, which is not possible for our business. accepted.
4. Exploration of future directions
In the future, we will continue to explore mainly in these two directions:
First, the direction of resource utilization. Includes Elastic Scaling survey and K8s Yunikorn resource queue survey. We found that there is a resource queue problem after Flink goes to the cloud, so it is necessary to manage the user's resources by queue;
Second, the direction of the data lake. The first is the unified stream batch service gateway. Different engines may be used when building real-time data warehouses, such as Flink and Spark. They belong to two different sets of services, so a unified stream batch service gateway is required. Followed by data lineage, data assets and data quality services.
The evolution of real-time computing engines in the cloud is divided into several stages:
From 2015 to 2016, we used Apache Storm, the first generation real-time computing engine;
In 2017, we began to investigate Apache Spark Streaming, which can be integrated with the self-developed framework, reducing operation and maintenance pressure and maintenance costs;
In 2018, users had more and more demands for cloud computing, and Storm and Spark could no longer satisfy the business well. At the same time, we studied several well-known articles on stream computing and found that Apache Flink has relatively completely provided some of the semantics mentioned in the article;
From 2019 to 20, we started to implement cloud services and launched real-time computing platforms to public and private clouds;
From 2020 to 21, we began to investigate real-time data warehouses and put LakeHouse online on the cloud.
At present, Flink is mainly used for China Mobile's signaling digital processing, real-time user portraits and buried points, real-time data warehouses, real-time operation and maintenance monitoring, real-time recommendations, and cloud data pipeline services.
The functions of China Mobile's real-time computing platform are divided into three parts.
The first part is service management, which supports the hosting of task lifecycle, Flink and SQL jobs, Spark Streaming jobs, and support for multiple versions of the engine;
The second part is SQL support, which provides online Notebook writing, SQL syntax detection, UDF management and metadata management;
The third part is task operation and maintenance, which supports log retrieval of real-time tasks, collection of real-time performance indicators, message delay alarm and task backpressure alarm, etc.
In the daily task scenario, we found that the cost of user program debugging is relatively high, and the period for users to try new versions of the engine is also relatively long.
The process of multi-version submission is as follows: the user's task is first submitted to the rtp service, and the rtp service uploads the user program to HDFS for storage, and then pulls it back from HDFS and submits it to the Yarn cluster when it needs to be submitted. There is one thing in common for such tasks - the core package of Apache Flink is included in the job, which will cause many problems.
Therefore, first of all, we will communicate with the business, so that the job package does not include Flink's core package, but the benefits are relatively small, so we do a test on the platform side, and actively detect the user package when the user is uploading the jar package. Whether to include the core package. If a job is found to contain an illegal core package, the user will be blocked from submitting it.
Such a simple operation has brought great benefits to the company:
First, it greatly reduces the cost of locating some low-value bugs;
Second, job upgrades and rollbacks are more convenient;
Third, the stability and safety of the operation are improved.
In daily business scenarios, we need to verify the complex logic of the process through log retrieval. In addition, the UI log of the native TM cannot be opened, and it is easy to get stuck. And TM UI does not support retrieval. As shown in the figure above, when the business logic is very complex, Flink UI cannot provide the above functions. Therefore, we designed the real-time task log retrieval function.
The design of real-time task log retrieval needs to consider the following issues: How to collect job program logs and distribute TMs on different machines? How to collect logs without intruding into jobs? How to restrict jobs from printing a lot of useless logs?
For the first question, we use the push mode to reduce the pressure of collecting logs;
For the second question, refer to the AOP mechanism in spring, we use AspectJWeaver, the entry point is the input or event of log4j, and then send the log to Sender;
For the third question, we use RateLimiter for current limiting.
The figure above is the overall design of real-time task log retrieval. We have added an AOP layer under the native TaskManager, and the log will be sent to the task through the TaskManager first, and then sent to AOP. The entire AOP is imperceptible to users because of the aspect approach. Then it is sent to RateLimiter, and then to Sender, and RateLimiter performs current limiting operation. Then the log continues to be sent to Kafka, and the log will be sent to Elestic Search when doing retrieval.
With real-time task log retrieval, business programs can support log retrieval without any modification. At the same time, developers can easily verify business logic. Thanks to the throttling measures, there will be no log storage bottleneck. In addition, the pressure on platform management is also reduced.
2. Business optimization
The emergence of the business is to solve the needs of government departments at all levels for user resource data, including tourism departments, emergency departments, transportation industries, etc., such as traffic planning, traffic survey, population flow monitoring and floating population monitoring management in key areas such as tourist attractions etc.
Relying on the high coverage rate of mobile phone users, using communication network regional service technology and GIS technology, through the statistics of user signaling data, it analyzes and predicts factors such as urban population and mobility, and provides services for urban planning, traffic planning, management, Government management behaviors such as resource allocation, immigrant population management, and policy formulation provide decision-making data support.
The average daily business data is about 10PB, 20 trillion/day, and the size of a single piece of data is 0.5KB, including 2345G Internet data, location signaling, provinces and cities, network types, interface types, etc. Data processing is also more complicated, such as data encryption, compression, and version unification. The above figure shows the conditions and business logic when processing signaling numbers.
It simplifies the requirements and responds to the cluster, which is a reporting gateway. It will upload the signaling data from various places, receive the data from the Flume cluster, and then transmit it to the Hadoop cluster. As you can see from the figure above, there is a physical wall between Flume and Hadoop.
As the amount of data increases, we also encounter many problems:
First, the Flume cluster will always alarm and prompt Flume channel full;
Second, if the firewall exceeds the limit, an alarm will also be issued;
Third, when Flume writes to Kafka, the Kafka sender will send a timeout alarm;
Fourth, when downstream processing signaling data, Spark Streaming processing is unstable.
The above problems can be summarized into two categories:
The first category is write performance issues. Kafka frequently times out when writing, and there is a bottleneck in production performance. And Flume cannot reach the upper limit speed of the network card when sending data;
The second category is architectural design issues. The many components involved in the architecture lead to high maintenance costs; in addition, component responsibilities are not clear, such as data cleaning logic in Flume; Spark logic and processing logic are complex, there are multiple shuffle, and processing performance is unstable.
The first thing to solve is the timeout problem of PRO writing to Kafka. To solve this problem, we performed the following optimizations:
Optimized the firewall port;
Optimized some performance parameters of the Kafka server;
Some performance parameters were tuned on the Kafka server side.
But this does not completely solve the problem of timeout when Flume writes to Kafka, so we focus on the client. The first is how to optimize the parameters of the client, especially how to tune batch.size, buffer.memory and request.time.out. The second is how to achieve the maximum network speed of the stand-alone network, that is, how many concurrent clients are set in the case of a stand-alone.
After practice, we found that when the batch.size is 256 megabytes and buffer.memory is 128 megabytes, the performance will be optimal, but at this time the maximum speed of the network card has not been reached.
So we conducted a second round of testing, adding compression.type, expecting to increase the sending bandwidth by compressing the sent data, but the result did not meet our expectations.
This is because there is a problem in the lower version of Kafka. Every value of the parameter in its verification script is the same, so its compression ratio will be relatively large. But in the actual production environment, each number is different, so the compression ratio is very small.
Another question is how to achieve the maximum speed of the network card? The easiest way is to increase the degree of parallelism, but the greater the degree of parallelism, the better. After practice, it is found that when the concurrency is 4, the maximum speed of the network card can be reached. After exceeding 4, the average time consumption will increase significantly, and it will also cause Kafka write timeout.
The second point is the problem of Flume channel full.
When extending the service, the transaction API processing of the service is relatively low-level and needs to be processed manually. In addition, when the transaction data of the service is processed, the data needs to be copied. As shown in the figure above, when data is sent from the source to the channel, a copy of the data will be copied to the memory first, and when it is sent from the channel to the sink, it will be copied from the channel to the memory again. The two copies in this process waste resources. However, Flink relies on state management when doing transactions, so its processing performance is relatively stable. In addition, Flink has rich sources and sinks, and its scalability is relatively strong.
Therefore, we decided to use Flink instead of Flume to solve the problem. After replacing it with Flink, the collection performance has been improved, the performance bottleneck of massive data transmission has been solved, and the stability has been significantly improved. At the same time, the component responsibilities are clarified, and we transfer all the logic existing in the original service to the back-end real-time data decomposition, allowing the collection layer to focus on data aggregation, and the processing layer to focus on data sorting. In addition, we unified the technology stack and adopted the Flink framework end-to-end to achieve higher performance and reduce development and operation and maintenance costs.
The result is a 1/3 improvement in overall performance and reduced maintenance costs.
3. Stability practice
Job stability mainly refers to service failures and solutions. Service failures mainly include job failures, job consumption delays, job OOMs, and job restarts. The corresponding solution is to physically isolate jobs, downgrade services, strengthen resource monitoring, and split services.
The platform maintainers are most concerned about the overall issue.
If a server in the ZooKeeper cluster has a network service interruption, it will also cause a large number of task restarts. Flink JobManager will use ZooKeeper to elect the leader and discover the counter management of CheckpointID.
So we analyzed the transition of the ZooKeeper network state. When the client connects to the ZooKeeper cluster, its state is connected first. After the network is disconnected, it will change to the Suspended state. The Suspended state will be converted to the lost state, and will continue to be converted to the reconnected state. Flink relies on a curator2.0 component when using ZooKeeper. However, there is a defect in this component. When encountering the Suspended state, the leader will be discarded directly, which will cause most jobs to restart, which is not possible for our business. accepted.
4. Exploration of future directions
In the future, we will continue to explore mainly in these two directions:
First, the direction of resource utilization. Includes Elastic Scaling survey and K8s Yunikorn resource queue survey. We found that there is a resource queue problem after Flink goes to the cloud, so it is necessary to manage the user's resources by queue;
Second, the direction of the data lake. The first is the unified stream batch service gateway. Different engines may be used when building real-time data warehouses, such as Flink and Spark. They belong to two different sets of services, so a unified stream batch service gateway is required. Followed by data lineage, data assets and data quality services.
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00