From 5adec9b50f871484d1aa90f569ec9b7148208c51 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Mon, 30 Sep 2024 13:35:21 +0800 Subject: [PATCH 01/28] feat: support async-profiler --- apm-protocol/apm-network/pom.xml | 4 + .../command/AsyncProfilerTaskCommand.java | 113 ++++++++++++++ .../command/CommandDeserializer.java | 3 + .../AsyncProfilerDataSender.java | 135 +++++++++++++++++ .../core/asyncprofiler/AsyncProfilerTask.java | 141 ++++++++++++++++++ .../AsyncProfilerTaskChannelService.java | 114 ++++++++++++++ .../AsyncProfilerTaskExecutionService.java | 129 ++++++++++++++++ .../core/commands/CommandExecutorService.java | 5 + .../AsyncProfilerCommandExecutor.java | 44 ++++++ .../apm/agent/core/conf/Config.java | 19 +++ ...skywalking.apm.agent.core.boot.BootService | 3 + apm-sniffer/config/agent.config | 6 + pom.xml | 8 + 13 files changed, 724 insertions(+) create mode 100644 apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/AsyncProfilerCommandExecutor.java diff --git a/apm-protocol/apm-network/pom.xml b/apm-protocol/apm-network/pom.xml index 195286c43a..665d4232a4 100644 --- a/apm-protocol/apm-network/pom.xml +++ b/apm-protocol/apm-network/pom.xml @@ -78,6 +78,10 @@ ${org.apache.tomcat.annotations-api.version} provided + + io.pyroscope + async-profiler-context + diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java new file mode 100644 index 0000000000..57d87610fe --- /dev/null +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.network.trace.component.command; + +import org.apache.skywalking.apm.network.common.v3.Command; +import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair; + +import java.util.List; +import java.util.Objects; + +public class AsyncProfilerTaskCommand extends BaseCommand implements Serializable, Deserializable { + public static final Deserializable DESERIALIZER = new AsyncProfilerTaskCommand("", "", 0, null, "", 0); + public static final String NAME = "AsyncProfileTaskQuery"; + + private final String taskId; + private final int duration; + private final String execArgs; + private final long createTime; + + public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration, + List events, String execArgs, long createTime) { + super(NAME, serialNumber); + this.taskId = taskId; + this.duration = duration; + this.createTime = createTime; + String comma = ","; + StringBuilder sb = new StringBuilder(); + if (Objects.nonNull(events) && !events.isEmpty()) { + sb.append("event=") + .append(String.join(comma, events)) + .append(comma); + } + if (execArgs != null && !execArgs.isEmpty()) { + sb.append(execArgs); + } + this.execArgs = sb.toString(); + } + + public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration, + String execArgs, long createTime) { + super(NAME, serialNumber); + this.taskId = taskId; + this.duration = duration; + this.execArgs = execArgs; + this.createTime = createTime; + } + + @Override + public AsyncProfilerTaskCommand deserialize(Command command) { + final List argsList = command.getArgsList(); + String taskId = null; + int duration = 0; + String execArgs = null; + long createTime = 0; + String serialNumber = null; + for (final KeyStringValuePair pair : argsList) { + if ("SerialNumber".equals(pair.getKey())) { + serialNumber = pair.getValue(); + } else if ("TaskId".equals(pair.getKey())) { + taskId = pair.getValue(); + } else if ("Duration".equals(pair.getKey())) { + duration = Integer.parseInt(pair.getValue()); + } else if ("ExecArgs".equals(pair.getKey())) { + execArgs = pair.getValue(); + } else if ("CreateTime".equals(pair.getKey())) { + createTime = Long.parseLong(pair.getValue()); + } + } + return new AsyncProfilerTaskCommand(serialNumber, taskId, duration, execArgs, createTime); + } + + @Override + public Command.Builder serialize() { + final Command.Builder builder = commandBuilder(); + builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId)) + .addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration))) + .addArgs(KeyStringValuePair.newBuilder().setKey("ExecArgs").setValue(execArgs)) + .addArgs(KeyStringValuePair.newBuilder().setKey("CreateTime").setValue(String.valueOf(createTime))); + return builder; + } + + public String getTaskId() { + return taskId; + } + + public int getDuration() { + return duration; + } + + public String getExecArgs() { + return execArgs; + } + + public long getCreateTime() { + return createTime; + } +} diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java index ff8680bcb3..4fd737ff98 100644 --- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/CommandDeserializer.java @@ -27,7 +27,10 @@ public static BaseCommand deserialize(final Command command) { return ProfileTaskCommand.DESERIALIZER.deserialize(command); } else if (ConfigurationDiscoveryCommand.NAME.equals(commandName)) { return ConfigurationDiscoveryCommand.DESERIALIZER.deserialize(command); + } else if (AsyncProfilerTaskCommand.NAME.equals(commandName)) { + return AsyncProfilerTaskCommand.DESERIALIZER.deserialize(command); } + throw new UnsupportedCommandException(command); } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java new file mode 100644 index 0000000000..2f46f8688e --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.asyncprofiler; + +import com.google.protobuf.ByteString; +import io.grpc.Channel; +import io.grpc.stub.StreamObserver; +import org.apache.skywalking.apm.agent.core.boot.BootService; +import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.agent.core.profile.ProfileSnapshotSender; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; +import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus; +import org.apache.skywalking.apm.network.common.v3.Commands; +import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfilerData; +import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfilerMetaData; +import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfilerTaskGrpc; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; + +@DefaultImplementor +public class AsyncProfilerDataSender implements BootService, GRPCChannelListener { + private static final ILog LOGGER = LogManager.getLogger(ProfileSnapshotSender.class); + private static final int DATA_CHUNK_SIZE = 1024 * 1024; + + private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; + + private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskStub asyncProfilerTaskStub; + + @Override + public void prepare() throws Throwable { + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); + } + + @Override + public void boot() throws Throwable { + + } + + @Override + public void onComplete() throws Throwable { + + } + + @Override + public void shutdown() throws Throwable { + + } + + @Override + public void statusChanged(GRPCChannelStatus status) { + if (GRPCChannelStatus.CONNECTED.equals(status)) { + Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); + asyncProfilerTaskStub = AsyncProfilerTaskGrpc.newStub(channel); + } else { + asyncProfilerTaskStub = null; + } + this.status = status; + } + + public void send(AsyncProfilerTask task, InputStream fileDataInputStream) throws IOException { + if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(fileDataInputStream)) { + return; + } + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); + StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( + GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS + ).collect(new StreamObserver() { + @Override + public void onNext(Commands value) { + } + + @Override + public void onError(Throwable t) { + status.finished(); + if (LOGGER.isErrorEnable()) { + LOGGER.error( + t, "Send async profiler task data to collector fail with a grpc internal exception." + ); + } + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); + } + + @Override + public void onCompleted() { + status.finished(); + } + }); + AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() + .setService(Config.Agent.SERVICE_NAME) + .setServiceInstance(Config.Agent.INSTANCE_NAME) + .setUploadTime(System.currentTimeMillis()) + .setTaskId(task.getTaskId()) + .build(); + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); + dataStreamObserver.onNext(asyncProfilerData); + // send bin data + byte[] data = new byte[DATA_CHUNK_SIZE]; + int byteRead; + while ((byteRead = fileDataInputStream.read(data)) != -1) { + asyncProfilerData = AsyncProfilerData.newBuilder() + .setContent(ByteString.copyFrom(data, 0, byteRead)) + .build(); + dataStreamObserver.onNext(asyncProfilerData); + } + dataStreamObserver.onCompleted(); + status.wait4Finish(); + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java new file mode 100644 index 0000000000..2c82fb83fd --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.asyncprofiler; + +import io.pyroscope.one.profiler.AsyncProfiler; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.util.StringUtil; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class AsyncProfilerTask { + private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerTask.class); + private static final String COMMA = ","; + /** + * task id + */ + private String taskId; + /** + * execArgument from oap server + */ + private String execArgs; + /** + * run profiling for duration seconds + */ + private int duration; + /** + * run profiling for duration seconds + */ + private long createTime; + /** + * temp File + */ + private Path tempFile; + + private static String execute(AsyncProfiler asyncProfiler, String args) + throws IllegalArgumentException, IOException { + LOGGER.info("async profiler execute args:{}", args); + String result = asyncProfiler.execute(args); + return result.trim(); + } + + /** + * start async profiler + */ + public String start(AsyncProfiler asyncProfiler) throws IOException { + tempFile = getProfilerFilePath(); + StringBuilder startArgs = new StringBuilder(); + startArgs.append("start").append(COMMA); + if (StringUtil.isNotEmpty(execArgs)) { + startArgs.append(execArgs).append(COMMA); + } + startArgs.append("file=").append(tempFile.toString()); + + return execute(asyncProfiler, startArgs.toString()); + } + + /** + * stop async-profiler and get dump file inputStream + */ + public File stop(AsyncProfiler asyncProfiler) throws IOException { + LOGGER.info("async profiler process stop and dump file"); + String stopArgs = "stop" + COMMA + "file=" + tempFile.toAbsolutePath(); + execute(asyncProfiler, stopArgs); + return tempFile.toFile(); + } + + public Path getProfilerFilePath() throws IOException { + if (StringUtil.isNotEmpty(Config.AsyncProfiler.OUTPUT_PATH)) { + Path tempFilePath = Paths.get(Config.AsyncProfiler.OUTPUT_PATH, taskId + getFileExtension()); + return Files.createFile(tempFilePath.toAbsolutePath()); + } else { + return Files.createTempFile(taskId + getFileExtension(), taskId + getFileExtension()); + } + } + + private String getFileExtension() { + return ".jfr"; + } + + public void setExecArgs(String execArgs) { + this.execArgs = execArgs; + } + + public void setDuration(int duration) { + this.duration = duration; + } + + public void setTempFile(Path tempFile) { + this.tempFile = tempFile; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public void setCreateTime(long createTime) { + this.createTime = createTime; + } + + public String getExecArgs() { + return execArgs; + } + + public int getDuration() { + return duration; + } + + public Path getTempFile() { + return tempFile; + } + + public String getTaskId() { + return taskId; + } + + public long getCreateTime() { + return createTime; + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java new file mode 100644 index 0000000000..8a7e40852d --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.asyncprofiler; + +import io.grpc.Channel; +import org.apache.skywalking.apm.agent.core.boot.BootService; +import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.commands.CommandService; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; +import org.apache.skywalking.apm.network.common.v3.Commands; +import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfileTaskCommandQuery; +import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfilerTaskGrpc; +import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; + +@DefaultImplementor +public class AsyncProfilerTaskChannelService implements BootService, Runnable, GRPCChannelListener { + private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerTaskChannelService.class); + + // channel status + private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; + private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskBlockingStub asyncProfilerTaskBlockingStub; + + // query task list schedule + private volatile ScheduledFuture getTaskListFuture; + + @Override + public void run() { + if (status == GRPCChannelStatus.CONNECTED) { + // test start command and 10s after put stop command + long lastCommandCreateTime = ServiceManager.INSTANCE + .findService(AsyncProfilerTaskExecutionService.class).getLastCommandCreateTime(); + AsyncProfileTaskCommandQuery query = AsyncProfileTaskCommandQuery.newBuilder() + .setServiceInstance(Config.Agent.INSTANCE_NAME) + .setService(Config.Agent.SERVICE_NAME) + .setLastCommandTime(lastCommandCreateTime) + .build(); + Commands commands = asyncProfilerTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) + .getAsyncProfileTaskCommands(query); + ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); + } + } + + @Override + public void statusChanged(GRPCChannelStatus status) { + if (GRPCChannelStatus.CONNECTED.equals(status)) { + Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); + asyncProfilerTaskBlockingStub = AsyncProfilerTaskGrpc.newBlockingStub(channel); + } else { + asyncProfilerTaskBlockingStub = null; + } + this.status = status; + } + + @Override + public void prepare() throws Throwable { + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); + } + + @Override + public void boot() throws Throwable { + + if (Config.AsyncProfiler.ACTIVE) { + getTaskListFuture = Executors.newSingleThreadScheduledExecutor( + new DefaultNamedThreadFactory("AsyncProfilerGetTaskService") + ).scheduleWithFixedDelay( + new RunnableWithExceptionProtection( + this, + t -> LOGGER.error("Query async profiler task list failure.", t) + ), 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS + ); + } + } + + @Override + public void onComplete() throws Throwable { + + } + + @Override + public void shutdown() throws Throwable { + if (getTaskListFuture != null) { + getTaskListFuture.cancel(true); + } + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java new file mode 100644 index 0000000000..4cffc13917 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.asyncprofiler; + +import io.pyroscope.labels.io.pyroscope.PyroscopeAsyncProfiler; +import io.pyroscope.one.profiler.AsyncProfiler; +import org.apache.skywalking.apm.agent.core.boot.BootService; +import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@DefaultImplementor +public class AsyncProfilerTaskExecutionService implements BootService { + + private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerTaskChannelService.class); + + private static final AsyncProfiler ASYNC_PROFILER = PyroscopeAsyncProfiler.getAsyncProfiler(); + + private static final String SUCCESS_RESULT = "Profiling started"; + + // profile executor thread pool, only running one thread + private static final ScheduledExecutorService ASYNC_PROFILER_EXECUTOR = Executors.newSingleThreadScheduledExecutor( + new DefaultNamedThreadFactory("ASYNC-PROFILING-TASK")); + + // last command create time, use to next query task list + private volatile long lastCommandCreateTime = -1; + + // task schedule future + private volatile ScheduledFuture scheduledFuture; + + public void processAsyncProfilerTask(AsyncProfilerTask task) { + if (task.getCreateTime() <= lastCommandCreateTime) { + LOGGER.warn("get repeat task because createTime is less than lastCommandCreateTime"); + return; + } + lastCommandCreateTime = task.getCreateTime(); + LOGGER.info("add async profiler task: {}", task.getTaskId()); + // add task to list + ASYNC_PROFILER_EXECUTOR.execute(() -> { + try { + if (Objects.nonNull(scheduledFuture) && !scheduledFuture.isDone()) { + LOGGER.info("AsyncProfilerTask already running"); + return; + } + String result = task.start(ASYNC_PROFILER); + if (!SUCCESS_RESULT.equals(result)) { + LOGGER.error("AsyncProfilerTask start fail result:" + result); + return; + } + scheduledFuture = ASYNC_PROFILER_EXECUTOR.schedule( + () -> stopAsyncProfile(task), task.getDuration(), TimeUnit.SECONDS + ); + } catch (IOException e) { + LOGGER.error("AsyncProfilerTask executor error:" + e.getMessage(), e); + } + }); + } + + private void stopAsyncProfile(AsyncProfilerTask task) { + try { + // execute stop task + File dumpFile = task.stop(ASYNC_PROFILER); + InputStream fileDataInputStream = Files.newInputStream(dumpFile.toPath()); + // upload file + AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); + dataSender.send(task, fileDataInputStream); + // close inputStream + fileDataInputStream.close(); + if (!dumpFile.delete()) { + LOGGER.warn("delete async profiler dump file failed"); + } + } catch (Exception e) { + LOGGER.error("stop async profiler task error", e); + return; + } + } + + public long getLastCommandCreateTime() { + return lastCommandCreateTime; + } + + @Override + public void prepare() throws Throwable { + + } + + @Override + public void boot() throws Throwable { + + } + + @Override + public void onComplete() throws Throwable { + + } + + @Override + public void shutdown() throws Throwable { + ASYNC_PROFILER_EXECUTOR.shutdown(); + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java index 819b0b9ff1..c619051c9b 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java @@ -21,9 +21,11 @@ import java.util.Map; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.commands.executor.AsyncProfilerCommandExecutor; import org.apache.skywalking.apm.agent.core.commands.executor.ConfigurationDiscoveryCommandExecutor; import org.apache.skywalking.apm.agent.core.commands.executor.NoopCommandExecutor; import org.apache.skywalking.apm.agent.core.commands.executor.ProfileTaskCommandExecutor; +import org.apache.skywalking.apm.network.trace.component.command.AsyncProfilerTaskCommand; import org.apache.skywalking.apm.network.trace.component.command.BaseCommand; import org.apache.skywalking.apm.network.trace.component.command.ConfigurationDiscoveryCommand; import org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand; @@ -48,6 +50,9 @@ public void prepare() throws Throwable { //Get ConfigurationDiscoveryCommand executor. commandExecutorMap.put(ConfigurationDiscoveryCommand.NAME, new ConfigurationDiscoveryCommandExecutor()); + + // AsyncProfiler task executor + commandExecutorMap.put(AsyncProfilerTaskCommand.NAME, new AsyncProfilerCommandExecutor()); } @Override diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/AsyncProfilerCommandExecutor.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/AsyncProfilerCommandExecutor.java new file mode 100644 index 0000000000..530b655f84 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/AsyncProfilerCommandExecutor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.commands.executor; + +import org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerTask; +import org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerTaskExecutionService; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.commands.CommandExecutionException; +import org.apache.skywalking.apm.agent.core.commands.CommandExecutor; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.network.trace.component.command.AsyncProfilerTaskCommand; +import org.apache.skywalking.apm.network.trace.component.command.BaseCommand; + +public class AsyncProfilerCommandExecutor implements CommandExecutor { + @Override + public void execute(BaseCommand command) throws CommandExecutionException { + AsyncProfilerTaskCommand asyncProfilerTaskCommand = (AsyncProfilerTaskCommand) command; + + AsyncProfilerTask asyncProfilerTask = new AsyncProfilerTask(); + asyncProfilerTask.setTaskId(asyncProfilerTaskCommand.getTaskId()); + int duration = Math.min(Config.AsyncProfiler.MAX_DURATION, asyncProfilerTaskCommand.getDuration()); + asyncProfilerTask.setDuration(duration); + asyncProfilerTask.setExecArgs(asyncProfilerTaskCommand.getExecArgs()); + asyncProfilerTask.setCreateTime(asyncProfilerTaskCommand.getCreateTime()); + ServiceManager.INSTANCE.findService(AsyncProfilerTaskExecutionService.class) + .processAsyncProfilerTask(asyncProfilerTask); + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index be7d54a4e4..dc79dafc6e 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -252,6 +252,25 @@ public static class Profile { public static int SNAPSHOT_TRANSPORT_BUFFER_SIZE = 500; } + public static class AsyncProfiler { + /** + * If true, skywalking agent will enable profile when user create a new async profiler task. + * Otherwise disable it. + */ + public static boolean ACTIVE = true; + + /** + * Max monitor time(second), if async profiler monitor time out of limit, then stop it. + * default 1h. + */ + public static int MAX_DURATION = 3600; + + /** + * jfr directory generated by async profiler + */ + public static String OUTPUT_PATH = ""; + } + public static class Meter { /** * If true, skywalking agent will enable sending meters. Otherwise disable meter report. diff --git a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService index cfda93521c..f75d28cd78 100644 --- a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService +++ b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService @@ -36,3 +36,6 @@ org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService org.apache.skywalking.apm.agent.core.remote.EventReportServiceClient org.apache.skywalking.apm.agent.core.ServiceInstanceGenerator +org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerTaskExecutionService +org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerTaskChannelService +org.apache.skywalking.apm.agent.core.asyncprofiler.AsyncProfilerDataSender \ No newline at end of file diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 06f5717de0..de09acc78e 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -164,6 +164,12 @@ profile.duration=${SW_AGENT_PROFILE_DURATION:10} profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500} # Snapshot transport to backend buffer size profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:4500} +# If true, skywalking agent will enable profile when user create a new asyncprofile task. Otherwise disable it. +asyncprofiler.active=${SW_AGENT_ASYNC_PROFILER_ACTIVE:true} +# Max monitor time(second), if async profiler monitor time out of limit, then stop it. default 1h. +asyncprofiler.max_duration=${SW_AGENT_ASYNC_PROFILER_MAX_DURATION:3600} +# jfr directory generated by async profiler +asyncprofiler.output_path=${SW_AGENT_ASYNC_PROFILER_OUTPUT_PATH:} # If true, the agent collects and reports metrics to the backend. meter.active=${SW_METER_ACTIVE:true} # Report meters interval. The unit is second diff --git a/pom.xml b/pom.xml index adac36458d..7f3cc7ba04 100755 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ apm-protocol apm-sniffer apm-application-toolkit + java-agent-test pom @@ -87,6 +88,7 @@ 1.14.9 + 0.14.0 1.53.0 4.1.100.Final 2.8.9 @@ -213,6 +215,12 @@ + + io.pyroscope + async-profiler-context + ${async-profiler.version} + + junit junit From 5a88a8f4e2776a6dfc33bf5b55ad2eca96933600 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Fri, 11 Oct 2024 14:50:59 +0800 Subject: [PATCH 02/28] fix: reduce the default max_duration of async-profiler --- .../org/apache/skywalking/apm/agent/core/conf/Config.java | 2 +- apm-sniffer/config/agent.config | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index dc79dafc6e..d22ff3e3e0 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -263,7 +263,7 @@ public static class AsyncProfiler { * Max monitor time(second), if async profiler monitor time out of limit, then stop it. * default 1h. */ - public static int MAX_DURATION = 3600; + public static int MAX_DURATION = 600; /** * jfr directory generated by async profiler diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index de09acc78e..9058b2ef2c 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -166,8 +166,8 @@ profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500} profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:4500} # If true, skywalking agent will enable profile when user create a new asyncprofile task. Otherwise disable it. asyncprofiler.active=${SW_AGENT_ASYNC_PROFILER_ACTIVE:true} -# Max monitor time(second), if async profiler monitor time out of limit, then stop it. default 1h. -asyncprofiler.max_duration=${SW_AGENT_ASYNC_PROFILER_MAX_DURATION:3600} +# Max monitor time(second), if async profiler monitor time out of limit, then stop it. default 10 minutes. +asyncprofiler.max_duration=${SW_AGENT_ASYNC_PROFILER_MAX_DURATION:600} # jfr directory generated by async profiler asyncprofiler.output_path=${SW_AGENT_ASYNC_PROFILER_OUTPUT_PATH:} # If true, the agent collects and reports metrics to the backend. From e5243b29adedb67d545aeb5ea2237a488db7a6f6 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Thu, 24 Oct 2024 21:50:32 +0800 Subject: [PATCH 03/28] fix: Cooperate with proto modification --- apm-protocol/apm-network/pom.xml | 4 - .../command/AsyncProfilerTaskCommand.java | 2 +- apm-sniffer/apm-agent-core/pom.xml | 4 + .../AsyncProfilerDataSender.java | 74 ++++++++++++++++--- .../core/asyncprofiler/AsyncProfilerTask.java | 4 +- .../AsyncProfilerTaskChannelService.java | 9 ++- .../AsyncProfilerTaskExecutionService.java | 39 ++++++---- dist-material/LICENSE | 1 + pom.xml | 8 +- 9 files changed, 102 insertions(+), 43 deletions(-) diff --git a/apm-protocol/apm-network/pom.xml b/apm-protocol/apm-network/pom.xml index 665d4232a4..195286c43a 100644 --- a/apm-protocol/apm-network/pom.xml +++ b/apm-protocol/apm-network/pom.xml @@ -78,10 +78,6 @@ ${org.apache.tomcat.annotations-api.version} provided - - io.pyroscope - async-profiler-context - diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java index 57d87610fe..f8fe47191b 100644 --- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java @@ -26,7 +26,7 @@ public class AsyncProfilerTaskCommand extends BaseCommand implements Serializable, Deserializable { public static final Deserializable DESERIALIZER = new AsyncProfilerTaskCommand("", "", 0, null, "", 0); - public static final String NAME = "AsyncProfileTaskQuery"; + public static final String NAME = "AsyncProfilerTaskQuery"; private final String taskId; private final int duration; diff --git a/apm-sniffer/apm-agent-core/pom.xml b/apm-sniffer/apm-agent-core/pom.xml index 8ba309156f..fcca095f25 100644 --- a/apm-sniffer/apm-agent-core/pom.xml +++ b/apm-sniffer/apm-agent-core/pom.xml @@ -143,6 +143,10 @@ jmh-generator-annprocess test + + tools.profiler + async-profiler + diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index 2f46f8688e..e128976588 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -33,12 +33,14 @@ import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus; import org.apache.skywalking.apm.network.common.v3.Commands; -import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfilerData; -import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfilerMetaData; -import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfilerTaskGrpc; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectType; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerData; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerMetaData; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc; import java.io.IOException; -import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -84,16 +86,20 @@ public void statusChanged(GRPCChannelStatus status) { this.status = status; } - public void send(AsyncProfilerTask task, InputStream fileDataInputStream) throws IOException { - if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(fileDataInputStream)) { + public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException { + if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) || !channel.isOpen()) { return; } + + int size = Math.toIntExact(channel.size()); + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS ).collect(new StreamObserver() { @Override public void onNext(Commands value) { + } @Override @@ -115,20 +121,64 @@ public void onCompleted() { AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) - .setUploadTime(System.currentTimeMillis()) + .setType(AsyncProfilerCollectType.PROFILING_SUCCESS) + .setContentSize(size) .setTaskId(task.getTaskId()) .build(); AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); dataStreamObserver.onNext(asyncProfilerData); - // send bin data - byte[] data = new byte[DATA_CHUNK_SIZE]; - int byteRead; - while ((byteRead = fileDataInputStream.read(data)) != -1) { + // todo wait for server ? + // Is it possible to upload jfr? + ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); + while (channel.read(buf) > 0) { + buf.flip(); asyncProfilerData = AsyncProfilerData.newBuilder() - .setContent(ByteString.copyFrom(data, 0, byteRead)) + .setContent(ByteString.copyFrom(buf)) .build(); dataStreamObserver.onNext(asyncProfilerData); + buf.clear(); + } + dataStreamObserver.onCompleted(); + + status.wait4Finish(); + } + + public void sendError(AsyncProfilerTask task, String errorMessage) { + if (status != GRPCChannelStatus.CONNECTED) { + return; } + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); + + StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( + GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS + ).collect(new StreamObserver() { + @Override + public void onNext(Commands value) { + } + + @Override + public void onError(Throwable t) { + status.finished(); + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); + } + + @Override + public void onCompleted() { + status.finished(); + } + }); + AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() + .setService(Config.Agent.SERVICE_NAME) + .setServiceInstance(Config.Agent.INSTANCE_NAME) + .setTaskId(task.getTaskId()) + .setType(AsyncProfilerCollectType.EXECUTION_TASK_ERROR) + .setContentSize(0) + .build(); + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() + .setMetaData(metaData) + .setErrorMessage(errorMessage) + .build(); + dataStreamObserver.onNext(asyncProfilerData); dataStreamObserver.onCompleted(); status.wait4Finish(); } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java index 2c82fb83fd..b7b72199e4 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java @@ -18,7 +18,7 @@ package org.apache.skywalking.apm.agent.core.asyncprofiler; -import io.pyroscope.one.profiler.AsyncProfiler; +import one.profiler.AsyncProfiler; import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; @@ -91,7 +91,7 @@ public Path getProfilerFilePath() throws IOException { Path tempFilePath = Paths.get(Config.AsyncProfiler.OUTPUT_PATH, taskId + getFileExtension()); return Files.createFile(tempFilePath.toAbsolutePath()); } else { - return Files.createTempFile(taskId + getFileExtension(), taskId + getFileExtension()); + return Files.createTempFile(taskId, getFileExtension()); } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java index 8a7e40852d..482b6b0e77 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java @@ -31,8 +31,8 @@ import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; import org.apache.skywalking.apm.network.common.v3.Commands; -import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfileTaskCommandQuery; -import org.apache.skywalking.apm.network.language.asyncprofile.v3.AsyncProfilerTaskGrpc; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskCommandQuery; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import java.util.concurrent.Executors; @@ -58,13 +58,14 @@ public void run() { // test start command and 10s after put stop command long lastCommandCreateTime = ServiceManager.INSTANCE .findService(AsyncProfilerTaskExecutionService.class).getLastCommandCreateTime(); - AsyncProfileTaskCommandQuery query = AsyncProfileTaskCommandQuery.newBuilder() + + AsyncProfilerTaskCommandQuery query = AsyncProfilerTaskCommandQuery.newBuilder() .setServiceInstance(Config.Agent.INSTANCE_NAME) .setService(Config.Agent.SERVICE_NAME) .setLastCommandTime(lastCommandCreateTime) .build(); Commands commands = asyncProfilerTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) - .getAsyncProfileTaskCommands(query); + .getAsyncProfilerTaskCommands(query); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index 4cffc13917..a06fb58158 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -18,8 +18,7 @@ package org.apache.skywalking.apm.agent.core.asyncprofiler; -import io.pyroscope.labels.io.pyroscope.PyroscopeAsyncProfiler; -import io.pyroscope.one.profiler.AsyncProfiler; +import one.profiler.AsyncProfiler; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; @@ -28,9 +27,9 @@ import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; +import java.nio.channels.FileChannel; import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,7 +41,7 @@ public class AsyncProfilerTaskExecutionService implements BootService { private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerTaskChannelService.class); - private static final AsyncProfiler ASYNC_PROFILER = PyroscopeAsyncProfiler.getAsyncProfiler(); + private static final AsyncProfiler ASYNC_PROFILER = AsyncProfiler.getInstance(); private static final String SUCCESS_RESULT = "Profiling started"; @@ -70,13 +69,14 @@ public void processAsyncProfilerTask(AsyncProfilerTask task) { LOGGER.info("AsyncProfilerTask already running"); return; } + String result = task.start(ASYNC_PROFILER); if (!SUCCESS_RESULT.equals(result)) { - LOGGER.error("AsyncProfilerTask start fail result:" + result); + stopWhenError(task, result); return; } scheduledFuture = ASYNC_PROFILER_EXECUTOR.schedule( - () -> stopAsyncProfile(task), task.getDuration(), TimeUnit.SECONDS + () -> stopWhenSuccess(task), task.getDuration(), TimeUnit.SECONDS ); } catch (IOException e) { LOGGER.error("AsyncProfilerTask executor error:" + e.getMessage(), e); @@ -84,16 +84,25 @@ public void processAsyncProfilerTask(AsyncProfilerTask task) { }); } - private void stopAsyncProfile(AsyncProfilerTask task) { + private void stopWhenError(AsyncProfilerTask task, String errorMessage) { + LOGGER.error("AsyncProfilerTask start fail result:" + errorMessage); + AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); + dataSender.sendError(task, errorMessage); + } + + private void stopWhenSuccess(AsyncProfilerTask task) { + try { - // execute stop task File dumpFile = task.stop(ASYNC_PROFILER); - InputStream fileDataInputStream = Files.newInputStream(dumpFile.toPath()); - // upload file - AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); - dataSender.send(task, fileDataInputStream); - // close inputStream - fileDataInputStream.close(); + // stop task + try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) { + // upload file + FileChannel channel = fileInputStream.getChannel(); + + AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); + dataSender.sendData(task, channel); + } + if (!dumpFile.delete()) { LOGGER.warn("delete async profiler dump file failed"); } diff --git a/dist-material/LICENSE b/dist-material/LICENSE index a5c6f781cf..f36ccd2177 100755 --- a/dist-material/LICENSE +++ b/dist-material/LICENSE @@ -222,6 +222,7 @@ The text of each license is the standard Apache 2.0 license. Google: jsr305 3.0.2: http://central.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.pom , Apache 2.0 Google: guava 32.0.1: https://github.com/google/guava , Apache 2.0 netty 4.1.100: https://github.com/netty/netty/blob/4.1/LICENSE.txt, Apache 2.0 + async-profiler 3.0: https://github.com/async-profiler/async-profiler/blob/v3.0/LICENSE, Apache 2.0 ======================================================================== BSD licenses diff --git a/pom.xml b/pom.xml index 7f3cc7ba04..9fca6a946f 100755 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,6 @@ apm-protocol apm-sniffer apm-application-toolkit - java-agent-test pom @@ -88,7 +87,7 @@ 1.14.9 - 0.14.0 + 3.0 1.53.0 4.1.100.Final 2.8.9 @@ -216,11 +215,10 @@ - io.pyroscope - async-profiler-context + tools.profiler + async-profiler ${async-profiler.version} - junit junit From c236129bb9d96b3c4a156c0950978a393e03cacf Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:12:36 +0800 Subject: [PATCH 04/28] Update apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java Co-authored-by: Jiajing LU --- .../java/org/apache/skywalking/apm/agent/core/conf/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index d22ff3e3e0..57f39d1c4d 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -266,7 +266,7 @@ public static class AsyncProfiler { public static int MAX_DURATION = 600; /** - * jfr directory generated by async profiler + * Path for the JFR outputs from the Async profiler */ public static String OUTPUT_PATH = ""; } From 74171030260cd9f1999e73de69a1f7bd9e1a8d04 Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:12:45 +0800 Subject: [PATCH 05/28] Update apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java Co-authored-by: Jiajing LU --- .../java/org/apache/skywalking/apm/agent/core/conf/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index 57f39d1c4d..cbc3ac3b80 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -254,7 +254,7 @@ public static class Profile { public static class AsyncProfiler { /** - * If true, skywalking agent will enable profile when user create a new async profiler task. + * If true, Async Profiler will be enabled when user creates a new async profiler task. * Otherwise disable it. */ public static boolean ACTIVE = true; From d327dd2318e7bd6f91ed806f8ad47f559ca0e995 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Sat, 26 Oct 2024 16:42:36 +0800 Subject: [PATCH 06/28] fix: proto git sha --- apm-protocol/apm-network/src/main/proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto index d4da569991..5c2364e2f5 160000 --- a/apm-protocol/apm-network/src/main/proto +++ b/apm-protocol/apm-network/src/main/proto @@ -1 +1 @@ -Subproject commit d4da5699915ee52288f8ff1c954decf6363485bc +Subproject commit 5c2364e2f51f8e1116f63792f16d59336e890ffe From 1a417acef592484a26b4d67dd6bfaef7e821b1f9 Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Sat, 26 Oct 2024 16:54:26 +0800 Subject: [PATCH 07/28] Update apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java Co-authored-by: Jiajing LU --- .../java/org/apache/skywalking/apm/agent/core/conf/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index cbc3ac3b80..2e9dfdd6d9 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -255,7 +255,7 @@ public static class Profile { public static class AsyncProfiler { /** * If true, Async Profiler will be enabled when user creates a new async profiler task. - * Otherwise disable it. + * Otherwise it is disabled. */ public static boolean ACTIVE = true; From 3b59da7e9099bd179bc3b7d4591652d419dcbf14 Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Sat, 26 Oct 2024 16:54:36 +0800 Subject: [PATCH 08/28] Update apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java Co-authored-by: Jiajing LU --- .../java/org/apache/skywalking/apm/agent/core/conf/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index 2e9dfdd6d9..35da3f9bde 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -260,7 +260,7 @@ public static class AsyncProfiler { public static boolean ACTIVE = true; /** - * Max monitor time(second), if async profiler monitor time out of limit, then stop it. + * Max execution time(second) for the Async Profiler. The task will be stopped even if a longer time is specified. * default 1h. */ public static int MAX_DURATION = 600; From faa873279da1809b9d86fefd66367a848c69bdf8 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Sat, 26 Oct 2024 17:26:15 +0800 Subject: [PATCH 09/28] fix: comment --- .../core/asyncprofiler/AsyncProfilerTask.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java index b7b72199e4..6e67d7be58 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java @@ -38,19 +38,20 @@ public class AsyncProfilerTask { */ private String taskId; /** - * execArgument from oap server + * User input parameters + * @see async-profiler argument */ private String execArgs; /** - * run profiling for duration seconds + * run profiling for duration (second) */ private int duration; /** - * run profiling for duration seconds + * The time when oap server created this task */ private long createTime; /** - * temp File + * tempFile generated by async-profiler execution */ private Path tempFile; @@ -77,7 +78,7 @@ public String start(AsyncProfiler asyncProfiler) throws IOException { } /** - * stop async-profiler and get dump file inputStream + * stop async profiler and get file */ public File stop(AsyncProfiler asyncProfiler) throws IOException { LOGGER.info("async profiler process stop and dump file"); @@ -86,6 +87,10 @@ public File stop(AsyncProfiler asyncProfiler) throws IOException { return tempFile.toFile(); } + /** + * if outputPath is configured, the JFR file will be generated at outputPath, + * otherwise createTemp will be used to create the file + */ public Path getProfilerFilePath() throws IOException { if (StringUtil.isNotEmpty(Config.AsyncProfiler.OUTPUT_PATH)) { Path tempFilePath = Paths.get(Config.AsyncProfiler.OUTPUT_PATH, taskId + getFileExtension()); From fbc24ef99ea593c101255e514506ec9d0fd0d036 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Mon, 28 Oct 2024 10:25:41 +0800 Subject: [PATCH 10/28] fix: comment & data sender --- .../agent/core/asyncprofiler/AsyncProfilerDataSender.java | 6 +++--- .../org/apache/skywalking/apm/agent/core/conf/Config.java | 6 ++++-- apm-sniffer/config/agent.config | 6 +++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index e128976588..b7b671c7a1 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -92,7 +92,7 @@ public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOExcep } int size = Math.toIntExact(channel.size()); - + boolean[] isAbort = new boolean[]{false}; final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS @@ -104,6 +104,7 @@ public void onNext(Commands value) { @Override public void onError(Throwable t) { + isAbort[0] = true; status.finished(); if (LOGGER.isErrorEnable()) { LOGGER.error( @@ -130,7 +131,7 @@ public void onCompleted() { // todo wait for server ? // Is it possible to upload jfr? ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); - while (channel.read(buf) > 0) { + while (isAbort[0] && channel.read(buf) > 0) { buf.flip(); asyncProfilerData = AsyncProfilerData.newBuilder() .setContent(ByteString.copyFrom(buf)) @@ -148,7 +149,6 @@ public void sendError(AsyncProfilerTask task, String errorMessage) { return; } final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); - StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS ).collect(new StreamObserver() { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index 35da3f9bde..ec200b9964 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -261,12 +261,14 @@ public static class AsyncProfiler { /** * Max execution time(second) for the Async Profiler. The task will be stopped even if a longer time is specified. - * default 1h. + * default 10min. */ public static int MAX_DURATION = 600; /** - * Path for the JFR outputs from the Async profiler + * Path for the JFR outputs from the Async Profiler. + * If the parameter is not empty, the file will be created in the specified directory, + * otherwise the Files.createTemp method will be used to create the file. */ public static String OUTPUT_PATH = ""; } diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 9058b2ef2c..fe6d875a64 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -164,11 +164,11 @@ profile.duration=${SW_AGENT_PROFILE_DURATION:10} profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500} # Snapshot transport to backend buffer size profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:4500} -# If true, skywalking agent will enable profile when user create a new asyncprofile task. Otherwise disable it. +# If true, Async Profiler will be enabled when user creates a new async profiler task, Otherwise it is disabled. asyncprofiler.active=${SW_AGENT_ASYNC_PROFILER_ACTIVE:true} -# Max monitor time(second), if async profiler monitor time out of limit, then stop it. default 10 minutes. +# Max execution time(second) for the Async Profiler. The task will be stopped even if a longer time is specified. default 1h. asyncprofiler.max_duration=${SW_AGENT_ASYNC_PROFILER_MAX_DURATION:600} -# jfr directory generated by async profiler +# Path for the JFR outputs from the Async Profiler. If the parameter is not empty, the file will be created in the specified directory, otherwise the Files.createTemp method will be used to create the file. asyncprofiler.output_path=${SW_AGENT_ASYNC_PROFILER_OUTPUT_PATH:} # If true, the agent collects and reports metrics to the backend. meter.active=${SW_METER_ACTIVE:true} From dbb7bffd93265a932801f707aa7863e024aba42e Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Mon, 28 Oct 2024 11:02:42 +0800 Subject: [PATCH 11/28] fix: data sender and server communication --- .../core/asyncprofiler/AsyncProfilerDataSender.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index b7b671c7a1..3826538cb8 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -86,7 +86,7 @@ public void statusChanged(GRPCChannelStatus status) { this.status = status; } - public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException { + public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOException, InterruptedException { if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) || !channel.isOpen()) { return; } @@ -128,7 +128,12 @@ public void onCompleted() { .build(); AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); dataStreamObserver.onNext(asyncProfilerData); - // todo wait for server ? + /** + * Wait briefly to see if the server can receive the jfr. If the server cannot receive it, onError will be triggered. + * Then we wait for a while (waiting for the server to send onError) and then decide whether to send the jfr file. + */ + Thread.sleep(500); + // Is it possible to upload jfr? ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); while (isAbort[0] && channel.read(buf) > 0) { From 2f1cdd8f253c5426446f040c97b7ecc71e69954c Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Mon, 28 Oct 2024 23:06:57 +0800 Subject: [PATCH 12/28] fix: data sender --- .../apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index 3826538cb8..7a2b994024 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -136,7 +136,7 @@ public void onCompleted() { // Is it possible to upload jfr? ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); - while (isAbort[0] && channel.read(buf) > 0) { + while (!isAbort[0] && channel.read(buf) > 0) { buf.flip(); asyncProfilerData = AsyncProfilerData.newBuilder() .setContent(ByteString.copyFrom(buf)) From bcc5f302b80a6c72f30de28095429ce434c19cd8 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Tue, 29 Oct 2024 14:27:22 +0800 Subject: [PATCH 13/28] fix: data sender --- .../AsyncProfilerDataSender.java | 62 ++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index 7a2b994024..b86516d455 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -20,6 +20,8 @@ import com.google.protobuf.ByteString; import io.grpc.Channel; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; @@ -32,7 +34,7 @@ import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus; -import org.apache.skywalking.apm.network.common.v3.Commands; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectMessage; import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectType; import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerData; import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerMetaData; @@ -92,25 +94,44 @@ public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOExcep } int size = Math.toIntExact(channel.size()); - boolean[] isAbort = new boolean[]{false}; final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS - ).collect(new StreamObserver() { + ).collect(new ClientResponseObserver() { + ClientCallStreamObserver requestStream; + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + this.requestStream = requestStream; + } + @Override - public void onNext(Commands value) { + public void onNext(AsyncProfilerCollectMessage value) { + if (!AsyncProfilerCollectType.TERMINATED_BY_OVERSIZE.equals(value.getType())) { + ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); + try { + while (channel.read(buf) > 0) { + buf.flip(); + AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() + .setContent(ByteString.copyFrom(buf)) + .build(); + requestStream.onNext(asyncProfilerData); + buf.clear(); + } + } catch (IOException e) { + LOGGER.error("Failed to read JFR file and failed to upload to oap", e); + } + } else { + LOGGER.warn("JFR is too large to be received by the oap server"); + } + requestStream.onCompleted(); } @Override public void onError(Throwable t) { - isAbort[0] = true; status.finished(); - if (LOGGER.isErrorEnable()) { - LOGGER.error( - t, "Send async profiler task data to collector fail with a grpc internal exception." - ); - } + LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception."); ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); } @@ -128,23 +149,6 @@ public void onCompleted() { .build(); AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build(); dataStreamObserver.onNext(asyncProfilerData); - /** - * Wait briefly to see if the server can receive the jfr. If the server cannot receive it, onError will be triggered. - * Then we wait for a while (waiting for the server to send onError) and then decide whether to send the jfr file. - */ - Thread.sleep(500); - - // Is it possible to upload jfr? - ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); - while (!isAbort[0] && channel.read(buf) > 0) { - buf.flip(); - asyncProfilerData = AsyncProfilerData.newBuilder() - .setContent(ByteString.copyFrom(buf)) - .build(); - dataStreamObserver.onNext(asyncProfilerData); - buf.clear(); - } - dataStreamObserver.onCompleted(); status.wait4Finish(); } @@ -156,9 +160,9 @@ public void sendError(AsyncProfilerTask task, String errorMessage) { final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS - ).collect(new StreamObserver() { + ).collect(new StreamObserver() { @Override - public void onNext(Commands value) { + public void onNext(AsyncProfilerCollectMessage value) { } @Override From cef065db07bd536f1c570d3c0d7cb49baf310637 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Tue, 29 Oct 2024 15:06:18 +0800 Subject: [PATCH 14/28] fix: protocol update --- apm-protocol/apm-network/src/main/proto | 2 +- .../asyncprofiler/AsyncProfilerDataSender.java | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto index 5c2364e2f5..bd1f91f7e1 160000 --- a/apm-protocol/apm-network/src/main/proto +++ b/apm-protocol/apm-network/src/main/proto @@ -1 +1 @@ -Subproject commit 5c2364e2f51f8e1116f63792f16d59336e890ffe +Subproject commit bd1f91f7e1cb4de9d9b5ccb71f36ce6b1c7c97f5 diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index b86516d455..a1ed19c32e 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -34,11 +34,11 @@ import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus; -import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectMessage; -import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectType; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectionResponse; import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerData; import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerMetaData; import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc; +import org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus; import java.io.IOException; import java.nio.ByteBuffer; @@ -97,7 +97,7 @@ public void sendData(AsyncProfilerTask task, FileChannel channel) throws IOExcep final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS - ).collect(new ClientResponseObserver() { + ).collect(new ClientResponseObserver() { ClientCallStreamObserver requestStream; @Override @@ -106,8 +106,8 @@ public void beforeStart(ClientCallStreamObserver requestStrea } @Override - public void onNext(AsyncProfilerCollectMessage value) { - if (!AsyncProfilerCollectType.TERMINATED_BY_OVERSIZE.equals(value.getType())) { + public void onNext(AsyncProfilerCollectionResponse value) { + if (!AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) { ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); try { while (channel.read(buf) > 0) { @@ -143,7 +143,7 @@ public void onCompleted() { AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder() .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) - .setType(AsyncProfilerCollectType.PROFILING_SUCCESS) + .setType(AsyncProfilingStatus.PROFILING_SUCCESS) .setContentSize(size) .setTaskId(task.getTaskId()) .build(); @@ -160,9 +160,9 @@ public void sendError(AsyncProfilerTask task, String errorMessage) { final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter( GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS - ).collect(new StreamObserver() { + ).collect(new StreamObserver() { @Override - public void onNext(AsyncProfilerCollectMessage value) { + public void onNext(AsyncProfilerCollectionResponse value) { } @Override @@ -180,7 +180,7 @@ public void onCompleted() { .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) .setTaskId(task.getTaskId()) - .setType(AsyncProfilerCollectType.EXECUTION_TASK_ERROR) + .setType(AsyncProfilingStatus.EXECUTION_TASK_ERROR) .setContentSize(0) .build(); AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() From c960a6ba2e16f625fd0908fd9c934264958a7469 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Tue, 29 Oct 2024 15:48:49 +0800 Subject: [PATCH 15/28] fix: dependency location --- pom.xml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 9fca6a946f..1ca1ec9c2a 100755 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,6 @@ 1.14.9 - 3.0 1.53.0 4.1.100.Final 2.8.9 @@ -98,6 +97,7 @@ 2.0.48.Final 1.3.2 3.1 + 3.0 6.0.53 @@ -214,11 +214,6 @@ - - tools.profiler - async-profiler - ${async-profiler.version} - junit junit @@ -266,6 +261,11 @@ ${jmh.version} test + + tools.profiler + async-profiler + ${async-profiler.version} + From cacaa4e25d5e408f3a4ff6af3a12fb0cb4fe87e0 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Tue, 29 Oct 2024 22:35:50 +0800 Subject: [PATCH 16/28] fix: some issue --- .../agent/core/asyncprofiler/AsyncProfilerDataSender.java | 6 +++--- .../asyncprofiler/AsyncProfilerTaskExecutionService.java | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index a1ed19c32e..5d4a733534 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -107,7 +107,9 @@ public void beforeStart(ClientCallStreamObserver requestStrea @Override public void onNext(AsyncProfilerCollectionResponse value) { - if (!AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) { + if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) { + LOGGER.warn("JFR is too large to be received by the oap server"); + } else { ByteBuffer buf = ByteBuffer.allocateDirect(DATA_CHUNK_SIZE); try { while (channel.read(buf) > 0) { @@ -121,8 +123,6 @@ public void onNext(AsyncProfilerCollectionResponse value) { } catch (IOException e) { LOGGER.error("Failed to read JFR file and failed to upload to oap", e); } - } else { - LOGGER.warn("JFR is too large to be received by the oap server"); } requestStream.onCompleted(); diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index a06fb58158..18ed922443 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -133,6 +133,8 @@ public void onComplete() throws Throwable { @Override public void shutdown() throws Throwable { + scheduledFuture.cancel(true); ASYNC_PROFILER_EXECUTOR.shutdown(); + scheduledFuture = null; } } From e66546a360945c328384b812c891cc95e146d834 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 10:45:32 +0800 Subject: [PATCH 17/28] fix: some issue --- .../apm/agent/core/asyncprofiler/AsyncProfilerTask.java | 3 +-- .../asyncprofiler/AsyncProfilerTaskExecutionService.java | 8 +++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java index 6e67d7be58..819b25f998 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java @@ -58,8 +58,7 @@ public class AsyncProfilerTask { private static String execute(AsyncProfiler asyncProfiler, String args) throws IllegalArgumentException, IOException { LOGGER.info("async profiler execute args:{}", args); - String result = asyncProfiler.execute(args); - return result.trim(); + return asyncProfiler.execute(args); } /** diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index 18ed922443..da0d350e21 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -43,7 +43,7 @@ public class AsyncProfilerTaskExecutionService implements BootService { private static final AsyncProfiler ASYNC_PROFILER = AsyncProfiler.getInstance(); - private static final String SUCCESS_RESULT = "Profiling started"; + private static final String SUCCESS_RESULT = "Profiling started\n"; // profile executor thread pool, only running one thread private static final ScheduledExecutorService ASYNC_PROFILER_EXECUTOR = Executors.newSingleThreadScheduledExecutor( @@ -133,8 +133,10 @@ public void onComplete() throws Throwable { @Override public void shutdown() throws Throwable { - scheduledFuture.cancel(true); ASYNC_PROFILER_EXECUTOR.shutdown(); - scheduledFuture = null; + if (Objects.nonNull(scheduledFuture)) { + scheduledFuture.cancel(true); + scheduledFuture = null; + } } } From 2f28e5f0177603da41951a8aded7fb089df6ffd2 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 11:56:03 +0800 Subject: [PATCH 18/28] fix: unit test --- .../skywalking/apm/agent/core/boot/ServiceManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java index 48af242a43..e3cce99623 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java @@ -59,7 +59,7 @@ public static void afterClass() { public void testServiceDependencies() throws Exception { HashMap registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices"); - assertThat(registryService.size(), is(20)); + assertThat(registryService.size(), is(23)); assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)); assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class)); From 17990e0eaca452e2de0f55d7792d24f098b204a0 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 12:24:53 +0800 Subject: [PATCH 19/28] fix: comment --- .../command/AsyncProfilerTaskCommand.java | 13 +++++ .../core/asyncprofiler/AsyncProfilerTask.java | 2 +- .../AsyncProfilerTaskChannelService.java | 50 ++++++++++++------- .../apm/agent/core/conf/Config.java | 5 +- apm-sniffer/config/agent.config | 4 +- 5 files changed, 52 insertions(+), 22 deletions(-) diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java index f8fe47191b..b74be5c0f8 100644 --- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java @@ -28,9 +28,22 @@ public class AsyncProfilerTaskCommand extends BaseCommand implements Serializabl public static final Deserializable DESERIALIZER = new AsyncProfilerTaskCommand("", "", 0, null, "", 0); public static final String NAME = "AsyncProfilerTaskQuery"; + /** + * async-profiler taskId + */ private final String taskId; + /** + * run profiling for duration (second) + */ private final int duration; + /** + * User input parameters + * @see async-profiler argument + */ private final String execArgs; + /** + * task create time + */ private final long createTime; public AsyncProfilerTaskCommand(String serialNumber, String taskId, int duration, diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java index 819b25f998..56c0bd06ca 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java @@ -39,7 +39,7 @@ public class AsyncProfilerTask { private String taskId; /** * User input parameters - * @see async-profiler argument + * @see async-profiler argument */ private String execArgs; /** diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java index 482b6b0e77..953e46f754 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java @@ -19,6 +19,8 @@ package org.apache.skywalking.apm.agent.core.asyncprofiler; import io.grpc.Channel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; @@ -49,24 +51,38 @@ public class AsyncProfilerTaskChannelService implements BootService, Runnable, G private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskBlockingStub asyncProfilerTaskBlockingStub; - // query task list schedule - private volatile ScheduledFuture getTaskListFuture; + // query task schedule + private volatile ScheduledFuture getTaskFuture; @Override public void run() { if (status == GRPCChannelStatus.CONNECTED) { - // test start command and 10s after put stop command - long lastCommandCreateTime = ServiceManager.INSTANCE - .findService(AsyncProfilerTaskExecutionService.class).getLastCommandCreateTime(); - - AsyncProfilerTaskCommandQuery query = AsyncProfilerTaskCommandQuery.newBuilder() - .setServiceInstance(Config.Agent.INSTANCE_NAME) - .setService(Config.Agent.SERVICE_NAME) - .setLastCommandTime(lastCommandCreateTime) - .build(); - Commands commands = asyncProfilerTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) - .getAsyncProfilerTaskCommands(query); - ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); + try { + // test start command and 10s after put stop command + long lastCommandCreateTime = ServiceManager.INSTANCE + .findService(AsyncProfilerTaskExecutionService.class).getLastCommandCreateTime(); + + AsyncProfilerTaskCommandQuery query = AsyncProfilerTaskCommandQuery.newBuilder() + .setServiceInstance(Config.Agent.INSTANCE_NAME) + .setService(Config.Agent.SERVICE_NAME) + .setLastCommandTime(lastCommandCreateTime) + .build(); + Commands commands = asyncProfilerTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) + .getAsyncProfilerTaskCommands(query); + ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); + } catch (Throwable t) { + if (!(t instanceof StatusRuntimeException)) { + LOGGER.error(t, "Query async-profiler task from backend fail."); + return; + } + final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; + if (statusRuntimeException.getStatus().getCode() == Status.Code.UNIMPLEMENTED) { + LOGGER.warn("Backend doesn't support async-profiler, async-profiler will be disabled"); + if (getTaskFuture != null) { + getTaskFuture.cancel(true); + } + } + } } } @@ -90,7 +106,7 @@ public void prepare() throws Throwable { public void boot() throws Throwable { if (Config.AsyncProfiler.ACTIVE) { - getTaskListFuture = Executors.newSingleThreadScheduledExecutor( + getTaskFuture = Executors.newSingleThreadScheduledExecutor( new DefaultNamedThreadFactory("AsyncProfilerGetTaskService") ).scheduleWithFixedDelay( new RunnableWithExceptionProtection( @@ -108,8 +124,8 @@ public void onComplete() throws Throwable { @Override public void shutdown() throws Throwable { - if (getTaskListFuture != null) { - getTaskListFuture.cancel(true); + if (getTaskFuture != null) { + getTaskFuture.cancel(true); } } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index ec200b9964..3a296d45bb 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -254,8 +254,9 @@ public static class Profile { public static class AsyncProfiler { /** - * If true, Async Profiler will be enabled when user creates a new async profiler task. - * Otherwise it is disabled. + * If true, async profiler will be enabled when user creates a new async profiler task. + * If false, it will be disabled. + * The default value is true. */ public static boolean ACTIVE = true; diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index fe6d875a64..0ceb658627 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -164,9 +164,9 @@ profile.duration=${SW_AGENT_PROFILE_DURATION:10} profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500} # Snapshot transport to backend buffer size profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:4500} -# If true, Async Profiler will be enabled when user creates a new async profiler task, Otherwise it is disabled. +# If true, async profiler will be enabled when user creates a new async profiler task. If false, it will be disabled. The default value is true. asyncprofiler.active=${SW_AGENT_ASYNC_PROFILER_ACTIVE:true} -# Max execution time(second) for the Async Profiler. The task will be stopped even if a longer time is specified. default 1h. +# Max execution time(second) for the Async Profiler. The task will be stopped even if a longer time is specified. default 10min. asyncprofiler.max_duration=${SW_AGENT_ASYNC_PROFILER_MAX_DURATION:600} # Path for the JFR outputs from the Async Profiler. If the parameter is not empty, the file will be created in the specified directory, otherwise the Files.createTemp method will be used to create the file. asyncprofiler.output_path=${SW_AGENT_ASYNC_PROFILER_OUTPUT_PATH:} From d6dcbff55cd7c71dfbb033fd4060ee7f03799833 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 13:11:43 +0800 Subject: [PATCH 20/28] fix: unit test --- .../skywalking/apm/agent/core/boot/ServiceManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java index e3cce99623..e177920cfe 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java @@ -109,7 +109,7 @@ private void assertGRPCChannelManager(GRPCChannelManager service) throws Excepti assertNotNull(service); List listeners = getFieldValue(service, "listeners"); - assertEquals(listeners.size(), 10); + assertEquals(listeners.size(), 12); } private void assertSamplingService(SamplingService service) { From e8aef4c80fcb01e762f1f23cf09cde4b15432b9a Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 14:06:35 +0800 Subject: [PATCH 21/28] fix: comment --- .../component/command/AsyncProfilerTaskCommand.java | 11 +++++++++-- .../agent/core/asyncprofiler/AsyncProfilerTask.java | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java index b74be5c0f8..41d6043dee 100644 --- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/AsyncProfilerTaskCommand.java @@ -37,8 +37,15 @@ public class AsyncProfilerTaskCommand extends BaseCommand implements Serializabl */ private final int duration; /** - * User input parameters - * @see async-profiler argument + * async profiler extended parameters. Here is a table of optional parameters. + * + *

lock[=DURATION] - profile contended locks overflowing the DURATION ns bucket (default: 10us)

+ *

alloc[=BYTES] - profile allocations with BYTES interval

+ *

interval=N - sampling interval in ns (default: 10'000'000, i.e. 10 ms)

+ *

jstackdepth=N - maximum Java stack depth (default: 2048)

+ *

chunksize=N - approximate size of JFR chunk in bytes (default: 100 MB)

+ *

chunktime=N - duration of JFR chunk in seconds (default: 1 hour)

+ * details @see async-profiler argument */ private final String execArgs; /** diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java index 56c0bd06ca..0eb629b2ae 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java @@ -38,8 +38,8 @@ public class AsyncProfilerTask { */ private String taskId; /** - * User input parameters - * @see async-profiler argument + * async profiler optional extended parameters + * @see org.apache.skywalking.apm.network.trace.component.command.AsyncProfilerTaskCommand */ private String execArgs; /** From b3fe849134dca564f50bb5ab339b53c3a9ec8111 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 15:06:06 +0800 Subject: [PATCH 22/28] fix: lazy init && config --- .../AsyncProfilerDataSender.java | 6 ++-- .../AsyncProfilerTaskExecutionService.java | 34 ++++++++++++++----- .../apm/agent/core/conf/Config.java | 5 +++ 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index 5d4a733534..c64709cd98 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -29,7 +29,6 @@ import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; -import org.apache.skywalking.apm.agent.core.profile.ProfileSnapshotSender; import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener; import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; @@ -46,12 +45,12 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import static org.apache.skywalking.apm.agent.core.conf.Config.AsyncProfiler.DATA_CHUNK_SIZE; import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; @DefaultImplementor public class AsyncProfilerDataSender implements BootService, GRPCChannelListener { - private static final ILog LOGGER = LogManager.getLogger(ProfileSnapshotSender.class); - private static final int DATA_CHUNK_SIZE = 1024 * 1024; + private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerDataSender.class); private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; @@ -168,6 +167,7 @@ public void onNext(AsyncProfilerCollectionResponse value) { @Override public void onError(Throwable t) { status.finished(); + LOGGER.error(t, "Send async profiler task execute error fail with a grpc internal exception."); ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index da0d350e21..4f84422ca4 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -41,13 +41,12 @@ public class AsyncProfilerTaskExecutionService implements BootService { private static final ILog LOGGER = LogManager.getLogger(AsyncProfilerTaskChannelService.class); - private static final AsyncProfiler ASYNC_PROFILER = AsyncProfiler.getInstance(); + private AsyncProfiler asyncProfilerInstance; private static final String SUCCESS_RESULT = "Profiling started\n"; // profile executor thread pool, only running one thread - private static final ScheduledExecutorService ASYNC_PROFILER_EXECUTOR = Executors.newSingleThreadScheduledExecutor( - new DefaultNamedThreadFactory("ASYNC-PROFILING-TASK")); + private ScheduledExecutorService asyncProfilerExecutor; // last command create time, use to next query task list private volatile long lastCommandCreateTime = -1; @@ -63,19 +62,19 @@ public void processAsyncProfilerTask(AsyncProfilerTask task) { lastCommandCreateTime = task.getCreateTime(); LOGGER.info("add async profiler task: {}", task.getTaskId()); // add task to list - ASYNC_PROFILER_EXECUTOR.execute(() -> { + getAsyncProfilerExecutor().execute(() -> { try { if (Objects.nonNull(scheduledFuture) && !scheduledFuture.isDone()) { LOGGER.info("AsyncProfilerTask already running"); return; } - String result = task.start(ASYNC_PROFILER); + String result = task.start(getAsyncProfiler()); if (!SUCCESS_RESULT.equals(result)) { stopWhenError(task, result); return; } - scheduledFuture = ASYNC_PROFILER_EXECUTOR.schedule( + scheduledFuture = getAsyncProfilerExecutor().schedule( () -> stopWhenSuccess(task), task.getDuration(), TimeUnit.SECONDS ); } catch (IOException e) { @@ -93,7 +92,7 @@ private void stopWhenError(AsyncProfilerTask task, String errorMessage) { private void stopWhenSuccess(AsyncProfilerTask task) { try { - File dumpFile = task.stop(ASYNC_PROFILER); + File dumpFile = task.stop(getAsyncProfiler()); // stop task try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) { // upload file @@ -133,10 +132,29 @@ public void onComplete() throws Throwable { @Override public void shutdown() throws Throwable { - ASYNC_PROFILER_EXECUTOR.shutdown(); + getAsyncProfilerExecutor().shutdown(); if (Objects.nonNull(scheduledFuture)) { scheduledFuture.cancel(true); scheduledFuture = null; } } + + private AsyncProfiler getAsyncProfiler() { + if (asyncProfilerInstance == null) { + asyncProfilerInstance = AsyncProfiler.getInstance(); + } + return asyncProfilerInstance; + } + + private ScheduledExecutorService getAsyncProfilerExecutor() { + if (asyncProfilerExecutor == null) { + synchronized (this) { + if (asyncProfilerExecutor == null) { + asyncProfilerExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultNamedThreadFactory("ASYNC-PROFILING-TASK")); + } + } + } + return asyncProfilerExecutor; + } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index 3a296d45bb..fe2b7448e1 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -272,6 +272,11 @@ public static class AsyncProfiler { * otherwise the Files.createTemp method will be used to create the file. */ public static String OUTPUT_PATH = ""; + + /** + * The size of the chunk when uploading jfr + */ + public static final int DATA_CHUNK_SIZE = 1024 * 1024; } public static class Meter { From 4a6d7f5cc7d4a5745e9974baaae4946ac19e7196 Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Wed, 30 Oct 2024 15:10:44 +0800 Subject: [PATCH 23/28] Update apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 吴晟 Wu Sheng --- .../apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java index c64709cd98..5e09fab5b0 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java @@ -181,7 +181,7 @@ public void onCompleted() { .setServiceInstance(Config.Agent.INSTANCE_NAME) .setTaskId(task.getTaskId()) .setType(AsyncProfilingStatus.EXECUTION_TASK_ERROR) - .setContentSize(0) + .setContentSize(-1) .build(); AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder() .setMetaData(metaData) From 599fb98560516f9f79bfeb091c89b874a473c2a2 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 15:13:10 +0800 Subject: [PATCH 24/28] fix: volatile --- .../core/asyncprofiler/AsyncProfilerTaskExecutionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index 4f84422ca4..a0b43afd79 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -46,7 +46,7 @@ public class AsyncProfilerTaskExecutionService implements BootService { private static final String SUCCESS_RESULT = "Profiling started\n"; // profile executor thread pool, only running one thread - private ScheduledExecutorService asyncProfilerExecutor; + private volatile ScheduledExecutorService asyncProfilerExecutor; // last command create time, use to next query task list private volatile long lastCommandCreateTime = -1; From 0435607193dc89fe09cc7ef0b6091bb6c96a3412 Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Wed, 30 Oct 2024 15:14:24 +0800 Subject: [PATCH 25/28] Update apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 吴晟 Wu Sheng --- .../core/asyncprofiler/AsyncProfilerTaskExecutionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index a0b43afd79..5672e43172 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -84,7 +84,7 @@ public void processAsyncProfilerTask(AsyncProfilerTask task) { } private void stopWhenError(AsyncProfilerTask task, String errorMessage) { - LOGGER.error("AsyncProfilerTask start fail result:" + errorMessage); + LOGGER.error("AsyncProfilerTask fails to start: {}", errorMessage); AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); dataSender.sendError(task, errorMessage); } From efa9ce93ac5a97d4fcd87806169468a63bf93ae7 Mon Sep 17 00:00:00 2001 From: zhengziyi0117 <91662408+zhengziyi0117@users.noreply.github.com> Date: Wed, 30 Oct 2024 15:14:32 +0800 Subject: [PATCH 26/28] Update apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 吴晟 Wu Sheng --- .../core/asyncprofiler/AsyncProfilerTaskExecutionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index 5672e43172..739b973078 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -103,7 +103,7 @@ private void stopWhenSuccess(AsyncProfilerTask task) { } if (!dumpFile.delete()) { - LOGGER.warn("delete async profiler dump file failed"); + LOGGER.warn("Fail to delete the dump file of async profiler."); } } catch (Exception e) { LOGGER.error("stop async profiler task error", e); From bbd987fe66825b4f641fe2c848609373a4dc83c7 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 15:34:40 +0800 Subject: [PATCH 27/28] fix: log grammar --- .../core/asyncprofiler/AsyncProfilerTaskChannelService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java index 953e46f754..536a5f236c 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java @@ -72,7 +72,7 @@ public void run() { ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } catch (Throwable t) { if (!(t instanceof StatusRuntimeException)) { - LOGGER.error(t, "Query async-profiler task from backend fail."); + LOGGER.error(t, "fail to query async-profiler task from backend"); return; } final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; From d09acd4872890c61f6278bc1344ec6a78713cbd1 Mon Sep 17 00:00:00 2001 From: zhengziyi Date: Wed, 30 Oct 2024 16:23:59 +0800 Subject: [PATCH 28/28] fix: log && code format --- .../apm/agent/core/asyncprofiler/AsyncProfilerTask.java | 1 + .../core/asyncprofiler/AsyncProfilerTaskChannelService.java | 2 +- .../core/asyncprofiler/AsyncProfilerTaskExecutionService.java | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java index 0eb629b2ae..95948762b2 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java @@ -39,6 +39,7 @@ public class AsyncProfilerTask { private String taskId; /** * async profiler optional extended parameters + * * @see org.apache.skywalking.apm.network.trace.component.command.AsyncProfilerTaskCommand */ private String execArgs; diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java index 536a5f236c..7a2b26a46a 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskChannelService.java @@ -76,7 +76,7 @@ public void run() { return; } final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; - if (statusRuntimeException.getStatus().getCode() == Status.Code.UNIMPLEMENTED) { + if (Status.Code.UNIMPLEMENTED.equals(statusRuntimeException.getStatus().getCode())) { LOGGER.warn("Backend doesn't support async-profiler, async-profiler will be disabled"); if (getTaskFuture != null) { getTaskFuture.cancel(true); diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java index 739b973078..550923c1b0 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java @@ -84,7 +84,7 @@ public void processAsyncProfilerTask(AsyncProfilerTask task) { } private void stopWhenError(AsyncProfilerTask task, String errorMessage) { - LOGGER.error("AsyncProfilerTask fails to start: {}", errorMessage); + LOGGER.error("AsyncProfilerTask fails to start: " + errorMessage); AsyncProfilerDataSender dataSender = ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class); dataSender.sendError(task, errorMessage); }