kafka introduction
Kafka is an open-source distributed event streaming platform
Concept
Servers Kafka is run as a cluster of one or more services that can span multiple data centers or cloud regions.
Brokers one type of server, form the storage layer, store partitions
Clients allow one to read, write and process streams of events
Event records the fact that something happened
Producer Client application that publish(write) events to kafka
Consumer client applications that subscribe to (read and process) these events.
Topics Events are organized and durably stored in topics.
Partition Topics are partitioned, spread over a number of buckets located on different kafka brokers

Network
Kafka Uses a binary protocol over TCP. The protocol defines all API as request response message pairs. All messages are size delimited and are made up of the following primitive types
Boolean
int8...int64
varint
string
array
bytes
records
- Common Request and Response Structure
 
RequestOrResponse => Size (RequestMessage | ResponseMessage)
Size => int32
- Request Message Headers
 
Request Header v0 => request_api_key request_api_version correlation_id 
request_api_key => INT16
request_api_version => INT16
correlation_id => INT32
Request Header v1 => request_api_key request_api_version correlation_id client_id 
request_api_key => INT16
request_api_version => INT16
correlation_id => INT32
client_id => NULLABLE_STRING
Request Header v2 => request_api_key request_api_version correlation_id client_id TAG_BUFFER 
request_api_key => INT16
request_api_version => INT16
correlation_id => INT32
client_id => NULLABLE_STRING
Api_key defines api type
- Response Message Headers
 
Response Header v0 => correlation_id 
correlation_id => INT32
Response Header v1 => correlation_id TAG_BUFFER 
correlation_id => INT32
Other protocol formats can be found in official doc
Producer
KafkaProducer A Kafka client that publishes records to the kafka cluster.
The Producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances
The send() method is asynchronous. When called, it adds the record to a buffer of pending record sends and immediately return.
RecordAccumulator acts as a queue that accumulates records into MemoryRecords
interface Producer<K, V> extends Closeable {
  Future<RecordMetadata> send(ProducerRecord<K, V> record)
  void flush()
}
class KafkaProducer<K, V> implements Producer {
    ProducerConfig producerConfig
    ProducerMetadata metadata
    RecordAccumulator accumulator
    Sender sender
    Thread ioThread
    Serializer<K> keySerializer
    Serializer<V> valueSerializer
    ProducerInterceptors<K, V> interceptors
}
class RecordAccumulator {
}
class Sender implements Runnable {
  KafkaClient client
  RecordAccumulator accumulator
  ProducerMetadata metadata
  TransactionManager transactionManager
}
class KafkaThread extends Thread 
class BufferPool
interface KafkaClient {
  boolean isReady(Node node, long now)
  boolean ready(Node node, long now)
  void send(ClientRequest request, long now)
  List<ClientResponse> poll(long timeout, long now)
  void wakeup()
  void disconnect(String nodeId)
  void close(String nodeId)
}
class NetworkClient implements KafkaClient {
  Selectable selector
  MetadataUpdater metadataUpdater
  ClusterConnectionStates connectionStates
  InFlightRequests inFlightRequests
}
interface Selectable {
  void send(NetworkSend send)
  void wakeup()
  void poll(long timeout)
}
class Selector implements Selectable {
  Selector nioSelector
  Map<String, KafkaChannel> channels
}
class ProducerMetadata {
  Map<String, Long> topics
  Set<String> newTopics
}
KafkaProducer o-- RecordAccumulator
KafkaProducer o-right- KafkaThread: ioThread
KafkaThread o-- Sender
RecordAccumulator o-- BufferPool
Sender o-- KafkaClient
NetworkClient o-- Selectable
KafkaProducer *-- ProducerMetadata
Metadata
Metadata A class encapsulating some of the logic around metadata.
This class is shared by the client thread(for partitioning) and the background sender thread.
Metadata is maintained for only a subset of topics, which can be added to over time.
MetadataSnapshot An internal immutable snapshot of nodes, topics, and partitions in the kafka cluster.
class Metadata implements Closeable {
  MetadataSnapshot metadataSnapshot
  List<InetSocketAddress> bootstrapAddresses
  int updateVersion
  int requestVersion
  long lastRefreshMs
}
class MetadataSnapshot {
  - unauthorizedTopics: Set<String>
  - clusterInstance: Cluster
  - metadataByPartition: Map<TopicPartition, PartitionMetadata>
  - invalidTopics: Set<String>
  - topicNames: Map<Uuid, String>
  - nodes: Map<Integer, Node>
  - clusterId: String
  - topicIds: Map<String, Uuid>
  - controller: Node
  - internalTopics: Set<String>
}
class Cluster {
  List<Node> nodes
  Node controller
  Map<TopicPartition, PartitionInfo> partitionsByTopicPartition
  Map<String, List<PartitionInfo>> partitionsByTopic
  Map<Integer, List<PartitionInfo>> partitionsByNode
  Map<Integer, Node> nodesById
  ClusterResource clusterResource
  Map<String, Uuid> topicIds
  Map<Uuid, String> topicNames
}
class ProducerMetadata {
  Map<String, Long> topics
  Set<String> newTopics
}
Metadata o-- MetadataSnapshot
MetadataSnapshot o-- Cluster
Metadata <|-right-  ProducerMetadata
Producer Configs
batch.sizeProducer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This setting gives the upper bound of the batch size to be sent.linger.msThe producer groups together any records that arrive in between request transmissions into a single batched request. This setting gives the upper bound on the delay for batching.retry.backoff.msthe amount of time to wait before attempting to retry a failed request to a given topicretry.backoff.max.msThe maximum amount of time in milliseconds to wait when retrying a request to the broker that has repeatedly failedreconnect.backoff.msreconnect.backoff.max.ms
Server
class KafaRaftServer {
  SharedServer sharedServer
  Option[BrokerServer] broker
  Option[ControllerServer] controller
  startup()
  shutdown()
}
class BrokerServer {
  SharedServer sharedServer
  KafkaRaftManager[ApiMessageAndVersion] raftManager
  BrokerLifecycleManager lifecycleManager
  AssignmentsManager assignmentsManager
  KafkaApis dataPlaneRequestProcessor
  SocketServer socketServer
  KafkaRequestHandlerPool dataPlaneRequestHandlerPool
  Option[RemoteLogManager] remoteLogManagerOpt
  ReplicaManager _replicaManager
  GroupCoordinator groupCoordinator
  TransactionCoordinator transactionCoordinator
  NodeToControllerChannelManager clientToControllerChannelManager
  AutoTopicCreationManager autoTopicCreationManager
  KafkaScheduler kafkaScheduler
  KRaftMetadataCache metadataCache
}
class SharedServer {
  KafkaConfig brokerConfig
  KafkaConfig controllerConfig
  MetadataLoader loader
  SnapshotEmitter snapshotEmitter
  SnapshotGenerator snapshotGenerator
}
class ControllerServer {
  SharedServer
  KafkaConfigSchema
  BootstrapMetadata
  
}
class SocketServer {
  dataPlaneAcceptors
  dataPlaneRequestChannel
  memoryPool
  controlPlaneAcceptorOpt
  connectionQuotas
} 
KafaRaftServer *-- BrokerServer
KafaRaftServer *-- SharedServer
KafaRaftServer *-- ControllerServer
BrokerServer *-- SocketServer
SharedServer
SharedServer manages the components which are shared between the BrokerServer and ControllerServer.
The shared components include the Raft manager, snapshot generator, and metadata loader.
A KRaft server running in combined mode as both a broker and a controller will still contain only a single SharedServer instance.
BrokerServer
A broker is responsible for
- Storing message data
 - serving producer and consumer requests
 - managing partitions and replicas
 
SocketServer
Handles new connections, requests and responses to and from broker.
- data-plane
- handles requests from clients and other brokers in the cluster
 - thread model
- 1 Acceptor thread per listener, multiple listeners can be configured in kafkaConfig
 - Acceptor has N processor threads that each have their own selector and read requests from sockets. M handler threads that handle requests and produce responses back to the processor threads for writing.
 
 
 - control-plane
- handles requests from controller. This is optional and can be configured by specifying 
controller.listener.name - thread model
- 1 Acceptor thread handles new connection
 - Acceptor has 1 processor thread that has its own selector and read requests from the socket
 - 1 Handler thread that handles requests and produces responses back to the processor thread for writing.
 
 
 - handles requests from controller. This is optional and can be configured by specifying 
 
abstract class Acceptor extends Runnable {
  socketServer
  nioSelector
  serverChannel
  thread
  processors
  run()
}
class SocketServer {
  dataPlaneAcceptors
  controlPlaneAcceptorOpt
}
class DataPlaneAcceptor extends Acceptor {
}
class controlPlaneAcceptorOpt extends Acceptor
class Processor {
  thread
  selector
  requestChannel
  newConnections
  responseQueue
  accept(socketChannel)
}
class Selector {
  nioSelector
  completedReceives
  completedSends
  channels
}
class  KafkaChannel {
  NetworkReceive receive
  NetworkSend send
  TransportLayer transportLayer
}
class RequestChannel {
  requestQueue
  processors
  callbackQueue
}
class BrokerServer {
  dataPlaneRequestHandlerPool
  groupCoordinator
  socketServer
  KafkaApis dataPlaneRequestProcessor
}
class KafkaRequestHandlerPool {
  requestChannel
}
class KafkaRequestHandler extends Runnable {
  requestChannel
  ApiRequestHandler apis
  void run()
}
class KafkaApis extends ApiRequestHandler {
}
SocketServer o-- DataPlaneAcceptor
SocketServer *-- controlPlaneAcceptorOpt
Acceptor *-- Processor
Processor *-- Selector
Selector *-- KafkaChannel
SocketServer *-- RequestChannel
BrokerServer *-- SocketServer
BrokerServer *-- KafkaRequestHandlerPool
KafkaRequestHandlerPool o-- KafkaRequestHandler
KafkaRequestHandler *-- RequestChannel
Processor *-- RequestChannel
KafkaRequestHandler *-- ApiRequestHandler
ControllerServer
Controller server is responsible for:
- Managing the overall state of the kafka cluster
 - Electing partition leaders
 - Handling administrative operations
 
security
SASL
Simple Authentication and Security Layer(SASL) is a framework for authentication and data security in internet protocols. It decouple authentication mechanisms from application protocols.