Skip to content

Commit

Permalink
refactor: introduce RetryProcessor (#4787)
Browse files Browse the repository at this point in the history
* refactor: introduce retry processor

* pr remarks
  • Loading branch information
ndr-brt authored Feb 11, 2025
1 parent d2939b0 commit fc34b5a
Show file tree
Hide file tree
Showing 20 changed files with 1,188 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.ResponseFailure;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.statemachine.retry.processor.RetryProcessor;

import java.time.Clock;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,7 +28,10 @@

/**
* Provides retry capabilities to an asynchronous process that returns a {@link CompletableFuture} with a {@link StatusResult} content
*
* @deprecated use {@link RetryProcessor}.
*/
@Deprecated(since = "0.12.0")
public class AsyncStatusResultRetryProcess<E extends StatefulEntity<E>, C, SELF extends AsyncStatusResultRetryProcess<E, C, SELF>>
extends CompletableFutureRetryProcess<E, StatusResult<C>, SELF> {
private final Monitor monitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.statemachine.retry.processor.RetryProcessor;

import java.time.Clock;
import java.util.Optional;
Expand All @@ -28,7 +29,10 @@

/**
* Provides retry capabilities to an asynchronous process that returns a {@link CompletableFuture} object
*
* @deprecated use {@link RetryProcessor}.
*/
@Deprecated(since = "0.12.0")
public class CompletableFutureRetryProcess<E extends StatefulEntity<E>, C, SELF extends CompletableFutureRetryProcess<E, C, SELF>>
extends RetryProcess<E, CompletableFutureRetryProcess<E, C, SELF>> {
private final Supplier<CompletableFuture<C>> process;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,6 @@
/**
* Configure a {@link RetryProcess}
*/
public class EntityRetryProcessConfiguration {
public record EntityRetryProcessConfiguration(int retryLimit, Supplier<WaitStrategy> delayStrategySupplier) {

private final int retryLimit;
private final Supplier<WaitStrategy> delayStrategySupplier;

public EntityRetryProcessConfiguration(int retryLimit, Supplier<WaitStrategy> delayStrategySupplier) {
this.retryLimit = retryLimit;
this.delayStrategySupplier = delayStrategySupplier;
}

public int getRetryLimit() {
return retryLimit;
}

public Supplier<WaitStrategy> getDelayStrategySupplier() {
return delayStrategySupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.statemachine.retry.processor.RetryProcessor;

import java.time.Clock;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,23 +38,42 @@ public EntityRetryProcessFactory(Monitor monitor, Clock clock, EntityRetryProces
this.configuration = configuration;
}

/**
* Initialize a {@link RetryProcessor} on the passed entity.
*
* @param entity the entity.
* @return a retry processor.
*/
public <E extends StatefulEntity<E>, C> RetryProcessor<E, C> retryProcessor(E entity) {
return new RetryProcessor<>(entity, monitor, clock, configuration);
}

/**
* Initialize a synchronous process that needs to be retried if it does not succeed
*
* @deprecated use {{@link #retryProcessor(StatefulEntity)}}
*/
@Deprecated(since = "0.12.0")
public <T extends StatefulEntity<T>, C> StatusResultRetryProcess<T, C> doSyncProcess(T entity, Supplier<StatusResult<C>> process) {
return new StatusResultRetryProcess<>(entity, process, monitor, clock, configuration);
}

/**
* Initialize an asynchronous process that needs to be retried if it does not succeed
*
* @deprecated use {{@link #retryProcessor(StatefulEntity)}}
*/
@Deprecated(since = "0.12.0")
public <T extends StatefulEntity<T>, C, SELF extends CompletableFutureRetryProcess<T, C, SELF>> SELF doAsyncProcess(T entity, Supplier<CompletableFuture<C>> process) {
return (SELF) new CompletableFutureRetryProcess<T, C, SELF>(entity, process, monitor, clock, configuration);
}

/**
* Initialize an asynchronous process that will return a {@link StatusResult} and it will need to be handled
*
* @deprecated use {{@link #retryProcessor(StatefulEntity)}}
*/
@Deprecated(since = "0.12.0")
public <T extends StatefulEntity<T>, C, SELF extends AsyncStatusResultRetryProcess<T, C, SELF>> SELF doAsyncStatusResultProcess(T entity, Supplier<CompletableFuture<StatusResult<C>>> process) {
return (SELF) new AsyncStatusResultRetryProcess<T, C, SELF>(entity, process, monitor, clock, configuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.statemachine.retry.processor.RetryProcessor;

import java.time.Clock;
import java.util.function.Consumer;
Expand All @@ -24,7 +25,10 @@
* Represent a process on a {@link StatefulEntity} that is retried after a certain delay if it fails.
* This works only used on a state machine, where states are persisted.
* The process is a unit of logic that can be executed on the entity.
*
* @deprecated use {@link RetryProcessor}.
*/
@Deprecated(since = "0.12.0")
public abstract class RetryProcess<E extends StatefulEntity<E>, SELF extends RetryProcess<E, SELF>> {

private final E entity;
Expand Down Expand Up @@ -64,7 +68,7 @@ public boolean execute(String description) {
}
return false;
} else {
monitor.debug(String.format("Entity %s %s retry #%d of %d.", entity.getId(), entity.getClass().getSimpleName(), entity.getStateCount() - 1, configuration.getRetryLimit()));
monitor.debug(String.format("Entity %s %s retry #%d of %d.", entity.getId(), entity.getClass().getSimpleName(), entity.getStateCount() - 1, configuration.retryLimit()));
}
}

Expand All @@ -86,12 +90,12 @@ public SELF onDelay(Consumer<E> onDelay) {
* @return {@code true} if the entity should not be sent anymore.
*/
protected boolean retriesExhausted(E entity) {
return entity.getStateCount() > configuration.getRetryLimit();
return entity.getStateCount() > configuration.retryLimit();
}

private long delayMillis(E entity) {
// Get a new instance of WaitStrategy.
var delayStrategy = configuration.getDelayStrategySupplier().get();
var delayStrategy = configuration.delayStrategySupplier().get();

// Set the WaitStrategy to have observed <retryCount> previous failures.
// This is relevant for stateful strategies such as exponential wait.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.ResponseFailure;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.statemachine.retry.processor.RetryProcessor;

import java.time.Clock;
import java.util.function.BiConsumer;
Expand All @@ -28,7 +29,10 @@

/**
* Provides retry capabilities to a synchronous process that returns a {@link StatusResult} object
*
* @deprecated use {@link RetryProcessor}.
*/
@Deprecated(since = "0.12.0")
public class StatusResultRetryProcess<E extends StatefulEntity<E>, C> extends RetryProcess<E, StatusResultRetryProcess<E, C>> {
private final Supplier<StatusResult<C>> process;
private final Monitor monitor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2025 Cofinity-X
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Cofinity-X - initial API and implementation
*
*/

package org.eclipse.edc.statemachine.retry.processor;

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.jetbrains.annotations.NotNull;

/**
* Exception that describes generic failure in a process
*/
public class EntityStateException extends RuntimeException {

private final StatefulEntity<?> entity;
private final String processName;

public EntityStateException(StatefulEntity<?> entity, String processName, String message) {
super(message);
this.entity = entity;
this.processName = processName;
}

public StatefulEntity<?> getEntity() {
return entity;
}

public String getProcessName() {
return processName;
}

@NotNull String getRetryLimitExceededMessage() {
return "%s: ID %s. Attempt #%d failed to %s. Retry limit exceeded. Cause: %s".formatted(
entity.getClass().getSimpleName(),
entity.getId(),
entity.getStateCount(),
getProcessName(),
getMessage()
);
}

@NotNull String getRetryFailedMessage() {
return "%s: ID %s. Attempt #%d failed to %s. Cause: %s".formatted(
entity.getClass().getSimpleName(),
entity.getId(),
entity.getStateCount(),
getProcessName(),
getMessage()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2025 Cofinity-X
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Cofinity-X - initial API and implementation
*
*/

package org.eclipse.edc.statemachine.retry.processor;

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.response.StatusResult;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;

import static java.util.concurrent.CompletableFuture.failedFuture;

/**
* Process implementation that handles a process that returns {@link CompletableFuture} with enclosed {@link StatusResult}
*
* @param <E> entity type.
* @param <I> process input type.
* @param <O> process output type.
*/
public class FutureResultRetryProcess<E extends StatefulEntity<E>, I, O> implements Process<E, I, O> {

private final String name;
private final BiFunction<E, I, CompletableFuture<StatusResult<O>>> function;
private Function<String, E> entityReload;

public FutureResultRetryProcess(String name, BiFunction<E, I, CompletableFuture<StatusResult<O>>> function) {
this.name = name;
this.function = function;
}

@Override
public CompletableFuture<ProcessContext<E, O>> execute(ProcessContext<E, I> context) {
try {
return new FutureRetryProcess<>(name, function).entityReload(entityReload)
.execute(context)
.thenCompose(asyncContext -> new ResultRetryProcess<E, I, O>(name, (e, c) -> asyncContext.content())
.execute(new ProcessContext<>(asyncContext.entity(), context.content())));
} catch (Throwable throwable) {
return failedFuture(new UnrecoverableEntityStateException(reloadEntity(context.entity()), name, throwable.getMessage()));
}
}

public FutureResultRetryProcess<E, I, O> entityReload(Function<String, E> entityReload) {
this.entityReload = entityReload;
return this;
}

private E reloadEntity(E entity) {
return entityReload == null ? entity : entityReload.apply(entity.getId());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2025 Cofinity-X
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Cofinity-X - initial API and implementation
*
*/

package org.eclipse.edc.statemachine.retry.processor;

import org.eclipse.edc.spi.entity.StatefulEntity;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;

import static java.util.concurrent.CompletableFuture.failedFuture;

/**
* Process implementation that handles a process that returns {@link CompletableFuture}
*
* @param <E> entity type.
* @param <I> process input type.
* @param <O> process output type.
*/
public class FutureRetryProcess<E extends StatefulEntity<E>, I, O> implements Process<E, I, O> {

private final String name;
private final BiFunction<E, I, CompletableFuture<O>> function;
private Function<String, E> entityReload;

public FutureRetryProcess(String name, BiFunction<E, I, CompletableFuture<O>> function) {
this.name = name;
this.function = function;
}

@Override
public CompletableFuture<ProcessContext<E, O>> execute(ProcessContext<E, I> context) {
try {
return function.apply(context.entity(), context.content())
.handle((content, throwable) -> {
var reloadedEntity = reloadEntity(context.entity());
if (throwable == null) {
return new ProcessContext<>(reloadedEntity, content);
} else {
throw new EntityStateException(reloadedEntity, name, throwable.getMessage());
}
});
} catch (Throwable throwable) {
return failedFuture(new UnrecoverableEntityStateException(reloadEntity(context.entity()), name, throwable.getMessage()));
}
}

public FutureRetryProcess<E, I, O> entityReload(Function<String, E> entityReload) {
this.entityReload = entityReload;
return this;
}

private E reloadEntity(E entity) {
return entityReload == null ? entity : entityReload.apply(entity.getId());
}

}
Loading

0 comments on commit fc34b5a

Please sign in to comment.