Please enable JavaScript.
Coggle requires JavaScript to display documents.
Components (Consumer p86 (Poll Loop p92 (poll() (find GroupCoordinator…
Components
ConsumerGroup
members rebalance partitions to consume when some consumers fail
:!: consumers stop consuming during rebalance, and need to refresh cache after rebalance
-
only one consumer in a group will consume a specific partition, exclusively
max parallelism of topic consumption is determined by number of partitions, redundant consumers will stay idle
:star: one consumer per partition, one consumer per thread ( :question: too many threads concern? )
HeartBeating
(before 0.10.1) Consumer sends heartbeat to group coordinator broker when
a) consumer polls (e.g. retrieves records)
b) consumer commits records it has consumed
(after 0.10.1) separate heartbeat thread to decouple heartbeat and consume freq.
configurable how long the app can go without consumer polling before it will leave the group and trigger a rebalance, to avoid livelock
consumers send heartbeats to group coordinator broker to maintain membership of consumer group and ownership of partitions
poll()
- find GroupCoordinator broker, join consumer group and receive partition assignment
- rebalancing :question: see Rebalance Listener and Internals mindmap
- heartbeating :question: see HeartBeating
:!: ALWAYS close() consumer before exiting
close() will 1) close network conns and sockets, 2) trigger a consumer group rebalance immediately
-
-
-
Exiting Poll Loop
use another thread (e.g. ShutdownHook) to call consumer.wakeup() - the only consumer method safe to call from another thread
wakeup() cause poll() throw WakeupException, must call close() after that
group.id
consumers with same groupId and same topic will read individual subsets of the messages in the topic
-
enable.auto.commit
when true, no control over number of duplicated records (records not processed before auto-commit kick in)
-
commit
-
consumer produce a message containing commited offset for each partition to __consumer_offsets topic
after a rebalance, a consumer may be assigned new set of partitions, then it continues from last committed offset on that partition
:!: it's possible Kafka twice process messages in between last committed offset and last message the client processed
:!: it's possible Kafka miss messages in between last message the client processed and last committed offset
auto commit
-
enable.auto.commit=true, and use auto.commit.interval.ms to set commit interval
manual commit
Commit Latest Offset
commitSync()
-
:!::!::!: make sure you call commitSync() only after you are done processing all the records returned by poll()
automatically retries committing unless there is unrecoverable exception, CommitFailedException etc.
-
commitAsync()
-
-
Retrying Async Commits
Compare-And-Set
use a monotonically increasing sequence number.
Increase the seq no each time you commit and put it into the callback.
Before retrying, if the seq no in callback is lower than current seq no, newer commit has happened and do NOT retry
-
Commit Specified Offset p102
commit[A]Sync(Map<TopicPartition, OffsetAndMetadata>)
OffsetAndMetadata contains the offset of next message to read, NOT the last committed offset
committing an offset will commit all offsets before it, so do NOT commit if some records failed in the middle
option1, commit the last record succeeded, store the rest to buffer and keep trying to process.
use consumer.pause() to ensure no new data returned in additional poll()
option 2, dead-letter-queue: write failed records to a separate retry topic, and use separate consumer group to handle retries there
-
- create a ProducerRecord(topic, [partition], [key], value)
- serialize with provided key/value serializer
- partitioner decide which partition to write to
- return RecordMetadata(topic, partition, offset) to producer
- retry in case of failure, or return error (handled by Kafka itself)
configs
-
-
-
acks
-
0: fire-n-forget, no commit guarantee
1: only wait for leader replica commit is done
all: wait until ALL replica committed
-1: wait until all In-Sync-Replicas committed
compression.type
:star: snappy
low CPU overhead, good perf
gzip
more CPU and time, better compression ratio
-
-
max.request.size
make this match message.max.bytes in broker config
to avoid broker rejecting message due to size too large
-
sync send
producer.send() returns a Future, use future.get() to block until a response received from broker
async send
producer.send(callback), execute callback async
scaling
start with one thread per producer,
add multiple threads using the same producer for better throughput,
when above no longer help, add more producers
-
-
Storage
Partitions are composed of segments (blocks of files).
Whole segments will be deleted upon retention (per-topic), to avoid random read/write for better performance
log compacted
when used, Kafka retain only the last message produced with a specific key
-
-
VerifiableProducer
produces a seq of msgs containing 1 to a value you choose, to check how it works with your config
VerifiableConsumer
consumes events from producer and prints in order, with info of commits and rebalancing
-
Optimization
change send/receive buffers memory size for each socket
default (net.core.wmem_default = 131072/128KB, net.core.rmem_default = 131072/128KB) and
maximum (net.core.wmem_max = 2097152/2MB, net.core.rmem_max = 2097152/2MB)
change send/receive buffer for TCP sockets via
net.ipv4.tcp_wmem = 4096 65536 2048000 and
net.ipv4.tcp_rmem = 4096 65536 2048000
enable TCP window scaling by net.ipv4.tcp_window_scaling = 1, client transfer data more efficiently and broker buffers data
increase net.ipv4.tcp_max_syn_backlog > 1024, allow greater number of simultaneous connections to be accepted
increase not.core.netdev_max_backlog > 1000, assist with bursts of network traffic
-
set noatime mount option for the mount point.
because (last access time) atime is updated everytime a file is read, which is useless and cost large amount of writes
-
-
-
increase vm.dirty_ratio to [60, 80]
-
GC Options
-
-
-
:!: default Kafka start script use ParNew and CMS, change env var KAFKA_JVM_PERFORMANCE_OPTS
-