2017-06-07 1 views
1

J'évalue les flux de Kafka et fait une application simple et l'ai laissée fonctionner pendant la nuit. Je l'ai couru sur 2 instances avec 1 fil de discussion par instance. J'ai un cluster Kafka 2 courtiers.L'application Kafka Streams est morte avec "StreamsException: Impossible de créer des sujets internes".

Le StreamsConfig:

private Map<String, Object> settings() { 

    Map<String, Object> settings = new HashMap<>(); 
    settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "fare_tracker"); 
    settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverAddress + ":" + serverPort); 
    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092"); 
    settings.put(StreamsConfig.STATE_DIR_CONFIG, directoryName); 
    settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); 
    settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, AvroTimeStampExtractor.class); 
    settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2); 
    settings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); 

    settings.put("schema.registry.url", "http://zookeeper1:8081"); 

    settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); 
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "snappy"); 
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 3); 
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), 500); 

    return settings; 
} 

Il est mort environ 12 heures après le début, avec pile ci-dessous trace:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create internal topics. 
     at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:81) 
     at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:628) 
     at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:382) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:343) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:501) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:451) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:433) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) 

J'ai trouvé quelques journaux pertinents WARN et ERROR.

{ 
    "@timestamp": "2017-06-07T05:44:26.996+05:30", 
    "@version": 1, 
    "message": "Got error produce response with correlation id 198191 on topic-partition fare_tracker-small_window-changelog-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION", 
    "logger_name": "org.apache.kafka.clients.producer.internals.Sender", 
    "thread_name": "kafka-producer-network-thread | fare_tracker-9e0a04f4-c1cc-4b61-8ca5-8bf25f18549f-StreamThread-1-producer", 
    "level": "WARN", 
    "level_value": 30000 
} 

^Cela ressemble à un problème de réseau générique, j'ai le producteur configuré avec 3 tentatives.

Application1 est mort avec les journaux suivants:

{ 
    "@timestamp": "2017-06-07T06:20:35.122+05:30", 
    "@version": 1, 
    "message": "stream-thread [StreamThread-1] Failed to commit StreamTask 2_61 state: ", 
    "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread", 
    "thread_name": "StreamThread-1", 
    "level": "WARN", 
    "level_value": 30000, 
    "stack_trace": org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) 
    at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) 
    at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) 
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) 
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) 
    at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) 
    at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) 
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) 
} 

{ 
    "@timestamp": "2017-06-07T06:20:35.236+05:30", 
    "@version": 1, 
    "message": "Bootstrap broker kafka2:9092 disconnected", 
    "logger_name": "org.apache.kafka.clients.NetworkClient", 
    "thread_name": "StreamThread-1", 
    "level": "WARN", 
    "level_value": 30000 
} 
{ 
    "@timestamp": "2017-06-07T06:20:36.100+05:30", 
    "@version": 1, 
    "message": "Could not create internal topics: Found only 1 brokers, but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4", 
    "logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager", 
    "thread_name": "StreamThread-1", 
    "level": "WARN", 
    "level_value": 30000 
} 
{ 
    "@timestamp": "2017-06-07T06:20:36.914+05:30", 
    "@version": 1, 
    "message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.", 
    "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread", 
    "thread_name": "StreamThread-1", 
    "level": "WARN", 
    "level_value": 30000 
} 

Application 2 morts avec les journaux suivants:

{ 
    "@timestamp": "2017-06-07T06:20:06.254+05:30", 
    "@version": 1, 
    "message": "Could not create internal topics: Found only 1 brokers, but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4", 
    "logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager", 
    "thread_name": "StreamThread-1", 
    "level": "WARN", 
    "level_value": 30000 
} 
{ 
    "@timestamp": "2017-06-07T06:20:07.041+05:30", 
    "@version": 1, 
    "message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.", 
    "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread", 
    "thread_name": "StreamThread-1", 
    "level": "WARN", 
    "level_value": 30000 
} 

J'ai vérifié mes autres applications, et bien que le sont en cours d'exécution bien, je vu ce qui suit Connectez-vous plusieurs fois à peu près en même temps que ci-dessus.

{ 
     "@timestamp": "2017-06-07T06:10:34.962+05:30", 
     "@version": 1, 
     "message": "Publishing to kafka failed ", 
     "thread_name": "kafka-producer-network-thread | producer-1", 
     "level": "ERROR", 
     "level_value": 40000, 
     "stack_trace": org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time 
    at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:255) 
    at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109) 
    at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160) 
    at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time 

org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time 
    } 

Répondre

1

Vous avez dit que vous avez 2 courtiers Kafka, mais l'un des messages d'erreur comprend les informations suivantes:

Could not create internal topics: Found only 1 brokers, but replication factor is 2. 

Il semble que vous avez réseau des problèmes de connectivité entre votre application (s) et Kafka courtiers (et peut-être aussi entre les courtiers Kafka eux-mêmes). Si un tel problème de réseau persiste pendant une période plus longue, les applications qui tentent de communiquer avec les courtiers Kafka finissent par échouer tôt ou tard (en fonction de leurs paramètres).

+1

Il s'agissait d'une panne de réseau d'AWS dans ma zone de disponibilité. J'ai tout dans une seule zone de disponibilité. :) – Dexter