Implement Agents
Every agent comes with an automatically generated implementation file, which allows to implement business logic that should happen after an event has happened. After publishing an event, all agents bound to the associated topic binding will be executed.
Agent base
For each agent there will be an abstract class Agent Base generated in the SDK
The Agent Base provides access to the repository, the entity builder, the event builder and the event producer
The Agent Base contains one abstract method named onMessage
The onMessage method needs to be implemented in the generated implementation file for the agent
The default behavior can be changed by overriding the method onMessage( ConsumerRecord consumerRecord, MessageHeaders headers) in the agent directly. There is also the possibility to access the initial ConsumerRecord.
onMessage default behavior changes is only available for Domain Services (Java) based on Java Spring Boot Stack 2.0
- The agent implementation file onMessage is automatically triggered when the Kafka event that the agent is modelled against is received.
Payload entity
Agent onMessage method will receive Payload Entity as first parameter (if event that the agent receives is modelled with a payload).
Agent onMessage method will also receive MessageHeaders as a parameter which is a key-value map that contains Kafka message headers.
Implementation example
Example of Balance service implementation file.
package de.knowis.cards.operations.domain.cc.agent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.messaging.MessageHeaders; import de.knowis.cards.operations.sdk.domain.cc.agent.CreditCardCreatedAgentBase; import de.knowis.cards.operations.domain.facade.DomainEntityBuilder; import de.knowis.cards.operations.domain.facade.DomainEventBuilder; import de.knowis.cards.operations.sdk.domain.facade.Repository; import de.knowis.cards.operations.sdk.domain.service.EventProducerService; import de.knowis.cards.operations.sdk.domain.cc.entity.CreditCard; import de.knowis.cards.operations.sdk.domain.cc.entity.Card; @Service public class CreditCardCreatedAgent extends CreditCardCreatedAgentBase { private static Logger log = LoggerFactory.getLogger(CreditCardCreatedAgent.class); // Declare Card Command private final CardCommandBase cardCommand; // Adjust your generated agent constructor to inject CardCommandBase so it can be used in agent logic public CreditCardCreatedAgent(DomainEntityBuilder entityBuilder, DomainEventBuilder eventBuilder, EventProducerService eventProducer, Repository repo, CardCommandBase cardCommand) { super(entityBuilder,eventBuilder, eventProducer,repo); this.cardCommand = cardCommand; } @Override public void onMessage(CreditCard creditCard, MessageHeaders headers) { log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated"); // Get root entity through repository Card card = this.repo.getCc().getCards().findById(creditCard.getId()); // Call activateCard instance command to activate Card // (this will call implemented logic for activate card command) cardCommand.activateCard(card); } }
Example to override the default behavior of onMessage method (Java Spring Boot Stack 2.0 only) in Balance service implementation file.
package de.knowis.cards.operations.domain.cc.agent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.messaging.MessageHeaders; import de.knowis.cards.operations.sdk.domain.cc.agent.CreditCardCreatedAgentBase; import de.knowis.cards.operations.domain.facade.DomainEntityBuilder; import de.knowis.cards.operations.domain.facade.DomainEventBuilder; import de.knowis.cards.operations.sdk.domain.facade.Repository; import de.knowis.cards.operations.sdk.domain.service.EventProducerService; import de.knowis.cards.operations.sdk.domain.cc.entity.CreditCard; import de.knowis.cards.operations.sdk.domain.cc.entity.Card; @Service public class CreditCardCreatedAgent extends CreditCardCreatedAgentBase { private static Logger log = LoggerFactory.getLogger(CreditCardCreatedAgent.class); // Declare Card Command private final CardCommandBase cardCommand; // Adjust your generated agent constructor to inject CardCommandBase so it can be used in agent logic public CreditCardCreatedAgent(DomainEntityBuilder entityBuilder, DomainEventBuilder eventBuilder, EventProducerService eventProducer, Repository repo, CardCommandBase cardCommand) { super(entityBuilder,eventBuilder, eventProducer,repo); this.cardCommand = cardCommand; } @Override public void onMessage(CreditCard creditCard, MessageHeaders headers) { log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated"); } @Override // optional could be implemented in case there is a need for ConsumerRecord // Gives the opportunity to access message key, headers and timestamp from ConsumerRecord // only available for Domain Services (Java) based on Java Spring Boot Stack 2.0 public void onMessage(ConsumerRecord<?, ?> consumerRecord, MessageHeaders headers) { // get message key from consumerRecord Object messageKey = consumerRecord.key(); // get message header String customerHeader = new String(consumerRecord.headers().lastHeader("customerHeaderName").value()); // get message Timestamp from consumerRecord long messageTimeStamp = consumerRecord.timestamp(); log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated"); // Get root entity through repository Card card = this.repo.getCc().getCards().findById(creditCard.getId()); // Call activateCard instance command to activate Card // (this will call implemented logic for activate card command) cardCommand.activateCard(card); } }