J'ai un sujet qui finira par contenir de nombreux schémas différents. Pour l'instant il n'en a qu'un. J'ai créé un travail de connexion via REST comme ceci :
{
"name":"com.mycompany.sinks.GcsSinkConnector-auth2",
"config": {
"connector.class": "com.mycompany.sinks.GcsSinkConnector",
"topics": "auth.events",
"flush.size": 3,
"my.setting":"bar",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry-service:8081",
"value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
"group.id":"account-archiver"
}
}
J'envoie ensuite un message à ce sujet avec une clé de chaîne et une charge utile sérialisée avro. Si j'inspecte le sujet dans le centre de contrôle, je vois les données correctement désérialisées passer. En regardant la sortie de l'instance de connexion, je vois cela dans les journaux
RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Vous pouvez voir d'ici qu'il y a deux problèmes liés :
Error retrieving Avro schema for id 7
Subject not found.; error code: 40401
Ce qui me dérange, c'est que j'ai spécifié que la stratégie est RecordNameStrategy qui, je pense, devrait utiliser l'octet magique pour aller chercher le schéma par opposition au nom du sujet, mais il y a des erreurs sur le sujet introuvable. Je ne sais pas s'il s'agit réellement de rechercher un nom de sujet ou d'obtenir un schéma par l'ID. Quoi qu'il en soit, en ssh-ing à l'instance de connexion et en faisant une boucle pour
http://schema-registry-service:8081/schemas/ids/7
obtenir le schéma renvoyé. Il y a une journalisation supplémentaire au-dessus de cette trace de pile qui semble décevante comme si elle utilisait toujours la mauvaise stratégie de nom :
INFO AvroConverterConfig values:
schema.registry.url = [http://schema-registry-service:8081]
basic.auth.user.info = [hidden]
auto.register.schemas = false
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
schema.registry.basic.auth.user.info = [hidden]
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
Quelqu'un at-il des indices sur la façon de résoudre ce problème? J'utilise les images suivantes:
- confluentinc/cp-kafka-connect:5.2.0
- confluentinc/cp-kafka:5.1.0
Merci
Solution du problème
Dans la trace, lookUpSubjectVersion
signifie qu'il a essayé de faire une recherche sous /subjects/:name/versions
pour chaque ID répertorié là-bas, puis n'a pas pu trouver schemaId=7
(Remarque : pas la version = 7), mais pas trop clair d'après les journaux ce :name
qu'il essaie d'utiliser ici, mais si ce n'est pas le cas 't trouvé, alors vous obtiendrez votre Subject not found
erreur. Si mon PR était accepté, le nom du sujet serait plus clair
Je pense que cela peut être dû à l'utilisation de RecordNameStrategy
. En regardant le PR pour cette propriété, j'ai compris qu'il n'était vraiment testé que par rapport au code producteur/consommateur, et pas complètement dans l'API Connect. Par rapport au comportement par défaut deTopicNameStrategy
Lequel, vous pouvez voir qu'il a essayé d'utiliser
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
Mais en regardant de plus près, je pense que vous l'avez peut-être mal configuré.
Semblable à la façon dont vous avez value.converter.schema.registry.url
, vous auriez en fait besoin de définir à la value.converter.value.subject.name.strategy
place.