Please enable JavaScript.
Coggle requires JavaScript to display documents.
Internals (Physical Storage p128 (File Management p130 (Segment (partition…
Internals
Physical Storage
p128
basic storage unit is a partition replica. replica cannot split, must be sequential
size of a partition is limited by the space available on a single mount point (single disk if JBOD, multiple disks if RAID)
log.dirs
config defines directories Kafka use to store data
Partition Allocation
p128
spread replicas evenly among brokers by
assigning
partitions
to brokers in
round-robin
style
for each partition, spread its replicas to different brokers by
placing replicas at increasing index of brokers from where the leader is
assign replicas of each partition to different racks by
rack-alternating broker list
e.g.
rack1 has brokers 0, 1, 2
rack2 has brokers 3, 4, 5
then order is 0,3,1,4,2,5
choose the directory with
fewest
partitions to store the partition
File Management
p130
Segment
partition consists of series of
segments
each segment is by default 1GB or a week of data,
when reached the limit, the segment file is closed and create a new one
active segment NEVER get deleted or compacted
:!: Kafka keep an open file handle to every segment in every partition - even inactive segments,
leading to
high open file handles
, OS needs to be tuned accordingly
each segment is stored in a
single file
File Format
the same format on disk and over the wire to achieve zero-copy
format consist of key, value, offset, msg size, checksum, magic byte for version of format, compression codec, timestamp
:star: recommend compressing on producer side, better for both network and broker disks
DumpLogSegment
a tool to inspect partition segments and their contents and all metadata
Index
p132
one index per partition, mapping offsets to segment files and positions within the file
index is broken into segments, automatically regenerate when missing/corrupted
:question: implementation ?
On-disk File Structure
:question: How does Kafka stores topics into Zookeeper?
:question: How does Kafka store topic and its partitions on disk?
Topic per dir under /var/kafka/data/ ?
Compaction
p132
retention policy
delete
delete events older than retention time
compact
only store most recent value
for each key in the topic
:!: MUST have non-null key
each broker starts a compaction manager thread, and multiple compaction threads.
choose the partition with highest ratio of dirty messages
:question: Implementation
Deleting Event
to delete a key completely, produce a msg with that key and a
null
value
:!: give consumers enough time to see the tombstone message
Topic design
put all events of the same type in the same topic, and use different topics for different event types
https://www.confluent.io/blog/put-several-event-types-kafka-topic/
https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
Reliable Delivery
p138
:star: must be designed into a system from its very beginning
Kafka guarantees
order
of msg in a partition
produced msgs are considered "committed" when
written to all in-sync-replicas
(but not necessarily flushed to disk, can be in filesystem cache)
consumers can
only read committed
messages
Broker Configs
p140
Replication Factor
topic-level: replication.factor
broker-level: default.replication.factor
Rack Awareness
broker.rack
Unclean Leader Election
p142
default
unclean.leader.election.enable = true
allows
out-of-sync replicas to become leaders
. this will risk
data loss
and
data inconsistency
unclean.leader.election.enable = false
disallows
out-of-sync replicas to become leaders
. this will risk
availability
if all in-sync replicas are down
Min In-Sync Replicas
both topic/broker levels - min.insync.replicas
when
not enough in-sync replicas online
brokers
stop accepting produce requests
and return
NotEnoughReplicasException
to producer client
remaining in-sync replicas become
read-only
which consumers can
still consume
this
prevents data loss
when
unclean
election occurs
Error Handling
p146
Non-retriable errors
Errors occurred before sending msg to broker, e.g. serialization error etc.
Errors occurred when all retry attempts exhuasted, or when out of memory
Maintaining State
p150
:question: approaches
Handling Long Processing Times
p150
hand off data to a thread-pool for parallel processing, and
pause()
the consumer but
keep polling w/o fetching new records
to
avoid rebalancing
due to heartbeat timeout
Cluster Membership
p118
each broker register its ID in Zookeeper by creating an
ephemeral
node
the ephemeral node auto-delete when broker is down, and watchers are notified
if a broker is down and another joins with
the same ID
, it get assigned
the same partitions and topics
Controller
responsible for electing partition leaders
usually the first broker started in the cluster is controller, by creating an ephemeral node in Zookeeper named
/controller
.
Following brokers will fail to create this node, hence realising a controller already exists.
all the nodes watch on controller and try becoming controller when it's down
controller epoch - each time a controller is elected, it receives a new, higher number, so msg from older controller will be ignored
controller
watches all broker nodes
. when a
broker left
the cluster, controller knows what leader-partitions the left broker had.
for each partition p that needs a new leader, the controller
assigns leader to the next replica in the replica list
of p,
and
sends a request to all brokers that contain replica of p to notify this change
.
when a
broker joins
the cluster, the controller
check the broker ID for replicas existing on that broker
.
If so then
notifies all related brokers to replicate message to the new joiner broker
Request Processing
p122
Components
processor/network thread
number configurable.
takes requests
from client connections and
places them in a request queue
,
and
picks up responses from a response queue
and
sending responses back to clients
acceptor thread
broker runs
one
acceptor thread on
each port
it listens.
create connection
and
handover to one processor
thread
IO thread
picking requests from request queue
and
process
them.
Request type
Produce request
sent by
producers
, contains
messages the client wants to write
to brokers
check 1) client has write access? 2) acks is valid number? 3) if acks=all, are there enough in-sync replicas?
broker writes to Linux filesystem cache, NO guarantee when written to disk,
Kafka does NOT wait for data to persist to disk
if acks=all,
request is stored in buffer
called
purgatory
until the leader observes
followers replicated msg
, then respond to client
Fetch request
p124
sent by
consumers
or
follower replicas
when they
read messages from brokers
,
containing
what offsets of which partitions of what topics
to return
:star:
ZERO-COPY
- Kafka store msgs from producers in
filesystem cache
, and then
directly
send to network channel
without intermediate buffers
if producing rate == consuming rate, no need to persist msgs to disk, it's all done in filesystem cache.
this
removes the overhead of copying bytes and managing buffers in memory
data size upperbound -
maximum data size
broker can return.
because clients need to
allocate memory to hold returned messages
, too large message could cause
OutOfMemory
issue in clients
data size lowerbound - broker only responds to client when it has
at least that much data ready
or
timeout
is reached
reduce CPU and network utilization, so improving throughput
Metadata request
can be sent to
any
broker. returns
list of topics
the client interested in,
partitions of each topic
,
replicas of each partition
and
which is the leader replica
client cache metadata and refresh periodically, controlled by
metadata.max.age.ms
Produce/Fetch requests
must be sent to leader replica
,
NO redirection
, otherwise client will get NotLeaderException,
then client will send metadata request to refresh metadata before request again
Replication
Leader replica
only one leader
replica per partition
always
update leader replica before any other replicas
need to know
which follower replicas are up-to-date
with leader replica
preferred leader
the replica that
was the leader when the topic was originally created
because
leader partitions are balanced across brokers
when
originally creating topic partitions
.
When the preferred leader is actually the real leader, load is balanced
auto.leader.rebalance.enable=true
when preferred leader is
in-sync
but not leader, trigger an election to make it the leader
:!: spread preferred leader partitions across brokers to avoid overloading brokers
Follower replica
do NOT serve client requests, only replicate msgs from leader replica
elect to become new leader when leader is down
sends leader
Fetch
requests (containing
offsets of message the follower wants to receive next
) periodically,
to get messages from leader replica and then store them
out-of-sync
the follower
hasn't sent Fetch request to leader
in more than 10 seconds
the follower requested within 10 sec but
hasn't caught up to the most recent message
in more than 10 sec
out-of-sync follower cannot become leader
time controlled by
replica.lag.time.max.ms
:star:
rapid flip between in-sync and out-of-sync
is a sign of misconfiguration in cluster.
mostly due to wrong Java
GC
config on a broker causing frequent long pause
Delivery Semantics
Exactly-Once
Producer side
Kafka use txid and seqNo/time to detect duplicate publish and ignore
publishers can safely
retry idemponently
use Property "enable.idempotence": "true" and "transactional.id": "your tx id"
Consumer side
consumer knows the offset of a message, thus can
persist them together transactionally
e.g. store in the same row in a RDBMS
Then after crash recovery, resume consuming from the latest stored offsets
use Property "isolation.level", "read_committed"
then messages written to the topic in uncommitted tx is not visible to consumers
messages in the same tx can span multiple parititions, being read by different consumers.
so Kafka broker maintains a list of all updated partitions for a tx
Transaction