Tuesday, July 30, 2019

Binary Tree Level Order Traversal

Given a binary tree, return the level order traversal of its nodes' values. (ie, from left to right, level by level).
For example:
Given binary tree [3,9,20,null,null,15,7],
    3
   / \
  9  20
    /  \
   15   7
return its level order traversal as:
[
  [3],
  [9,20],
  [15,7]
]
Ans:
public List<List<Integer>> levelOrder(TreeNode root) {
        Queue<TreeNode> queue = new LinkedList<TreeNode>();
        List<List<Integer>> result = new LinkedList<List<Integer>>();
        
        if(root == null) {
            return result;
        }
        
        queue.add(root);
        while(!queue.isEmpty()){
            int size = queue.size();
            List<Integer> tempList = new LinkedList<Integer>();
            for(int i=0; i<size; i++) {
                TreeNode temp = queue.poll();
                if(temp.left != null)  {
                    queue.add(temp.left);
                }
                if(temp.right != null) {
                    queue.add(temp.right);
                }
                tempList.add(temp.val);
            }
            result.add(tempList);
        }
        return result;
    }
OR
public List<List<Integer>> levelOrder(TreeNode root) {
        
        List<List<Integer>> levelOrder = new ArrayList<List<Integer>>();
        List<Integer> levelList = new ArrayList<Integer>();
        int depth = depth(root);
        for(int i=1; i<=depth; i++) {
            levelOrder.add(i-1, addLevel(root, i, new ArrayList<Integer>()));
        }
        return levelOrder;
    }
    
    private int depth(TreeNode T) {
        if(T==null) {
            return 0;
        }
        return Math.max(depth(T.left), depth(T.right))+1;
    }
    
    private ArrayList addLevel(TreeNode T, int level, ArrayList<Integer> levelList) {
        if(T==null){
            return null;
        }
        if(level == 1) {
            levelList.add(T.val);
        }
        addLevel(T.left, level-1, levelList);
        addLevel(T.right, level-1, levelList);
        
        return levelList;
    }

Monday, July 29, 2019

Kth Smallest Element in a BST

Given a binary search tree, write a function kthSmallest to find the kth smallest element in it.
Note: 
You may assume k is always valid, 1 ≤ k ≤ BST's total elements.
Example 1:
Input: root = [3,1,4,null,2], k = 1
   3
  / \
 1   4
  \
   2
Output: 1
public int kthSmallest(TreeNode root, int k) {
        ArrayList<Integer> inorderList = inoder(root, new ArrayList<Integer>());
        return inorderList.get(k-1);
    }
    
    private ArrayList<Integer> inoder(TreeNode T, ArrayList<Integer> inorderList) {
        if(T!=null) {
            inoder(T.left, inorderList);
            inorderList.add(T.val);
            inoder(T.right, inorderList);
        }
        return inorderList;
    }

Sum Root to Leaf Numbers

Given a binary tree containing digits from 0-9 only, each root-to-leaf path could represent a number.
An example is the root-to-leaf path 1->2->3 which represents the number 123.
Find the total sum of all root-to-leaf numbers.
Example:
Input: [1,2,3]
    1
   / \
  2   3
Output: 25
Explanation:
The root-to-leaf path 1->2 represents the number 12.
The root-to-leaf path 1->3 represents the number 13.
Therefore, sum = 12 + 13 = 25.
Ans:
public int sumNumbers(TreeNode root) {
        return sumNumbersHelper(root,0);
    }
    
    private int sumNumbersHelper(TreeNode T, int sum) {
        if(T==null){
            return 0;
        }
        sum = sum*10+T.val;
        // leaf
        if(T.left == null && T.right == null) {
            return sum;
        }
        // non leaf
        return sumNumbersHelper(T.left, sum) + sumNumbersHelper(T.right, sum);
    }

Height Balanced Binary Tree

Given a binary tree, determine if it is height-balanced.
For this problem, a height-balanced binary tree is defined as:
a binary tree in which the depth of the two subtrees of every node never differ by more than 1.
Ans:

public boolean isBalanced(TreeNode root) {
     
        if(root == null) {
            return true;
        }
        int lHeight = -1, rHeight = -1;
        if(root.left != null) {
            lHeight = height(root.left);
        }
     
        if(root.right != null) {
            rHeight = height(root.right);
        }
        if(Math.abs(lHeight-rHeight)>1) {
            return false;
        }
        return isBalanced(root.left)&&isBalanced(root.right);
    }
 
    private int height(TreeNode T) {
        if(T == null) {
            return -1;
        }
        if(T.left==null && T.right==null) {
            return 0;
        }
        return Math.max(height(T.left),height(T.right))+1;
    } 

Sunday, July 28, 2019

Apache Flink

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

Currently we are using Flink 1.7

Flink 1.8 Released : https://flink.apache.org/news/2019/07/02/release-1.8.1.html

Checkpoints

The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink, i.e. a Checkpoint is created, owned, and released by Flink - without user interaction. Checkpoints are usually dropped after the job was terminated by the user (except if explicitly configured as retained Checkpoints).

By default, checkpointing is disabled. To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds.

Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution.

More details here : https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html

Savepoints

Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume. For example, this could be an update of your Flink version, changing your job graph, changing parallelism, forking a second job like for a red/blue deployment, and so on. Of course, Savepoints must survive job termination. Conceptually, Savepoints can be a bit more expensive to produce and restore and focus more on portability and support for the previously mentioned changes to the job.

Triggering Save Points

./bin/flink savepoint <jobId> [savepointDirectory]
./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>
Restore a savepoint
./bin/flink run -s <savepointPath> ...
The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.
Latest Restore: There are two types of restore operations.
  • Restore from Checkpoint: We restored from a regular periodic checkpoint.
  • Restore from Savepoint: We restored from a savepoint.
Reference : https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#savepoints

BackPressure

BackPressure is a situation where a system is receiving data at a higher rate than it can process¹. Such behaviour will result in the sender being backpressured and may be caused by two things:

  • The receiver is slow.
    • This can happen because the receiver is backpressured itself, is unable to keep processing at the same rate as the sender, or is temporarily blocked by garbage collection, lack of system resources, or I/O.
  • The network channel is slow.
Flink offers two mechanisms for identifying where the bottleneck is:
  • directly via Flink’s web UI and its backpressure monitor, or
  • indirectly through some of the network metrics.


Flink’s web interface provides a tab to monitor the back pressure behaviour of running jobs.


If you see a back pressure warning (e.g. High) for a task, this means that it is producing data faster than the downstream operators can consume. Records in your job flow downstream (e.g. from sources to sinks) and back pressure is propagated in the opposite direction, up the stream.

What to do with Backpressure?
Assuming that you identified where the source of backpressure — a bottleneck — is located, the next step is to analyse why this is happening. Below, we list some potential causes of backpressure from the more basic to the more complex ones. We recommend to check the basic causes first, before diving deeper on the more complex ones and potentially drawing false conclusions.

If backpressure is temporary, you should simply ignore it.

Here are a couple of things to check.

  • System Resources Firstly, you should check the incriminated machines’ basic resource usage like CPU, network, or disk I/O. If some resource is fully or heavily utilised you can do one of the following:
    • Try to optimise your code. Code profilers are helpful in this case.
    • Tune Flink for that specific resource.
    • Scale out by increasing the parallelism and/or increasing the number of machines in the cluster.

  •  Garbage Collection Oftentimes, performance issues arise from long GC pauses. You can verify whether you are in such a situation by either printing debug GC logs (via -XX:+PrintGCDetails) or by using some memory/GC profilers. Since dealing with GC issues is highly application-dependent and independent of Flink, we will not go into details here.             
  • CPU/Thread Bottleneck Sometimes a CPU bottleneck might not be visible at first glance if one or a couple of threads are causing the CPU bottleneck while the CPU usage of the overall machine remains relatively low. For instance, a single CPU-bottlenecked thread on a 48-core machine would result in only 2% CPU use. Consider using code profilers for this as they can identify hot threads by showing each threads’ CPU usage, for example.                                        
  • Thread Contention Similarly to the CPU/thread bottleneck issue above, a subtask may be bottlenecked due to high thread contention on shared resources. Again, CPU profilers are your best friend here! Consider looking for synchronization overhead / lock contention in user code — although adding synchronization in user code should be avoided and may even be dangerous! Also consider investigating shared system resources. The default JVM’s SSL implementation, for example, can become contented around the shared /dev/urandom resource.   
  • Load Imbalance If your bottleneck is caused by data skew, you can try to remove it or mitigate its impact by changing the data partitioning to separate heavy keys or by implementing local/pre-aggregation.
Generally, in order to reduce a bottleneck and thus backpressure, first analyze where it is happening and then find out why. The best place to start reasoning about the “why” is by checking what resources are fully utilized.

References : 

A Deep-Dive into Flink's Network Stack


Broadcast State in Apache Flink

The Broadcast State can be used to combine and jointly process two streams of events in a specific way. The events of the first stream are broadcasted to all parallel instances of an operator, which maintains them as state. The events of the other stream are not broadcasted but sent to individual instances of the same operator and processed together with the events of the broadcasted stream. The new broadcast state is a natural fit for applications that need to join a low-throughput and a high-throughput stream or need to dynamically update their processing logic.

References

Interesting Reads


Friday, July 19, 2019

SQL Queries

1. Find second highest salary of employee using self join?

select distinct salary from Empoyee e1 where 2 = (select count(distinct salary) from Employee e2 where  e1.salary <= e2.salary);

1.2 Find third highest salary of employee using self join

select distinct salary from Empoyee e1 where 3 = (select count(distinct salary) from Employee e2 where  e1.salary <= e2.salary);

1.3 Find nth highest salary of employee using self join

select distinct salary from Empoyee e1 where n = (select count(distinct salary) from Employee e2 where  e1.salary <= e2.salary);

2. Find duplicate rows in a table?

select * from Employee a where rowid <> ( select max(rowid) from Employee b where a.Employee_num=b.Employee_num);

3. Get 3 Highest salaries records from Employee table ?

select distinct salary from employee a where 3 >= (select count(distinct salary) from employee b where a.salary <= b.salary) order by a.salary desc

4. Get the departments who has no employees assigned to it ?

Table Structure:
EMP
EMPNO   ENAME    DEPTNO

DEPT
DEPTNO   DNAME
Ans:

Select DName from DEPT where DeptNo Not In (Select Distinct DeptNO from EMP);

SELECT D.DNAME
FROM DEPT D
LEFT JOIN EMP E ON D.DEPTNO = E.DEPTNO
WHERE E.DEPTNO IS NULL
SELECT D.DNAME
FROM DEPT D
WHERE
 NOT EXISTS (SELECT * FROM EMP E WHERE D.DEPTNO = E.DEPTNO)

5. Get information of Employee where Employee is not assigned to the department ?

Select * from Employee where Dept_no Not in(Select Department_no from Department);


6. Select employee which have the max salary in a given department

SELECT EmpNo
    FROM emp    WHERE Salary in        (        SELECT Max(Salary)        FROM emp        GROUP BY DeptNo        )


7.  What are the differences between Having and Where clause?

  • The Where clause filters rows before grouping. Having clause filters rows after grouping.
  • You cannot use aggregate functions in Where clause. In Having clause, you can use aggregate functions.
8. What are the differences between UNION and UNION ALL.
  • Union and union all are used to merge rows from two or more tables.
  • Union set operator removes duplicate records. Whereas union all does not.
  • Union operator sorts the data in ascending order. union all does not.
  • Union all is faster than union operator.
9. How Do you find all Employees with its managers?

Select e.employee_name,m.employee name from Employee e,Employee m where e.Employee_id=m.Manager_id;

10. Delete duplicate rows

Delete FROM Student WHERE ROWID <>

(Select max (rowid) from Student b where rollno=b.rollno);

11. What is the difference between the RANK() and DENSE_RANK() functions? Provide an example.

The difference between the results shown by these 2 functions occur in case there is a tie. When multiple values in the set have same ranking, then the RANK() function will assign non-consecutive ranks to the values resulting in gaps. When DENSE_RANK() is used consecutive ranks are assigned to values in the set.

For example in the set  {100,100, 200, 300, 300, 600}.
RANK() will return {1, 1, 3, 4, 4, 6} . Since values 2 and 5 are skipped there are gaps. 
DENSE_RANK() on the other hand will return {1, 1, 2, 3, 3, 4}.Here ranks are not skipped.

12. Explain Rank Function as Analytical function with Example.

Rank function is used as analytical function in SQL/PLSQL/SQL server which is used to give the rank to the specific record in the table.Rank function is giving you ranking in ordered partitions.Means Ties are assigned to the same values after using the order by clause.So Rank function is not useful where same data is repeated again and again.It is useful in Unique data where user can make partition and order  the data properly.

Syntax of Rank:

RANK () OVER (PARTITION BY expression ORDER BY expression)

Example:

SELECT Employee_Name,Department_No,Salary,RANK() OVER (PARTITION BY Department_No ORDER BY Salary) “Rank” FROM EMPLOYEE;

If we consider above query the same rank will be given for same salaried Employees but it will jump to the new rank for next salaried employee.Kindly check following Output

Employee Name Department No Salary Rank
      Amit 10 15000 1
      Rahul 10 8700 2
      Aditya 10 8700 2  (Tie assigned same rank)
      Shrey 10 12500 4

13. What is Correlated Subquery? Give an example

Correlated Query is nothing but the subquery whose output is depending on the inner query used in that query.

Correlated Subqueries always uses operator like Exist,Not Exist,IN,Not IN.

Example for Correlated Subqueries:

Fetch the Employees who have not assigned a single department.

Select * from Employee E where Not exist

(Select Department_no From Department D where E.Employee_id=D.Employee_ID);

14. List departments that have less than 3 people in it ?

SELECT d.Name AS 'Department'
FROM departments d JOIN employees e 
ON e.department_id = d.Department_id
GROUP BY d.department_id
HAVING COUNT(e.employee_id) < 3

15. List ALL departments along WITH the NUMBER OF people there (tricky - people often do an "inner join" leaving OUT empty departments)

SELECT d.name AS 'Department', COUNT(e.employee_id) AS '# of Employees'
FROM departments d LEFT OUTER JOIN employees e 
ON e.department_id = d.department_id
GROUP BY d.Department_id

16. List employees who have the biggest salary IN their departments

SELECT d.name as department, e.name as empName, MAX(e.salary) as salary
FROM employees e
JOIN departments d ON e.department_id=d.id
GROUP BY e.department_id;

References:


Thursday, July 4, 2019

Difference between Coalesce and Repartition

The coalesce reduces the number of partitions in a DataFrame. 

The repartition either increase or decrease the number of partitions in a DataFrame.

The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.

Summary Of Difference
coalesce()repartition()
reduce the number of partitionsincrease or decrease the number of partitions.
Tries to minimize data movement by avoiding network shuffle.A network shuffle will be
triggered which can increase data movement.
Creates unequal sized partitionsCreates equal sized partitions

Apache Parquet File Format

Apache Parquet is a file format. The Parquet fire format is designed as a columnar storage format to support complex data processing.
Apache Parquet is a self-describing data format which embeds the schema, or structure, within the data itself. This results in a file that is optimized for query performance and minimizing I/O. Specifically, it has the following characteristics:
  • Apache Parquet is column-oriented and designed to bring efficient columnar storage of data compared to row based files like CSV
  • Apache Parquet is built from the ground up with complex nested data structures in mind
  • Apache Parquet is built to support very efficient compression and encoding schemes (see Google Snappy)
  • Apache Parquet allows to lower storage costs for data files and maximizes the effectiveness of querying data with serverless technologies like Amazon Athena, Redshift Spectrum, BigQuery, and Azure Data Lakes.
  • Licensed under the Apache software foundation and available to any project.

Adaptive Execution in Spark

Adaptive Query Execution (aka Adaptive Optimisation or Adaptive Execution) is an optimisation of a query execution plan that Spark Planner uses for allowing alternative execution plans at runtime that would be optimized better based on runtime statistics.
Quoting the description of a talk by the authors of Adaptive Query Execution:
At runtime, the adaptive execution mode can change shuffle join to broadcast join if it finds the size of one table is less than the broadcast threshold. It can also handle skewed input data for join and change the partition number of the next stage to better fit the data scale. In general, adaptive execution decreases the effort involved in tuning SQL query parameters and improves the execution performance by choosing a better execution plan and parallelism at runtime.
Adaptive Query Execution is disabled by default. Set spark.sql.adaptive.enabled configuration property to true to enable it.

References:

  1. An adaptive execution mode for Spark SQL by Carson Wang (Intel), Yucai Yu (Intel) at Strata Data Conference in Singapore, December 7, 2017
  2. https://issues.apache.org/jira/browse/SPARK-23128
  3. https://issues.apache.org/jira/browse/SPARK-9850