GOOGLE ADS

samedi 30 avril 2022

Connecteur Kafka et Schema Registry - Erreur lors de la récupération du schéma Avro - Objet introuvable

J'ai un sujet qui finira par contenir de nombreux schémas différents. Pour l'instant il n'en a qu'un. J'ai créé un travail de connexion via REST comme ceci :

{
"name":"com.mycompany.sinks.GcsSinkConnector-auth2",
"config": {
"connector.class": "com.mycompany.sinks.GcsSinkConnector",
"topics": "auth.events",
"flush.size": 3,
"my.setting":"bar",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry-service:8081",
"value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
"group.id":"account-archiver"
}
}

J'envoie ensuite un message à ce sujet avec une clé de chaîne et une charge utile sérialisée avro. Si j'inspecte le sujet dans le centre de contrôle, je vois les données correctement désérialisées passer. En regardant la sortie de l'instance de connexion, je vois cela dans les journaux

RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Vous pouvez voir d'ici qu'il y a deux problèmes liés :


  • Error retrieving Avro schema for id 7

  • Subject not found.; error code: 40401


Ce qui me dérange, c'est que j'ai spécifié que la stratégie est RecordNameStrategy qui, je pense, devrait utiliser l'octet magique pour aller chercher le schéma par opposition au nom du sujet, mais il y a des erreurs sur le sujet introuvable. Je ne sais pas s'il s'agit réellement de rechercher un nom de sujet ou d'obtenir un schéma par l'ID. Quoi qu'il en soit, en ssh-ing à l'instance de connexion et en faisant une boucle pour
http://schema-registry-service:8081/schemas/ids/7obtenir le schéma renvoyé. Il y a une journalisation supplémentaire au-dessus de cette trace de pile qui semble décevante comme si elle utilisait toujours la mauvaise stratégie de nom :

INFO AvroConverterConfig values:
schema.registry.url = [http://schema-registry-service:8081]
basic.auth.user.info = [hidden]
auto.register.schemas = false
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
schema.registry.basic.auth.user.info = [hidden]
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

Quelqu'un at-il des indices sur la façon de résoudre ce problème? J'utilise les images suivantes:


  • confluentinc/cp-kafka-connect:5.2.0

  • confluentinc/cp-kafka:5.1.0


Merci


Solution du problème

Dans la trace, lookUpSubjectVersionsignifie qu'il a essayé de faire une recherche sous /subjects/:name/versionspour chaque ID répertorié là-bas, puis n'a pas pu trouver schemaId=7(Remarque : pas la version = 7), mais pas trop clair d'après les journaux ce :namequ'il essaie d'utiliser ici, mais si ce n'est pas le cas 't trouvé, alors vous obtiendrez votre Subject not founderreur. Si mon PR était accepté, le nom du sujet serait plus clair

Je pense que cela peut être dû à l'utilisation de RecordNameStrategy. En regardant le PR pour cette propriété, j'ai compris qu'il n'était vraiment testé que par rapport au code producteur/consommateur, et pas complètement dans l'API Connect. Par rapport au comportement par défaut deTopicNameStrategy

Lequel, vous pouvez voir qu'il a essayé d'utiliser

value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

Mais en regardant de plus près, je pense que vous l'avez peut-être mal configuré.

Semblable à la façon dont vous avez value.converter.schema.registry.url, vous auriez en fait besoin de définir à la value.converter.value.subject.name.strategyplace.

Aucun commentaire:

Enregistrer un commentaire

Comment utiliseriez-vous .reduce() sur des arguments au lieu d'un tableau ou d'un objet spécifique ?

Je veux définir une fonction.flatten qui aplatit plusieurs éléments en un seul tableau. Je sais que ce qui suit n'est pas possible, mais...