diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
index c79b6ea8331b..7bb53c80a07c 100755
--- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
+++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
@@ -133,7 +133,10 @@ public class ComponentsDefine {
public static final OfficialComponent PLAY = new OfficialComponent(68, "Play");
public static final OfficialComponent CASSANDRA_JAVA_DRIVER = new OfficialComponent(69, "cassandra-java-driver");
-
+
public static final OfficialComponent LIGHT_4J = new OfficialComponent(71, "Light4J");
+ public static final OfficialComponent PULSAR_PRODUCER = new OfficialComponent(73, "pulsar-producer");
+
+ public static final OfficialComponent PULSAR_CONSUMER = new OfficialComponent(74, "pulsar-consumer");
}
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index 886dadb97c74..d38c4bc97d0a 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -76,6 +76,7 @@
solrj-7.x-plugin
cassandra-java-driver-3.x-plugin
light4j-plugins
+ pulsar-plugin
pom
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml
new file mode 100644
index 000000000000..bbb9e9eac35d
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+ apm-sdk-plugin
+ org.apache.skywalking
+ 6.5.0-SNAPSHOT
+
+ 4.0.0
+
+ apm-pulsar-plugin
+
+
+ 2.4.0
+
+
+
+
+ org.apache.pulsar
+ pulsar-client
+ ${pulsar.version}
+ provided
+
+
+
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
new file mode 100644
index 000000000000..8965c933a535
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+/**
+ * Interceptor of pulsar consumer constructor.
+ *
+ * The interceptor create {@link ConsumerEnhanceRequiredInfo} which is required by instance method interceptor,
+ * So use it to update the skywalking dynamic field of pulsar consumer enhanced instance.
+ * So that the instance methods can get the {@link ConsumerEnhanceRequiredInfo}
+ *
+ * @author penghui
+ */
+public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
+
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
+ String topic = (String) allArguments[1];
+ ConsumerConfigurationData consumerConfigurationData = (ConsumerConfigurationData) allArguments[2];
+ ConsumerEnhanceRequiredInfo requireInfo = new ConsumerEnhanceRequiredInfo();
+ /*
+ * Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
+ * can handle the service url provider which use a dynamic service url
+ */
+ requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
+ requireInfo.setTopic(topic);
+ requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
+ objInst.setSkyWalkingDynamicField(requireInfo);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
new file mode 100644
index 000000000000..a1276d8fa479
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+/**
+ * Pulsar consumer enhance required info is required by consumer enhanced object method interceptor
+ *
+ * @author penghui
+ */
+public class ConsumerEnhanceRequiredInfo {
+
+ /**
+ * service url of the consumer
+ */
+ private String serviceUrl;
+
+ /**
+ * topic name of the consumer
+ */
+ private String topic;
+
+ /**
+ * subscription name of the consumer
+ */
+ private String subscriptionName;
+
+ public String getServiceUrl() {
+ return serviceUrl;
+ }
+
+ public void setServiceUrl(String serviceUrl) {
+ this.serviceUrl = serviceUrl;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
+
+ public void setSubscriptionName(String subscriptionName) {
+ this.subscriptionName = subscriptionName;
+ }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java
new file mode 100644
index 000000000000..8db812014c9a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+/**
+ * Interceptor of pulsar producer constructor.
+ *
+ * The interceptor create {@link ProducerEnhanceRequiredInfo} which is required by instance method interceptor,
+ * So use it to update the skywalking dynamic field of pulsar producer enhanced instance.
+ * So that the instance methods can get the {@link ProducerEnhanceRequiredInfo}
+ *
+ * @author penghui
+ */
+public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
+
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
+ String topic = (String) allArguments[1];
+ ProducerEnhanceRequiredInfo producerEnhanceRequiredInfo = new ProducerEnhanceRequiredInfo();
+ producerEnhanceRequiredInfo.setTopic(topic);
+ /*
+ * Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
+ * can handle the service url provider which use a dynamic service url
+ */
+ producerEnhanceRequiredInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
+ objInst.setSkyWalkingDynamicField(producerEnhanceRequiredInfo);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java
new file mode 100644
index 000000000000..210e66a7eb9a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+/**
+ * Pulsar producer enhance required info is required by producer enhanced object method interceptor
+ *
+ * @author penghui
+ */
+public class ProducerEnhanceRequiredInfo {
+
+ /**
+ * service url of the pulsar producer
+ */
+ private String serviceUrl;
+
+ /**
+ * topic name of the pulsar producer
+ */
+ private String topic;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getServiceUrl() {
+ return serviceUrl;
+ }
+
+ public void setServiceUrl(String serviceUrl) {
+ this.serviceUrl = serviceUrl;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
new file mode 100644
index 000000000000..61ebe0411bd0
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
+
+/**
+ * Interceptor for pulsar consumer enhanced instance
+ *
+ * Here is the intercept process steps:
+ *
+ *
+ * 1. Get the @{@link ConsumerEnhanceRequiredInfo} and record the service url, topic name and subscription name
+ * 2. Create the entry span when call messageProcessed
method
+ * 3. Extract all the Trace Context
when call messageProcessed
method
+ * 4. Stop the entry span when messageProcessed
method finished.
+ *
+ *
+ * @author penghui
+ */
+public class PulsarConsumerInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String OPERATE_NAME_PREFIX = "Pulsar/";
+ public static final String CONSUMER_OPERATE_NAME = "/Consumer/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ if (allArguments[0] != null) {
+ ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+ Message msg = (Message) allArguments[0];
+ ContextCarrier carrier = new ContextCarrier();
+ CarrierItem next = carrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ next.setHeadValue(msg.getProperty(next.getHeadKey()));
+ }
+ AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX +
+ requiredInfo.getTopic() + CONSUMER_OPERATE_NAME + requiredInfo.getSubscriptionName(), carrier);
+ activeSpan.setComponent(ComponentsDefine.PULSAR_CONSUMER);
+ SpanLayer.asMQ(activeSpan);
+ Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
+ Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+ if (allArguments[0] != null) {
+ ContextManager.stopSpan();
+ }
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class>[] argumentsTypes, Throwable t) {
+ if (allArguments[0] != null) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java
new file mode 100644
index 000000000000..3205973b6d54
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
+
+/**
+ * Interceptor for pulsar producer enhanced instance.
+ *
+ * Here is the intercept process steps:
+ *
+ *
+ * 1. Get the {@link ProducerEnhanceRequiredInfo} and record the service url, topic name
+ * 2. Create the exit span when the producer invoke sendAsync
method
+ * 3. Inject the context to {@link Message#getProperties()}
+ * 4. Create {@link SendCallbackEnhanceRequiredInfo} with ContextManager.capture()
and set the
+ * callback enhanced instance skywalking dynamic field to the created required info.
+ * 5. Stop the exit span when sendAsync
method finished.
+ *
+ *
+ * @author penghui
+ */
+public class PulsarProducerInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String OPERATE_NAME_PREFIX = "Pulsar/";
+ public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ if (allArguments[0] != null) {
+ ProducerEnhanceRequiredInfo requiredInfo = (ProducerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+ ContextCarrier contextCarrier = new ContextCarrier();
+ String topicName = requiredInfo.getTopic();
+ AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName +
+ PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, requiredInfo.getServiceUrl());
+ Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
+ Tags.MQ_TOPIC.set(activeSpan, topicName);
+ SpanLayer.asMQ(activeSpan);
+ activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER);
+ CarrierItem next = contextCarrier.items();
+ MessageImpl msg = (MessageImpl) allArguments[0];
+ while (next.hasNext()) {
+ next = next.next();
+ msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder()
+ .setKey(next.getHeadKey())
+ .setValue(next.getHeadValue()));
+ }
+ if (allArguments.length > 1) {
+ EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
+ if (callbackInstance != null) {
+ ContextSnapshot snapshot = ContextManager.capture();
+ if (null != snapshot) {
+ SendCallbackEnhanceRequiredInfo callbackRequiredInfo = new SendCallbackEnhanceRequiredInfo();
+ callbackRequiredInfo.setTopic(topicName);
+ callbackRequiredInfo.setContextSnapshot(snapshot);
+ callbackInstance.setSkyWalkingDynamicField(callbackRequiredInfo);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+ if (allArguments[0] != null) {
+ ContextManager.stopSpan();
+ }
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ if (allArguments[0] != null) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java
new file mode 100644
index 000000000000..06b0c8f9cd06
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+
+/**
+ * Pulsar {@link org.apache.pulsar.client.impl.SendCallback} enhance required info is required by
+ * SendCallback
enhanced object method interceptor
+ *
+ * @author penghui
+ */
+public class SendCallbackEnhanceRequiredInfo {
+
+ /**
+ * topic name of the producer
+ */
+ private String topic;
+
+ /**
+ * context snapshot
+ */
+ ContextSnapshot contextSnapshot;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public ContextSnapshot getContextSnapshot() {
+ return contextSnapshot;
+ }
+
+ public void setContextSnapshot(ContextSnapshot contextSnapshot) {
+ this.contextSnapshot = contextSnapshot;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java
new file mode 100644
index 000000000000..f5573ef28fe7
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
+
+/**
+ * Interceptor for send callback enhanced instance.
+ *
+ * Here is the intercept process steps:
+ *
+ *
+ * 1. Get the @{@link SendCallbackEnhanceRequiredInfo} and record the service url, context snapshot
+ * 2. Create the local span when the callback invoke sendComplete
method
+ * 3. Stop the local span when sendComplete
method finished.
+ *
+ *
+ * @author penghui
+ */
+public class SendCallbackInterceptor implements InstanceMethodsAroundInterceptor {
+
+ private static final String OPERATION_NAME = "Pulsar/Producer/Callback";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+ if (null != requiredInfo.getContextSnapshot()) {
+ AbstractSpan activeSpan = ContextManager.createLocalSpan(OPERATION_NAME);
+ activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER);
+ Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
+ SpanLayer.asMQ(activeSpan);
+ ContextManager.continued(requiredInfo.getContextSnapshot());
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+ SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+ if (null != requiredInfo.getContextSnapshot()) {
+ Exception exceptions = (Exception) allArguments[0];
+ if (exceptions != null) {
+ ContextManager.activeSpan().errorOccurred().log(exceptions);
+ }
+ ContextManager.stopSpan();
+ }
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class>[] argumentsTypes, Throwable t) {
+ SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+ if (null != requiredInfo.getContextSnapshot()) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java
new file mode 100644
index 000000000000..e52a774041a5
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * The pulsar consumer instrumentation use {@link org.apache.pulsar.client.impl.ConsumerImpl} as an enhanced class.
+ * {@link org.apache.pulsar.client.api.Consumer} is a user-oriented interface and the implementations are
+ * {@link org.apache.pulsar.client.impl.ConsumerImpl} and {@link org.apache.pulsar.client.impl.MultiTopicsConsumerImpl}
+ *
+ * The MultiTopicsConsumerImpl is a complex type with multiple ConsumerImpl to support uses receive messages from
+ * multiple topics. As each ConsumerImpl has it's own topic name and it is the initial unit of a single topic
+ * to receiving messages, so use ConsumerImpl as an enhanced class is an effective way.
+ *
+ * Use messageProcessed
as the enhanced method since pulsar
+ * consumer has multiple ways to receiving messages such as sync method, async method and listeners.
+ * Method messageProcessed is a basic unit of ConsumerImpl, no matter which way uses uses, messageProcessed will always
+ * record the message receiving.
+ *
+ * @author penghui
+ */
+public class PulsarConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+ public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.pulsar.client.impl.PulsarClientImpl";
+ public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.ConsumerConstructorInterceptor";
+ public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarConsumerInterceptor";
+ public static final String ENHANCE_METHOD = "messageProcessed";
+ public static final String ENHANCE_METHOD_TYPE = "org.apache.pulsar.client.api.Message";
+ public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.ConsumerImpl";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[] {
+ new ConstructorInterceptPoint() {
+ @Override public ElementMatcher getConstructorMatcher() {
+ return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
+ }
+
+ @Override public String getConstructorInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher getMethodsMatcher() {
+ return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ENHANCE_METHOD_TYPE));
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java
new file mode 100644
index 000000000000..4a38f8fe7e83
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Pulsar producer instrumentation.
+ *
+ * The pulsar producer instrumentation use {@link org.apache.pulsar.client.impl.ProducerImpl} as an enhanced class.
+ * {@link org.apache.pulsar.client.api.Producer} is a user-oriented interface and the implementations of the Producer
+ * are {@link org.apache.pulsar.client.impl.PartitionedProducerImpl} and {@link org.apache.pulsar.client.impl.ProducerImpl}.
+ *
+ * And the PartitionedProducerImpl is a complex type with multiple ProducerImpl to support uses send messages to
+ * multiple partitions. As each ProducerImpl has it's own topic name and it is the initial unit of a single topic
+ * to send messages, so use ProducerImpl as an enhanced class is an effective way.
+ *
+ * About the enhanced methods, currently use {@link org.apache.pulsar.client.impl.ProducerImpl#sendAsync(
+ * org.apache.pulsar.client.api.Message, org.apache.pulsar.client.impl.SendCallback)} as the enhanced method.
+ * Pulsar provides users with two kinds of methods for sending messages sync methods and async methods. The async method
+ * use {@link java.util.concurrent.CompletableFuture as the method result}, if use this method as the enhanced method
+ * is hard to pass the snapshot of span, because can't ensure that the CompletableFuture is completed after the skywalking
+ * dynamic field was updated. But execution time of sync method will be inaccurate.
+ *
+ * @author penghui
+ */
+public class PulsarProducerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+ public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarProducerInterceptor";
+ public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.ProducerImpl";
+ public static final String ENHANCE_METHOD = "sendAsync";
+ public static final String ENHANCE_METHOD_TYPE = "org.apache.pulsar.client.impl.SendCallback";
+
+ public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.ProducerConstructorInterceptor";
+ public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.pulsar.client.impl.PulsarClientImpl";
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[]{
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher getConstructorMatcher() {
+ return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named(ENHANCE_METHOD).and(takesArgumentWithType(1, ENHANCE_METHOD_TYPE));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java
new file mode 100644
index 000000000000..865015c69a70
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.pulsar.SendCallbackEnhanceRequiredInfo;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * Pulsar producer send callback instrumentation.
+ *
+ * The send callback enhanced object will use {@link org.apache.skywalking.apm.plugin.pulsar.SendCallbackEnhanceRequiredInfo}
+ * which {@link org.apache.skywalking.apm.plugin.pulsar.PulsarProducerInterceptor} set by skywalking dynamic field of
+ * enhanced object.
+ *
+ * When a callback is complete, {@link org.apache.skywalking.apm.plugin.pulsar.SendCallbackInterceptor} will continue
+ * the {@link SendCallbackEnhanceRequiredInfo#getContextSnapshot()}.
+ *
+ * @author penghui
+ */
+public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+ public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.SendCallback";
+ public static final String ENHANCE_METHOD = "sendComplete";
+ public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.SendCallbackInterceptor";
+
+ @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher getMethodsMatcher() {
+ return named(ENHANCE_METHOD);
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 000000000000..a59280e2ff77
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentation
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
new file mode 100644
index 000000000000..371f45deb388
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerConstructorInterceptorTest {
+
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ private static final String TOPIC_NAME = "persistent://my-tenant/my-ns/my-topic";
+ private static final String SUBSCRIPTION_NAME = "my-sub";
+
+ @Mock
+ private PulsarClientImpl pulsarClient;
+
+ @Mock
+ private LookupService lookupService;
+
+ @Mock
+ private ConsumerConfigurationData consumerConfigurationData;
+
+ private ConsumerConstructorInterceptor constructorInterceptor;
+
+ private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+
+ private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
+
+ @Override public Object getSkyWalkingDynamicField() {
+ return consumerEnhanceRequiredInfo;
+ }
+
+ @Override public void setSkyWalkingDynamicField(Object value) {
+ consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
+ }
+ };
+
+ @Before
+ public void setUp() {
+ when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
+ when(pulsarClient.getLookup()).thenReturn(lookupService);
+ when(consumerConfigurationData.getSubscriptionName()).thenReturn(SUBSCRIPTION_NAME);
+ constructorInterceptor = new ConsumerConstructorInterceptor();
+ }
+
+ @Test
+ public void testOnConsumer() {
+ constructorInterceptor.onConstruct(enhancedInstance, new Object[] {pulsarClient, TOPIC_NAME, consumerConfigurationData});
+ ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
+ assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
+ assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
+ assertThat(requiredInfo.getSubscriptionName(), is(SUBSCRIPTION_NAME));
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
new file mode 100644
index 000000000000..316279254ba0
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockMessage extends MessageImpl {
+
+ private PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+
+ private transient Map properties;
+
+ public MockMessage() {
+ this(null, "1:1", new HashMap(), null, null);
+ }
+
+ public MockMessage(String topic, String msgId, Map properties, ByteBuf payload, Schema schema) {
+ super(topic, msgId, properties, payload, schema);
+ }
+
+ @Override
+ public PulsarApi.MessageMetadata.Builder getMessageBuilder() {
+ return msgMetadataBuilder;
+ }
+
+ public synchronized Map getProperties() {
+ if (this.properties == null) {
+ if (this.msgMetadataBuilder.getPropertiesCount() > 0) {
+ Map internalProperties = new HashMap();
+ for (int i = 0; i < this.msgMetadataBuilder.getPropertiesCount(); i++) {
+ PulsarApi.KeyValue kv = this.msgMetadataBuilder.getProperties(i);
+ internalProperties.put(kv.getKey(), kv.getValue());
+ }
+ this.properties = Collections.unmodifiableMap(internalProperties);
+ } else {
+ this.properties = Collections.emptyMap();
+ }
+ }
+ return this.properties;
+ }
+
+ @Override
+ public String getProperty(String name) {
+ return this.getProperties().get(name);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java
new file mode 100644
index 000000000000..eaf4aa8f3a1f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProducerConstructorInterceptorTest {
+
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ private static final String TOPIC_NAME = "persistent://my-tenant/my-ns/my-topic";
+
+ @Mock
+ private PulsarClientImpl pulsarClient;
+
+ @Mock
+ private LookupService lookupService;
+
+ private ProducerConstructorInterceptor constructorInterceptor;
+
+ private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+
+ private ProducerEnhanceRequiredInfo requiredInfo;
+
+ @Override public Object getSkyWalkingDynamicField() {
+ return requiredInfo;
+ }
+
+ @Override public void setSkyWalkingDynamicField(Object value) {
+ this.requiredInfo = (ProducerEnhanceRequiredInfo)value;
+ }
+ };
+
+ @Before
+ public void setUp() {
+ when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
+ when(pulsarClient.getLookup()).thenReturn(lookupService);
+ constructorInterceptor = new ProducerConstructorInterceptor();
+ }
+
+ @Test
+ public void testOnConsumer() {
+ constructorInterceptor.onConstruct(enhancedInstance, new Object[] {pulsarClient, TOPIC_NAME});
+ ProducerEnhanceRequiredInfo requiredInfo = (ProducerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
+ assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
+ assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
new file mode 100644
index 000000000000..7629f5cac356
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.hamcrest.MatcherAssert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.util.List;
+
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_CONSUMER;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class PulsarConsumerInterceptorTest {
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
+
+ private PulsarConsumerInterceptor consumerInterceptor;
+
+ private MockMessage msg;
+
+ private EnhancedInstance consumerInstance = new EnhancedInstance() {
+ @Override public Object getSkyWalkingDynamicField() {
+ return consumerEnhanceRequiredInfo;
+ }
+
+ @Override public void setSkyWalkingDynamicField(Object value) {
+ consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
+ }
+ };
+
+ @Before
+ public void setUp() {
+ Config.Agent.ACTIVE_V1_HEADER = true;
+ consumerInterceptor = new PulsarConsumerInterceptor();
+ consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
+
+ consumerEnhanceRequiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
+ consumerEnhanceRequiredInfo.setServiceUrl("pulsar://localhost:6650");
+ consumerEnhanceRequiredInfo.setSubscriptionName("my-sub");
+ msg = new MockMessage();
+ msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder()
+ .setKey("sw3")
+ .setValue("1.234.111|3|1|1|#192.168.1.8:18002|#/portal/|#testEntrySpan|#AQA*#AQA*Et0We0tQNQA*"));
+ }
+
+ @After
+ public void clear() {
+ Config.Agent.ACTIVE_V1_HEADER = false;
+ }
+
+ @Test
+ public void testConsumerWithNullMessage() throws Throwable {
+ consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{null}, new Class[0], null);
+ consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{null}, new Class[0], null);
+
+ List traceSegments = segmentStorage.getTraceSegments();
+ assertThat(traceSegments.size(), is(0));
+ }
+
+ @Test
+ public void testConsumerWithMessage() throws Throwable {
+ consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
+ consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
+
+ List traceSegments = segmentStorage.getTraceSegments();
+ assertThat(traceSegments.size(), is(1));
+
+ TraceSegment traceSegment = traceSegments.get(0);
+ List refs = traceSegment.getRefs();
+ assertThat(refs.size(), is(1));
+ assertTraceSegmentRef(refs.get(0));
+
+ List spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+ assertConsumerSpan(spans.get(0));
+ }
+
+ private void assertConsumerSpan(AbstractTracingSpan span) {
+ SpanAssert.assertLayer(span, SpanLayer.MQ);
+ SpanAssert.assertComponent(span, PULSAR_CONSUMER);
+ SpanAssert.assertTagSize(span, 2);
+ SpanAssert.assertTag(span, 0, "pulsar://localhost:6650");
+ SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic");
+ }
+
+ private void assertTraceSegmentRef(TraceSegmentRef ref) {
+ MatcherAssert.assertThat(SegmentRefHelper.getEntryServiceInstanceId(ref), is(1));
+ MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(3));
+ MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("1.234.111"));
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java
new file mode 100644
index 000000000000..ecbf72d0f3fa
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.util.List;
+
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_PRODUCER;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class PulsarProducerInterceptorTest {
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+
+ private PulsarProducerInterceptor producerInterceptor;
+
+ private Object[] arguments;
+ private Class[] argumentType;
+
+ private EnhancedInstance pulsarProducerInstance = new EnhancedInstance() {
+
+
+ @Override public Object getSkyWalkingDynamicField() {
+ ProducerEnhanceRequiredInfo requiredInfo = new ProducerEnhanceRequiredInfo();
+ requiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
+ requiredInfo.setServiceUrl("pulsar://localhost:6650");
+ return requiredInfo;
+ }
+
+ @Override public void setSkyWalkingDynamicField(Object value) {
+ }
+ };
+
+ private MessageImpl msg = new MockMessage();
+
+ @Before
+ public void setUp() {
+ producerInterceptor = new PulsarProducerInterceptor();
+ arguments = new Object[] {msg, null};
+ argumentType = new Class[] {MessageImpl.class};
+ }
+
+ @Test
+ public void testSendMessage() throws Throwable {
+ producerInterceptor.beforeMethod(pulsarProducerInstance, null, arguments, argumentType, null);
+ producerInterceptor.afterMethod(pulsarProducerInstance, null, arguments, argumentType, null);
+
+ List traceSegmentList = segmentStorage.getTraceSegments();
+ assertThat(traceSegmentList.size(), is(1));
+
+ TraceSegment segment = traceSegmentList.get(0);
+ List spans = SegmentHelper.getSpans(segment);
+ assertThat(spans.size(), is(1));
+
+ assertMessageSpan(spans.get(0));
+ }
+
+ @Test
+ public void testSendWithNullMessage() throws Throwable {
+ producerInterceptor.beforeMethod(pulsarProducerInstance, null, new Object[]{null}, argumentType, null);
+ producerInterceptor.afterMethod(pulsarProducerInstance, null, new Object[]{null}, argumentType, null);
+ List traceSegmentList = segmentStorage.getTraceSegments();
+ assertThat(traceSegmentList.size(), is(0));
+ }
+
+ private void assertMessageSpan(AbstractTracingSpan span) {
+ SpanAssert.assertTag(span, 0, "pulsar://localhost:6650");
+ SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic");
+ SpanAssert.assertComponent(span, PULSAR_PRODUCER);
+ SpanAssert.assertLayer(span, SpanLayer.MQ);
+ assertThat(span.getOperationName(), is("Pulsar/persistent://my-tenant/my-ns/my-topic/Producer"));
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java
new file mode 100644
index 000000000000..177f959a3bba
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.skywalking.apm.agent.core.context.MockContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentRefAssert;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.util.List;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class SendCallbackInterceptorTest {
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ private SendCallbackInterceptor callbackInterceptor;
+
+ private Object[] arguments;
+ private Object[] argumentsWithException;
+ private Class[] argumentTypes;
+
+ private EnhancedInstance callBackInstance = new EnhancedInstance() {
+
+ @Override public Object getSkyWalkingDynamicField() {
+ SendCallbackEnhanceRequiredInfo requiredInfo = new SendCallbackEnhanceRequiredInfo();
+ requiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
+ requiredInfo.setContextSnapshot(MockContextSnapshot.INSTANCE.mockContextSnapshot());
+ return requiredInfo;
+ }
+
+ @Override public void setSkyWalkingDynamicField(Object value) {
+
+ }
+ };
+
+ @Before
+ public void setUp() {
+ callbackInterceptor = new SendCallbackInterceptor();
+
+ arguments = new Object[] {
+ null
+ };
+ argumentsWithException = new Object[] {
+ new RuntimeException()
+ };
+
+ argumentTypes = new Class[] {
+ Exception.class
+ };
+ }
+
+ @Test
+ public void testCallbackWithoutException() throws Throwable {
+ callbackInterceptor.beforeMethod(callBackInstance, null, arguments, argumentTypes, null);
+ callbackInterceptor.afterMethod(callBackInstance, null, arguments, argumentTypes, null);
+
+ List traceSegments = segmentStorage.getTraceSegments();
+ assertThat(traceSegments.size(), is(1));
+ TraceSegment traceSegment = traceSegments.get(0);
+
+ List abstractSpans = SegmentHelper.getSpans(traceSegment);
+ assertThat(abstractSpans.size(), is(1));
+
+ assertCallbackSpan(abstractSpans.get(0));
+
+ assertCallbackSegmentRef(traceSegment.getRefs());
+ }
+
+ @Test
+ public void testCallbackWithException() throws Throwable {
+ callbackInterceptor.beforeMethod(callBackInstance, null, argumentsWithException, argumentTypes, null);
+ callbackInterceptor.afterMethod(callBackInstance, null, argumentsWithException, argumentTypes, null);
+
+ List traceSegments = segmentStorage.getTraceSegments();
+ assertThat(traceSegments.size(), is(1));
+ TraceSegment traceSegment = traceSegments.get(0);
+
+ List abstractSpans = SegmentHelper.getSpans(traceSegment);
+ assertThat(abstractSpans.size(), is(1));
+
+ assertCallbackSpanWithException(abstractSpans.get(0));
+
+ assertCallbackSegmentRef(traceSegment.getRefs());
+ }
+
+ private void assertCallbackSpanWithException(AbstractTracingSpan span) {
+ assertCallbackSpan(span);
+
+ SpanAssert.assertException(SpanHelper.getLogs(span).get(0), RuntimeException.class);
+ assertThat(SpanHelper.getErrorOccurred(span), is(true));
+ }
+
+ private void assertCallbackSegmentRef(List refs) {
+ assertThat(refs.size(), is(1));
+
+ TraceSegmentRef segmentRef = refs.get(0);
+ SegmentRefAssert.assertSpanId(segmentRef, 1);
+ assertThat(segmentRef.getEntryEndpointName(), is("/for-test-entryOperationName"));
+ }
+
+ private void assertCallbackSpan(AbstractTracingSpan span) {
+ assertThat(span.getOperationName(), is("Pulsar/Producer/Callback"));
+ }
+}
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 51ddb54cf95d..07db4d841ff3 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -44,6 +44,7 @@
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 1.0
* [ActiveMQ](https://github.com/apache/activemq) 5.x
* [RabbitMQ](https://www.rabbitmq.com/) 5.x
+ * [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.4.x
* NoSQL
* Redis
* [Jedis](https://github.com/xetorthio/jedis) 2.x
diff --git a/oap-server/server-starter/src/main/resources/component-libraries.yml b/oap-server/server-starter/src/main/resources/component-libraries.yml
index 88463767ffad..778ebd1456c8 100755
--- a/oap-server/server-starter/src/main/resources/component-libraries.yml
+++ b/oap-server/server-starter/src/main/resources/component-libraries.yml
@@ -242,7 +242,15 @@ Cassandra:
Light4J:
id: 71
languages: Java
-
+Pulsar:
+ id: 72
+ languages: Java
+pulsar-producer:
+ id: 73
+ languages: Java
+pulsar-consumer:
+ id: 74
+ languages: Java
# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
@@ -343,4 +351,6 @@ Component-Server-Mappings:
Npgsql.EntityFrameworkCore.PostgreSQL: PostgreSQL
transport-client: Elasticsearch
SolrJ: Solr
- cassandra-java-driver: Cassandra
\ No newline at end of file
+ cassandra-java-driver: Cassandra
+ pulsar-producer: Pulsar
+ pulsar-consumer: Pulsar
\ No newline at end of file