Implement Agents

Every agent comes with an automatically generated implementation file, which allows to implement business logic that should happen after an event has happened. After publishing an event, all agents bound to the associated topic binding will be executed.

Agent base

  • For each agent there will be an abstract class Agent Base generated in the SDK

  • The Agent Base provides access to the repository, the entity builder, the event builder and the event producer

  • The Agent Base contains one abstract method named onMessage

  • The onMessage method needs to be implemented in the generated implementation file for the agent

  • The default behavior can be changed by overriding the method onMessage( ConsumerRecord consumerRecord, MessageHeaders headers) in the agent directly. There is also the possibility to access the initial ConsumerRecord.

Note:

onMessage default behavior changes is only available for Domain Services (Java) based on Java Spring Boot Stack 2.0

  • The agent implementation file onMessage is automatically triggered when the Kafka event that the agent is modelled against is received.

Payload entity

  • Agent onMessage method will receive Payload Entity as first parameter (if event that the agent receives is modelled with a payload).

  • Agent onMessage method will also receive MessageHeaders as a parameter which is a key-value map that contains Kafka message headers.

Implementation example

Example of Balance service implementation file.

package de.knowis.cards.operations.domain.cc.agent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.messaging.MessageHeaders;

import de.knowis.cards.operations.sdk.domain.cc.agent.CreditCardCreatedAgentBase;
import de.knowis.cards.operations.domain.facade.DomainEntityBuilder;
import de.knowis.cards.operations.domain.facade.DomainEventBuilder;
import de.knowis.cards.operations.sdk.domain.facade.Repository;
import de.knowis.cards.operations.sdk.domain.service.EventProducerService;
import de.knowis.cards.operations.sdk.domain.cc.entity.CreditCard;
import de.knowis.cards.operations.sdk.domain.cc.entity.Card;

@Service
public class CreditCardCreatedAgent extends CreditCardCreatedAgentBase {

  private static Logger log = LoggerFactory.getLogger(CreditCardCreatedAgent.class);

  // Declare Card Command 
  private final CardCommandBase cardCommand;

  // Adjust your generated agent constructor to inject CardCommandBase so it can be used in agent logic
  public CreditCardCreatedAgent(DomainEntityBuilder entityBuilder, DomainEventBuilder eventBuilder, EventProducerService eventProducer, Repository repo, CardCommandBase cardCommand) {
    super(entityBuilder,eventBuilder, eventProducer,repo);
    this.cardCommand = cardCommand;
  }

  
  @Override
  public void onMessage(CreditCard creditCard, MessageHeaders headers) {

    log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");

    // Get root entity through repository
    Card card = this.repo.getCc().getCards().findById(creditCard.getId());

    // Call activateCard instance command to activate Card 
    // (this will call implemented logic for activate card command)
    cardCommand.activateCard(card);
  }
}

Example to override the default behavior of onMessage method (Java Spring Boot Stack 2.0 only) in Balance service implementation file.

package de.knowis.cards.operations.domain.cc.agent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.messaging.MessageHeaders;

import de.knowis.cards.operations.sdk.domain.cc.agent.CreditCardCreatedAgentBase;
import de.knowis.cards.operations.domain.facade.DomainEntityBuilder;
import de.knowis.cards.operations.domain.facade.DomainEventBuilder;
import de.knowis.cards.operations.sdk.domain.facade.Repository;
import de.knowis.cards.operations.sdk.domain.service.EventProducerService;
import de.knowis.cards.operations.sdk.domain.cc.entity.CreditCard;
import de.knowis.cards.operations.sdk.domain.cc.entity.Card;

@Service
public class CreditCardCreatedAgent extends CreditCardCreatedAgentBase {

  private static Logger log = LoggerFactory.getLogger(CreditCardCreatedAgent.class);

  // Declare Card Command 
  private final CardCommandBase cardCommand;

  // Adjust your generated agent constructor to inject CardCommandBase so it can be used in agent logic
  public CreditCardCreatedAgent(DomainEntityBuilder entityBuilder, DomainEventBuilder eventBuilder, EventProducerService eventProducer, Repository repo, CardCommandBase cardCommand) {
    super(entityBuilder,eventBuilder, eventProducer,repo);
    this.cardCommand = cardCommand;
  }

  @Override
  public void onMessage(CreditCard creditCard, MessageHeaders headers) {
    log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");
  }

  @Override
  // optional could be implemented in case there is a need for ConsumerRecord
  // Gives the opportunity to access message key, headers and timestamp from ConsumerRecord
  // only available for Domain Services (Java) based on Java Spring Boot Stack 2.0
  public void onMessage(ConsumerRecord<?, ?> consumerRecord, MessageHeaders headers) {

    // get message key from consumerRecord
    Object messageKey = consumerRecord.key();

    // get message header
    String customerHeader = new String(consumerRecord.headers().lastHeader("customerHeaderName").value());

    // get message Timestamp from consumerRecord
    long messageTimeStamp = consumerRecord.timestamp();

    log.info("Agent CreditCardCreatedAgent received event cc:CreditCardCreated");

    // Get root entity through repository
    Card card = this.repo.getCc().getCards().findById(creditCard.getId());

    // Call activateCard instance command to activate Card 
    // (this will call implemented logic for activate card command)
    cardCommand.activateCard(card);
  }
}