GOOGLE ADS

samedi 23 avril 2022

Erreur MassTransit "L'événement Completed n'est pas géré pendant l'état final de la machine d'état RequestStateMachine"

J'ai un système dans lequel j'essaie d'implémenter un middleware pour que nos API gèrent l'idempotence. pour cela, toutes les API envoient un message au système où un autre service prend ces messages et les transmet soit au gestionnaire réel avec une jonglerie de données mineure, soit répond par lui-même.

le système frontal (client) doit envoyer le message TRequestWithIdempotency et attendre un message TResponseWithIdempotency en retour. interne TRequestInternal et TResponseInternal doivent être utilisés si la machine d'état est nouvelle et que l'opération API n'a pas encore été traitée/exécutée (résultat de l'opération inconnu).

J'ai essayé d'utiliser StateMachines depuis que j'ai vu un exemple sur https://github.com/MassTransit/MassTransit/blob/develop/tests/MassTransit.QuartzIntegration.Tests/RequestRequest_Specs.cs qui est très similaire à ce que j'essaie. aussi j'ai changé le code de test et le système fonctionne bien mais en dessous le système échoue à la fin du flux d'opération. Je peux voir les journaux sur les opérations d'API internes réussies et je peux également voir la réponse dans les journaux, mais à un moment donné avant de répondre à l'interface (renvoyant la réponse TResponseWithIdempotency), l'erreur mentionnée se produit.

mon consommateur interne et RequestStateMachine sont enregistrés sur le système en tant que fonction ConfigureInMemoryBus dans le fichier de test de github.

faites-moi savoir s'il manque quelque chose. Ma machine d'état n'a pas d' état Terminé comme erreur et l'erreur provient de RequestState que je ne modifie ni ne touche.

J'utilise MassTransit 7.3.1 (pas 8 malheureusement), Localstack

 // TRequest and TResponse are same with TRequestWithIdempotency and TResponseWithIdempotency types in statemachine
var requestClient = bus.CreateRequestClient<TRequest>(RequestTimeout.After(s: 120));
var requestHandle = requestClient.Create(request);
var response = requestHandle.GetResponse<TResponse>(false);

mon état de fonctionnement

 public class IdempotentPurchaseOperationState<TResponse>: SagaStateMachineInstance
{
public Guid Id => CorrelationId;
public Guid CorrelationId { get; set; }
public int CurrentState { get; set; }
public Guid? MyRequestId { get; set; }
public TResponse? ResponseMessage { get; set; }
}

ma définition de machine d'état

 // there are some Get... functions which convert the data and returns proper data instances
public State InternalRequestCompleted { get; private set; }
public State InternalRequestFaulted { get; private set; }
public Request<TState, TRequestInternal, TResponseInternal, IdempotentRequestFault> RequestFromHandler { get; private set; }
public Event<TRequestWithIdempotency> IdempotentRequestReceived { get; private set; }
public IdempotentPurchaseOperationProcessManager()
{
InstanceState(x => x.CurrentState, InternalRequestCompleted, InternalRequestFaulted);
Event(() => IdempotentRequestReceived, x =>
{
x.CorrelateById(x => x.CorrelationId?? GetIdempotencyKey(x.Message));
x.SetSagaFactory(e => new TState
{
CorrelationId = GetIdempotencyKey(e.Message),
});
});
Request(() => RequestFromHandler, x => x.MyRequestId, x => x.Timeout = TimeSpan.FromSeconds(30));
Initially(
When(IdempotentRequestReceived)
.Request(RequestFromHandler, ctx =>
{
return ctx.Init<TRequestInternal>(GetInternalRequestFromProcessManagerRequest(ctx.Data));
})
.RequestStarted()
.TransitionTo(RequestFromHandler.Pending)
);
During(RequestFromHandler.Pending,
When(RequestFromHandler.Completed)
.RequestCompleted(ctx =>
{
return ctx.Init<TResponseWithIdempotency>(GetProcessManagerResponseFromInternalResponse(ctx.Data));
})
.TransitionTo(InternalRequestCompleted),
When(RequestFromHandler.Completed2)
.RequestCompleted(context =>
{
return context.Init<IdempotentRequestFault>
(new IdempotentRequestFaultImpl
{
ExceptionMessages = context.Data.ExceptionMessages
});
})
.TransitionTo(InternalRequestFaulted),
When(RequestFromHandler.Faulted)
.RequestCompleted(context =>
{
return context.Init<Fault<TRequestInternal>>
(new
{
Message = context.Instance.InitialMessage,
context.Data.FaultId,
context.Data.FaultedMessageId,
context.Data.Timestamp,
context.Data.Exceptions,
context.Data.Host,
context.Data.FaultMessageTypes
});
})
.TransitionTo(InternalRequestFaulted)
);
During(InternalRequestCompleted,
When(IdempotentRequestReceived)
.RespondAsync(ctx => {
return ctx.Init<TResponseWithIdempotency>(ctx.Instance.ResponseMessage);
})
);
During(InternalRequestFaulted,
When(IdempotentRequestReceived)
.Finalize());
}

Je reçois en dessous de l'exception. lors du prochain appel d'API avec la même clé d'idempotence, j'obtiens une réponse correcte et attendue.

sev="ERROR" msg="Message handling threw error." corrid="d9d7b1c8-d59f-4f02-bac1-03c35a91fc8b" reqid="01000000-ac14-0242-5b09-08da1e0e6950" op="Consumer:RequestCompleted" messageType="RequestCompleted" duration=339 Exception=Automatonymous.NotAcceptedStateMachineException: Automatonymous.Requests.RequestState(00060000-ac14-0242-af78-08da1d5a3ecf) Saga exception on receipt of Automatonymous.Contracts.RequestCompleted: Not accepted in state Final
---> Automatonymous.UnhandledEventException: The Completed event is not handled during the Final state for the RequestStateMachine state machine
at Automatonymous.AutomatonymousStateMachine`1.DefaultUnhandledEventCallback(UnhandledEventContext`1 context)
at Automatonymous.AutomatonymousStateMachine`1.UnhandledEvent(EventContext`1 context, State state)
at Automatonymous.States.StateMachineState`1.Automatonymous.State<TInstance>.Raise[T](EventContext`2 context)
at Automatonymous.AutomatonymousStateMachine`1.Automatonymous.StateMachine<TInstance>.RaiseEvent[T](EventContext`2 context)
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context, IPipe`1 next)
--- End of inner exception stack trace ---
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context, IPipe`1 next)
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context, IPipe`1 next)


Solution du problème

Vous appelez probablement RequestCompleted()plus d'une fois pour la même demande, ce qui entraîne la livraison de plusieurs événements terminés à RequestStateMachine pour la même demande. Dans ce cas, la machine d'état de la demande a déjà été finalisée pour cette demande.

Vous pourriez bénéficier davantage de l'utilisation d'un avenir durable, que je couvre dans la saison 3 sur YouTube. Il est plus adapté à votre scénario, avec moins de surcharge et était destiné à remplacer RequestStateMachine.

Aucun commentaire:

Enregistrer un commentaire

Comment utiliseriez-vous .reduce() sur des arguments au lieu d'un tableau ou d'un objet spécifique&nbsp;?

Je veux définir une fonction.flatten qui aplatit plusieurs éléments en un seul tableau. Je sais que ce qui suit n'est pas possible, mais...