Please enable JavaScript.
Coggle requires JavaScript to display documents.
Kafka Publisher Interface
sendEvent();
messageRelay();,…
-
-
Kafka Writer Wrapper Impl:
init(kafkaConfig, keeper interface outboxEnabled, db) WriterWrapper;
Kafka Writer Wrapper Interface:
sendEventWithPersistence(tx, kafkaPayload);
sendEvent(KafkaPayload) (err);
Poll(db);
Implementation details
-
if outboxEnabled is false, sendEvent will call the keeper transform method, and then push message to the kafka writer
if outboxEnabled is true, in the init phase a go-routine will spawned calling the Poll method.
Poll to be spawned by Application
In the Poll method, the wrapper will call keeper's ReadFromOutbox and subsequently call the keeper's transform method and finally call it's own sendEvent method.
Also, in case of outboxEnabled true, the user needs to call the wrapper's sendEventWithPersistence method and pass an active tx object, the wrapper will then call the keeper's SaveToOutbox method for persistence.
by default, the keeper interface base impl can be implemented by the wrapper itself.
-
Kafka Lister Wrapper Impl:
init(kafkaConfig, retryConfig,
keeper interface,
processor interface, db)
kafka Listener Wrapper Interface:
processMainTopicMsg();
processRetryTopicMsg();
retry(Kafka_Writer_Wrapper,
kafkaPayload, retryCount, db) (err);
implementation details:
init will internally call GetKafkaReader(kafkaConfig), GetKafkaReader(kafkaRetryConfig) (depending on isRetryToBeDone
-
process method will internally call the Processor's isMsgValid method first, then call the Processor's ProcessMessage. If processing is successful, process method will commit the msg.
if processing fails and retry config's isRetryTobeDone is true, process method will call it's retry method. If retry method functionality is successful, it will commit the original kafka msg
retry method depending on the msg's retryCount, will use the kafka_writer_wrapper_for_retry_topic to push the msg back to the retry topic.
If max retry threshold is reached, retry method will call the Processor's SaveFailedMsgToDb to persist the failed msg. The msg lifecycle ends here
init will internally initialize a kafka_writer_wrapper_for_retry_topic using the keeper and kafkaRetryConfig
-
-
Kafka config:
brokerUrl,
topicName,
etc
-
Kafka Writer Interface:
push(message, key);
-
-
-
-
:warning: the publisher knows how to map the data to the proto message this service is concerned about when putting the msg to kafka.
-
-
-
-
Keeper interface:
SaveToOutbox(tx, kafkaPayload);
ReadFromOutbox(db) (kafkaPayload);
Transform(kafkaPayload) (Any);
-
:warning: Keeper Impl at the client side for overridng, else library will do the job
-
-
Processor interface:
isMsgValid(baseProto);
ProcessMsg(baseProto);
SaveFailedMsgToDb(db, kafka_payload);
transform(baseProto) (kafka_payload);
// this is the reverse of the Keeper's transform
-
-
-
Keeper interface:
SaveToOutbox(tx, kafkaPayload);
ReadFromOutbox(db) (kafkaPayload);
Transform(kafkaPayload) (Any);
-