2017-10-09 8 views
0

J'utilise kafka/confluent (3.2.0) pour réessayer le changement sur une instance Mongodb que nous avons. Le processus source est géré par Debezium source connector qui utilise Source Connect Api et est déployé sur nos systèmes en utilisant Mesos (DC/OS) pour étendre l'image docker Confluent Connect. Kafka lui-même est déployé sur le même DC/OS en utilisant la version du framework.Kafka/Confluent: ProducerConfig change par défaut max.request.size en utilisant des images docker

Depuis que nous avons un certain message plus grand que la taille par défaut j'ai changé ces paramètres d'installation de kafka:

• replica.fetch.max.bytes
• message.max.bytes

deux 4MB .

Puis je commence à l'image Connecteur Docker en utilisant ce

docker run -d --rm --net=bridge --name=kafka-connect-mongodb -e CONNECT_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS} -e CONNECT_REST_PORT=${CONNECT_REST_PORT} -e CONNECT_GROUP_ID="mongo-to-kafka-source-connector" -e CONNECT_CONFIG_STORAGE_TOPIC="${CONFIG.TOPIC}" -e CONNECT_OFFSET_STORAGE_TOPIC="${OFFSETS.TOPIC}" -e CONNECT_STATUS_STORAGE_TOPIC="${STATUS.TOPIC}" -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_LISTENERS}" -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_LISTENERS}" -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_REST_ADVERTISED_HOST_NAME="${CONNECTOR_HOME}" -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO -e CONNECT_MAX_REQUEST_SIZE=4194304 -e KAFKA_MAX_REQUEST_SIZE=4194304 mongodb-source-connector:1.1 

J'ai changé la valeur par défaut max.request.size producteur passant à la fois KAFKA_MAX_REQUEST_SIZE et CONNECT_MAX_REQUEST_SIZE et le journal a été correctement changé 4MB.

Le problème se pose lorsque je démarre l'extraction à partir de Mongodb. Pour ce faire je cours ce poste

curl -X POST \ 
http://hostname:8083/connectors \ 
    -d '{ 
    "name": "source_connector", 
    "config": { 
    "tasks.max":"1", 
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "mongodbhost:27017", 
    "mongodb.name": "replica", 
    "collection.whitelist": "db[.]table", 
    "max.request.size": "4194304" 
    } 
}' 

mais le journal dit

[2017-10-09 12:22:56,036] INFO ProducerConfig values: 
    acks = all 
    batch.size = 16384 
    block.on.buffer.full = false 
    bootstrap.servers = [PLAINTEXT://172.17.0.3:9093] 
    buffer.memory = 33554432 
    client.id = 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    linger.ms = 0 
    max.block.ms = 9223372036854775807 
    max.in.flight.requests.per.connection = 1 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 2147483647 
    retries = 2147483647 
    retry.backoff.ms = 100 
    sasl.jaas.config = null 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = null 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = null 
    ssl.keystore.password = null 
    ssl.keystore.type = JKS 
    ssl.protocol = TLS 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = null 
    ssl.truststore.password = null 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 

Alors, quand je commence le processus source, la valeur par défaut pour max.request.size est utilisé.

Here le journal complet.

Je ne me défais pas de ce qui me manque.

Répondre

0

le chat IRC m'a aidé: je devais spécifier KAFKA_PRODUCER_MAX_REQUEST_SIZE et CONNECT_PRODUCER_MAX_REQUEST_SIZE en commençant l'image docker.