Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Currently we are using Flink 1.7
Flink 1.8 Released : https://flink.apache.org/news/2019/07/02/release-1.8.1.html
Checkpoints
The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink, i.e. a Checkpoint is created, owned, and released by Flink - without user interaction. Checkpoints are usually dropped after the job was terminated by the user (except if explicitly configured as retained Checkpoints).
By default, checkpointing is disabled. To enable checkpointing, call
Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution.
More details here : https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html
Savepoints
Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume. For example, this could be an update of your Flink version, changing your job graph, changing parallelism, forking a second job like for a red/blue deployment, and so on. Of course, Savepoints must survive job termination. Conceptually, Savepoints can be a bit more expensive to produce and restore and focus more on portability and support for the previously mentioned changes to the job.
Triggering Save Points
BackPressure
BackPressure is a situation where a system is receiving data at a higher rate than it can process¹. Such behaviour will result in the sender being backpressured and may be caused by two things:
Flink’s web interface provides a tab to monitor the back pressure behaviour of running jobs.
What to do with Backpressure?
Currently we are using Flink 1.7
Flink 1.8 Released : https://flink.apache.org/news/2019/07/02/release-1.8.1.html
Checkpoints
The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink, i.e. a Checkpoint is created, owned, and released by Flink - without user interaction. Checkpoints are usually dropped after the job was terminated by the user (except if explicitly configured as retained Checkpoints).
By default, checkpointing is disabled. To enable checkpointing, call
enableCheckpointing(n)
on the StreamExecutionEnvironment
, where n is the checkpoint interval in milliseconds.Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution.
More details here : https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html
Savepoints
Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume. For example, this could be an update of your Flink version, changing your job graph, changing parallelism, forking a second job like for a red/blue deployment, and so on. Of course, Savepoints must survive job termination. Conceptually, Savepoints can be a bit more expensive to produce and restore and focus more on portability and support for the previously mentioned changes to the job.
Triggering Save Points
./bin/flink savepoint <jobId> [savepointDirectory]
./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>
Restore a savepoint
./bin/flink run -s <savepointPath> ...
The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.
Latest Restore: There are two types of restore operations.
- Restore from Checkpoint: We restored from a regular periodic checkpoint.
- Restore from Savepoint: We restored from a savepoint.
Reference : https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#savepointsBackPressure
BackPressure is a situation where a system is receiving data at a higher rate than it can process¹. Such behaviour will result in the sender being backpressured and may be caused by two things:
- The receiver is slow.
- This can happen because the receiver is backpressured itself, is unable to keep processing at the same rate as the sender, or is temporarily blocked by garbage collection, lack of system resources, or I/O.
- The network channel is slow.
Flink offers two mechanisms for identifying where the bottleneck is:
- directly via Flink’s web UI and its backpressure monitor, or
- indirectly through some of the network metrics.
Flink’s web interface provides a tab to monitor the back pressure behaviour of running jobs.
If you see a back pressure warning (e.g.
High
) for a task, this means that it is producing data faster than the downstream operators can consume. Records in your job flow downstream (e.g. from sources to sinks) and back pressure is propagated in the opposite direction, up the stream.
Assuming that you identified where the source of backpressure — a bottleneck — is located, the next step is to analyse why this is happening. Below, we list some potential causes of backpressure from the more basic to the more complex ones. We recommend to check the basic causes first, before diving deeper on the more complex ones and potentially drawing false conclusions.
If backpressure is temporary, you should simply ignore it.
Here are a couple of things to check.
If backpressure is temporary, you should simply ignore it.
Here are a couple of things to check.
- System Resources : Firstly, you should check the incriminated machines’ basic resource usage like CPU, network, or disk I/O. If some resource is fully or heavily utilised you can do one of the following:
- Try to optimise your code. Code profilers are helpful in this case.
- Tune Flink for that specific resource.
- Scale out by increasing the parallelism and/or increasing the number of machines in the cluster.
- Garbage Collection Oftentimes, performance issues arise from long GC pauses. You can verify whether you are in such a situation by either printing debug GC logs (via -XX:+PrintGCDetails) or by using some memory/GC profilers. Since dealing with GC issues is highly application-dependent and independent of Flink, we will not go into details here.
- CPU/Thread Bottleneck Sometimes a CPU bottleneck might not be visible at first glance if one or a couple of threads are causing the CPU bottleneck while the CPU usage of the overall machine remains relatively low. For instance, a single CPU-bottlenecked thread on a 48-core machine would result in only 2% CPU use. Consider using code profilers for this as they can identify hot threads by showing each threads’ CPU usage, for example.
- Thread Contention Similarly to the CPU/thread bottleneck issue above, a subtask may be bottlenecked due to high thread contention on shared resources. Again, CPU profilers are your best friend here! Consider looking for synchronization overhead / lock contention in user code — although adding synchronization in user code should be avoided and may even be dangerous! Also consider investigating shared system resources. The default JVM’s SSL implementation, for example, can become contented around the shared /dev/urandom resource.
- Load Imbalance If your bottleneck is caused by data skew, you can try to remove it or mitigate its impact by changing the data partitioning to separate heavy keys or by implementing local/pre-aggregation.
Generally, in order to reduce a bottleneck and thus backpressure, first analyze where it is happening and then find out why. The best place to start reasoning about the “why” is by checking what resources are fully utilized.
References :
A Deep-Dive into Flink's Network Stack
Broadcast State in Apache Flink
The Broadcast State can be used to combine and jointly process two streams of events in a specific way. The events of the first stream are broadcasted to all parallel instances of an operator, which maintains them as state. The events of the other stream are not broadcasted but sent to individual instances of the same operator and processed together with the events of the broadcasted stream. The new broadcast state is a natural fit for applications that need to join a low-throughput and a high-throughput stream or need to dynamically update their processing logic.
References
Interesting Reads
Really this blog looks awesome,keep sharing more posts with us.
ReplyDeletethank you for info....
big data and hadoop training