diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/AsyncStatusResultRetryProcess.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/AsyncStatusResultRetryProcess.java index 9abd09280af..0470a211ded 100644 --- a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/AsyncStatusResultRetryProcess.java +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/AsyncStatusResultRetryProcess.java @@ -19,6 +19,7 @@ import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.response.ResponseFailure; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.statemachine.retry.processor.RetryProcessor; import java.time.Clock; import java.util.concurrent.CompletableFuture; @@ -27,7 +28,10 @@ /** * Provides retry capabilities to an asynchronous process that returns a {@link CompletableFuture} with a {@link StatusResult} content + * + * @deprecated use {@link RetryProcessor}. */ +@Deprecated(since = "0.12.0") public class AsyncStatusResultRetryProcess, C, SELF extends AsyncStatusResultRetryProcess> extends CompletableFutureRetryProcess, SELF> { private final Monitor monitor; diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/CompletableFutureRetryProcess.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/CompletableFutureRetryProcess.java index 77d94696ddb..a5568e4a312 100644 --- a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/CompletableFutureRetryProcess.java +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/CompletableFutureRetryProcess.java @@ -16,6 +16,7 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.statemachine.retry.processor.RetryProcessor; import java.time.Clock; import java.util.Optional; @@ -28,7 +29,10 @@ /** * Provides retry capabilities to an asynchronous process that returns a {@link CompletableFuture} object + * + * @deprecated use {@link RetryProcessor}. */ +@Deprecated(since = "0.12.0") public class CompletableFutureRetryProcess, C, SELF extends CompletableFutureRetryProcess> extends RetryProcess> { private final Supplier> process; diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/EntityRetryProcessConfiguration.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/EntityRetryProcessConfiguration.java index 590646d4b53..1753f2b2f2c 100644 --- a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/EntityRetryProcessConfiguration.java +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/EntityRetryProcessConfiguration.java @@ -22,21 +22,6 @@ /** * Configure a {@link RetryProcess} */ -public class EntityRetryProcessConfiguration { +public record EntityRetryProcessConfiguration(int retryLimit, Supplier delayStrategySupplier) { - private final int retryLimit; - private final Supplier delayStrategySupplier; - - public EntityRetryProcessConfiguration(int retryLimit, Supplier delayStrategySupplier) { - this.retryLimit = retryLimit; - this.delayStrategySupplier = delayStrategySupplier; - } - - public int getRetryLimit() { - return retryLimit; - } - - public Supplier getDelayStrategySupplier() { - return delayStrategySupplier; - } } diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/EntityRetryProcessFactory.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/EntityRetryProcessFactory.java index 1389a323a62..32ad101b5f6 100644 --- a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/EntityRetryProcessFactory.java +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/EntityRetryProcessFactory.java @@ -17,6 +17,7 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.statemachine.retry.processor.RetryProcessor; import java.time.Clock; import java.util.concurrent.CompletableFuture; @@ -37,23 +38,42 @@ public EntityRetryProcessFactory(Monitor monitor, Clock clock, EntityRetryProces this.configuration = configuration; } + /** + * Initialize a {@link RetryProcessor} on the passed entity. + * + * @param entity the entity. + * @return a retry processor. + */ + public , C> RetryProcessor retryProcessor(E entity) { + return new RetryProcessor<>(entity, monitor, clock, configuration); + } + /** * Initialize a synchronous process that needs to be retried if it does not succeed + * + * @deprecated use {{@link #retryProcessor(StatefulEntity)}} */ + @Deprecated(since = "0.12.0") public , C> StatusResultRetryProcess doSyncProcess(T entity, Supplier> process) { return new StatusResultRetryProcess<>(entity, process, monitor, clock, configuration); } /** * Initialize an asynchronous process that needs to be retried if it does not succeed + * + * @deprecated use {{@link #retryProcessor(StatefulEntity)}} */ + @Deprecated(since = "0.12.0") public , C, SELF extends CompletableFutureRetryProcess> SELF doAsyncProcess(T entity, Supplier> process) { return (SELF) new CompletableFutureRetryProcess(entity, process, monitor, clock, configuration); } /** * Initialize an asynchronous process that will return a {@link StatusResult} and it will need to be handled + * + * @deprecated use {{@link #retryProcessor(StatefulEntity)}} */ + @Deprecated(since = "0.12.0") public , C, SELF extends AsyncStatusResultRetryProcess> SELF doAsyncStatusResultProcess(T entity, Supplier>> process) { return (SELF) new AsyncStatusResultRetryProcess(entity, process, monitor, clock, configuration); } diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/RetryProcess.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/RetryProcess.java index 3a7c966c827..b7d9d8cd971 100644 --- a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/RetryProcess.java +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/RetryProcess.java @@ -16,6 +16,7 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.statemachine.retry.processor.RetryProcessor; import java.time.Clock; import java.util.function.Consumer; @@ -24,7 +25,10 @@ * Represent a process on a {@link StatefulEntity} that is retried after a certain delay if it fails. * This works only used on a state machine, where states are persisted. * The process is a unit of logic that can be executed on the entity. + * + * @deprecated use {@link RetryProcessor}. */ +@Deprecated(since = "0.12.0") public abstract class RetryProcess, SELF extends RetryProcess> { private final E entity; @@ -64,7 +68,7 @@ public boolean execute(String description) { } return false; } else { - monitor.debug(String.format("Entity %s %s retry #%d of %d.", entity.getId(), entity.getClass().getSimpleName(), entity.getStateCount() - 1, configuration.getRetryLimit())); + monitor.debug(String.format("Entity %s %s retry #%d of %d.", entity.getId(), entity.getClass().getSimpleName(), entity.getStateCount() - 1, configuration.retryLimit())); } } @@ -86,12 +90,12 @@ public SELF onDelay(Consumer onDelay) { * @return {@code true} if the entity should not be sent anymore. */ protected boolean retriesExhausted(E entity) { - return entity.getStateCount() > configuration.getRetryLimit(); + return entity.getStateCount() > configuration.retryLimit(); } private long delayMillis(E entity) { // Get a new instance of WaitStrategy. - var delayStrategy = configuration.getDelayStrategySupplier().get(); + var delayStrategy = configuration.delayStrategySupplier().get(); // Set the WaitStrategy to have observed previous failures. // This is relevant for stateful strategies such as exponential wait. diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/StatusResultRetryProcess.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/StatusResultRetryProcess.java index 1f0bdc19027..2c8863bdd78 100644 --- a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/StatusResultRetryProcess.java +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/StatusResultRetryProcess.java @@ -18,6 +18,7 @@ import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.response.ResponseFailure; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.statemachine.retry.processor.RetryProcessor; import java.time.Clock; import java.util.function.BiConsumer; @@ -28,7 +29,10 @@ /** * Provides retry capabilities to a synchronous process that returns a {@link StatusResult} object + * + * @deprecated use {@link RetryProcessor}. */ +@Deprecated(since = "0.12.0") public class StatusResultRetryProcess, C> extends RetryProcess> { private final Supplier> process; private final Monitor monitor; diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/EntityStateException.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/EntityStateException.java new file mode 100644 index 00000000000..02a59b08e57 --- /dev/null +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/EntityStateException.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.entity.StatefulEntity; +import org.jetbrains.annotations.NotNull; + +/** + * Exception that describes generic failure in a process + */ +public class EntityStateException extends RuntimeException { + + private final StatefulEntity entity; + private final String processName; + + public EntityStateException(StatefulEntity entity, String processName, String message) { + super(message); + this.entity = entity; + this.processName = processName; + } + + public StatefulEntity getEntity() { + return entity; + } + + public String getProcessName() { + return processName; + } + + @NotNull String getRetryLimitExceededMessage() { + return "%s: ID %s. Attempt #%d failed to %s. Retry limit exceeded. Cause: %s".formatted( + entity.getClass().getSimpleName(), + entity.getId(), + entity.getStateCount(), + getProcessName(), + getMessage() + ); + } + + @NotNull String getRetryFailedMessage() { + return "%s: ID %s. Attempt #%d failed to %s. Cause: %s".formatted( + entity.getClass().getSimpleName(), + entity.getId(), + entity.getStateCount(), + getProcessName(), + getMessage() + ); + } +} diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/FutureResultRetryProcess.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/FutureResultRetryProcess.java new file mode 100644 index 00000000000..01699c40c13 --- /dev/null +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/FutureResultRetryProcess.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.entity.StatefulEntity; +import org.eclipse.edc.spi.response.StatusResult; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static java.util.concurrent.CompletableFuture.failedFuture; + +/** + * Process implementation that handles a process that returns {@link CompletableFuture} with enclosed {@link StatusResult} + * + * @param entity type. + * @param process input type. + * @param process output type. + */ +public class FutureResultRetryProcess, I, O> implements Process { + + private final String name; + private final BiFunction>> function; + private Function entityReload; + + public FutureResultRetryProcess(String name, BiFunction>> function) { + this.name = name; + this.function = function; + } + + @Override + public CompletableFuture> execute(ProcessContext context) { + try { + return new FutureRetryProcess<>(name, function).entityReload(entityReload) + .execute(context) + .thenCompose(asyncContext -> new ResultRetryProcess(name, (e, c) -> asyncContext.content()) + .execute(new ProcessContext<>(asyncContext.entity(), context.content()))); + } catch (Throwable throwable) { + return failedFuture(new UnrecoverableEntityStateException(reloadEntity(context.entity()), name, throwable.getMessage())); + } + } + + public FutureResultRetryProcess entityReload(Function entityReload) { + this.entityReload = entityReload; + return this; + } + + private E reloadEntity(E entity) { + return entityReload == null ? entity : entityReload.apply(entity.getId()); + } + +} diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/FutureRetryProcess.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/FutureRetryProcess.java new file mode 100644 index 00000000000..fed205087ba --- /dev/null +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/FutureRetryProcess.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.entity.StatefulEntity; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static java.util.concurrent.CompletableFuture.failedFuture; + +/** + * Process implementation that handles a process that returns {@link CompletableFuture} + * + * @param entity type. + * @param process input type. + * @param process output type. + */ +public class FutureRetryProcess, I, O> implements Process { + + private final String name; + private final BiFunction> function; + private Function entityReload; + + public FutureRetryProcess(String name, BiFunction> function) { + this.name = name; + this.function = function; + } + + @Override + public CompletableFuture> execute(ProcessContext context) { + try { + return function.apply(context.entity(), context.content()) + .handle((content, throwable) -> { + var reloadedEntity = reloadEntity(context.entity()); + if (throwable == null) { + return new ProcessContext<>(reloadedEntity, content); + } else { + throw new EntityStateException(reloadedEntity, name, throwable.getMessage()); + } + }); + } catch (Throwable throwable) { + return failedFuture(new UnrecoverableEntityStateException(reloadEntity(context.entity()), name, throwable.getMessage())); + } + } + + public FutureRetryProcess entityReload(Function entityReload) { + this.entityReload = entityReload; + return this; + } + + private E reloadEntity(E entity) { + return entityReload == null ? entity : entityReload.apply(entity.getId()); + } + +} diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/Process.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/Process.java new file mode 100644 index 00000000000..36bc034a70c --- /dev/null +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/Process.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.entity.StatefulEntity; +import org.eclipse.edc.spi.response.StatusResult; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; + +/** + * Represent a single unit of processing on an entity + * + * @param entity type. + * @param process input type. + * @param process output type. + */ +public interface Process, I, O> { + + /** + * Instantiates a process that returns a {@link StatusResult} + * + * @param name the process name, will be used for logging. + * @param function that executes the process. + * @return the process instance. + */ + static , I, O> Process result(String name, BiFunction> function) { + return new ResultRetryProcess<>(name, function); + } + + /** + * Instantiates a process that returns a {@link CompletableFuture} + * + * @param name the process name, will be used for logging. + * @param function that executes the process. + * @return the process instance. + */ + static , I, O> Process future(String name, BiFunction> function) { + return new FutureRetryProcess<>(name, function); + } + + /** + * Instantiates a process that returns a {@link CompletableFuture} that encloses a {@link StatusResult} + * + * @param name the process name, will be used for logging. + * @param function that executes the process. + * @return the process instance. + */ + static , I, O> Process futureResult(String name, BiFunction>> function) { + return new FutureResultRetryProcess<>(name, function); + } + + /** + * Function that wraps the enclosed content type into a {@link CompletableFuture} + * + * @param context the process context. + * @return a future containing the response type. + */ + CompletableFuture> execute(ProcessContext context); + +} diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/ProcessContext.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/ProcessContext.java new file mode 100644 index 00000000000..229af3c59a5 --- /dev/null +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/ProcessContext.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.entity.StatefulEntity; + +/** + * Data object that contains the entity and the content of a process. + */ +public record ProcessContext, C>(E entity, C content) { +} diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/ResultRetryProcess.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/ResultRetryProcess.java new file mode 100644 index 00000000000..1e60559f2fa --- /dev/null +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/ResultRetryProcess.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.entity.StatefulEntity; +import org.eclipse.edc.spi.response.StatusResult; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; + +/** + * Process implementation that handles a process that returns {@link StatusResult} + * + * @param entity type. + * @param process input type. + * @param process output type. + */ +public class ResultRetryProcess, I, O> implements Process { + + private final String name; + private final BiFunction> function; + + public ResultRetryProcess(String name, BiFunction> function) { + this.name = name; + this.function = function; + } + + @Override + public CompletableFuture> execute(ProcessContext context) { + try { + var result = function.apply(context.entity(), context.content()); + + if (result.fatalError()) { + return CompletableFuture.failedFuture(new UnrecoverableEntityStateException(context.entity(), name, result.getFailureDetail())); + } + + if (result.failed()) { + return CompletableFuture.failedFuture(new EntityStateException(context.entity(), name, result.getFailureDetail())); + } + + return CompletableFuture.completedFuture(new ProcessContext<>(context.entity(), result.getContent())); + } catch (Throwable e) { + return CompletableFuture.failedFuture(e); + } + + } +} diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/RetryProcessor.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/RetryProcessor.java new file mode 100644 index 00000000000..b5b603014ff --- /dev/null +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/RetryProcessor.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.entity.StatefulEntity; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.statemachine.retry.EntityRetryProcessConfiguration; + +import java.time.Clock; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** + * Component that watches processes to be executed on entities in the state machine. + * One or more processes can either: + *
    + *
  • not be executed because of retry policies not met
  • + *
  • executed successfully
  • + *
  • failed, to be retried on the next state machine iteration
  • + *
  • failed exceeding the retry limit
  • + *
  • failed with an unrecoverable error
  • + *
+ * + * The component has been designed with a process chain component on which multiple processes can be chained. + * + * @param entity type. + * @param content type that is returned by the {@link #processChain} and that will be available in the {@link #onSuccess} handler. + * + */ +public class RetryProcessor, C> { + + private final E entity; + private final Monitor monitor; + private final Clock clock; + private final EntityRetryProcessConfiguration configuration; + private final Function>> processChain; + + private BiConsumer onSuccess; + private BiConsumer onFailure; + private BiConsumer onFinalFailure; + + public RetryProcessor(E entity, Monitor monitor, Clock clock, EntityRetryProcessConfiguration configuration) { + this(entity, monitor, clock, configuration, v -> CompletableFuture.completedFuture(new ProcessContext<>(entity, null))); + } + + private RetryProcessor(E entity, Monitor monitor, Clock clock, EntityRetryProcessConfiguration configuration, Function>> processChain) { + this.entity = entity; + this.monitor = monitor; + this.clock = clock; + this.configuration = configuration; + this.processChain = processChain; + } + + public RetryProcessor doProcess(Process process) { + return new RetryProcessor<>(entity, monitor, clock, configuration, c -> processChain.apply(c).thenCompose(process::execute)); + } + + public RetryProcessor onSuccess(BiConsumer onSuccess) { + this.onSuccess = onSuccess; + return this; + } + + public RetryProcessor onFailure(BiConsumer onFailure) { + this.onFailure = onFailure; + return this; + } + + public RetryProcessor onFinalFailure(BiConsumer onFinalFailure) { + this.onFinalFailure = onFinalFailure; + return this; + } + + /** + * Execute the processes applying eventual retry policy + * Will execute the onSuccess, onFailure, onFinalFailure handlers at need + * + * @return true if the process has been run, false otherwise. + */ + public boolean execute() { + if (isRetry(entity)) { + var delay = delayMillis(entity); + if (delay > 0) { + monitor.debug(String.format("Entity %s %s retry #%d will not be attempted before %d ms.", entity.getId(), entity.getClass().getSimpleName(), entity.getStateCount() - 1, delay)); + return false; + } else { + monitor.debug(String.format("Entity %s %s retry #%d of %d.", entity.getId(), entity.getClass().getSimpleName(), entity.getStateCount() - 1, configuration.retryLimit())); + } + } + + processChain.apply(null) + .whenComplete((content, throwable) -> { + if (throwable == null) { + onSuccess.accept(content.entity(), content.content()); + } else { + var cause = throwable.getCause(); + if (cause instanceof UnrecoverableEntityStateException unrecoverable) { + monitor.severe(unrecoverable.getUnrecoverableMessage()); + onFinalFailure.accept(entity, unrecoverable); + } else if (cause instanceof EntityStateException entityStateException) { + var exceptionEntity = entityStateException.getEntity(); + if (exceptionEntity.getStateCount() > configuration.retryLimit()) { + monitor.severe(entityStateException.getRetryLimitExceededMessage()); + onFinalFailure.accept(entity, entityStateException); + } else { + monitor.debug(entityStateException.getRetryFailedMessage()); + onFailure.accept(entity, entityStateException); + } + } else { + monitor.severe("Runtime exception caught by retry processor: %s".formatted(cause.getMessage()), cause); + onFinalFailure.accept(entity, cause); + } + } + }); + + return true; + } + + private boolean isRetry(E entity) { + return entity.getStateCount() - 1 > 0; + } + + private long delayMillis(E entity) { + // Get a new instance of WaitStrategy. + var delayStrategy = configuration.delayStrategySupplier().get(); + + // Set the WaitStrategy to have observed previous failures. + // This is relevant for stateful strategies such as exponential wait. + delayStrategy.failures(entity.getStateCount() - 1); + + // Get the delay time following the number of failures. + var waitMillis = delayStrategy.retryInMillis(); + + return entity.getStateTimestamp() + waitMillis - clock.millis(); + } + +} diff --git a/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/UnrecoverableEntityStateException.java b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/UnrecoverableEntityStateException.java new file mode 100644 index 00000000000..0f2177a248d --- /dev/null +++ b/core/common/lib/state-machine-lib/src/main/java/org/eclipse/edc/statemachine/retry/processor/UnrecoverableEntityStateException.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.entity.StatefulEntity; +import org.jetbrains.annotations.NotNull; + +/** + * Exception that describes an unrecoverable failure in process. + */ +public class UnrecoverableEntityStateException extends EntityStateException { + public UnrecoverableEntityStateException(StatefulEntity entity, String processName, String message) { + super(entity, processName, message); + } + + @NotNull String getUnrecoverableMessage() { + return "%s: ID %s. Attempt #%d failed to %s. Fatal error occurred. Cause: %s".formatted( + getEntity().getClass().getSimpleName(), + getEntity().getId(), + getEntity().getStateCount(), + getProcessName(), + getMessage() + ); + } +} diff --git a/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/FutureResultRetryProcessTest.java b/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/FutureResultRetryProcessTest.java new file mode 100644 index 00000000000..14565918a31 --- /dev/null +++ b/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/FutureResultRetryProcessTest.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.statemachine.retry.TestEntity; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import static java.time.temporal.ChronoUnit.SECONDS; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY; +import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class FutureResultRetryProcessTest { + + private final Duration timeout = Duration.of(1, SECONDS); + + @Test + void shouldReturnSuccess_whenFunctionSucceeds() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build(); + var retryProcess = new FutureResultRetryProcess("process", (e, i) -> completedFuture(StatusResult.success("content"))); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).succeedsWithin(timeout).extracting(ProcessContext::content).isEqualTo("content"); + } + + @Test + void shouldReturnFailure_whenFunctionFails() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build(); + var retryProcess = new FutureResultRetryProcess("process", (e, i) -> failedFuture(new EdcException("error"))); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(EntityStateException.class, exception -> { + assertThat(exception.getEntity()).isSameAs(entity); + assertThat(exception.getProcessName()).isEqualTo("process"); + assertThat(exception.getMessage()).isEqualTo("error"); + }); + } + + @Test + void shouldReturnUnrecoverable_whenFunctionThrowsException() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build(); + var retryProcess = new FutureResultRetryProcess("process", (testEntity, o) -> { + throw new RuntimeException("unexpected exception"); + }); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(UnrecoverableEntityStateException.class, exception -> { + assertThat(exception.getEntity()).isSameAs(entity); + assertThat(exception.getProcessName()).isEqualTo("process"); + assertThat(exception.getMessage()).isEqualTo("unexpected exception"); + }); + } + + @Test + void shouldReloadEntity_whenConfigured() { + var entityId = UUID.randomUUID().toString(); + var entity = TestEntity.Builder.newInstance().id(entityId).build(); + var reloadedEntity = TestEntity.Builder.newInstance().id(entityId).build(); + Function entityReload = mock(); + when(entityReload.apply(any())).thenReturn(reloadedEntity); + var retryProcess = new FutureResultRetryProcess("process", (e, i) -> completedFuture(StatusResult.success("content"))) + .entityReload(entityReload); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).succeedsWithin(timeout).extracting(ProcessContext::entity).isSameAs(reloadedEntity); + verify(entityReload).apply(entityId); + } + + @Test + void shouldFail_whenResultFails() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build(); + var retryProcess = new FutureResultRetryProcess("process", (e, i) -> completedFuture(StatusResult.failure(ERROR_RETRY, "error"))); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(EntityStateException.class, exception -> { + assertThat(exception.getEntity()).isSameAs(entity); + assertThat(exception.getProcessName()).isEqualTo("process"); + assertThat(exception.getMessage()).isEqualTo("error"); + }); + } + + @Test + void shouldFailUnrecoverable_whenResultFatalError() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build(); + var retryProcess = new FutureResultRetryProcess("process", (e, i) -> completedFuture(StatusResult.failure(FATAL_ERROR, "error"))); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(EntityStateException.class, exception -> { + assertThat(exception.getEntity()).isSameAs(entity); + assertThat(exception.getProcessName()).isEqualTo("process"); + assertThat(exception.getMessage()).isEqualTo("error"); + }); + } +} diff --git a/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/FutureRetryProcessTest.java b/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/FutureRetryProcessTest.java new file mode 100644 index 00000000000..56023dd91d0 --- /dev/null +++ b/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/FutureRetryProcessTest.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.statemachine.retry.TestEntity; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import static java.time.temporal.ChronoUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class FutureRetryProcessTest { + + private final Duration timeout = Duration.of(1, SECONDS); + + @Test + void shouldReturnSuccess_whenFunctionSucceeds() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build(); + var retryProcess = new FutureRetryProcess("process", (e, i) -> CompletableFuture.completedFuture("content")); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).succeedsWithin(timeout).extracting(ProcessContext::content).isEqualTo("content"); + } + + @Test + void shouldReturnFailure_whenFunctionFails() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build(); + var retryProcess = new FutureRetryProcess("process", (e, i) -> CompletableFuture.failedFuture(new EdcException("error"))); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(EntityStateException.class, exception -> { + assertThat(exception.getEntity()).isSameAs(entity); + assertThat(exception.getProcessName()).isEqualTo("process"); + assertThat(exception.getMessage()).isEqualTo("error"); + }); + } + + @Test + void shouldReturnUnrecoverable_whenFunctionThrowsException() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).build(); + var retryProcess = new FutureRetryProcess("process", (testEntity, o) -> { + throw new RuntimeException("unexpected exception"); + }); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(UnrecoverableEntityStateException.class, exception -> { + assertThat(exception.getEntity()).isSameAs(entity); + assertThat(exception.getProcessName()).isEqualTo("process"); + assertThat(exception.getMessage()).isEqualTo("unexpected exception"); + }); + } + + @Test + void shouldReloadEntity_whenConfigured() { + var entityId = UUID.randomUUID().toString(); + var entity = TestEntity.Builder.newInstance().id(entityId).build(); + var reloadedEntity = TestEntity.Builder.newInstance().id(entityId).build(); + Function entityReload = mock(); + when(entityReload.apply(any())).thenReturn(reloadedEntity); + var retryProcess = new FutureRetryProcess("process", (e, i) -> CompletableFuture.completedFuture("content")) + .entityReload(entityReload); + + var future = retryProcess.execute(new ProcessContext<>(entity, "any")); + + assertThat(future).succeedsWithin(timeout).extracting(ProcessContext::entity).isSameAs(reloadedEntity); + verify(entityReload).apply(entityId); + } +} diff --git a/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/ResultRetryProcessTest.java b/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/ResultRetryProcessTest.java new file mode 100644 index 00000000000..11551d87579 --- /dev/null +++ b/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/ResultRetryProcessTest.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.statemachine.retry.TestEntity; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import static java.time.temporal.ChronoUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY; +import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; + +class ResultRetryProcessTest { + + private final Duration timeout = Duration.of(1, SECONDS); + + @Test + void shouldComplete_whenResultSucceeds() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).stateCount(1).build(); + var retryProcess = new ResultRetryProcess("description", (e, i) -> StatusResult.success("output")); + + var future = retryProcess.execute(new ProcessContext<>(entity, "input")); + + assertThat(future).succeedsWithin(timeout); + } + + @Test + void shouldFail_whenResultFails() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).stateCount(1).build(); + var retryProcess = new ResultRetryProcess("description", (e, i) -> StatusResult.failure(ERROR_RETRY, "error")); + + var future = retryProcess.execute(new ProcessContext<>(entity, "input")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(EntityStateException.class, exception -> { + assertThat(exception.getEntity()).isSameAs(entity); + assertThat(exception.getProcessName()).isEqualTo("description"); + assertThat(exception.getMessage()).isEqualTo("error"); + }); + } + + @Test + void shouldFailUnrecoverable_whenResultFatalError() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).stateCount(1).build(); + var retryProcess = new ResultRetryProcess("description", (e, i) -> StatusResult.failure(FATAL_ERROR, "error")); + + var future = retryProcess.execute(new ProcessContext<>(entity, "input")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(UnrecoverableEntityStateException.class, exception -> { + assertThat(exception.getEntity()).isSameAs(entity); + assertThat(exception.getProcessName()).isEqualTo("description"); + assertThat(exception.getMessage()).isEqualTo("error"); + }); + } + + @Test + void shouldFailUnrecoverable_whenFunctionThrowsException() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).stateCount(1).build(); + var retryProcess = new ResultRetryProcess("description", (testEntity, o) -> { + throw new RuntimeException("unexpected exception"); + }); + + var future = retryProcess.execute(new ProcessContext<>(entity, "input")); + + assertThat(future).failsWithin(timeout).withThrowableOfType(ExecutionException.class) + .extracting(Throwable::getCause).isInstanceOfSatisfying(RuntimeException.class, exception -> { + assertThat(exception.getMessage()).isEqualTo("unexpected exception"); + }); + } + +} diff --git a/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/RetryProcessorTest.java b/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/RetryProcessorTest.java new file mode 100644 index 00000000000..d5ebb962ba4 --- /dev/null +++ b/core/common/lib/state-machine-lib/src/test/java/org/eclipse/edc/statemachine/retry/processor/RetryProcessorTest.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.statemachine.retry.processor; + +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.retry.WaitStrategy; +import org.eclipse.edc.statemachine.retry.EntityRetryProcessConfiguration; +import org.eclipse.edc.statemachine.retry.TestEntity; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Instant; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; + +import static java.time.ZoneOffset.UTC; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY; +import static org.eclipse.edc.statemachine.retry.processor.Process.future; +import static org.eclipse.edc.statemachine.retry.processor.Process.result; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +class RetryProcessorTest { + + private static final long DELAY = 2L; + private final long millis = 123; + private final Monitor monitor = mock(); + private final Clock clock = Clock.fixed(Instant.ofEpochMilli(millis), UTC); + private final long shouldDelayTime = millis - DELAY + 1; + private final long shouldNotDelayTime = millis - DELAY; + private final WaitStrategy fixedWaitStrategy = () -> DELAY; + private final int retryLimit = 2; + private final EntityRetryProcessConfiguration configuration = new EntityRetryProcessConfiguration(retryLimit, () -> fixedWaitStrategy); + + private final BiConsumer success = mock(); + private final BiConsumer failure = mock(); + private final BiConsumer finalFailure = mock(); + + @Test + void shouldExecuteAllTheStagesInTheRightOrder_whenItIsRetryButDoesNotDelay() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).stateTimestamp(shouldDelayTime).stateCount(1).build(); + BiFunction> firstProcess = mock(); + when(firstProcess.apply(any(), any())).thenReturn(CompletableFuture.completedFuture(1)); + BiFunction> secondProcess = mock(); + when(secondProcess.apply(any(), any())).thenAnswer(i -> StatusResult.success(i.getArgument(1) + " second")); + + var processed = new RetryProcessor<>(entity, monitor, clock, configuration) + .doProcess(future("async process", firstProcess)) + .doProcess(result("sync process", secondProcess)) + .onSuccess(success) + .onFailure(failure) + .onFinalFailure(finalFailure) + .execute(); + + assertThat(processed).isTrue(); + verify(success).accept(entity, "1 second"); + verifyNoInteractions(failure, finalFailure); + var inOrder = inOrder(firstProcess, secondProcess); + inOrder.verify(firstProcess).apply(any(), any()); + inOrder.verify(secondProcess).apply(any(), any()); + } + + @Test + void shouldNotProcess_whenItShouldDelay() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).stateTimestamp(shouldDelayTime).stateCount(2).build(); + BiFunction> process = mock(); + + var processed = new RetryProcessor<>(entity, monitor, clock, configuration) + .doProcess(result("mock", process)) + .onSuccess(success).onFailure(failure).onFinalFailure(finalFailure) + .execute(); + + assertThat(processed).isFalse(); + verify(monitor).debug(contains("not be attempted before")); + verifyNoInteractions(process, success, failure, finalFailure); + } + + @Test + void shouldNotProcessSecond_whenFirstFails() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).stateTimestamp(shouldDelayTime).stateCount(1).build(); + BiFunction> first = mock(); + when(first.apply(any(), any())).thenReturn(StatusResult.failure(ERROR_RETRY)); + BiFunction> second = mock(); + + var processed = new RetryProcessor<>(entity, monitor, clock, configuration) + .doProcess(result("first", first)) + .doProcess(result("second", second)) + .onSuccess(success) + .onFailure(failure) + .onFinalFailure(finalFailure) + .execute(); + + assertThat(processed).isTrue(); + verify(failure).accept(same(entity), isA(Throwable.class)); + verifyNoInteractions(second, success, finalFailure); + } + + @Test + void shouldInvokeFailureHandler_whenFailureHappensAndRetryLimitNotExceeded() { + var entityId = UUID.randomUUID().toString(); + var entity = TestEntity.Builder.newInstance().id(entityId).stateTimestamp(shouldNotDelayTime).build(); + + org.eclipse.edc.statemachine.retry.processor.Process process = context -> failedFuture(new EntityStateException( + TestEntity.Builder.newInstance().id(entityId).stateCount(retryLimit).build(), "process", "generic error")); + + var processed = new RetryProcessor<>(entity, monitor, clock, configuration) + .doProcess(process) + .onSuccess(success) + .onFailure(failure) + .onFinalFailure(finalFailure) + .execute(); + + assertThat(processed).isTrue(); + verify(failure).accept(same(entity), isA(Throwable.class)); + verify(monitor).debug(contains("failed to process. Cause: generic error")); + verifyNoInteractions(success, finalFailure); + } + + @Test + void shouldInvokeFinalFailureHandler_whenRetryExhausted() { + var entityId = UUID.randomUUID().toString(); + var entity = TestEntity.Builder.newInstance().id(entityId).stateTimestamp(shouldNotDelayTime).build(); + + org.eclipse.edc.statemachine.retry.processor.Process process = context -> failedFuture(new EntityStateException( + TestEntity.Builder.newInstance().id(entityId).stateCount(retryLimit + 1).build(), "process", "generic error")); + + var processed = new RetryProcessor<>(entity, monitor, clock, configuration) + .doProcess(process) + .onSuccess(success) + .onFailure(failure) + .onFinalFailure(finalFailure) + .execute(); + + assertThat(processed).isTrue(); + verify(monitor).severe(contains("failed to process. Retry limit exceeded. Cause: generic error")); + verify(finalFailure).accept(same(entity), isA(Throwable.class)); + verifyNoInteractions(success, failure); + } + + @Test + void shouldInvokeFinalFailureHandler_whenUnrecoverableException() { + var entityId = UUID.randomUUID().toString(); + var entity = TestEntity.Builder.newInstance().id(entityId).stateTimestamp(shouldNotDelayTime).build(); + + org.eclipse.edc.statemachine.retry.processor.Process process = context -> failedFuture(new UnrecoverableEntityStateException( + TestEntity.Builder.newInstance().id(entityId).build(), "process", "fatal error")); + + var processed = new RetryProcessor<>(entity, monitor, clock, configuration) + .doProcess(process) + .onSuccess(success) + .onFailure(failure) + .onFinalFailure(finalFailure) + .execute(); + + assertThat(processed).isTrue(); + verify(monitor).severe(contains("failed to process. Fatal error occurred. Cause: fatal error")); + verify(finalFailure).accept(same(entity), isA(Throwable.class)); + verifyNoInteractions(success, failure); + } + + @Test + void shouldInvokeFinalFailureHandler_whenGenericException() { + var entity = TestEntity.Builder.newInstance().id(UUID.randomUUID().toString()).stateTimestamp(shouldDelayTime).stateCount(1).build(); + + var runtimeException = new RuntimeException("generic exception"); + Process process = context -> failedFuture(runtimeException); + + var processed = new RetryProcessor<>(entity, monitor, clock, configuration) + .doProcess(process) + .onSuccess(success) + .onFailure(failure) + .onFinalFailure(finalFailure) + .execute(); + + assertThat(processed).isTrue(); + verify(monitor).severe(contains("generic exception"), same(runtimeException)); + verify(finalFailure).accept(same(entity), isA(Throwable.class)); + verifyNoInteractions(success, failure); + } + +} diff --git a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java index bc85e94e78a..c7826485749 100644 --- a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java @@ -43,7 +43,6 @@ import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferStartMessage; import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferSuspensionMessage; import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferTerminationMessage; -import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry; import org.eclipse.edc.spi.query.Criterion; @@ -56,8 +55,6 @@ import org.eclipse.edc.statemachine.Processor; import org.eclipse.edc.statemachine.ProcessorImpl; import org.eclipse.edc.statemachine.StateMachineManager; -import org.eclipse.edc.statemachine.retry.AsyncStatusResultRetryProcess; -import org.eclipse.edc.statemachine.retry.StatusResultRetryProcess; import org.jetbrains.annotations.NotNull; import java.util.List; @@ -67,9 +64,9 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import static java.lang.String.format; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.CONSUMER; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.PROVIDER; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETING; @@ -87,6 +84,9 @@ import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; import static org.eclipse.edc.spi.persistence.StateEntityStore.isNotPending; import static org.eclipse.edc.spi.types.domain.DataAddress.EDC_DATA_ADDRESS_SECRET; +import static org.eclipse.edc.statemachine.retry.processor.Process.future; +import static org.eclipse.edc.statemachine.retry.processor.Process.futureResult; +import static org.eclipse.edc.statemachine.retry.processor.Process.result; /** * This transfer process manager receives a {@link TransferProcess} and transitions it through its internal state @@ -252,17 +252,18 @@ private boolean processProvisioning(TransferProcess process) { var resources = process.getResourcesToProvision(); - return entityRetryProcessFactory.doAsyncProcess(process, () -> provisionManager.provision(resources, policy)) - .onSuccess((transferProcess, responses) -> handleResult(transferProcess, responses, provisionResponsesHandler)) + return entityRetryProcessFactory.retryProcessor(process) + .doProcess(future("Provisioning", (p, c) -> provisionManager.provision(resources, policy))) + .onSuccess((t, responses) -> handleResult(t, responses, provisionResponsesHandler)) .onFailure((t, throwable) -> transitionToProvisioning(t)) - .onRetryExhausted((t, throwable) -> { + .onFinalFailure((t, throwable) -> { if (t.getType() == PROVIDER) { transitionToTerminating(t, format("Error during provisioning: %s", throwable.getMessage())); } else { transitionToTerminated(t, format("Error during provisioning: %s", throwable.getMessage())); } }) - .execute("Provisioning"); + .execute(); } /** @@ -305,12 +306,14 @@ private boolean processRequesting(TransferProcess process) { .transferType(process.getTransferType()) .contractId(process.getContractId()); - return dispatch(messageBuilder, process, policyArchive.findPolicyForContract(process.getContractId()), TransferProcessAck.class) - .onSuccessResult(this::transitionToRequested) - .onRetryExhausted(this::transitionToTerminated) + return entityRetryProcessFactory.retryProcessor(process) + .doProcess(futureResult("Dispatch TransferRequestMessage to " + process.getCounterPartyAddress(), + (t, c) -> dispatch(messageBuilder, t, TransferProcessAck.class)) + ) + .onSuccess(this::transitionToRequested) .onFailure((t, throwable) -> transitionToRequesting(t)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .execute("send transfer request to " + process.getCounterPartyAddress()); + .onFinalFailure(this::transitionToTerminated) + .execute(); } else { transitionToTerminated(process, "No callback address found for protocol: " + process.getProtocol()); @@ -326,8 +329,7 @@ private boolean processRequesting(TransferProcess process) { */ @WithSpan private boolean processStarting(TransferProcess process) { - return startTransferFlow(process, this::transitionToStarting) - .execute("Initiate data flow"); + return startTransferFlow(process, this::transitionToStarting); } /** @@ -338,18 +340,25 @@ private boolean processStarting(TransferProcess process) { */ @WithSpan private boolean processProviderResuming(TransferProcess process) { - return startTransferFlow(process, this::transitionToResuming) - .execute("Resume data flow"); + return startTransferFlow(process, this::transitionToResuming); } - private StatusResultRetryProcess startTransferFlow(TransferProcess process, Consumer onFailure) { + private boolean startTransferFlow(TransferProcess process, Consumer onFailure) { var policy = policyArchive.findPolicyForContract(process.getContractId()); - return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.start(process, policy)) - .onSuccess((p, dataFlowResponse) -> sendTransferStartMessage(p, dataFlowResponse, policy, onFailure)) - .onFatalError((p, failure) -> transitionToTerminating(p, failure.getFailureDetail())) - .onFailure((t, failure) -> onFailure.accept(t)) - .onRetryExhausted((p, failure) -> transitionToTerminating(p, failure.getFailureDetail())); + return entityRetryProcessFactory.retryProcessor(process) + .doProcess(result("Start DataFlow", (t, c) -> dataFlowManager.start(process, policy))) + .doProcess(futureResult("Dispatch TransferRequestMessage to: " + process.getCounterPartyAddress(), + (t, dataFlowResponse) -> { + var messageBuilder = TransferStartMessage.Builder.newInstance().dataAddress(dataFlowResponse.getDataAddress()); + return dispatch(messageBuilder, t, Object.class) + .>thenApply(result -> result.map(i -> dataFlowResponse)); + }) + ) + .onSuccess((t, dataFlowResponse) -> transitionToStarted(t, dataFlowResponse.getDataPlaneId())) + .onFailure((t, throwable) -> onFailure.accept(t)) + .onFinalFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) + .execute(); } /** @@ -360,16 +369,16 @@ private StatusResultRetryProcess startTransfe */ @WithSpan private boolean processConsumerResuming(TransferProcess process) { - var policy = policyArchive.findPolicyForContract(process.getContractId()); - var messageBuilder = TransferStartMessage.Builder.newInstance(); - return dispatch(messageBuilder, process, policy, Object.class) - .onSuccess((t, content) -> transitionToResumed(t)) + return entityRetryProcessFactory.retryProcessor(process) + .doProcess(futureResult("Dispatch TransferStartMessage for transfer resume to " + process.getCounterPartyAddress(), + (t, dataFlowResponse) -> dispatch(messageBuilder, t, Object.class)) + ) + .onSuccess((t, c) -> transitionToResumed(t)) .onFailure((t, throwable) -> transitionToResuming(t)) - .onFatalError((n, failure) -> transitionToTerminating(n, failure.getFailureDetail())) - .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute("send transfer start to " + process.getCounterPartyAddress()); + .onFinalFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) + .execute(); } /** @@ -382,17 +391,19 @@ private boolean processConsumerResuming(TransferProcess process) { private boolean processCompleting(TransferProcess process) { var builder = TransferCompletionMessage.Builder.newInstance(); - return dispatch(builder, process, policyArchive.findPolicyForContract(process.getContractId()), Object.class) - .onSuccess((t, content) -> { + return entityRetryProcessFactory.retryProcessor(process) + .doProcess(futureResult("Dispatch TransferCompletionMessage to " + process.getCounterPartyAddress(), + (t, dataFlowResponse) -> dispatch(builder, t, Object.class)) + ) + .onSuccess((t, c) -> { transitionToCompleted(t); if (t.getType() == PROVIDER) { transitionToDeprovisioning(t); } }) .onFailure((t, throwable) -> transitionToCompleting(t)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute("send transfer completion to " + process.getCounterPartyAddress()); + .onFinalFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) + .execute(); } /** @@ -404,12 +415,18 @@ private boolean processCompleting(TransferProcess process) { */ @WithSpan private boolean processSuspending(TransferProcess process) { - return entityRetryProcessFactory.doSyncProcess(process, () -> suspendDataFlow(process)) - .onSuccess((p, dataFlowResponse) -> sendTransferSuspensionMessage(p)) - .onFailure((t, failure) -> transitionToSuspending(t, failure.getFailureDetail())) - .onFatalError((p, failure) -> transitionToTerminated(p, failure.getFailureDetail())) - .onRetryExhausted((p, failure) -> transitionToTerminated(p, failure.getFailureDetail())) - .execute("Suspend data flow"); + var builder = TransferSuspensionMessage.Builder.newInstance() + .reason(process.getErrorDetail()); + + return entityRetryProcessFactory.retryProcessor(process) + .doProcess(result("Suspend DataFlow", (t, c) -> suspendDataFlow(process))) + .doProcess(futureResult("Dispatch TransferSuspensionMessage to " + process.getCounterPartyAddress(), + (t, dataFlowResponse) -> dispatch(builder, t, Object.class)) + ) + .onSuccess((t, content) -> transitionToSuspended(t)) + .onFailure((t, throwable) -> transitionToSuspending(t, throwable.getMessage())) + .onFinalFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) + .execute(); } /** @@ -426,18 +443,30 @@ private boolean processTerminating(TransferProcess process) { return true; } - return entityRetryProcessFactory.doSyncProcess(process, () -> terminateDataFlow(process)) - .onSuccess((p, dataFlowResponse) -> sendTransferTerminationMessage(p)) - .onFailure((t, failure) -> { + return entityRetryProcessFactory.retryProcessor(process) + .doProcess(result("Terminate DataFlow", (p, i) -> terminateDataFlow(process))) + .doProcess(futureResult("Dispatch TransferTerminationMessage", (t, n) -> { + if (t.terminationWasRequestedByCounterParty()) { + return completedFuture(StatusResult.success(null)); + } else { + return dispatch(TransferTerminationMessage.Builder.newInstance().reason(t.getErrorDetail()), t, Object.class); + } + })) + .onSuccess((t, c) -> { + transitionToTerminated(t); + if (t.getType() == PROVIDER) { + transitionToDeprovisioning(t); + } + }) + .onFailure((t, throwable) -> { if (t.terminationWasRequestedByCounterParty()) { - transitionToTerminatingRequested(t, failure.getFailureDetail()); + transitionToTerminatingRequested(t, throwable.getMessage()); } else { - transitionToTerminating(t, failure.getFailureDetail()); + transitionToTerminating(t, throwable.getMessage()); } }) - .onFatalError((p, failure) -> transitionToTerminated(p, failure.getFailureDetail())) - .onRetryExhausted((p, failure) -> transitionToTerminated(p, failure.getFailureDetail())) - .execute("Terminate data flow"); + .onFinalFailure(this::transitionToTerminated) + .execute(); } /** @@ -455,24 +484,12 @@ private boolean processDeprovisioning(TransferProcess process) { var resourcesToDeprovision = process.getResourcesToDeprovision(); - return entityRetryProcessFactory.doAsyncProcess(process, () -> provisionManager.deprovision(resourcesToDeprovision, policy)) - .onSuccess((transferProcess, responses) -> handleResult(transferProcess, responses, deprovisionResponsesHandler)) + return entityRetryProcessFactory.retryProcessor(process) + .doProcess(future("Deprovisioning", (p, c) -> provisionManager.deprovision(resourcesToDeprovision, policy))) + .onSuccess((t, responses) -> handleResult(t, responses, deprovisionResponsesHandler)) .onFailure((t, throwable) -> transitionToDeprovisioning(t)) - .onRetryExhausted((t, throwable) -> transitionToDeprovisioningError(t, throwable.getMessage())) - .execute("deprovisioning"); - } - - @WithSpan - private void sendTransferStartMessage(TransferProcess process, DataFlowResponse dataFlowResponse, Policy policy, Consumer onFailure) { - var messageBuilder = TransferStartMessage.Builder.newInstance() - .dataAddress(dataFlowResponse.getDataAddress()); - - dispatch(messageBuilder, process, policy, Object.class) - .onSuccess((t, content) -> transitionToStarted(t, dataFlowResponse.getDataPlaneId())) - .onFailure((t, throwable) -> onFailure.accept(t)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute("send transfer start to " + process.getCounterPartyAddress()); + .onFinalFailure((t, throwable) -> transitionToDeprovisioningError(t, throwable.getMessage())) + .execute(); } @NotNull @@ -493,49 +510,14 @@ private StatusResult terminateDataFlow(TransferProcess process) { } } - private boolean sendTransferSuspensionMessage(TransferProcess process) { - var builder = TransferSuspensionMessage.Builder.newInstance() - .reason(process.getErrorDetail()); - - return dispatch(builder, process, policyArchive.findPolicyForContract(process.getContractId()), Object.class) - .onSuccess((t, content) -> transitionToSuspended(t)) - .onFailure((t, throwable) -> transitionToSuspending(t, throwable.getMessage())) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute("send transfer suspension to " + process.getCounterPartyAddress()); - } - - private boolean sendTransferTerminationMessage(TransferProcess process) { - var builder = TransferTerminationMessage.Builder.newInstance() - .reason(process.getErrorDetail()); - - var dispatch = process.terminationWasRequestedByCounterParty() - ? entityRetryProcessFactory.doAsyncStatusResultProcess(process, doNothing()) - : dispatch(builder, process, policyArchive.findPolicyForContract(process.getContractId()), Object.class); - - return dispatch - .onSuccess((t, content) -> { - transitionToTerminated(t); - if (t.getType() == PROVIDER) { - transitionToDeprovisioning(t); - } - }) - .onFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .onRetryExhausted(this::transitionToTerminated) - .execute("send transfer termination to " + process.getCounterPartyAddress()); - } - - private @NotNull Supplier>> doNothing() { - return () -> CompletableFuture.completedFuture(StatusResult.success(null)); - } + private > CompletableFuture> dispatch(B messageBuilder, TransferProcess process, Class responseType) { - private > AsyncStatusResultRetryProcess dispatch(B messageBuilder, TransferProcess process, Policy policy, Class responseType) { + var contractPolicy = policyArchive.findPolicyForContract(process.getContractId()); messageBuilder.protocol(process.getProtocol()) .counterPartyAddress(process.getCounterPartyAddress()) .processId(Optional.ofNullable(process.getCorrelationId()).orElse(process.getId())) - .policy(policy); + .policy(contractPolicy); if (process.lastSentProtocolMessage() != null) { messageBuilder.id(process.lastSentProtocolMessage()); @@ -544,18 +526,18 @@ private boolean sendTransferTerminationMessage(TransferProcess process) { if (process.getType() == PROVIDER) { messageBuilder.consumerPid(process.getCorrelationId()) .providerPid(process.getId()) - .counterPartyId(policy.getAssignee()); + .counterPartyId(contractPolicy.getAssignee()); } else { messageBuilder.consumerPid(process.getId()) .providerPid(process.getCorrelationId()) - .counterPartyId(policy.getAssigner()); + .counterPartyId(contractPolicy.getAssigner()); } var message = messageBuilder.build(); process.lastSentProtocolMessage(message.getId()); - return entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(responseType, message)); + return dispatcherRegistry.dispatch(responseType, message); } private void handleResult(TransferProcess transferProcess, List> responses, ResponsesHandler> handler) { diff --git a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java index b134dbdeb6f..2d32c3b3ca1 100644 --- a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java @@ -679,11 +679,11 @@ public Stream provideArguments(ExtensionContext extensionCo new DispatchFailure(TERMINATING, TERMINATED, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), // fatal error, in this case retry should never be done new DispatchFailure(REQUESTING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(STARTING, TERMINATED, fatalError, b -> b.type(PROVIDER).stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(COMPLETING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(SUSPENDING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(RESUMING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(CONSUMER)), - new DispatchFailure(RESUMING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(PROVIDER)), + new DispatchFailure(STARTING, TERMINATING, fatalError, b -> b.type(PROVIDER).stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(COMPLETING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(SUSPENDING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(RESUMING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(CONSUMER)), + new DispatchFailure(RESUMING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(PROVIDER)), new DispatchFailure(TERMINATING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)) ); }