Sunday 7 February 2016

Kafka Tips


  • Distributed publish-subscribe messaging system
  • The messages are maintained in ‘topics’
  • Messages are coming as ‘byte arrays’.
  • ‘Producers’ write data to ‘topics’
  • ‘Consumers’ pull data from ‘topics’
  • ‘Topics’ are partitioned (partitions) and distributed across nodes
  • Each ‘partition’ will have a single ‘leader’ and zero or more servers that act as ‘followers’
  • ‘Producers’ can attach each message with a ‘key’ so that message with same key goes into the same partition, or it can simply write in ‘round robin’ fashion just for load balancing
  • ‘Consumer Group’ – group of consumers
    • They subscribe to a topic
    • Each consumer in the group will consume from unique subset of partitions
    • Each consumer will have all messages related to the same in key
  • Each topic partition is considered to be ‘log’. (an ordered set of messages)
  • Messages are tracked by offset
  • Consumers are responsible for tracking the offset
  • Messages will be deleted after a certain retention period
  • Retention period is normally set for all topics together, however it can be set specific to a topic
  • Kafka maintains order within a topic, but not across topics
  • Kafka does not track acknowledgements, it also does not track messages per consumer
  • It can handle batch consumers – process that wake up in a period of time and consumes all the messages
  • Kafka uses ‘Zookeeper’ for cluster membership
  • ‘Broker’ is a daemon running in each node of Kafka cluster
  • ‘Broker ID’ is unique and permanent name for each node in the cluster – must be non-negative
  • ‘isr’ is in-sync replicas
  • There is a ‘max lag time ‘ parameter, and if the ‘followers’ do not send a fetch request within this period of time, then, the leader will remove the ‘follower’ from the list of ‘isr’ (Also, the ‘follower’ will treated as dead, if it falls more a set number of messages behind the leader)
  • Communication happens using TCP protocol
  • The log for a topic partition will be stored as a directory of segment files. (log.segment.bytes – controls the size of each segment. The ‘log’ will be rolled forcefully by Kafka in a certain time period even if it does not reach the size. The default period is 1 week)
  • The default replication factor for automatically created topics is 1.
  • Kafka is written in Scala

Some important properties: (kafka.utils.VerifiableProperties)

  • log.retention.check.interval.ms - The frequency in milliseconds that the log cleaner checks whether any log segment is eligible for deletion to meet the retention policies
  • log.retention.hours – The number of hours to keep the log segment before deleting it
  • zookeeper.connect – The details of zookeeper. <hostname:port>, normally 2181. To allow connecting through other zookeeper nodes when the host is down, we can specify the details in a comma separated list
  • Num.partitions – The default number of partitions per topic
  • group.id – To identify the group of consumers. All the consumer process having the same id will belong to same group

Kafka Mirroring

  • Feature to maintain replica of an existing Kafka cluster
  • This is done via ‘MirrorMaker’ tool
  • The tool uses Kafka Consumer to consume data from the source Kafka cluster and Kafka Producer to publish it to the target Kafka cluster
  • ‘- - whitelist’  specifies the list of topics to be mirrored from the source cluster and ‘- - blacklist’ specifies the list of topics that should not be mirrored
  • Check to see if mirror is keeping up – The ‘consumer offset checker’ tool can be used to measure how well our mirror is keeping up.


Basic Kafka Operations

Adding, Modifying and Removing Topics

  • Topics are created either manually or automatically
  • Manually topics can be added using :
    • Example: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
  • Topics can be modified using : 
    • Example: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
  • Kafka does not currently support reducing the number of partitions for a topic or changing the replication factor

Graceful shutdown

  • Kafka will automatically detect any broker shutdown or failure and elect new leaders for the partitions on that machine.
  • Advantages of graceful shutdown:
    • Sync all the logs avoiding any need for log recovery
    • Migrate all partitions the server is leader for to other replicas prior shutdown
  • controlled.shutdown.enable=true should be set for graceful shutdown

Balancing leadership

  • Whenever broker stops/crashes leadership for that broker’s partitions transfers to other replicas. This means, when the broker restarts it will be only the follower rather than being the leader. This might lead to imbalance in leadership. Hence, to avoid this Kafka has a concept of ‘preferred replica’. 
  • Suppose we have a partition replicated in 3,5,6 and if 3 is the preferred replica, then we can have Kafka cluster restore its leadership by running the command:
    • bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
  • Or we can set the property in configuration:
    • auto.leader.rebalance.enable=true

Expanding cluster

  • Add a unique broker id
  • Startup Kafka on new servers
  • New servers will not be automatically be assigned any data partitions, so need to migrate data
  • ‘Partition Reassignment’ tool can be used to move partitions across brokers