Je souhaite exécuter une seule instance d'un planificateur Akka dans un cluster. Actuellement, mon planificateur fonctionne correctement dans ma section locale, mais pas comme prévu. Le planificateur choisit les commandes de la base de données et les envoie au sujet Kafka.
J'utilise akka 2.5.6 (Java). J'ai traversé le official Doc mais je n'ai pas offert beaucoup d'aide. Toute aide sera grandement appréciée.Comment exécuter Akka Scheduler une fois dans un cluster?
public class OrderReprocessActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
OrderProcessorJdbcConnection orderProcessorJdbcConnection;
private final String SELECT_QUERY_TO_GET_FAILED_ORDER="SELECT * FROM ORDER_HISTORY WHERE ORDER_STATUS = ?";
CommonPropsUtil commonPropsUtil;
final Cluster cluster = Cluster.get(getContext().system());
public static Props getProps() {
return Props.create(OrderReprocessActor.class);
}
@Inject
public OrderReprocessActor(OrderProcessorJdbcConnection orderProcessorJdbcConnection , CommonPropsUtil commonPropsUtil){
this.orderProcessorJdbcConnection = orderProcessorJdbcConnection;
this.commonPropsUtil = commonPropsUtil;
}
@Override
public void onReceive(Object message) throws Throwable {
String failedStatus = (String) message;
List<OrderHistory> failedOrderList = getOrders(failedStatus);
pushOrderToKafka(failedOrderList);
String intervalSeconds = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.ORDER_REPROCESSOR_SCHEDULER_INTERVAL);
if(StringUtils.isNotEmpty(intervalSeconds))
{
int interval = Integer.parseInt(intervalSeconds);
getContext().system().scheduler().scheduleOnce(Duration.create(interval, TimeUnit.SECONDS),
() -> {
getSelf().tell(failedStatus, ActorRef.noSender());
}, getContext().system().dispatcher());
}
}
/**
* This method takes the failedOrderList and pushes to Kafka Topic
*
*/
private void pushOrderToKafka(List<OrderHistory> failedOrders) {
log.info("Entering pushOrderToKafka()");
String kafkaOrderTopic = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_SUBMIT_ORDER_TOPIC);
Properties props = getKafkaProperties();
Producer<String, Order> producer = new KafkaProducer<>(props);
for (OrderHistory orderHistory : failedOrders) {
ObjectMapper objectMapper = new ObjectMapper();
try {
Order order = objectMapper.readValue(orderHistory.getOrderData().toString(),Order.class);
log.info("******************Order ID..."+orderHistory.getOrderId());
producer.send(new ProducerRecord<String, Order>(kafkaOrderTopic, orderHistory.getOrderId(), order)).get();
} catch (IOException e) {
log.error("IOException caught , message="+e.getMessage());
} catch (InterruptedException e) {
log.error("InterruptedException caught , message="+e.getMessage());
} catch (ExecutionException e) {
log.error("ExecutionException caught , message="+e.getMessage());
}
}
producer.close();
log.info("Exiting pushOrderToKafka()");
}
/**
* This method return kafka connection properties
* @return
*/
private Properties getKafkaProperties() {
String kafkaBootStrapServers = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_BOOTSTRAP_SERVERS);
Properties props = new Properties();
props.put(CommonConstants.BOOTSTRAP_SERVERS, kafkaBootStrapServers);
props.put(CommonConstants.KEY_SERIALIZER, VZWCommonConstants.STRING_SERIALIZER);
props.put(CommonConstants.VALUE_SERIALIZER, VZWCommonConstants.ORDER_SERIALIZER);
return props;
}
/**
*This method get all the failed Order from DB
* @return List<OrderReprocessActor.OrderHistory>
* @throws SQLException
*/
private List<OrderReprocessActor.OrderHistory> getOrders(String failStatus) throws SQLException {
log.info("Entering getAllFailedOrdersFromDB()");
Connection connection = orderProcessorJdbcConnection.getConnection();
try {
PreparedStatement pstmt = connection.prepareStatement(SELECT_QUERY_TO_GET_FAILED_ORDER);
pstmt.setString(1,failStatus);
ResultSet rersultSet = pstmt.executeQuery();
return getOrdersFromResultSet(rersultSet);
}catch (SQLException e){
log.error("SQLException caught while fetching failed Order from DB");
log.error(e.getMessage());
}finally {
orderProcessorJdbcConnection.releaseConnection(connection);
}
log.info("Exiting getAllFailedOrdersFromDB()");
return null;
}
/**
* Retrives order from sql result set
* @param rersultSet
* @return
* @throws SQLException
*/
private List<OrderHistory> getOrdersFromResultSet(ResultSet rersultSet) throws SQLException {
List<OrderReprocessActor.OrderHistory> failedOrderList = new ArrayList<>();
while(rersultSet.next()){
String orderId = rersultSet.getString("order_id");
String orderData = rersultSet.getString("order_data");
OrderHistory orderHistory = new OrderHistory();
orderHistory.setOrderId(orderId);
orderHistory.setOrderData(orderData);
failedOrderList.add(orderHistory);
}
return failedOrderList;
}
public static class OrderHistory{
private String orderId;
private String orderData;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getOrderData() {
return orderData;
}
public void setOrderData(String orderData) {
this.orderData = orderData;
}
}
}
salut, j'ai traversé le lien comme mentionné dans la question. Mais je suis incapable de comprendre comment intégrer ce que j'ai. En théorie, j'ai compris ce qui se passe, mais je ne suis pas clair sur le code. –