diff --git a/pom.xml b/pom.xml index c583864c..fbd3cb87 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ tasks/terraform tasks/xml tasks/zoom + tasks/opentelemetry diff --git a/tasks/opentelemetry/pom.xml b/tasks/opentelemetry/pom.xml new file mode 100644 index 00000000..2c328822 --- /dev/null +++ b/tasks/opentelemetry/pom.xml @@ -0,0 +1,171 @@ + + + + 4.0.0 + + + com.walmartlabs.concord.plugins + concord-plugins-parent + 2.6.1-SNAPSHOT + ../../pom.xml + + + opentelemetry + takari-jar + + + 1.39.0 + 1.14.0 + + + + + com.walmartlabs.concord + concord-sdk + provided + + + com.walmartlabs.concord.runtime + concord-runtime-common + provided + + + com.walmartlabs.concord.runtime.v2 + concord-runtime-sdk-v2 + provided + + + com.walmartlabs.concord.runtime.v2 + concord-runner-v2 + provided + + + com.walmartlabs.concord.runtime.v2 + concord-runtime-vm-v2 + provided + + + com.walmartlabs.concord.runtime.v2 + concord-runtime-model-v2 + provided + + + + org.immutables + value + provided + + + com.google.code.findbugs + jsr305 + provided + + + com.google.errorprone + error_prone_annotations + provided + + + + org.slf4j + slf4j-api + provided + + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + + javax.inject + javax.inject + provided + + + org.glassfish + javax.el + provided + + + com.google.inject + guice + provided + + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-common + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-trace + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-exporter-otlp-http-trace + ${opentelemetry.exporter.version} + + + org.jetbrains.kotlin + kotlin-stdlib-common + + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + + + + + org.eclipse.sisu + sisu-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + + + eclipse + https://repo.eclipse.org/content/groups/releases/ + + + diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpan.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpan.java new file mode 100644 index 00000000..7a34802b --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpan.java @@ -0,0 +1,94 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import org.immutables.value.Value; + +import javax.annotation.Nullable; +import java.util.Collections; + +@Value.Immutable +@Value.Style(jdkOnly = true) +public abstract class ConcordSpan implements ReadableSpan { + + public abstract Resource resource(); + + public abstract Attributes attributes(); + + public abstract long startEpochNanos(); + public abstract long endEpochNanos(); + + public abstract StatusData status(); + + @Override + public SpanData toSpanData() { + return ConcordSpanData.builder() + .name(getName()) + .kind(getKind()) + .spanContext(getSpanContext()) + .parentSpanContext(getParentSpanContext()) + .status(status()) + .startEpochNanos(startEpochNanos()) + .endEpochNanos(endEpochNanos()) + .attributes(attributes()) + .events(Collections.emptyList()) + .links(Collections.emptyList()) + .hasEnded(hasEnded()) + .totalRecordedEvents(0) + .totalRecordedLinks(0) + .totalAttributeCount(attributes().size()) + .resource(resource()) + .instrumentationLibraryInfo(getInstrumentationLibraryInfo()) + .build(); + } + + @Override + public InstrumentationLibraryInfo getInstrumentationLibraryInfo() { + return InstrumentationLibraryInfo.empty(); + } + + @Override + public boolean hasEnded() { + return true; + } + + @Override + public long getLatencyNanos() { + return 0; + } + + @Override + public T getAttribute(AttributeKey key) { + return attributes() == null ? null : attributes().get(key); + } + + @Override + @Nullable + public abstract SpanContext getParentSpanContext(); +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpanBuilder.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpanBuilder.java new file mode 100644 index 00000000..1ab7bb9f --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpanBuilder.java @@ -0,0 +1,136 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.*; +import io.opentelemetry.sdk.internal.AttributesMap; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.IdGenerator; +import io.opentelemetry.sdk.trace.data.StatusData; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; + +public class ConcordSpanBuilder { + + private final String traceId; + private final Resource resource; + private final String spanId; + private final String spanName; + private String parentSpanId; + private final SpanKind spanKind = SpanKind.INTERNAL; + private final Map, Object> attributes = new HashMap<>(); + private StatusData status; + private long startEpochNanos; + private long endEpochNanos; + + public static ConcordSpanBuilder builder(String traceId, Resource resource, String spanName) { + return builder(traceId, resource, IdGenerator.random().generateSpanId(), spanName); + } + + public static ConcordSpanBuilder builder(String traceId, Resource resource, String spanId, String spanName) { + return new ConcordSpanBuilder(traceId, resource, spanId, spanName); + } + + public ConcordSpanBuilder(String traceId, Resource resource, String spanId, String spanName) { + this.traceId = traceId; + this.resource = resource; + this.spanId = spanId; + this.spanName = spanName; + } + + public ConcordSpanBuilder setAttribute(String key, String value) { + return setAttribute(stringKey(key), value); + } + + public ConcordSpanBuilder setAttribute(String key, long value) { + return setAttribute(longKey(key), value); + } + + public ConcordSpanBuilder setAttribute(AttributeKey key, T value) { + if (key == null || key.getKey().isEmpty() || value == null) { + return this; + } + attributes.put(key, value); + return this; + } + + public ConcordSpanBuilder setStartTimestamp(long startTimestamp, TimeUnit unit) { + startEpochNanos = unit.toNanos(startTimestamp); + return this; + } + + public ConcordSpanBuilder setParentSpanId(String parentSpanId) { + this.parentSpanId = parentSpanId; + return this; + } + + public ConcordSpanBuilder setStatus(StatusCode statusCode) { + this.status = StatusData.create(statusCode, ""); + return this; + } + + public ConcordSpanBuilder end(Long millis) { + if (millis != null) { + return end(millis, TimeUnit.MILLISECONDS); + } + return this; + } + + public ConcordSpanBuilder end(long timestamp, TimeUnit unit) { + end(unit.toNanos(timestamp)); + return this; + } + + private void end(long endEpochNanos) { + this.endEpochNanos = endEpochNanos; + } + + public ConcordSpan build() { + AttributesMap attrs = AttributesMap.create(attributes.size(), Integer.MAX_VALUE); + attrs.putAll(attributes); + return ImmutableConcordSpan.builder() + .resource(resource) + .name(spanName) + .kind(spanKind) + .spanContext(buildContext(traceId, spanId)) + .parentSpanContext(buildContext(traceId, parentSpanId)) + .attributes(attrs) + .startEpochNanos(startEpochNanos) + .endEpochNanos(endEpochNanos) + .status(status) + .build(); + } + + private static SpanContext buildContext(String traceId, String spanId) { + return SpanContext.create( + traceId, + spanId, + TraceFlags.getSampled(), + TraceState.getDefault() + ); + } +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpanData.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpanData.java new file mode 100644 index 00000000..10ab9b63 --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/ConcordSpanData.java @@ -0,0 +1,33 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import io.opentelemetry.sdk.trace.data.SpanData; +import org.immutables.value.Value; + +@Value.Immutable +@Value.Style(jdkOnly = true) +public abstract class ConcordSpanData implements SpanData { + + static ImmutableConcordSpanData.Builder builder() { + return ImmutableConcordSpanData.builder(); + } +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/FlowSteps.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/FlowSteps.java new file mode 100644 index 00000000..72d02767 --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/FlowSteps.java @@ -0,0 +1,120 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serial; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class FlowSteps implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + private final UUID instanceId; + private final String flowName; + private final long startedAt; + private final List steps; + + @JsonIgnore + private final Map stepIndex; + + @JsonCreator + public FlowSteps(@JsonProperty("instanceId") UUID instanceId, + @JsonProperty("flowName") String flowName, + @JsonProperty("startedAt") long startedAt, + @JsonProperty("steps") List steps) { + + this.instanceId = instanceId; + this.flowName = flowName; + this.startedAt = startedAt; + this.steps = new ArrayList<>(steps != null ? steps : List.of()); + this.stepIndex = IntStream.range(0, this.steps.size()) + .boxed() + .collect(Collectors.toMap( + i -> this.steps.get(i).id(), + i -> i + )); + } + + public void onStepStart(StepId stepId, String name, StepId parentId, + String filename, int lineNum, String flowName) { + StepInfo step = StepInfo.builder() + .id(stepId) + .name(name) + .parentId(parentId) + .startedAt(System.currentTimeMillis()) + .filename(filename) + .lineNum(lineNum) + .flowName(flowName) + .build(); + + synchronized (this) { + steps.add(step); + stepIndex.put(stepId, steps.size() - 1); + } + } + + public boolean onStepEnd(StepId stepId, boolean success) { + synchronized (this) { + Integer index = stepIndex.get(stepId); + if (index == null) { + return false; + } + + steps.set(index, StepInfo.builder().from(steps.get(index)) + .endedAt(System.currentTimeMillis()) + .success(success) + .build()); + } + + return true; + } + + @JsonProperty("steps") + public List steps() { + return steps; + } + + @JsonProperty("instanceId") + public UUID instanceId() { + return instanceId; + } + + @JsonProperty("startedAt") + public long startedAt() { + return startedAt; + } + + @JsonProperty("flowName") + public String flowName() { + return flowName; + } +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/OpentelemetryModule.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/OpentelemetryModule.java new file mode 100644 index 00000000..5a3d7cb1 --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/OpentelemetryModule.java @@ -0,0 +1,37 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.google.inject.Binder; +import com.google.inject.multibindings.Multibinder; +import com.walmartlabs.concord.svm.ExecutionListener; + +import javax.inject.Named; + +@Named +public class OpentelemetryModule implements com.google.inject.Module { + + @Override + public void configure (Binder binder){ + Multibinder taskProviders = Multibinder.newSetBinder(binder, ExecutionListener.class); + taskProviders.addBinding().to(TelemetryCollector.class); + } +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/StepId.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/StepId.java new file mode 100644 index 00000000..6341340e --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/StepId.java @@ -0,0 +1,57 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.immutables.value.Value; + +import javax.annotation.Nullable; +import java.io.Serial; +import java.io.Serializable; +import java.util.UUID; + +@Value.Immutable +@Value.Style(jdkOnly = true) +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonSerialize(as = ImmutableStepId.class) +@JsonDeserialize(as = ImmutableStepId.class) +public interface StepId extends Serializable { + + @Serial + long serialVersionUID = 1L; + + @Value.Parameter + UUID correlationId(); + + @Nullable + @Value.Parameter + Integer loopIndex(); + + static StepId from(UUID correlationId) { + return ImmutableStepId.of(correlationId, null); + } + + static StepId from(UUID correlationId, Integer loopIndex) { + return ImmutableStepId.of(correlationId, loopIndex); + } +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/StepInfo.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/StepInfo.java new file mode 100644 index 00000000..eb1e3c85 --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/StepInfo.java @@ -0,0 +1,70 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.immutables.value.Value; + +import javax.annotation.Nullable; +import java.io.Serial; +import java.io.Serializable; + +@Value.Immutable +@Value.Style(jdkOnly = true) +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonSerialize(as = ImmutableStepInfo.class) +@JsonDeserialize(as = ImmutableStepInfo.class) +public interface StepInfo extends Serializable { + + @Serial + long serialVersionUID = 1L; + + StepId id(); + + String name(); + + @Nullable + StepId parentId(); + + long startedAt(); + + @Nullable + Long endedAt(); + + @Nullable + String filename(); + + int lineNum(); + + @Nullable + String flowName(); + + @Value.Default + default boolean success() { + return false; + } + + static ImmutableStepInfo.Builder builder() { + return ImmutableStepInfo.builder(); + } +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryCollector.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryCollector.java new file mode 100644 index 00000000..c0670454 --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryCollector.java @@ -0,0 +1,340 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.walmartlabs.concord.runtime.common.injector.InstanceId; +import com.walmartlabs.concord.runtime.v2.model.*; +import com.walmartlabs.concord.runtime.v2.runner.DefaultTaskVariablesService; +import com.walmartlabs.concord.runtime.v2.runner.PersistenceService; +import com.walmartlabs.concord.runtime.v2.runner.context.ContextFactory; +import com.walmartlabs.concord.runtime.v2.runner.logging.SegmentedLogger; +import com.walmartlabs.concord.runtime.v2.runner.vm.*; +import com.walmartlabs.concord.runtime.v2.sdk.Context; +import com.walmartlabs.concord.runtime.v2.sdk.ProcessConfiguration; +import com.walmartlabs.concord.svm.Runtime; +import com.walmartlabs.concord.svm.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.List; +import java.util.UUID; + +public class TelemetryCollector implements ExecutionListener { + + public static final Logger log = LoggerFactory.getLogger(TelemetryCollector.class); + + private static final String TELEMETRY_STATE_FILENAME = "opentelemetry.json"; + + private static final String PARENT_STEP_ID_VARIABLE = "__opentelemetry__parent_step_id"; + private static final String FLOW_CALL_CORRELATION_ID_STEP_ID_VARIABLE = "__opentelemetry__correlation_id"; + private static final String FLOW_CALL_FRAME_VARIABLE = "__opentelemetry__flow_call_frame"; + + private final ObjectMapper objectMapper; + private final PersistenceService persistenceService; + + private final UUID instanceId; + private final ProcessConfiguration processConfiguration; + private final TelemetryParams params; + + private FlowSteps flowSteps; + + @Inject + public TelemetryCollector(ObjectMapper objectMapper, + PersistenceService persistenceService, + InstanceId instanceId, + ProcessConfiguration processConfiguration, + DefaultTaskVariablesService defaultTaskVariablesService) { + + this.objectMapper = objectMapper; + this.persistenceService = persistenceService; + this.instanceId = instanceId.getValue(); + this.processConfiguration = processConfiguration; + this.params = new TelemetryParams(defaultTaskVariablesService.get("opentelemetry")); + } + + @Override + public void beforeProcessStart(Runtime runtime, State state) { + if (!params.enabled()) { + return; + } + + this.flowSteps = new FlowSteps(instanceId, getEntryPoint(runtime, state, params, processConfiguration), System.currentTimeMillis(), List.of()); + } + + @Override + public void beforeProcessResume(Runtime runtime, State state) { + if (!params.enabled()) { + return; + } + + this.flowSteps = persistenceService.loadPersistedFile(TELEMETRY_STATE_FILENAME, + is -> objectMapper.readValue(is, FlowSteps.class)); + } + + @Override + public Result beforeCommand(Runtime runtime, VM vm, State state, ThreadId threadId, Command cmd) { + if (!params.enabled()) { + return Result.CONTINUE; + } + + if (cmd instanceof PopFrameCommand) { + // flow call end (including all steps)? + if (isFlowCallFrame(state, threadId)) { + UUID correlationId = getCorrelationId(state, threadId); + StepId stepId = StepId.from(correlationId, VMUtils.getCombinedLocal(state, threadId, LoopWrapper.CURRENT_INDEX)); + + boolean ok = flowSteps.onStepEnd(stepId, state.getThreadError(threadId) == null); + if (!ok) { + log.warn("beforeCommand ['{}', '{}'] -> step start info not found. This is most likely a bug", stepId, cmd); + } + } + return Result.CONTINUE; + } + + if (!shouldTraceCommand(cmd, params)) { + return Result.CONTINUE; + } + + if (cmd instanceof TaskResumeCommand) { // we already added span for task command + return Result.CONTINUE; + } + + StepCommand s = (StepCommand) cmd; + Step step = s.getStep(); + + log.debug("beforeCommand: {}", s); + + StepId stepId = StepId.from(s.getCorrelationId(), VMUtils.getCombinedLocal(state, threadId, LoopWrapper.CURRENT_INDEX)); + + flowSteps.onStepStart(stepId, + getStepName(runtime, state, threadId, step), + getParentId(state, threadId), + step.getLocation().fileName(), step.getLocation().lineNum(), + getFlowName(state, threadId)); + + return Result.CONTINUE; + } + + @Override + public Result afterCommand(Runtime runtime, VM vm, State state, ThreadId threadId, Command cmd) { + if (params.enabled()) { + afterCommand(state, threadId, cmd, true); + } + + return Result.CONTINUE; + } + + @Override + public Result onCommandError(Runtime runtime, VM vm, State state, ThreadId threadId, Command cmd, Exception e) { + if (params.enabled()) { + afterCommand(state, threadId, cmd, false); + } + + return Result.CONTINUE; + } + + private void afterCommand(State state, ThreadId threadId, Command cmd, boolean success) { + log.debug("afterCommand: {}", cmd); + + if (!shouldTraceCommand(cmd, params)) { + return; + } + + StepCommand s = (StepCommand) cmd; + + StepId stepId = StepId.from(s.getCorrelationId(), VMUtils.getCombinedLocal(state, threadId, LoopWrapper.CURRENT_INDEX)); + + if (cmd instanceof FlowCallCommand) { + setParentId(state, threadId, stepId); + setCorrelationId(state, threadId, s.getCorrelationId()); + markFrameAsFlowCall(state, threadId); + + return; + } + + boolean ok = flowSteps.onStepEnd(stepId, success); + if (!ok) { + log.warn("afterCommand ['{}', '{}'] -> step start info not found. This is most likely a bug", stepId, cmd); + } + } + + @Override + public void afterProcessEnds(Runtime runtime, State state, Frame lastFrame) { + if (!params.enabled()) { + return; + } + + if (isSuspended(state)) { + persistenceService.persistFile(TELEMETRY_STATE_FILENAME, + out -> objectMapper.writeValue(out, this.flowSteps)); + + return; + } + + sendTelemetry(true); + } + + @Override + public void onProcessError(Runtime runtime, State state, Exception e) { + if (!params.enabled()) { + return; + } + + sendTelemetry(false); + } + + private void sendTelemetry(boolean isProcessFinishedOk) { + long start = System.currentTimeMillis(); + log.info("Sending telemetry for process"); + + TelemetryExporter telemetryExporter = new TelemetryExporter(params.endpoint()); + String traceId = telemetryExporter.sendTelemetry(flowSteps, isProcessFinishedOk); + + log.info("Sending telemetry for process -> done in {} ms", (System.currentTimeMillis() - start)); + + String link = params.link(); + if (link != null) { + log.info("Opentelemetry traces link: {}", link.replace("", traceId)); + } + } + + private static boolean isSuspended(State state) { + return state.threadStatus().entrySet().stream() + .anyMatch(e -> e.getValue() == ThreadStatus.SUSPENDED); + } + + private static void setParentId(State state, ThreadId threadId, StepId id) { + state.peekFrame(threadId).setLocal(PARENT_STEP_ID_VARIABLE, id); + } + + private static StepId getParentId(State state, ThreadId threadId) { + return VMUtils.getCombinedLocal(state, threadId, PARENT_STEP_ID_VARIABLE); + } + + private static void setCorrelationId(State state, ThreadId threadId, UUID id) { + state.peekFrame(threadId).setLocal(FLOW_CALL_CORRELATION_ID_STEP_ID_VARIABLE, id); + } + + private static UUID getCorrelationId(State state, ThreadId threadId) { + return (UUID) state.peekFrame(threadId).getLocal(FLOW_CALL_CORRELATION_ID_STEP_ID_VARIABLE); + } + + private static String getFlowName(State state, ThreadId threadId) { + return FlowCallCommand.getFlowName(state, threadId); + } + + private static void markFrameAsFlowCall(State state, ThreadId threadId) { + Frame frame = state.peekFrame(threadId); + frame.setLocal(FLOW_CALL_FRAME_VARIABLE, frame.id()); + } + + private static boolean isFlowCallFrame(State state, ThreadId threadId) { + Frame frame = state.peekFrame(threadId); + Object value = frame.getLocal(FLOW_CALL_FRAME_VARIABLE); + if (value instanceof FrameId) { + return value.equals(frame.id()); + } + return false; + } + + private static String getStepName(Runtime runtime, State state, ThreadId threadId, Step step) { + if (step instanceof AbstractStep) { + ContextFactory contextFactory = runtime.getService(ContextFactory.class); + Context ctx = contextFactory.create(runtime, state, threadId, step); + + String rawSegmentName = SegmentedLogger.getSegmentName((AbstractStep) step); + String segmentName = ctx.eval(rawSegmentName, String.class); + if (segmentName != null) { + return segmentName; + } + } + + return getDefaultDescription(runtime, state, threadId, step); + } + + private static String getEntryPoint(Runtime runtime, State state, TelemetryParams params, ProcessConfiguration processConfiguration) { + if (params.entryPointVariableName() == null) { + return processConfiguration.entryPoint(); + } + + ContextFactory contextFactory = runtime.getService(ContextFactory.class); + Context ctx = contextFactory.create(runtime, state, state.getRootThreadId(), null); + + String result = ctx.eval("${" + params.entryPointVariableName() + "}", String.class); + if (result != null) { + return result; + } + + log.warn("can't load entry point from variable '{}', using process entry point: {}", params.entryPointVariableName(), processConfiguration.entryPoint()); + + return processConfiguration.entryPoint(); + } + + private static String getDefaultDescription(Runtime runtime, State state, ThreadId threadId, Step step) { + if (step instanceof FlowCall) { + String flowName = ((FlowCall) step).getFlowName(); + + ContextFactory contextFactory = runtime.getService(ContextFactory.class); + Context ctx = contextFactory.create(runtime, state, threadId, step); + + flowName = ctx.eval(flowName, String.class); + + return "Flow call: " + flowName; + } else if (step instanceof Expression) { + return "Expression: " + ((Expression) step).getExpr(); + } else if (step instanceof ScriptCall) { + return "Script: " + ((ScriptCall) step).getLanguageOrRef(); + } else if (step instanceof IfStep) { + return "Check: " + ((IfStep) step).getExpression(); + } else if (step instanceof SwitchStep) { + return "Switch: " + ((SwitchStep) step).getExpression(); + } else if (step instanceof SetVariablesStep) { + return "Set variables"; + } else if (step instanceof Checkpoint) { + return "Checkpoint: " + ((Checkpoint) step).getName(); + } else if (step instanceof FormCall) { + return "Form call: " + ((FormCall) step).getName(); + } else if (step instanceof GroupOfSteps) { + return "Group of steps"; + } else if (step instanceof ParallelBlock) { + return "Parallel block"; + } else if (step instanceof ExitStep) { + return "Exit"; + } else if (step instanceof ReturnStep) { + return "Return"; + } else if (step instanceof TaskCall) { + return "Task: " + ((TaskCall) step).getName(); + } + + return step.getClass().getName(); + } + + private boolean shouldTraceCommand(Command cmd, TelemetryParams params) { + for (Class cls : params.stepsToTrace()) { + if (cls.isInstance(cmd)) { + return true; + } + } + return false; + } +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryExporter.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryExporter.java new file mode 100644 index 00000000..dd6239c0 --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryExporter.java @@ -0,0 +1,95 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.IdGenerator; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class TelemetryExporter { + + private final String endpoint; + + public TelemetryExporter(String endpoint) { + this.endpoint = endpoint; + } + + public String sendTelemetry(FlowSteps flowSteps, boolean isProcessFinishedOk) { + Resource resource = Resource.create(Attributes.of( + AttributeKey.stringKey("service.name"), "Concord", + AttributeKey.stringKey("flowName"), flowSteps.flowName(), + AttributeKey.stringKey("processId"), flowSteps.instanceId().toString()) + ); + + OtlpHttpSpanExporter exporter = OtlpHttpSpanExporter.builder() + .setEndpoint(endpoint) + .build(); + + try (BatchSpanProcessor processor = BatchSpanProcessor.builder(exporter).build()) { + + String traceId = IdGenerator.random().generateTraceId(); + + String entryPoint = flowSteps.flowName(); + UUID processId = flowSteps.instanceId(); + long processEnds = System.currentTimeMillis(); + + ConcordSpan processSpan = ConcordSpanBuilder.builder(traceId, resource, entryPoint) + .setStartTimestamp(flowSteps.startedAt(), TimeUnit.MILLISECONDS) + .setAttribute("processId", processId.toString()) + .setStatus(isProcessFinishedOk ? StatusCode.OK : StatusCode.ERROR) + .end(processEnds) + .build(); + + Map correlationIdToSpanId = new HashMap<>(); + + for (StepInfo s : flowSteps.steps()) { + correlationIdToSpanId.put(s.id(), IdGenerator.random().generateSpanId()); + } + + for (StepInfo s : flowSteps.steps()) { + ConcordSpan span = ConcordSpanBuilder.builder(traceId, resource, correlationIdToSpanId.get(s.id()), s.name()) + .setParentSpanId(s.parentId() == null ? processSpan.getSpanContext().getSpanId() : correlationIdToSpanId.get(s.parentId())) + .setStartTimestamp(s.startedAt(), TimeUnit.MILLISECONDS) + .setAttribute("filename", s.filename()) + .setAttribute("lineNum", s.lineNum()) + .setAttribute("flowName", s.flowName()) + .setStatus(s.success() ? StatusCode.OK : StatusCode.ERROR) + .end(s.endedAt() == null ? processEnds : s.endedAt()) + .build(); + + processor.onEnd(span); + } + + processor.onEnd(processSpan); + + return traceId; + } + } +} diff --git a/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryParams.java b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryParams.java new file mode 100644 index 00000000..a9c1082c --- /dev/null +++ b/tasks/opentelemetry/src/main/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryParams.java @@ -0,0 +1,95 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.walmartlabs.concord.runtime.v2.runner.vm.*; +import com.walmartlabs.concord.sdk.MapUtils; + +import javax.annotation.Nullable; +import java.io.Serial; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +public class TelemetryParams implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + private static final List>> DEFAULT_STEP_TO_TRACE = List.of( + FlowCallCommand.class, + TaskCallCommand.class, + TaskResumeCommand.class); + + private final Map params; + + public TelemetryParams(Map params) { + this.params = params; + } + + public boolean enabled() { + return MapUtils.getBoolean(params, "enabled", false); + } + + public String endpoint() { + return MapUtils.assertString(params, "endpoint"); + } + + public String entryPointVariableName() { + return MapUtils.getString(params, "entryPointVariableName"); + } + + @Nullable + public String link() { + return MapUtils.getString(params, "link"); + } + + public List>> stepsToTrace() { + List steps = MapUtils.getList(params, "additionalSteps", List.of()); + if (steps.isEmpty()) { + return DEFAULT_STEP_TO_TRACE; + } + + return Stream.concat( + DEFAULT_STEP_TO_TRACE.stream(), + steps.stream() + .map(TelemetryParams::stepToStepCommand) + .filter(Objects::nonNull)) + .toList(); + } + + private static Class> stepToStepCommand(String step) { + switch (step) { + case "expression": + return ExpressionCommand.class; + case "checkpoint": + return CheckpointCommand.class; + case "set": + return SetVariablesCommand.class; + case "script": + return ScriptCallCommand.class; + default: + return null; + } + } +} diff --git a/tasks/opentelemetry/src/test/java/com/walmartlabs/concord/plugins/opentelemetry/FlowStepsSerializationTest.java b/tasks/opentelemetry/src/test/java/com/walmartlabs/concord/plugins/opentelemetry/FlowStepsSerializationTest.java new file mode 100644 index 00000000..e6e7c906 --- /dev/null +++ b/tasks/opentelemetry/src/test/java/com/walmartlabs/concord/plugins/opentelemetry/FlowStepsSerializationTest.java @@ -0,0 +1,49 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class FlowStepsSerializationTest { + + @Test + public void test() throws Exception { + FlowSteps flowStepsOrig = new FlowSteps(UUID.randomUUID(), "TestFlow", System.currentTimeMillis(), List.of()); + flowStepsOrig.onStepStart(StepId.from(UUID.randomUUID(), 123), "Step 1", null, "file1", 123, "flow1"); + + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(flowStepsOrig); + + FlowSteps flowSteps = objectMapper.readValue(json, FlowSteps.class); + + assertEquals(flowStepsOrig.flowName(), flowSteps.flowName()); + assertEquals(flowStepsOrig.instanceId(), flowSteps.instanceId()); + assertEquals(flowStepsOrig.startedAt(), flowSteps.startedAt()); + assertEquals(1, flowSteps.steps().size()); + assertEquals(flowStepsOrig.steps(), flowSteps.steps()); + } +} diff --git a/tasks/opentelemetry/src/test/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryExporterTest.java b/tasks/opentelemetry/src/test/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryExporterTest.java new file mode 100644 index 00000000..2d6e2068 --- /dev/null +++ b/tasks/opentelemetry/src/test/java/com/walmartlabs/concord/plugins/opentelemetry/TelemetryExporterTest.java @@ -0,0 +1,44 @@ +package com.walmartlabs.concord.plugins.opentelemetry; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc., Concord Authors + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.UUID; + +@Disabled() +public class TelemetryExporterTest { + + @Test + public void test() { + TelemetryExporter exporter = new TelemetryExporter("http://localhost:64318/v1/traces"); + + FlowSteps steps = new FlowSteps(UUID.randomUUID(), "fake-test-entrypoint", System.currentTimeMillis(), List.of()); + + steps.onStepStart(StepId.from(UUID.randomUUID()), "test-step", null, "test.concord.yaml", 1, "my-test-flow"); + + String traceId = exporter.sendTelemetry(steps, true); + + System.out.println(traceId); + } +} diff --git a/tasks/opentelemetry/src/test/resources/logback.xml b/tasks/opentelemetry/src/test/resources/logback.xml new file mode 100644 index 00000000..676d39c3 --- /dev/null +++ b/tasks/opentelemetry/src/test/resources/logback.xml @@ -0,0 +1,14 @@ + + + + %d{HH:mm:ss.SSS} [%thread] [%-5level] %logger{36} - %msg%n + + + + + + + + + +