2017-10-16 12 views
2

Je souhaite exécuter une seule instance d'un planificateur Akka dans un cluster. Actuellement, mon planificateur fonctionne correctement dans ma section locale, mais pas comme prévu. Le planificateur choisit les commandes de la base de données et les envoie au sujet Kafka.
J'utilise akka 2.5.6 (Java). J'ai traversé le official Doc mais je n'ai pas offert beaucoup d'aide. Toute aide sera grandement appréciée.Comment exécuter Akka Scheduler une fois dans un cluster?

public class OrderReprocessActor extends UntypedActor { 


    LoggingAdapter log = Logging.getLogger(getContext().system(), this); 
    OrderProcessorJdbcConnection orderProcessorJdbcConnection; 
    private final String SELECT_QUERY_TO_GET_FAILED_ORDER="SELECT * FROM ORDER_HISTORY WHERE ORDER_STATUS = ?"; 
    CommonPropsUtil commonPropsUtil; 

    final Cluster cluster = Cluster.get(getContext().system());   

    public static Props getProps() { 
     return Props.create(OrderReprocessActor.class); 
    } 

    @Inject 
    public OrderReprocessActor(OrderProcessorJdbcConnection orderProcessorJdbcConnection , CommonPropsUtil commonPropsUtil){ 
     this.orderProcessorJdbcConnection = orderProcessorJdbcConnection; 
     this.commonPropsUtil = commonPropsUtil; 
    } 

    @Override 
    public void onReceive(Object message) throws Throwable { 

     String failedStatus = (String) message; 
     List<OrderHistory> failedOrderList = getOrders(failedStatus); 
     pushOrderToKafka(failedOrderList); 
     String intervalSeconds = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.ORDER_REPROCESSOR_SCHEDULER_INTERVAL); 
     if(StringUtils.isNotEmpty(intervalSeconds)) 
     { 
      int interval = Integer.parseInt(intervalSeconds); 
      getContext().system().scheduler().scheduleOnce(Duration.create(interval, TimeUnit.SECONDS), 
        () -> { 
         getSelf().tell(failedStatus, ActorRef.noSender()); 
        }, getContext().system().dispatcher()); 
     } 
    } 

    /** 
    * This method takes the failedOrderList and pushes to Kafka Topic 
    * 
    */ 
    private void pushOrderToKafka(List<OrderHistory> failedOrders) { 

     log.info("Entering pushOrderToKafka()"); 
     String kafkaOrderTopic = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_SUBMIT_ORDER_TOPIC); 
     Properties props = getKafkaProperties(); 
     Producer<String, Order> producer = new KafkaProducer<>(props); 
     for (OrderHistory orderHistory : failedOrders) { 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       Order order = objectMapper.readValue(orderHistory.getOrderData().toString(),Order.class); 
       log.info("******************Order ID..."+orderHistory.getOrderId()); 
       producer.send(new ProducerRecord<String, Order>(kafkaOrderTopic, orderHistory.getOrderId(), order)).get(); 
      } catch (IOException e) { 
       log.error("IOException caught , message="+e.getMessage()); 
      } catch (InterruptedException e) { 
       log.error("InterruptedException caught , message="+e.getMessage()); 
      } catch (ExecutionException e) { 
       log.error("ExecutionException caught , message="+e.getMessage()); 
      } 
     } 
     producer.close(); 
     log.info("Exiting pushOrderToKafka()"); 
    } 

    /** 
    * This method return kafka connection properties 
    * @return 
    */ 
    private Properties getKafkaProperties() { 
     String kafkaBootStrapServers = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_BOOTSTRAP_SERVERS); 
     Properties props = new Properties(); 
     props.put(CommonConstants.BOOTSTRAP_SERVERS, kafkaBootStrapServers); 
     props.put(CommonConstants.KEY_SERIALIZER, VZWCommonConstants.STRING_SERIALIZER); 
     props.put(CommonConstants.VALUE_SERIALIZER, VZWCommonConstants.ORDER_SERIALIZER); 
     return props; 
    } 

    /** 
    *This method get all the failed Order from DB 
    * @return List<OrderReprocessActor.OrderHistory> 
    * @throws SQLException 
    */ 
    private List<OrderReprocessActor.OrderHistory> getOrders(String failStatus) throws SQLException { 
     log.info("Entering getAllFailedOrdersFromDB()"); 
     Connection connection = orderProcessorJdbcConnection.getConnection(); 

     try { 
      PreparedStatement pstmt = connection.prepareStatement(SELECT_QUERY_TO_GET_FAILED_ORDER); 
      pstmt.setString(1,failStatus); 
      ResultSet rersultSet = pstmt.executeQuery(); 
      return getOrdersFromResultSet(rersultSet); 
     }catch (SQLException e){ 
      log.error("SQLException caught while fetching failed Order from DB"); 
      log.error(e.getMessage()); 
     }finally { 
       orderProcessorJdbcConnection.releaseConnection(connection); 
     } 
     log.info("Exiting getAllFailedOrdersFromDB()"); 
     return null; 
    } 

    /** 
    * Retrives order from sql result set 
    * @param rersultSet 
    * @return 
    * @throws SQLException 
    */ 
    private List<OrderHistory> getOrdersFromResultSet(ResultSet rersultSet) throws SQLException { 
     List<OrderReprocessActor.OrderHistory> failedOrderList = new ArrayList<>(); 
     while(rersultSet.next()){ 
      String orderId = rersultSet.getString("order_id"); 
      String orderData = rersultSet.getString("order_data"); 
      OrderHistory orderHistory = new OrderHistory(); 
      orderHistory.setOrderId(orderId); 
      orderHistory.setOrderData(orderData); 
      failedOrderList.add(orderHistory); 
     } 
     return failedOrderList; 
    } 

    public static class OrderHistory{ 

     private String orderId; 
     private String orderData; 
     public String getOrderId() { 
      return orderId; 
     } 
     public void setOrderId(String orderId) { 
      this.orderId = orderId; 
     } 
     public String getOrderData() { 
      return orderData; 
     } 
     public void setOrderData(String orderData) { 
      this.orderData = orderData; 
     } 
    } 


} 

Répondre

0

Faites votre OrderReprocessActor un cluster singleton. À partir des documents:

Le modèle de cluster singleton est implémenté par akka.cluster.singleton.ClusterSingletonManager. Il gère une instance d'acteur singleton parmi tous les nœuds de cluster ou un groupe de nœuds associés à un rôle spécifique. ClusterSingletonManager est un acteur supposé être démarré sur tous les nœuds, ou tous les nœuds avec le rôle spécifié, dans le cluster. L'acteur singleton réel est démarré par ClusterSingletonManager sur le nœud le plus ancien en créant un acteur enfant à partir des accessoires fournis. ClusterSingletonManager s'assure qu'au maximum une instance singleton est en cours d'exécution à tout moment.

+0

salut, j'ai traversé le lien comme mentionné dans la question. Mais je suis incapable de comprendre comment intégrer ce que j'ai. En théorie, j'ai compris ce qui se passe, mais je ne suis pas clair sur le code. –

-1

Un singleton cluster est créé comme ceci:

final ClusterSingletonManagerSettings settings = 
    ClusterSingletonManagerSettings.create(system); 

system.actorOf(
    ClusterSingletonManager.props(
    Props.create(Consumer.class,() -> new Consumer(queue, testActor)), 
    TestSingletonMessages.end(), 
    settings), 
    "consumer"); 

Akka veillera à ce que l'acteur est créé uniquement sur le noeud principal du cluster.

Pour utiliser l'acteur singleton, vous demandez un proxy via son chemin:

ClusterSingletonProxySettings proxySettings = 
    ClusterSingletonProxySettings.create(system); 

ActorRef proxy = 
    system.actorOf(ClusterSingletonProxy.props("/user/consumer", proxySettings), 
    "consumerProxy"); 

Ces exemples sont adaptés à partir the docs.

+0

pouvez-vous s'il vous plaît dire quel paquet contient Consumer.class et aussi quel est le type de file d'attente? Je n'ai aucune file d'attente dans mon code –

+0

Il suffit de copier coller du code n'est pas ce que j'attends d'un utilisateur si réputé. –

+0

Vous savez déjà comment programmer des messages, et avec les détails supplémentaires sur la façon de créer et d'envoyer des messages à un singleton de cluster, il devrait être clair comment les enchaîner tous ensemble. Vous n'avez pas précisé dans votre question comment cela ne fonctionne pas. Peut-être que cela vous aiderait à avoir de meilleures réponses. – Synesso