Please enable JavaScript.
Coggle requires JavaScript to display documents.
Akka Cluster (Cluster Sharding (Rebalancing (notify all ShardRegion actors…
Akka Cluster
Cluster Sharding
distribute actors across nodes in the cluster
interact with actors using logical identifier without knowing physical location
typically used when you have many stateful actors that
together consume too much resources to be on single machine
actors with entities can be auto distributed across nodes in cluster.
each entity actor runs only at one place, msg can be sent to entity w/o location,
by sending msg via ShardRegion actor
-
-
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number of cluster nodes.
Less shards than number of nodes will result in that some nodes will not host any shards.
Too many shards will result in less efficient management of the shards, e.g. rebalancing overhead, and increased latency.
The sharding algorithm must be the same on all nodes in a running cluster.
It can be changed after stopping all nodes in the cluster.
-
1 ShardRegion only forward msg to
a) entity in Shard it owns
b) the ShardRegion that owns the destination entity
it NEVER forward msg to entities owned by other region
-
shard allocation decision is made by the central ShardCoordinator (as a cluster singleton)
ShardCoordinator.LeastShardAllocationStrategy - allocate shards to ShardRegion with least shards
Rebalancing
- notify all ShardRegion actors that a handoff for a shard has started
- ShardRegions start buffering msgs for that shard, location of that shard becomes unknown
- during rebalancing, coordinator will not answer requests for locations being rebalanced
- ShardRegion stop all entities in that shard by sending handOffStopMessage specified (default PoisonPill)
- ShardRegion ack the handoff as completed when all entities are terminated
- coordinator start replying requests for the location and allocate a new location for that shard, then buffered messages in the ShardRegion actors are delivered to the new location
-
State of shard locations in ShardCoordinator is persitent. new coordinator singleton is created and recover the state.
During this shards with known location are still available, msg for unknown locations are buffered until new corrdinator online
Remembering Entities
-
shardIdExtractor must handle Shard.StartEntity(EntityId)
*ShardId must be possible to extract from EntityId
-
performance cost, use with < 10000 entities/shard
Cluster Client
-
-
-
-
sender() seen by destination actor is NOT the client, but the receptionist
sender() of response msg sent by destination actor and seen by client, is DeadLetter
client buffers msg while establishing connection. if full, old msgs are dropped
-
Joining Cluster
-
retry
-
with timeout config, will run CoordinatedShutdown to
shutdown the actor system
and exit the JVM (optional)
akka.cluster.shutdown-after-unsuccessful-join-seed-nodes = 20s
akka.coordinated-shutdown.terminate-actor-system = on
-
join with same host and port after restart, cluster needs to remove old one and then allow new one to join
-
-
Failure Detector
in cluster, each node is monitored by some other nodes (default max 5)
ANY monitor detects unreachable, will spread the message to cluster with gossip
when ALL monitor find it reachable again, the cluster consider it as reachable
when system message cannot deliver to a node, that node will be Quarantined and CANNOT come back from Unreachable.
This can happen when there are too many un-ack-ed sys msgs (Watch, Terminated, remote actor deployment, failures)
Then the node needs to be moved to Down or Removed, the actor system of the quarantined node must be restarted before rejoin.
-
Coding
methods
-
-
-
subscribe
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
-
-
cluster.subscribe(self,
initialStateMode = InitialStateAsEvents,
classOf[MemberEvent], classOf[UnreachableMember])
use InitialStateAsEvents to receive history events instead of current cluster state (NOT full history!)
-
Cluster(system) registerOnMember[Up/Removed/...] {...}
will be invoked when current member status moves to Up
or register to CoordinatedShutdown
:!: if register on a shutdown cluster, callback will be immediately invoked on caller thread
-
-
-
-
Configuring
-
seed-nodes
-
-
FIRST node in config MUST be started when starting the cluster initially
to avoid forming separated islands when starting from an empty cluster
it is quickest to start all seed nodes at the same time, whatever order
-
-
-
WeaklyUp
Joining members will be promoted to WeaklyUp state and become part of cluster
when some node is Unreachable at that moment so gossip cannot converge.
once converged, WeaklyUp will move to Up
-
in network partition, the other side does NOT know WeaklyUp members, so DO NOT use WeaklyUp members for quorum decisions
-
-
-
-
Cluster Dispatcher
to avoid false alerts from heartbeat actors not having chance to run,
define a separate dispatcher to be used for the cluster actors
default should be sufficient, unless
1) running blocking actors/tasks
2) running CPU intensive tasks
create dedicated dispatcher for these
Cluster Metrics
member nodes can collect system health metrics and publish to other nodes in cluster and registered subscribers on system event bus
-
-