Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Concurrency 3.0 #1458

Merged
merged 46 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
891a463
add derbyshared to tomee libs
jungm Aug 11, 2024
3ecbf15
support for @ManagedExecutorDefinition
jungm Aug 11, 2024
d60c92c
license header
jungm Aug 15, 2024
b35a408
fix ManagedExecutor propOrder
jungm Aug 15, 2024
6f5d2c1
directly set ApplicationThreadContextProvider instead of relying on o…
jungm Aug 15, 2024
a05b7d8
fix classifiers in arquillian.xml
jungm Aug 15, 2024
5d5cb41
remove unused method
jungm Aug 15, 2024
a60b4e4
deploy @ManagedScheduledExecutorDefinition and @ManagedThreadFactoryD…
jungm Aug 16, 2024
893f678
fix ContextService resource property in converters
jungm Aug 16, 2024
fc444ff
validate instance in ContextServiceImpl before creating a contextual …
jungm Aug 16, 2024
a193bfe
awaitTermination is a forbidden method for MES/SMES
jungm Aug 16, 2024
9c478ed
implement TxThreadContextProvider
jungm Aug 16, 2024
1dab8b7
static inner classes
jungm Aug 16, 2024
95a6678
wrap functional interfaces in ContextServiceImpl
jungm Aug 17, 2024
004cdde
don't use outdated sxc bindings for webapp descriptor
jungm Aug 17, 2024
cffe1ce
update xml models
jungm Aug 17, 2024
87e3d66
bind max-async to threadpool size
jungm Aug 17, 2024
0a492cd
Wire up priority and ContextService in ManagedThreadFactory
jungm Aug 17, 2024
56fdb89
wrap all functions in CUCompletableFuture
jungm Aug 17, 2024
f571a75
Fix default ContextService
jungm Aug 17, 2024
37be3ac
implement @Asynchronous interceptor
jungm Aug 17, 2024
eae754b
call super.onTermination
jungm Aug 18, 2024
c179895
determine isShutdown by thread state
jungm Aug 18, 2024
494942c
improved @Asynchronous interceptor
jungm Aug 18, 2024
aecd4a7
implement ContextService withContextCapture
jungm Aug 18, 2024
b0f3b2d
fix NPE in MSES
jungm Aug 18, 2024
753d0c5
new concurrency tck
jungm Aug 19, 2024
16fb76a
exclude sigtest from concurrency-standalone
jungm Aug 20, 2024
7f07789
Make proxies created by ContextService serializable again
jungm Aug 20, 2024
d88623e
pass ContextService to wrappers in MSES
jungm Aug 20, 2024
5566eda
ScheduledTask is done when Trigger#getNextRun returns null
jungm Aug 20, 2024
3e2182f
init LastExecution for skippedCase, throw SkippedExecution in Schedul…
jungm Aug 21, 2024
24a338a
fix LastExecution scheduledStart
jungm Aug 22, 2024
cc145d7
clean up duplicate param in constructor
jungm Aug 23, 2024
9ffdac9
ManagedConnection enlist delegate in transaction if it already exists
jungm Aug 23, 2024
148e06e
rework contextual proxy to support USE_TRANSACTION_OF_EXECUTION_THREAD
jungm Aug 23, 2024
e462e20
fix first tomee tests
jungm Aug 23, 2024
fb6340d
propagate ThreadContext in ApplicationThreadContextProvider
jungm Aug 26, 2024
129109c
fix CUTriggerScheduledFuture isDone and cancel
jungm Aug 26, 2024
804602d
clean up
jungm Aug 26, 2024
23e30c4
license headers
jungm Aug 26, 2024
f52b5e6
sigtest setup
jungm Aug 26, 2024
5aa317e
Merge branch 'main' into concurrency-work
jungm Sep 15, 2024
bb0defb
private -> protected in jaxb model
jungm Sep 15, 2024
a9343eb
regenerate accessors, switch web.xml parsing to SXC again
jungm Sep 15, 2024
4f2e495
attempt to fix race condition in MSES task scheduling
jungm Sep 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.openejb.jee.JndiConsumer;
import org.apache.openejb.jee.KeyedCollection;
import org.apache.openejb.jee.LifecycleCallback;
import org.apache.openejb.jee.ManagedExecutor;
import org.apache.openejb.jee.ManagedScheduledExecutor;
import org.apache.openejb.jee.ManagedThreadFactory;
import org.apache.openejb.jee.MessageDestinationRef;
import org.apache.openejb.jee.PersistenceContextRef;
import org.apache.openejb.jee.PersistenceUnitRef;
Expand Down Expand Up @@ -67,6 +70,9 @@ public class CdiBeanInfo implements JndiConsumer {
private ClassLoader classLoader;
private List<Injection> injections;
protected KeyedCollection<String, ContextService> contextService;
protected KeyedCollection<String, ManagedExecutor> managedExecutor;
protected KeyedCollection<String, ManagedScheduledExecutor> managedScheduledExecutor;
protected KeyedCollection<String, ManagedThreadFactory> managedThreadFactory;

public String getBeanName() {
return beanName;
Expand Down Expand Up @@ -323,11 +329,38 @@ public String getJndiConsumerName() {
public Class<?> getBeanClass() {
return this.beanClass;
}

@Override
public Map<String, ContextService> getContextServiceMap() {
if (contextService == null) {
contextService = new KeyedCollection<String, ContextService>();
}
return this.contextService.toMap();
}

@Override
public Map<String, ManagedExecutor> getManagedExecutorMap() {
if (managedExecutor == null) {
managedExecutor = new KeyedCollection<>();
}
return this.managedExecutor.toMap();
}

@Override
public Map<String, ManagedScheduledExecutor> getManagedScheduledExecutorMap() {
if (managedScheduledExecutor == null) {
managedScheduledExecutor = new KeyedCollection<>();
}

return this.managedScheduledExecutor.toMap();
}

@Override
public Map<String, ManagedThreadFactory> getManagedThreadFactoryMap() {
if (managedThreadFactory == null) {
managedThreadFactory = new KeyedCollection<>();
}

return this.managedThreadFactory.toMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.openejb.assembler.classic.AppInfo;
import org.apache.openejb.assembler.classic.BeansInfo;
import org.apache.openejb.assembler.classic.EjbJarInfo;
import org.apache.openejb.cdi.concurrency.AsynchronousInterceptor;
import org.apache.openejb.cdi.transactional.MandatoryInterceptor;
import org.apache.openejb.cdi.transactional.NeverInterceptor;
import org.apache.openejb.cdi.transactional.NotSupportedInterceptor;
Expand Down Expand Up @@ -64,9 +65,13 @@ public class CdiScanner implements BdaScannerService {
private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB_CDI, OpenEJBLifecycle.class);
public static final String OPENEJB_CDI_FILTER_CLASSLOADER = "openejb.cdi.filter.classloader";

private static final Class<?>[] TRANSACTIONAL_INTERCEPTORS = new Class<?>[]{
private static final Class<?>[] INTERNAL_INTERCEPTORS = new Class<?>[]{
// @Transactional
MandatoryInterceptor.class, NeverInterceptor.class, NotSupportedInterceptor.class,
RequiredInterceptor.class, RequiredNewInterceptor.class, SupportsInterceptor.class
RequiredInterceptor.class, RequiredNewInterceptor.class, SupportsInterceptor.class,

// @Asynchronous
AsynchronousInterceptor.class
};

private final Set<Class<?>> startupClasses = new HashSet<>();
Expand Down Expand Up @@ -140,8 +145,8 @@ public void init(final Object object) {

if (appInfo.webAppAlone || !ejbJar.webapp) {
// "manual" extension to avoid to add it through SPI mecanism
classes.addAll(asList(TRANSACTIONAL_INTERCEPTORS));
for (final Class<?> interceptor : TRANSACTIONAL_INTERCEPTORS) {
classes.addAll(asList(INTERNAL_INTERCEPTORS));
for (final Class<?> interceptor : INTERNAL_INTERCEPTORS) {
interceptorsManager.addEnabledInterceptorClass(interceptor);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void execute(final Runnable command) {
executor = new ManagedExecutorServiceImpl(
new ExecutorBuilder()
.size(3)
.threadFactory(new ManagedThreadFactoryImpl(appContext.getId() + "-cdi-fireasync-"))
.threadFactory(new ManagedThreadFactoryImpl(appContext.getId() + "-cdi-fireasync-", null, ContextServiceImplFactory.newPropagateEverythingContextService()))
.prefix("CDIAsyncPool")
.build(appContext.getOptions()), ContextServiceImplFactory.newPropagateEverythingContextService());
delegate.compareAndSet(null, executor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openejb.cdi.concurrency;

import jakarta.annotation.Priority;
import jakarta.enterprise.concurrent.Asynchronous;
import jakarta.enterprise.concurrent.ManagedExecutorService;
import jakarta.interceptor.AroundInvoke;
import jakarta.interceptor.Interceptor;
import jakarta.interceptor.InvocationContext;
import org.apache.openejb.core.ivm.naming.NamingException;
import org.apache.openejb.resource.thread.ManagedExecutorServiceImplFactory;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;

@Interceptor
@Asynchronous
@Priority(Interceptor.Priority.PLATFORM_BEFORE + 5)
public class AsynchronousInterceptor {
public static final String MP_ASYNC_ANNOTATION_NAME = "org.eclipse.microprofile.faulttolerance.Asynchronous";

// ensure validation logic required by the spec only runs once per invoked Method
private final Map<Method, Exception> validationCache = new ConcurrentHashMap<>();

@AroundInvoke
public Object aroundInvoke(final InvocationContext ctx) throws Exception {
Exception exception = validationCache.computeIfAbsent(ctx.getMethod(), this::validate);
if (exception != null) {
throw exception;
}

Asynchronous asynchronous = ctx.getMethod().getAnnotation(Asynchronous.class);
ManagedExecutorService mes;
try {
mes = ManagedExecutorServiceImplFactory.lookup(asynchronous.executor());
} catch (NamingException | IllegalArgumentException e) {
throw new RejectedExecutionException("Cannot lookup ManagedExecutorService", e);
}

CompletableFuture<Object> future = mes.newIncompleteFuture();
mes.execute(() -> {
try {
Asynchronous.Result.setFuture(future);
CompletionStage<?> result = (CompletionStage<?>) ctx.proceed();
if (result == null || result == future) {
future.complete(result);

Asynchronous.Result.setFuture(null);
return;
}

result.whenComplete((resultInternal, err) -> {
if (resultInternal != null) {
future.complete(resultInternal);
} else if (err != null) {
future.completeExceptionally(err);
}

Asynchronous.Result.setFuture(null);
});
} catch (Exception e) {
future.completeExceptionally(e);
Asynchronous.Result.setFuture(null);
}
});

return ctx.getMethod().getReturnType() == Void.TYPE ? null : future;
}

private Exception validate(final Method method) {
if (hasMpAsyncAnnotation(method.getAnnotations()) || hasMpAsyncAnnotation(method.getDeclaringClass().getAnnotations())) {
return new UnsupportedOperationException("Combining " + Asynchronous.class.getName()
+ " and " + MP_ASYNC_ANNOTATION_NAME + " on the same method/class is not supported");
}

Asynchronous asynchronous = method.getAnnotation(Asynchronous.class);
if (asynchronous == null) {
return new UnsupportedOperationException("Asynchronous annotation must be placed on a method");
}

Class<?> returnType = method.getReturnType();
if (returnType != Void.TYPE && returnType != CompletableFuture.class && returnType != CompletionStage.class) {
return new UnsupportedOperationException("Asynchronous annotation must be placed on a method that returns either void, CompletableFuture or CompletionStage");
}

return null;
}

private boolean hasMpAsyncAnnotation(Annotation[] declaredAnnotations) {
return Arrays.stream(declaredAnnotations)
.map(it -> it.annotationType().getName())
.anyMatch(it -> it.equals(MP_ASYNC_ANNOTATION_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.openejb.config;

import jakarta.enterprise.concurrent.ContextServiceDefinition;
import jakarta.enterprise.concurrent.ManagedExecutorDefinition;
import jakarta.enterprise.concurrent.ManagedScheduledExecutorDefinition;
import jakarta.enterprise.concurrent.ManagedThreadFactoryDefinition;
import jakarta.interceptor.AroundConstruct;
import org.apache.openejb.BeanContext;
import org.apache.openejb.OpenEJBException;
Expand Down Expand Up @@ -78,6 +81,9 @@
import org.apache.openejb.jee.Lifecycle;
import org.apache.openejb.jee.LifecycleCallback;
import org.apache.openejb.jee.Listener;
import org.apache.openejb.jee.ManagedExecutor;
import org.apache.openejb.jee.ManagedScheduledExecutor;
import org.apache.openejb.jee.ManagedThreadFactory;
import org.apache.openejb.jee.MessageAdapter;
import org.apache.openejb.jee.MessageDrivenBean;
import org.apache.openejb.jee.MessageListener;
Expand Down Expand Up @@ -4081,6 +4087,53 @@ public void buildAnnotatedRefs(final JndiConsumer consumer, final IAnnotationFin
buildContextServiceDefinition(consumer, definition);
}

//
// @ManagedExecutorDefinition
//

for (final Annotated<Class<?>> annotated : annotationFinder.findMetaAnnotatedClasses(ManagedExecutorDefinition.List.class)) {
final ManagedExecutorDefinition.List defs = annotated.getAnnotation(ManagedExecutorDefinition.List.class);
for (final ManagedExecutorDefinition definition : defs.value()) {
buildManagedExecutorDefinition(consumer, definition);
}
}

for (final Annotated<Class<?>> annotated : annotationFinder.findMetaAnnotatedClasses(ManagedExecutorDefinition.class)) {
final ManagedExecutorDefinition definition = annotated.getAnnotation(ManagedExecutorDefinition.class);
buildManagedExecutorDefinition(consumer, definition);
}

//
// @ManagedScheduledExecutorDefinition
//

for (final Annotated<Class<?>> annotated : annotationFinder.findMetaAnnotatedClasses(ManagedScheduledExecutorDefinition.List.class)) {
final ManagedScheduledExecutorDefinition.List defs = annotated.getAnnotation(ManagedScheduledExecutorDefinition.List.class);
for (final ManagedScheduledExecutorDefinition definition : defs.value()) {
buildManagedScheduledExecutorDefinition(consumer, definition);
}
}

for (final Annotated<Class<?>> annotated : annotationFinder.findMetaAnnotatedClasses(ManagedScheduledExecutorDefinition.class)) {
final ManagedScheduledExecutorDefinition definition = annotated.getAnnotation(ManagedScheduledExecutorDefinition.class);
buildManagedScheduledExecutorDefinition(consumer, definition);
}

//
// @ManagedThreadFactoryDefinition
//
for (final Annotated<Class<?>> annotated : annotationFinder.findMetaAnnotatedClasses(ManagedThreadFactoryDefinition.List.class)) {
final ManagedThreadFactoryDefinition.List defs = annotated.getAnnotation(ManagedThreadFactoryDefinition.List.class);
for (final ManagedThreadFactoryDefinition definition : defs.value()) {
buildManagedThreadFactoryDefinition(consumer, definition);
}
}

for (final Annotated<Class<?>> annotated : annotationFinder.findMetaAnnotatedClasses(ManagedThreadFactoryDefinition.class)) {
final ManagedThreadFactoryDefinition definition = annotated.getAnnotation(ManagedThreadFactoryDefinition.class);
buildManagedThreadFactoryDefinition(consumer, definition);
}

//
// @JMSConnectionFactoryDefinition
//
Expand Down Expand Up @@ -4137,6 +4190,47 @@ private void buildContextServiceDefinition(final JndiConsumer consumer, final Co
consumer.getContextServiceMap().put(definition.name(), contextService);
}

private void buildManagedExecutorDefinition(final JndiConsumer consumer, final ManagedExecutorDefinition definition) {
ManagedExecutor existing = consumer.getManagedExecutorMap().get(definition.name());
final ManagedExecutor managedExecutor = (existing != null) ? existing : new ManagedExecutor();

managedExecutor.setName(new JndiName());
managedExecutor.getName().setvalue(definition.name());
managedExecutor.setContextService(new JndiName());
managedExecutor.getContextService().setvalue(definition.context());
managedExecutor.setHungTaskThreshold(definition.hungTaskThreshold());
managedExecutor.setMaxAsync(definition.maxAsync() == -1 ? null : definition.maxAsync());

consumer.getManagedExecutorMap().put(definition.name(), managedExecutor);
}

private void buildManagedScheduledExecutorDefinition(final JndiConsumer consumer, final ManagedScheduledExecutorDefinition definition) {
ManagedScheduledExecutor existing = consumer.getManagedScheduledExecutorMap().get(definition.name());
final ManagedScheduledExecutor managedScheduledExecutor = (existing != null) ? existing : new ManagedScheduledExecutor();

managedScheduledExecutor.setName(new JndiName());
managedScheduledExecutor.getName().setvalue(definition.name());
managedScheduledExecutor.setContextService(new JndiName());
managedScheduledExecutor.getContextService().setvalue(definition.context());
managedScheduledExecutor.setHungTaskThreshold(definition.hungTaskThreshold());
managedScheduledExecutor.setMaxAsync(definition.maxAsync() == -1 ? null : definition.maxAsync());

consumer.getManagedScheduledExecutorMap().put(definition.name(), managedScheduledExecutor);
}

private void buildManagedThreadFactoryDefinition(final JndiConsumer consumer, ManagedThreadFactoryDefinition definition) {
ManagedThreadFactory existing = consumer.getManagedThreadFactoryMap().get(definition.name());
final ManagedThreadFactory managedThreadFactory = (existing != null) ? existing : new ManagedThreadFactory();

managedThreadFactory.setName(new JndiName());
managedThreadFactory.getName().setvalue(definition.name());
managedThreadFactory.setContextService(new JndiName());
managedThreadFactory.getContextService().setvalue(definition.context());
managedThreadFactory.setPriority(definition.priority());

consumer.getManagedThreadFactoryMap().put(definition.name(), managedThreadFactory);
}

private void buildContext(final JndiConsumer consumer, final Member member) {
final ContextRef ref = new ContextRef();
ref.setName(member.getDeclaringClass().getName() + "/" + member.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ public ConfigurationFactory(final boolean offline, final DynamicDeployer preAuto

chain.add(new ConvertDataSourceDefinitions());
chain.add(new ConvertContextServiceDefinitions());
chain.add(new ConvertManagedExecutorServiceDefinitions());
chain.add(new ConvertManagedScheduledExecutorServiceDefinitions());
chain.add(new ConvertManagedThreadFactoryDefinitions());
chain.add(new ConvertJMSConnectionFactoryDefinitions());
chain.add(new ConvertJMSDestinationDefinitions());
chain.add(new CleanEnvEntries());
Expand Down
Loading