J'utilise kafka/confluent (3.2.0) pour réessayer le changement sur une instance Mongodb que nous avons. Le processus source est géré par Debezium source connector qui utilise Source Connect Api et est déployé sur nos systèmes en utilisant Mesos (DC/OS) pour étendre l'image docker Confluent Connect. Kafka lui-même est déployé sur le même DC/OS en utilisant la version du framework.Kafka/Confluent: ProducerConfig change par défaut max.request.size en utilisant des images docker
Depuis que nous avons un certain message plus grand que la taille par défaut j'ai changé ces paramètres d'installation de kafka:
• replica.fetch.max.bytes
• message.max.bytes
deux 4MB .
Puis je commence à l'image Connecteur Docker en utilisant ce
docker run -d --rm --net=bridge --name=kafka-connect-mongodb -e CONNECT_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS} -e CONNECT_REST_PORT=${CONNECT_REST_PORT} -e CONNECT_GROUP_ID="mongo-to-kafka-source-connector" -e CONNECT_CONFIG_STORAGE_TOPIC="${CONFIG.TOPIC}" -e CONNECT_OFFSET_STORAGE_TOPIC="${OFFSETS.TOPIC}" -e CONNECT_STATUS_STORAGE_TOPIC="${STATUS.TOPIC}" -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_LISTENERS}" -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_LISTENERS}" -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_REST_ADVERTISED_HOST_NAME="${CONNECTOR_HOME}" -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO -e CONNECT_MAX_REQUEST_SIZE=4194304 -e KAFKA_MAX_REQUEST_SIZE=4194304 mongodb-source-connector:1.1
J'ai changé la valeur par défaut max.request.size producteur passant à la fois KAFKA_MAX_REQUEST_SIZE et CONNECT_MAX_REQUEST_SIZE et le journal a été correctement changé 4MB.
Le problème se pose lorsque je démarre l'extraction à partir de Mongodb. Pour ce faire je cours ce poste
curl -X POST \
http://hostname:8083/connectors \
-d '{
"name": "source_connector",
"config": {
"tasks.max":"1",
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongodbhost:27017",
"mongodb.name": "replica",
"collection.whitelist": "db[.]table",
"max.request.size": "4194304"
}
}'
mais le journal dit
[2017-10-09 12:22:56,036] INFO ProducerConfig values:
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [PLAINTEXT://172.17.0.3:9093]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 9223372036854775807
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 2147483647
retries = 2147483647
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Alors, quand je commence le processus source, la valeur par défaut pour max.request.size est utilisé.
Here le journal complet.
Je ne me défais pas de ce qui me manque.