Wednesday, June 26, 2019

HBase Coprocessor

What is HBase Coprocessor?

Simply stated, Coprocessor is a framework that provides an easy way to run your custom code on HBase Region Server.

When working with any data store (like RDBMS or HBase) you fetch the data (in case of RDBMS you may use query and in case of HBase you use either Get or Scan). To fetch only relevant data you filter it (for RDBMS you put conditions in ‘WHERE’ clause and in HBase you use Filters). After fetching the desired data, you can perform your business computation on the data.

This scenario is close to ideal for “small data,” like a few thousand rows with a bunch of columns. Now imagine a scenario where there are billions of rows and millions of columns and you want to perform some computation which requires all the data, like calculating average or sum. Even if you are interested in just a few columns, you still have to fetch all the rows. There are a few drawbacks in this approach as described below:

In this approach the data transfer (from data store to client side) will become the bottleneck, and the time required to complete the operation is limited by the rate at which data transfer is taking place.
It’s not always possible to hold this much data in memory and perform computation.
Bandwidth is one of the most precious resources in any data center. Operations like this will severely impact the performance of your cluster.
Your client code is becoming thick as you are maintaining the code for calculating average or summation on client side. Not a major drawback when talking of severe issues like performance/bandwidth but still worth giving consideration.
In a scenario like this it’s better to move the computation to the data itself; just like stored procedure (a better analogy is MapReduce model). Coprocessor helps you achieve this but you can do more than that. To give an idea of Coprocessor’s capabilities, different people give different analogies. The three most famous analogies for Coprocessor present in the industry are:


  • Triggers and Stored Procedure: This is most common analogy that you will find for Coprocessor. (The official document uses this analogy). Observer Coprocessor (discussed below) is compared to triggers because like triggers they execute your custom code when certain event occurs (like Get or Put etc.). Similarly Endpoints Coprocessor (discussed below) is compared to the stored procedures and you can perform custom computation on data directly inside the region server.


  • MapReduce: As in MapReduce you move the computation to the data in the same way. Coprocessor executes your custom computation directly on Region Servers, i.e. where data resides. That’s why some people compare Coprocessor to small MapReduce jobs.
  • AOP: Some people compare it to Aspect Oriented Programming (AOP). As in AOP, you apply advice by intercepting the request then running some custom code (probably cross-cutting) and then forwarding the request on its path as if nothing happened (or even return it back). Similarly in Coprocessor you have this facility of intercepting the request and running custom code and then forwarding it on its path (or returning it).



In HBase, to implement a Coprocessor certain steps must be followed as described below:

Either your class should extend one of the Coprocessor classes (like BaseRegionObserver) or it should implement Coprocessor interfaces (like Coprocessor, CoprocessorService).
Load the Coprocessor: Currently there are two ways to load the Coprocessor. One is static (i.e. loading from configuration) and the other is dynamic (i.e. loading from table descriptor either through Java code or through ‘hbase shell’). Both are discussed below in detail.
Finally your client-side code to call the Coprocessor. This is the easiest step, as HBase handles the Coprocessor transparently and you don’t have to do much to call the Coprocessor.
Coprocessors are not designed to be used by the end users but by developers. Coprocessors are executed directly on region server; therefore a faulty/malicious code can bring your region server down. Currently there is no mechanism to prevent this, but there are efforts going on for this. For more, see JIRA ticketHBASE-4047.

Coprocessor can be broadly divided into two categories – Observer and Endpoint – and each one is discussed separately:

1. Observer Coprocessor: As stated above, these are just like database triggers, i.e. they execute your custom code on the occurrence of certain events. If you prefer (or if you are from Java background) you can also visualize it like advice (before and after only). Coprocessors allow you to hook your custom code in two places during the lifecycle of the event. One is just before the occurrence of the event (just like before advice in AOP). For example, it will allow your custom code to run just before the ‘Put’ operation. All methods providing this feature will start with the prefix ‘pre.’ For example, if you want to your code to be executed before the put operation then you should override following method of RegionObserver class. We will walk through a working example after this boring introduction. ☺

public void prePut (final ObserverContext e, final Put put, final WALEdit edit,final Durability durability) throws IOException {
}
Similarly, the Observer Coprocessor also provides hooks for your code to get executed just after the occurrence of the event (similar to after advice in AOP terminology). These methods will start with the prefix ‘post.’ For example, if you want your code to be executed after the ‘Put’ operation, you should override following method:

public void postPut(final ObserverContext e, final Put put, final WALEdit edit, final Durability durability) throws IOException { }
In short, the following conventions are followed:

Override preXXX() if you want your code to be executed before the occurrence of the event.

Override postXXX() if you want your code to be executed after the occurrence of the event.

A few use cases of the Observer Coprocessor are:

Security: Before performing any operation (like ‘Get’, ‘Put’) you can check for permission in the ‘preXXX’ methods.
Referential Integrity: Unlike traditional RDBMS, HBase doesn’t have the concept of referential integrity (foreign key). Suppose for example you have a requirement that whenever you insert a record in ‘users’ table, a corresponding entry should also be created in ‘user_daily_attendance’ table. One way you could solve this is by using two ‘Put’ one for each table, this way you are throwing the responsibility (of the referential integrity) to the user. A better way is to use Coprocessor and overriding ‘postPut’ method in which you write the code to insert the record in ‘user_daily_attendance’ table. This way client code is more lean and clean.
Secondary Index: Coprocessor can be used to maintain secondary indexes. For more information please see SecondaryIndexing.
Observer Coprocessor has following flavors:

RegionObserver: This Coprocessor provides the facility to hook your code when the events on region are triggered. Most common example include ‘preGet’ and ‘postGet’ for ‘Get’ operation and ‘prePut’ and ‘postPut’ for ‘Put’ operation.
Region Server Observer: Provides hook for the events related to the RegionServer, such as stopping the RegionServer and performing operations before or after merges, commits, or rollbacks.
WAL Observer: Provides hooks for WAL (Write-Ahead-Log) related operation. It has only two method ‘preWALWrite()’ and ‘postWALWrite()’.
Master Observer: This observer provides hooks for DDL like operation, such as create, delete, modify table.
Example of Observer Coprocessor:

Table 1: ‘users’ table

HBase
Consider a hypothetical example having the ‘users’ table as shown above. In the above example, the client can query the information about the employee. For the purpose of demonstration of Coprocessor we assuming that ‘admin’ is a special person and his details shouldn’t be visible to any client querying the table. To achieve this we will take the help of Coprocessor.

Following are the steps:

Write a class that extends the BaseRegionObserver class.
Override the ‘preGetOp()’ method (Note that ‘preGet()’ method is now deprecated). You should use ‘preGetOp’ method here because first check if the queried rowkey is ‘admin’ or not. If it ‘admin’ then return the call without allowing the system to perform the get operation thus saving on performance.
Export your code in a jar file.
Place the jar in HDFS where HBase can locate it.
Load the Coprocessor.
Write a simple program to test it.
Let’s see each step in detail:

Step 1 and Step2: Below is a class that extends one of the Coprocessor classes (BaseRegionObserver) and overrides the ‘preGetOp’ method.

public class RegionObserverExample extends BaseRegionObserver {

private static final byte[] ADMIN = Bytes.toBytes("admin");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
                private static final byte[] VALUE = Bytes.toBytes("You can’t see Admin details");

@Override
public void preGetOp(final ObserverContext e, final Get get, final List results) throws IOException {

    if (Bytes.equals(get.getRow(),ADMIN)) {
    Cell c = CellUtil.createCell(get.getRow(),COLUMN _FAMILY, COLUMN, System.currentTimeMillis(), (byte)4, VALUE);
      results.add(c);
    e.bypass();
    }
 
    List kvs = new ArrayList(results.size());
    for (Cell c : results) {
    kvs.add(KeyValueUtil.ensureKeyValue(c));
    }
    preGet(e, get, kvs);
    results.clear();
    results.addAll(kvs);
}
}

Overriding the ‘preGetOp()’ will only work for ‘Get’ operation. For ‘Scan’ operation it won’t help you. To deal with it you have to override another method called ‘preScannerOpen()’ method, and add a Filter explicitly for admin as shown below:

@Override
public RegionScanner preScannerOpen(final ObserverContext e, final Scan scan, final RegionScanner s) throws IOException {

Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
scan.setFilter(filter);
return s;
}
This method works but there is a side effect. If the client has used any Filter in his scan, then that Filter won’t have any effect because our filter has replaced it.

Another option you can try is to deliberately remove the admin from result. This approach is shown below:

@Override
public boolean postScannerNext(final ObserverContext e, final InternalScanner s, final List results, final int limit, final boolean hasMore) throws IOException {
Result result = null;
Iterator iterator = results.iterator();
while (iterator.hasNext()) {
result = iterator.next();
if (Bytes.equals(result.getRow(), ROWKEY)) {
iterator.remove();
break;
}
}
    return hasMore;
}
Step 3: It’s pretty convenient to export the above program in a jar file. Let’s assume that we exported it in a file called ‘coprocessor.jar’.

Step 4: Copy the jar to HDFS. I have used the Hadoop copy command:

hadoop fs –copyFromLocal coprocessor.jar coprocessor.jar

Step 5: See Loading of Coprocessor. For observer you can use any of the way you want.

Step 6: Run the following program to test. The first part is testing ‘Get’ and second ‘Scan’.

Configuration conf = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(conf);
HTableInterface table = connection.getTable("users");
Get get = new Get(Bytes.toBytes("admin"));
Result result = table.get(get);
for (Cell c : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ "==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ "{" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}");
}
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
for (Result res : scanner) {
for (Cell c : res.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ " ==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ " {" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toLong(CellUtil.cloneValue(c))
+ "}");
}
}

2. Endpoint Coprocessor: This kind of Coprocessor can be compared to stored procedure found in RDBMS. They help in performing computation which is not possible either through observe Coprocessor or otherwise. For example, calculating average or summation over the entire table that spans across multiple regions. They do so by providing a hook for your custom code and then running it across all regions. With Endpoints Coprocessor you can create your own dynamic RPC protocol and thus can provide communication between client and region server, thus enabling you to run your custom code on region server (on each region of a table). Unlike observer Coprocessor (where your custom code is executed transparently when events like ‘Get’ operation occurs), in Endpoint Coprocessor you have to explicitly invoke the Coprocessor by using the ‘CoprocessorService()’ method of the ‘HTableInterface’ (or HTable). A working example is given below.

From version 0.96, implementing Endpoint Coprocessor is not straight forward. Now it is done with the help of Google’s Protocol Buffer. For more details on Protocol Buffer, refer to this excellent Protocol Buffer Guide. For migrating Endpoints of version 0.94 or before to 0.96 or later you have to upgrade your Endpoint Coprocessor. For more details, see JIRA ticket HBASE-5448. For writing Endpoint Coprocessor, one should:

Create a ‘.proto’ file defining your service.
Execute the ‘protoc’ command to generate the Java code from the above ‘.proto’ file.
Write a class that should:
Extend the above generated service class.
It should also implement two interfaces Coprocessor and CoprocessorService.
Override the service method.
Load the Coprocessor.
Write a client code to call Coprocessor.
Example of Endpoint Coprocessor: We are following the same example as described above. Just to recap:

Hbase
In our hypothetical example (See Table 1), to demonstrate the Endpoint Coprocessor we see a trivial use case and will try to calculate the total (Sum) of gross salary of all employees. We will go step by step:

Step 1: Create a ‘proto’ file to define your service, request and response. Let’s call this file “sum.proto”. Below is the content of the ‘sum.proto’ file

option java_package = "org.myname.hbase.Coprocessor.autogenerated";
option java_outer_classname = "Sum";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequest {
required string family = 1;
required string column = 2;
}

message SumResponse {
  required int64 sum = 1 [default = 0];
}

service SumService {
  rpc getSum(SumRequest)
    returns (SumResponse);
}
Step 2: Compile the proto file using proto compiler (for detailed instructions see this excellent official documentation).

$ protoc --java_out=src ./sum.proto
(Note: It is necessary for you to create the src folder).
This will generate a class call “Sum.java”.

Step 3: Write your Endpoint Coprocessor: Firstly your class should extend the service just defined above (i.e. Sum.SumService). Second it should implement Coprocessor and CoprocessorService interfaces. Third, override the ‘getService()’, ‘start()’, ‘stop()’ and ‘getSum()’ methods. Below is the full code:

public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService {

private RegionCoprocessorEnvironment env;

@Override
public Service getService() {
return this;
}

@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}


@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// do mothing
}


@Override
public void getSum(RpcController controller, SumRequest request, RpcCallback done) {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
SumResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List results = new ArrayList();
boolean hasMore = false;
                long sum = 0L;
    do {
        hasMore = scanner.next(results);
        for (Cell cell : results) {
        sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
        }
        results.clear();
      } while (hasMore);

      response = SumResponse.newBuilder().setSum(sum).build();
 
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
    } finally {
    if (scanner != null) {
    try {
    scanner.close();
    } catch (IOException ignored) {}
    }
    }
done.run(response);
}
Step 4: Load the Coprocessor. See loading of Coprocessor. I recommend using static approach for Endpoint Coprocessor.
Step 5: Now we have to write the client code to test it. To do so in your main method, write the following code as shown below:

Configuration conf = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(conf);
HTableInterface table = connection.getTable("users");
final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
try {
Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null,
new Batch.Call<SumService, Long>() {
@Override
public Long call(SumService aggregate) throws IOException {
BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
aggregate.getSum(null, request, rpcCallback);
SumResponse response = rpcCallback.get();
return response.hasSum() ? response.getSum() : 0L;
}
});
for (Long sum : results.values()) {
System.out.println("Sum = " + sum);
}
} catch (ServiceException e) {
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
}
Loading of Coprocessor:

Coprocessor can be loaded broadly in two ways. One is static (loading through configuration files) and the other one is dynamic loading.

Dynamic loading: Dynamic loading means loading Coprocessor without restarting HBase. Dynamic loading can be done in three ways:

A. Using Shell: You can load the Coprocessor using the HBase shell as follows:

1. Disable the table so that you can load Coprocessor

hbase(main):001:0> disable ‘users’
2. Load the Coprocessor: (i.e. coprocessor.jar) that you copied to HDFS by using following command:

hbase(main):002:0> alter users’, METHOD => ‘table_att’, ‘Coprocessor’=>’hdfs://localhost/user/gbhardwaj/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|’
where “hdfs://localhost/user/gbhardwaj/coprocessor.jar” is the full path of the ‘coprocessor.jar’ in your HDFS.and “org.myname.hbase.Coprocessor.RegionObserverExample” is the full name of your class (including package name).

3. Enable the table:

hbase(main):003:0> enable ‘users'
4. Verify if Coprocessor is loaded by typing following command:

hbase(main):04:0> describe ‘users'
You must see some output like this:

DESCRIPTION ENABLED

users’, {TABLE_ATTRIBUTES => {Coprocessor$1 => true ‘hdfs://localhost/user/gbhardwaj/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|’}, {NAME => ‘ personalDet’ …………………
B. Using setValue() method of HTableDescriptor: This is done entirely in Java as follows:

String tableName = "users";
String path = "hdfs://localhost/user/gbhardwaj/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.setValue("COPROCESSOR$1", path +
      "|" + RegionObserverExample.class.getCanonicalName() +
      "|" + Coprocessor.PRIORITY_USER);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
C. Using addCoprocessor() method of HTableDescriptor: This method is available from 0.96 version onwards. Personally I prefer this way only:

String tableName = "users";
String path = "hdfs://localhost/user/gbhardwaj/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);


2. Static Loading: Static loading means that your Coprocessor will take effect only when you restart your HBase and there is a reason for it. In this you make changes ‘hbase-site.xml’ and therefore have to restart HBase for your changes to take place.

Create following entry in ‘hbase-site.xml’ file located in ‘conf’ directory:

      hbase.Coprocessor.region.classes
      org.myname.hbase.Coprocessor.endpoint.SumEndPoint

Make your code available to the HBase. I used the following simple steps – first, export your endpoint (SumEndPoint. java), service (Sum.java) and other relevant protocol buffer classes (i.e. all classes found under ‘<Protocol Buffer Directory>/ java/src/main/java/com/google/protobuf’ directory in jar file). Second, put this jar in the ‘lib’ folder of HBase and finally restart the HBase.

Note: Although the documentation clearly states:

“Add a table Coprocessor to this table. The Coprocessor type must be RegionObserver or Endpoint. It won’t check if the class can be loaded or not. Whether a Coprocessor is loadable or not will be determined when a region is opened.”

This means that you can load both Observer and Endpoint Coprocessor statically using the following Method of HTableDescriptor:

addCoprocessor(String className, org.apache.hadoop.fs.Path jarFilePath, int priority, Map<String,String> kvs) throws IOException
In my case, the above method worked fine for Observer Coprocessor but didn’t work for Endpoint Coprocessor, causing the table to become unavailable and finally I had to restart my HBase. The same Endpoint Coprocessor worked fine when loaded statically. Use the above method for Endpoint Coprocessor with caution.

As I wrap up, here are few terms I have used in the above blog:

Column Family: It is generally said that HBase is a column-oriented database, which means it has columns, and columns are grouped in a column family. While columns are dynamic (i.e. there are no fixed number of columns in a row as opposed to RDBMS where the number of columns are fixed all the time) and it is not required to be declared at the time of table creation, column families are fixed and it is required to define them at the time of table creation. They must remain more or less fixed throughout the life of the table. You can add column families after table creation but it is an expensive operation and is generally avoided.

HFile: Column Family is stored in HFile. A column family can span across multiple HFiles (until major compaction) but the converse is not true, i.e. an HFile cannot have multiple column families; it will always host a single column family.

Region: Although HBase tables host billions of rows,it is not possible to store all that together. Therefore, it is divided into chunks. A Region is responsible for holding a set of rows. Also when you keep adding more rows and its size grows beyond a threshold value, it automatically splits in two regions. A region hosts a continuous set of rows.

RegionsServer: Regions are hosted by Region Server. A Region Server can host many Regions but a Region is always hosted by one and only one Region Server.

Flink vs Storm

Flink supports batch and streaming analytics, in one system. 

Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing.

Flink has Out-of-the box connector to HDFS, S3 and kinesis

Monday, June 24, 2019

Kafka Quick Review

Apache Kafka Refreshment

Apache Kafka is a distributed stream processing platform for big data. It’s a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system. Kafka store streams of records. Unlike most messaging systems, the log (the message queue) is persistent.
The log stream in Kafka is called a topic. Producers publish, (append), their records to a topic, and consumers subscribe to one or more topics. A Kafka topic is just a sharded write-ahead log.
Kafka uses ZooKeeper to manage the cluster, Zookeeper keeps track of the status of Kafka cluster nodes, and it also keeps track of Kafka topics, partitions, etc.



Important Concepts

Broker

A Kafka cluster consists of one or more servers called Kafka brokers. The Kafka broker is mediating the conversation between different computer systems, like a queue in a message system. Nodes in a Kafka cluster communicate by sharing information between each other directly or indirectly using Zookeeper.

Producer

A producer writes data (records) to brokers.

Consumer

A consumer reads data (records) from the brokers.

Topics

Each Broker can have one or more topics. A Topic is a category/feed name to which messages are stored and published. If you wish to send a message you send it to a specific topic and if you want to read a message you read it from a specific topic. Messages published to the cluster will stay in the cluster until a configurable retention period has passed. Kafka retains all messages for a set amount of time or until a configurable size is reached.

Partition

Topics are split into partitions, which can be replicated between nodes. Consumers cannot consume messages from the same partition at the same time.

Record

Data sent to and from the broker is called a record, a key-value pair. The record contains the topic name and partition number. The Kafka broker keeps records inside topic partitions.

Consumer group

A consumer group includes the set of consumers that are subscribing to a specific topic. Kafka consumers are typically part of a consumer group. Each consumer in the group is assigned a set of partitions from where to consume messages. Each consumer in the group will receive messages from different subsets of the partitions in the topic. Consumer groups enable multi-machine consumption from Kafka topics.

Offset

Kafka topics are divided into a number of partitions, which contains messages in an unchangeable sequence. Each message in a partition is assigned and identified by its unique offset.

ZooKeeper

Zookeeper is a software developed by Apache that acts as a centralized service and keeps track of the status of the Kafka cluster nodes, and it also keeps track of Kafka topics, partitions, etc.

Performance optimization for Apache Kafka - Producers

The producer in Kafka is responsible for writing the data to the Kafka Brokers and can be seen as the triggers in the Apache Kafka workflow. The producer can be optimized in various ways to meet the needs for your Apache Kafka setup. By refining your producer setup, you can avoid common errors and ensure your configuration meets your expectations.
This guide is divided into three parts, and this first one, will focus on the Producers.

Ack-value

An acknowledgment (ACK) is a signal passed between communicating processes to signify acknowledgment, i.e., receipt of the message sent. The ack-value is a producer configuration parameter in Apache Kafka, and can be set to following values:
  • acks=0
  • The producer never waits for an ack from the server. No guarantee can be made that the server has received the message, and the retries configuration will not take effect since the server will never know that the message was lost. This means that you got a low cost while sending your message, but you might pay your price in message durability.
  • acks=1
  • The producer gets an ack after leading replica has received the data. The leader will write the record to its local log but will respond without awaiting a full acknowledgment from all followers. The message will be lost only if the leader fails immediately after acknowledging the record, but before the followers have replicated it. This means that you got a higher cost while sending your message, and not maximum, but high, durability.
  • acks=all
  • The producer gets an ack after all-in sync replicas have received the data. The leader will wait for the full set of in-sync replicas to acknowledge the record. This means that it takes a longer time to send a message, but gives strongest message durability.

How should you set the ack value for the producer in Apache Kafka?

For the highest throughput set the value to 0. For no data loss, set the ack-value to all (or -1). For high, but not maximum durability and for high but not maximum throughput - set the ack-value to 1. Ack-value 1 can be seen as an intermediate between both of the above.

Batch messages in Apache Kafka

Messages can be sent together in a specific way as groups, called a batch. The batch can then be sent when specified criteria for the batch is met; when the number of messages for the batch has reached a certain number or after a given amount of time. Sending batches of messages is recommended since it will increase the throughput.
Always keep a good balance between building up batches and the sending rate. A small batch might give you a low throughput and lots of overhead. However, a small batch is still better than not using batches at all. A too large batch might take a long time to collect, keeping consumers idling. This depends on the use case; if you have a real-time application make sure you don't have large batches.

Compression of Large messages

The producer can compress records, and the consumer can decompress them. We recommend that you compress large messages to reduce the disk footprint, and also the footprint on the wire. It’s not meant to send large files through Kafka. Put large files on shared storage instead of sending it through Kafka. Read more about compression in Apache Kafka here.

Performance optimization for Apache Kafka - Brokers

The Kafka Broker is the central part of Kafka. It receives and stores log messages until the given retention period has exceeded. By refining your broker setup, you can avoid common errors and ensure your configuration meets your expectations.


Topics and Partitions

Kafka topics are divided into a number of partitions, which contains messages in an unchangeable sequence. Each message in a partition is assigned and identified by its unique offset. A topic can have multiple partition logs.

More Partitions Lead to Higher Throughput

The number of consumers is equal to the number of partitions. One partition will only be able to handle one consumer. Multiple partitions allow for multiple consumers to read from a topic in parallel, meaning that you will have a more scalable system. With more partitions, you can handle a larger throughput since all consumers can work in parallel.

Do not set up too many partitions

Partitions are the key to Kafka scalability, but that does not mean that you should have too many partitions. We have seen a few customers with way too many partitions which in turn consumes all resources from the server. Each partition in a topic uses a lot of RAM (file descriptors). The load on the CPU will also get higher with more partitions since Kafka needs to keep track of all of the partitions. More than 50 partitions for a topic are rarely recommended good practice.

Keep a good balance between cores and consumers

Each partition in Kafka is single threaded, too many partitions will not reach its full potential if you have a low number of cores on the server. Therefore you need to try to keep a good balance between cores and consumers. You don’t want to have consumers idling, due to fewer cores than consumers.

Kafka Broker

A Kafka cluster consists of one or more servers (Kafka brokers), which are running Kafka.

How many brokers should we have?

In Kafka, replication is implemented at the partition level. Kafka automatically failover to these replicas when a server in the cluster fails so that messages remain available in the presence of failures. Each partition of a topic can be replicated on one or many nodes, depending on the number of nodes you have in your cluster. This redundant unit of a partition is called a replica. By having replicas, your data can be found on multiple places. The number of replicas you have for your topic is specified by you when you create the topic. The number of replicas can be changed in the future. The minimum number of in-sync replicas specify how many replicas that need to be available for the producer to successfully send messages to a partition.
A higher number of minimum number of in-sync replicas gives you higher persistency, but on the other hand, it might reduce availability, since the minimum number of replicas given must be available before a publish. If you have a 3 node cluster and minimum in-sync replicas is set to 3, and one node goes down, the other two are not able to receive any data.
You only care about the minimum number of in-sync replicas when it comes to the availability of your cluster and reliability guarantees. The minimum number of in-sync replicas has nothing to do with the throughput. Setting the minimum number of in-sync replicas to larger than 1 may ensure less or no data loss, but throughput varies depending on the acks configuration.

Partition load between brokers

The more brokers you have in your cluster, the higher performance you get since the load is spread between all of your nodes. A common error is that load is not distributed equally between brokers. You should always keep an eye on partition distribution and do re-assignments to new brokers if needed, to ensure no broker is overloaded while another is idling.

Do not hardcode partitions

Keys are used to determine the partition within a log to which a message is appended to. A common error is that the same key is used when sending messages, making every message ending up on the same partition. Make sure that you never hardcode the message key value.

How many partitions should we have?

How many partitions you should have depends on your need. A higher number of partitions is preferable for high throughput in Kafka, although a high number of partitions will put more load on the machines and might affect the latency of messages. Consider your desired result and don't exaggerate.

The configuration of the Apache Kafka Broker

One thing that we have changed a lot for all CloudKarafka instances is the number of file descriptors given to Apache Kafka. All CloudKarafka brokers have a very large number of file descriptors.

The topic is created by default

When sending a message to a non-existing topic, the topic is created by default sinceauto.create.topics.enable is set to true by default in Apache Kafka.
This config can be changed so that topics are not created if they do not exist. This configuration can be helpful in the matter of minimizing mistakes caused by misspelling or miscommunication between developers. Send us an email if you would like to change the default value of auto.create.topics.enablein your CloudKarafka cluster.

Change default Minimum In-sync Replicas

Default minimum In-sync Replicas is set to 1 by default in CloudKarafka, meaning that the minimum number of in-sync replicas that must be available for the producer to successfully send messages to a partition must be 1. This setting can be changed to a higher number if higher persistency is required. Send us an email if you would like to change the default minimum in-sync replicas in your cluster.

Default Retention Period

A message sent to a Kafka cluster is appended to the end of one of the logs. The message remains in the topic for a configurable period of time or until a configurable size is reached or until the specified retention for the topic exceeds. The message stays in the log, even if the message has been consumed. 

Message Order in Kafka

One partition will guarantee an unchangeable sequence of your logstream. Two or more partitions will break the order since the order is not guaranteed between partitions.
Messages sent within Apache Kafka can be strictly ordered, even though your setup contains more than one partition. You will achieve a strict order of messages by setting up a consistent message key that sorts messages in the order specified, for example, user-ID. This will guarantee that all messages from a specific user always ends up in the same partition.
Please note that if the purpose of using Apache Kafka requires that all messages must be ordered within one topic, then you have to use only one partition.

Number of Zookeepers

Zookeeper requires a majority of servers to be functioning. If you, for example, have 5 servers in your cluster, you would need 3 servers to be up and running for Zookeeper to be working.
I.e., you can afford to lose one Zookeeper in a 3 node cluster, and you can afford to lose 2 Zookeeper in a 5 node cluster.

What type of server do I need for Apache Kafka?

What you need when setting up a Kafka cluster is lots of memory. The data sent to the broker is always written to disk, but it also stays in the memory for as long as there is space to keep it in there. More memory will give you a higher throughput since Kafka Consumers, first of all, try to read memory data.
Kafka does not require high CPU, as long as you are not running too many partitions.

Performance optimization for Apache Kafka -Consumers

Kafka Consumers reads data from the brokers and can be seen as the executor in the Apache Kafka three-stage rocket. By refining your consumer setup, you can avoid common errors and ensure your configuration meets your expectations. This guide is divided into three parts, and this is part three. The previous and following blog post focuses on the Producers and Brokersin the Apache Kafka symbiosis.

Apache Kafka Consumer

Consumers can read log messages from the broker, starting from a specific offset. Consumers are allowed to read from any offset point they choose. This allows consumers to join the cluster at any point in time.
A consumer can join a group, called a consumer group. A consumer group includes the set of consumer processes that are subscribing to a specific topic. Consumers in the group then divide the topic partitions fairly amongst themselves by establishing that each partition is only consumed by a single consumer from the group, I.e., each consumer in the group is assigned a set of partitions to consume from. Kafka guarantees that a message is only read by a single consumer in the group.

Make sure all consumers in a consumer group have a good connection

Partitions are redistributed between consumers every time a consumer connects or drop out of the consumer group. This means that consumers in the group are not able to consume messages during this time. If one consumer in a group has a bad connection, the whole group is affected and will be unavailable during every reconnect. A distribution of partitions takes around 2-3 seconds or longer. To make sure your setup is running smoothly, we strongly recommend you to secure the connection for your consumers.

Number of consumers

Ideally, the number of partitions should be equal to the number of consumers.
  • Number of consumer > number of partitions If the number of consumers is greater, some consumers will be idling, i.e., you will be wasting client resources.
  • Number of partitions > number of consumers Some consumers will read from multiple partitions if the number of partitions is greater than the number of consumers.
As mentioned before, the availability will be affected if one consumer has a bad connection. The more consumers you have, the larger risk there is, that one might drop and halt all other consumers.

Kafka and Kafka Performance Tuning

What is Kafka?

Apache Kafka® is a distributed streaming platform.

A streaming platform has three key capabilities:

Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
Store streams of records in a fault-tolerant durable way.
Process streams of records as they occur.
Kafka is generally used for two broad classes of applications:

Building real-time streaming data pipelines that reliably get data between systems or applications
Building real-time streaming applications that transform or react to the streams of data

List the various components in Kafka.

The four major components of Kafka are:
  • Topic – a stream of messages belonging to the same type
  • Producer – that can publish messages to a topic
  • Brokers – a set of servers where the publishes messages are stored
  • Consumer – that subscribes to various topics and pulls data from the brokers.

Explain the role of the offset.

Messages contained in the partitions are assigned a unique ID number that is called the offset. The role of the offset is to uniquely identify every message within the partition.

What is a Consumer Group?

Consumer Groups is a concept exclusive to Kafka.  Every Kafka consumer group consists of one or more consumers that jointly consume a set of subscribed topics.

What is the role of the ZooKeeper in Kafka?

Kafka uses Zookeeper to store offsets of messages consumed for a specific topic and partition by a specific Consumer Group.

Is it possible to use Kafka without ZooKeeper?

No, it is not possible to bypass Zookeeper and connect directly to the Kafka server. If, for some reason, ZooKeeper is down, you cannot service any client request.

Explain the concept of Leader and Follower.

Every partition in Kafka has one server which plays the role of a Leader, and none or more servers that act as Followers. The Leader performs the task of all read and write requests for the partition, while the role of the Followers is to passively replicate the leader. In the event of the Leader failing, one of the Followers will take on the role of the Leader. This ensures load balancing of the server.

How do you define a Partitioning Key?

Within the Producer, the role of a Partitioning Key is to indicate the destination partition of the message. By default, a hashing-based Partitioner is used to determine the partition ID given the key. Alternatively, users can also use customized Partitions.
What is the purpose of retention period in Kafka cluster?
However, retention period retains all the published records within the Kafka cluster. It doesn’t check whether they have been consumed or not. Moreover, the records can be discarded by using a configuration setting for the retention period. And, it results as it can free up some space.
What is Mirror Maker ?
Mirror Maker tool helps to offer to mirror of one Kafka cluster to another.

Kafka Performance Tuning

Performance tuning involves two important metrics:
  • Latency measures how long it takes to process one event.
  • Throughput measures how many events arrive within a specific amount of time.
Most systems are optimized for either latency or throughput. Kafka is balanced for both. A well-tuned Kafka system has just enough brokers to handle topic throughput, given the latency required to process information as it is received.
Tuning your producers, brokers, and consumers to send, process, and receive the largest possible batches within a manageable amount of time results in the best balance of latency and throughput for your Kafka cluster.
The following sections introduce the concepts you'll need to be able to balance your Kafka workload and then provide practical tuning configuration to address specific circumstances.
For a quick video introduction to tuning Kafka, see Tuning Your Apache Kafka Cluster.

Tuning Brokers

Topics are divided into partitions. Each partition has a leader. Topics that are properly configured for reliability will consist of a leader partition and 2 or more follower partitions. When the leaders are not balanced properly, one might be overworked, compared to others.
Depending on your system and how critical your data is, you want to be sure that you have sufficient replication sets to preserve your data. For each topic, Cloudera recommends starting with one partition per physical storage disk and one consumer per partition.

Tuning Producers

Kafka uses an asynchronous publish/subscribe model. When your producer calls send(), the result returned is a future. The future provides methods to let you check the status of the information in process. When the batch is ready, the producer sends it to the broker. The Kafka broker waits for an event, receives the result, and then responds that the transaction is complete.
If you do not use a future, you could get just one record, wait for the result, and then send a response. Latency is very low, but so is throughput. If each transaction takes 5 ms, throughput is 200 events per second — slower than the expected 100,000 events per second.
When you use Producer.send(), you fill up buffers on the producer. When a buffer is full, the producer sends the buffer to the Kafka broker and begins to refill the buffer.
Two parameters are particularly important for latency and throughput: batch size and linger time.

Batch Size

batch.size measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker. Set this as high as possible, without exceeding available memory. The default value is 16384.
If you increase the size of your buffer, it might never get full. The Producer sends the information eventually, based on other triggers, such as linger time in milliseconds. Although you can impair memory usage by setting the buffer batch size too high, this does not impact latency.
If your producer is sending all the time, you are probably getting the best throughput possible. If the producer is often idle, you might not be writing enough data to warrant the current allocation of resources.

Linger Time

linger.ms sets the maximum time to buffer data in asynchronous mode. For example, the setting of 100 means that it batches 100ms of messages to send at once. This improves throughput, but the buffering adds message delivery latency.
By default, the producer does not wait. It sends the buffer any time data is available.
Instead of sending immediately, you can set linger.ms to 5 and send more messages in one batch. This would reduce the number of requests sent, but would add up to 5 milliseconds of latency to records sent, even if the load on the system does not warrant the delay.
The farther away the broker is from the producer, the more overhead required to send messages. Increase linger.ms for higher latency and higher throughput in your producer.

Tuning Consumers

Consumers can create throughput issues on the other side of the pipeline. The maximum number of consumers in a consumer group for a topic is equal to the number of partitions. You need enough partitions to handle all the consumers needed to keep up with the producers.
Consumers in the same consumer group split the partitions among them. Adding more consumers to a group can enhance performance (up to the number of partitions). Adding more consumer groups does not affect performance.