Please enable JavaScript.
Coggle requires JavaScript to display documents.
Netty (ByteBuf (Why ByteBuf? (2 pointers, readerIndex & writerIndex…
Netty
ByteBuf
diss NIO ByteBuffer
-
only 1 pointer to locate position, tedious calls to flip(), rewind() etc.
-
Why ByteBuf?
2 pointers, readerIndex & writerIndex
discardReadBytes can free data in [0, readerIndex] and shift readable bytes to the beginning of buffer
-
-
-
-
-
-
copy()
return a copy of the ByteBuf, totally separate one
slice()
return a ByteBuf from readIndex to writeIndex, share the same buffer but separate index
-
-
skipBytes(int length)
forward readerIndex by length, throws if it's beyond writerIndex after skip
Types
HeapByteBuf
fast memory allocation/revoke, JVM GC handles it
if using Socket IO, extra memory copy of head <---> channel
-
stores as byte[] on heap, use hasArray() to check if it's HeapByteBuf
DirectByteBuf
use off-heap memory, slower memory alloc/revoke
faster Socket Channel read/write, no extra mem copy
-
Pooled
-
-
-
-
PoolArena
-
-
in Netty, PoolArena consists of many Chunks, each Chunk has many Pages
Chunk
manage allocate/revoke Pages in memory.
mark a branch node as used if all Pages under it has been used
Pages built as BinaryTree
64 bytes each, consists of 16 Pages
-
Page
-
SubPage
split a Page into blocks, size based on first allocation.
e.g. a 8 byte Page is split into 2 * 4 bytes SubPages if
4 bytes are allocated on first time
Unpooled
UnpooledHeapByteBuf
create new instance each time, can impact GC
-
-
-
-
tmpNioBuf for converting to NIO ByteBuffer, reset to null after buffer resizing
CompositeBuffer
view of multiple underlying ByteBuf's, allow dynamically add/remove ByteBuf's
ResourceLeakDetector<ByteBuf> p99
static, shared among all ByteBuf instances
-
-
-
-
-
ByteBufUtil
encodeString(ByteBufAllocator alloc, CharBuffer src, Charset chset)
decodeString(ByteBuffer src, Charset chset)
-
-
-
:star:Recommended to use Unpooled/ByteBufAllocator helper methods to create new instances, NOT constructors
ByteBuf views created by duplicate/slice/readSlice do NOT increase reference count, use retained versions to do so
ChunkedFile
read file by chunk when zero-copy is not supported, or you need to transform file data
-
-
-
-
I/O
read()
-
trigger ChannelHandler.channelRead(ChannelHandlerContext, Object) callback
-
-
close(ChannelPromise p)
close connection, handle close result via the promise
-
-
-
-
-
-
-
ChannelPipeline
a responsibility chain of ChannelHandlers, each process the message on a specific event
Outbound
SocketChannel.read() get ByteBuf and trigger ChannelRead event ->
NioEventLoop select the ChannelRead event ->
NioEventLoop call ChannelPipeline.fireChannelRead(Object msg) ->
NioEventLoop send ByteBuf to ChannelPipeline ->
Handlers process ByteBuf in turn, any one can terminate it ->
-
Inbound
ChannelHandlerContext.write() ->
ByteBuf travel thru pipeline in reverse order to reading ->
Message is written to sending buffer, wait to be sent by flush()
-
ServerBootstrap/Boostrap will create pipeline for each Channel.
just add handlers by pipeline.addLast("name", handler)
also support inserting handler at specific position
-
-
-
see comment of ChannelPipeline source code
extends ChannelHandlerAdapter, implement callbacks for interested events
-
-
ByteToMessageDecoder
decode(ChannelHandlerContext, ByteBuf, List<Object>)
if cannot decode with current buffer state, reset readIndex and quit decoding
-
-
-
-
-
often stateful, simple & recommended approach is to use member variables.
-
ChannelHandlerContext
-
via context, ChannelHandlers pass events up/downstream,
modify the pipeline dynamically, or store handler-specific info in AttributeKeys
-
-
-
-
-
Java NIO
Channel
-
like a stream of data, but support both read & write
-
-
-
Buffer
-
-
-
-
compact() - clean only the data you have read, shift unread data to the beginning of buffer
-
-
-
Selector
-
-
SelectableChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE) - register a channel to the given selector,
2nd arg is what event is interested in (OP_CONNECT, OP_ACCEPT, OP_READ, OP_WRITE)
the channel MUST be in Non-Blocking mode (channel.configureBlocking(false))
FileChannel cannot be in non-blocking mode, hence not available for selector
-
select a chan
-
-
-
selectedKeys() - once the select methods return > 0, get a Set<SelectionKey> of ready selections
use iterator to iterate thru the selKeys. MUST remove each selKey by iterator.remove() after iterating it.
-
-
close() - close the selector, invalidates all selectionKeys registered. Channels are not closed
-
Scatter/Gather
Scatter
-
-
once a buffer is full, move on reading into next buffer
NOT for dynamic data length, need to know the fixed size beforehand
-
Reactor model
-
-
-
Multi-Thread Reactor
-
NIO thread pool handle I/O, reading/decoding/encoding/sending data
-
Master-Slave Reactor
-
Acceptor accept TCP conn, verify, auth, create SocketChannel and register to I/O thread pool
-
-
-
NioEventLoopGroup
-
execute Scheduled Tasks
NioEventLoop.schedule(Runnable task, long delay, TimeUnit unit)
avoid locks, sequential I/O operations
BestPractice
use 2 NioEventLoopGroup
boss group
-
accept TCP conn, initialise Channel config
worker group
async send data to conn, via ChannelPipeline in reverse
async read data from conn, fire event to ChannelPipeline
-
use decodeHandler to decode msg, DO NOT swtich to user thread
-
if business logic is time-consuming, build a Task on the POJO and send to buz-logic thread-pool,
to release NIO thread ASAP
-
NioEventLoop
-
when read a msg, call ChannelPipeline.fireChannelRead(msg), on the same thread
-
-
-
- read data from SocketChannel into ByteBuffer
- process System tasks and scheduled tasks after I/O events
iterate delayedTaskQ to move tasks to taskQ if their deadline reached
- execute tasks in TaskQ within given time slot defined by ioRatio,
only get system nano time every 60 loops cos it's costly to get sys time,
when timeup, stop running tasks and start processing I/O
- in graceful shutdown, iterate all Channel and call its Unsafe.close()
:red_flag: Unsafe is an inner class in Channel
-
Future / Promise
Future
ChannelFuture
-
await()
:!:DO NOT call ChannelFuture.await() in ChannelHandler,
will cause deadlock if I/O thread and user thread are the same, where the thread is waiting to be notified by itself
:!:be careful: ChannelFuture timeout != I/O timeout
conn can succeed after future timeout
always close() properly
Promise
-
-
-
sync()
wait for future to complete, rethrow the cause if future failure