diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java index c79b6ea8331b..7bb53c80a07c 100755 --- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java @@ -133,7 +133,10 @@ public class ComponentsDefine { public static final OfficialComponent PLAY = new OfficialComponent(68, "Play"); public static final OfficialComponent CASSANDRA_JAVA_DRIVER = new OfficialComponent(69, "cassandra-java-driver"); - + public static final OfficialComponent LIGHT_4J = new OfficialComponent(71, "Light4J"); + public static final OfficialComponent PULSAR_PRODUCER = new OfficialComponent(73, "pulsar-producer"); + + public static final OfficialComponent PULSAR_CONSUMER = new OfficialComponent(74, "pulsar-consumer"); } diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml index 886dadb97c74..d38c4bc97d0a 100644 --- a/apm-sniffer/apm-sdk-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/pom.xml @@ -76,6 +76,7 @@ solrj-7.x-plugin cassandra-java-driver-3.x-plugin light4j-plugins + pulsar-plugin pom diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml new file mode 100644 index 000000000000..bbb9e9eac35d --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml @@ -0,0 +1,42 @@ + + + + + + apm-sdk-plugin + org.apache.skywalking + 6.5.0-SNAPSHOT + + 4.0.0 + + apm-pulsar-plugin + + + 2.4.0 + + + + + org.apache.pulsar + pulsar-client + ${pulsar.version} + provided + + + \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java new file mode 100644 index 000000000000..8965c933a535 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +/** + * Interceptor of pulsar consumer constructor. + * + * The interceptor create {@link ConsumerEnhanceRequiredInfo} which is required by instance method interceptor, + * So use it to update the skywalking dynamic field of pulsar consumer enhanced instance. + * So that the instance methods can get the {@link ConsumerEnhanceRequiredInfo} + * + * @author penghui + */ +public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor { + + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0]; + String topic = (String) allArguments[1]; + ConsumerConfigurationData consumerConfigurationData = (ConsumerConfigurationData) allArguments[2]; + ConsumerEnhanceRequiredInfo requireInfo = new ConsumerEnhanceRequiredInfo(); + /* + * Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl() + * can handle the service url provider which use a dynamic service url + */ + requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl()); + requireInfo.setTopic(topic); + requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName()); + objInst.setSkyWalkingDynamicField(requireInfo); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java new file mode 100644 index 000000000000..a1276d8fa479 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +/** + * Pulsar consumer enhance required info is required by consumer enhanced object method interceptor + * + * @author penghui + */ +public class ConsumerEnhanceRequiredInfo { + + /** + * service url of the consumer + */ + private String serviceUrl; + + /** + * topic name of the consumer + */ + private String topic; + + /** + * subscription name of the consumer + */ + private String subscriptionName; + + public String getServiceUrl() { + return serviceUrl; + } + + public void setServiceUrl(String serviceUrl) { + this.serviceUrl = serviceUrl; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java new file mode 100644 index 000000000000..8db812014c9a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +/** + * Interceptor of pulsar producer constructor. + * + * The interceptor create {@link ProducerEnhanceRequiredInfo} which is required by instance method interceptor, + * So use it to update the skywalking dynamic field of pulsar producer enhanced instance. + * So that the instance methods can get the {@link ProducerEnhanceRequiredInfo} + * + * @author penghui + */ +public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor { + + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0]; + String topic = (String) allArguments[1]; + ProducerEnhanceRequiredInfo producerEnhanceRequiredInfo = new ProducerEnhanceRequiredInfo(); + producerEnhanceRequiredInfo.setTopic(topic); + /* + * Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl() + * can handle the service url provider which use a dynamic service url + */ + producerEnhanceRequiredInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl()); + objInst.setSkyWalkingDynamicField(producerEnhanceRequiredInfo); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java new file mode 100644 index 000000000000..210e66a7eb9a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +/** + * Pulsar producer enhance required info is required by producer enhanced object method interceptor + * + * @author penghui + */ +public class ProducerEnhanceRequiredInfo { + + /** + * service url of the pulsar producer + */ + private String serviceUrl; + + /** + * topic name of the pulsar producer + */ + private String topic; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getServiceUrl() { + return serviceUrl; + } + + public void setServiceUrl(String serviceUrl) { + this.serviceUrl = serviceUrl; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java new file mode 100644 index 000000000000..61ebe0411bd0 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.client.api.Message; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; + +import java.lang.reflect.Method; + +/** + * Interceptor for pulsar consumer enhanced instance + * + * Here is the intercept process steps: + * + *
+ *  1. Get the @{@link ConsumerEnhanceRequiredInfo} and record the service url, topic name and subscription name
+ *  2. Create the entry span when call messageProcessed method
+ *  3. Extract all the Trace Context when call messageProcessed method
+ *  4. Stop the entry span when messageProcessed method finished.
+ * 
+ * + * @author penghui + */ +public class PulsarConsumerInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String OPERATE_NAME_PREFIX = "Pulsar/"; + public static final String CONSUMER_OPERATE_NAME = "/Consumer/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + if (allArguments[0] != null) { + ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField(); + Message msg = (Message) allArguments[0]; + ContextCarrier carrier = new ContextCarrier(); + CarrierItem next = carrier.items(); + while (next.hasNext()) { + next = next.next(); + next.setHeadValue(msg.getProperty(next.getHeadKey())); + } + AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + + requiredInfo.getTopic() + CONSUMER_OPERATE_NAME + requiredInfo.getSubscriptionName(), carrier); + activeSpan.setComponent(ComponentsDefine.PULSAR_CONSUMER); + SpanLayer.asMQ(activeSpan); + Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl()); + Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic()); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + if (allArguments[0] != null) { + ContextManager.stopSpan(); + } + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + if (allArguments[0] != null) { + ContextManager.activeSpan().errorOccurred().log(t); + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java new file mode 100644 index 000000000000..3205973b6d54 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; + +import java.lang.reflect.Method; + +/** + * Interceptor for pulsar producer enhanced instance. + * + * Here is the intercept process steps: + * + *
+ *  1. Get the {@link ProducerEnhanceRequiredInfo} and record the service url, topic name
+ *  2. Create the exit span when the producer invoke sendAsync method
+ *  3. Inject the context to {@link Message#getProperties()}
+ *  4. Create {@link SendCallbackEnhanceRequiredInfo} with ContextManager.capture() and set the
+ *     callback enhanced instance skywalking dynamic field to the created required info.
+ *  5. Stop the exit span when sendAsync method finished.
+ * 
+ * + * @author penghui + */ +public class PulsarProducerInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String OPERATE_NAME_PREFIX = "Pulsar/"; + public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + if (allArguments[0] != null) { + ProducerEnhanceRequiredInfo requiredInfo = (ProducerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField(); + ContextCarrier contextCarrier = new ContextCarrier(); + String topicName = requiredInfo.getTopic(); + AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, requiredInfo.getServiceUrl()); + Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl()); + Tags.MQ_TOPIC.set(activeSpan, topicName); + SpanLayer.asMQ(activeSpan); + activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER); + CarrierItem next = contextCarrier.items(); + MessageImpl msg = (MessageImpl) allArguments[0]; + while (next.hasNext()) { + next = next.next(); + msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder() + .setKey(next.getHeadKey()) + .setValue(next.getHeadValue())); + } + if (allArguments.length > 1) { + EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1]; + if (callbackInstance != null) { + ContextSnapshot snapshot = ContextManager.capture(); + if (null != snapshot) { + SendCallbackEnhanceRequiredInfo callbackRequiredInfo = new SendCallbackEnhanceRequiredInfo(); + callbackRequiredInfo.setTopic(topicName); + callbackRequiredInfo.setContextSnapshot(snapshot); + callbackInstance.setSkyWalkingDynamicField(callbackRequiredInfo); + } + } + } + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + if (allArguments[0] != null) { + ContextManager.stopSpan(); + } + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + if (allArguments[0] != null) { + ContextManager.activeSpan().errorOccurred().log(t); + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java new file mode 100644 index 000000000000..06b0c8f9cd06 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; + +/** + * Pulsar {@link org.apache.pulsar.client.impl.SendCallback} enhance required info is required by + * SendCallback enhanced object method interceptor + * + * @author penghui + */ +public class SendCallbackEnhanceRequiredInfo { + + /** + * topic name of the producer + */ + private String topic; + + /** + * context snapshot + */ + ContextSnapshot contextSnapshot; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public ContextSnapshot getContextSnapshot() { + return contextSnapshot; + } + + public void setContextSnapshot(ContextSnapshot contextSnapshot) { + this.contextSnapshot = contextSnapshot; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java new file mode 100644 index 000000000000..f5573ef28fe7 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; + +import java.lang.reflect.Method; + +/** + * Interceptor for send callback enhanced instance. + * + * Here is the intercept process steps: + * + *
+ *  1. Get the @{@link SendCallbackEnhanceRequiredInfo} and record the service url, context snapshot
+ *  2. Create the local span when the callback invoke sendComplete method
+ *  3. Stop the local span when sendComplete method finished.
+ * 
+ * + * @author penghui + */ +public class SendCallbackInterceptor implements InstanceMethodsAroundInterceptor { + + private static final String OPERATION_NAME = "Pulsar/Producer/Callback"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField(); + if (null != requiredInfo.getContextSnapshot()) { + AbstractSpan activeSpan = ContextManager.createLocalSpan(OPERATION_NAME); + activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER); + Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic()); + SpanLayer.asMQ(activeSpan); + ContextManager.continued(requiredInfo.getContextSnapshot()); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField(); + if (null != requiredInfo.getContextSnapshot()) { + Exception exceptions = (Exception) allArguments[0]; + if (exceptions != null) { + ContextManager.activeSpan().errorOccurred().log(exceptions); + } + ContextManager.stopSpan(); + } + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField(); + if (null != requiredInfo.getContextSnapshot()) { + ContextManager.activeSpan().errorOccurred().log(t); + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java new file mode 100644 index 000000000000..e52a774041a5 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * The pulsar consumer instrumentation use {@link org.apache.pulsar.client.impl.ConsumerImpl} as an enhanced class. + * {@link org.apache.pulsar.client.api.Consumer} is a user-oriented interface and the implementations are + * {@link org.apache.pulsar.client.impl.ConsumerImpl} and {@link org.apache.pulsar.client.impl.MultiTopicsConsumerImpl} + * + * The MultiTopicsConsumerImpl is a complex type with multiple ConsumerImpl to support uses receive messages from + * multiple topics. As each ConsumerImpl has it's own topic name and it is the initial unit of a single topic + * to receiving messages, so use ConsumerImpl as an enhanced class is an effective way. + * + * Use messageProcessed as the enhanced method since pulsar + * consumer has multiple ways to receiving messages such as sync method, async method and listeners. + * Method messageProcessed is a basic unit of ConsumerImpl, no matter which way uses uses, messageProcessed will always + * record the message receiving. + * + * @author penghui + */ +public class PulsarConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.pulsar.client.impl.PulsarClientImpl"; + public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.ConsumerConstructorInterceptor"; + public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarConsumerInterceptor"; + public static final String ENHANCE_METHOD = "messageProcessed"; + public static final String ENHANCE_METHOD_TYPE = "org.apache.pulsar.client.api.Message"; + public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.ConsumerImpl"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override public ElementMatcher getConstructorMatcher() { + return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE); + } + + @Override public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ENHANCE_METHOD_TYPE)); + } + + @Override public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java new file mode 100644 index 000000000000..4a38f8fe7e83 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * Pulsar producer instrumentation. + * + * The pulsar producer instrumentation use {@link org.apache.pulsar.client.impl.ProducerImpl} as an enhanced class. + * {@link org.apache.pulsar.client.api.Producer} is a user-oriented interface and the implementations of the Producer + * are {@link org.apache.pulsar.client.impl.PartitionedProducerImpl} and {@link org.apache.pulsar.client.impl.ProducerImpl}. + * + * And the PartitionedProducerImpl is a complex type with multiple ProducerImpl to support uses send messages to + * multiple partitions. As each ProducerImpl has it's own topic name and it is the initial unit of a single topic + * to send messages, so use ProducerImpl as an enhanced class is an effective way. + * + * About the enhanced methods, currently use {@link org.apache.pulsar.client.impl.ProducerImpl#sendAsync( + * org.apache.pulsar.client.api.Message, org.apache.pulsar.client.impl.SendCallback)} as the enhanced method. + * Pulsar provides users with two kinds of methods for sending messages sync methods and async methods. The async method + * use {@link java.util.concurrent.CompletableFuture as the method result}, if use this method as the enhanced method + * is hard to pass the snapshot of span, because can't ensure that the CompletableFuture is completed after the skywalking + * dynamic field was updated. But execution time of sync method will be inaccurate. + * + * @author penghui + */ +public class PulsarProducerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarProducerInterceptor"; + public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.ProducerImpl"; + public static final String ENHANCE_METHOD = "sendAsync"; + public static final String ENHANCE_METHOD_TYPE = "org.apache.pulsar.client.impl.SendCallback"; + + public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.ProducerConstructorInterceptor"; + public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.pulsar.client.impl.PulsarClientImpl"; + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[]{ + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named(ENHANCE_METHOD).and(takesArgumentWithType(1, ENHANCE_METHOD_TYPE)); + } + + @Override + public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java new file mode 100644 index 000000000000..865015c69a70 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.plugin.pulsar.SendCallbackEnhanceRequiredInfo; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * Pulsar producer send callback instrumentation. + * + * The send callback enhanced object will use {@link org.apache.skywalking.apm.plugin.pulsar.SendCallbackEnhanceRequiredInfo} + * which {@link org.apache.skywalking.apm.plugin.pulsar.PulsarProducerInterceptor} set by skywalking dynamic field of + * enhanced object. + * + * When a callback is complete, {@link org.apache.skywalking.apm.plugin.pulsar.SendCallbackInterceptor} will continue + * the {@link SendCallbackEnhanceRequiredInfo#getContextSnapshot()}. + * + * @author penghui + */ +public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.SendCallback"; + public static final String ENHANCE_METHOD = "sendComplete"; + public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.SendCallbackInterceptor"; + + @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(ENHANCE_METHOD); + } + + @Override public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def new file mode 100644 index 000000000000..a59280e2ff77 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentation +pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation +pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java new file mode 100644 index 000000000000..371f45deb388 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ConsumerConstructorInterceptorTest { + + private static final String SERVICE_URL = "pulsar://localhost:6650"; + private static final String TOPIC_NAME = "persistent://my-tenant/my-ns/my-topic"; + private static final String SUBSCRIPTION_NAME = "my-sub"; + + @Mock + private PulsarClientImpl pulsarClient; + + @Mock + private LookupService lookupService; + + @Mock + private ConsumerConfigurationData consumerConfigurationData; + + private ConsumerConstructorInterceptor constructorInterceptor; + + private EnhancedInstance enhancedInstance = new EnhancedInstance() { + + private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo; + + @Override public Object getSkyWalkingDynamicField() { + return consumerEnhanceRequiredInfo; + } + + @Override public void setSkyWalkingDynamicField(Object value) { + consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value; + } + }; + + @Before + public void setUp() { + when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL); + when(pulsarClient.getLookup()).thenReturn(lookupService); + when(consumerConfigurationData.getSubscriptionName()).thenReturn(SUBSCRIPTION_NAME); + constructorInterceptor = new ConsumerConstructorInterceptor(); + } + + @Test + public void testOnConsumer() { + constructorInterceptor.onConstruct(enhancedInstance, new Object[] {pulsarClient, TOPIC_NAME, consumerConfigurationData}); + ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField(); + assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL)); + assertThat(requiredInfo.getTopic(), is(TOPIC_NAME)); + assertThat(requiredInfo.getSubscriptionName(), is(SUBSCRIPTION_NAME)); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java new file mode 100644 index 000000000000..316279254ba0 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class MockMessage extends MessageImpl { + + private PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder(); + + private transient Map properties; + + public MockMessage() { + this(null, "1:1", new HashMap(), null, null); + } + + public MockMessage(String topic, String msgId, Map properties, ByteBuf payload, Schema schema) { + super(topic, msgId, properties, payload, schema); + } + + @Override + public PulsarApi.MessageMetadata.Builder getMessageBuilder() { + return msgMetadataBuilder; + } + + public synchronized Map getProperties() { + if (this.properties == null) { + if (this.msgMetadataBuilder.getPropertiesCount() > 0) { + Map internalProperties = new HashMap(); + for (int i = 0; i < this.msgMetadataBuilder.getPropertiesCount(); i++) { + PulsarApi.KeyValue kv = this.msgMetadataBuilder.getProperties(i); + internalProperties.put(kv.getKey(), kv.getValue()); + } + this.properties = Collections.unmodifiableMap(internalProperties); + } else { + this.properties = Collections.emptyMap(); + } + } + return this.properties; + } + + @Override + public String getProperty(String name) { + return this.getProperties().get(name); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java new file mode 100644 index 000000000000..eaf4aa8f3a1f --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ProducerConstructorInterceptorTest { + + private static final String SERVICE_URL = "pulsar://localhost:6650"; + private static final String TOPIC_NAME = "persistent://my-tenant/my-ns/my-topic"; + + @Mock + private PulsarClientImpl pulsarClient; + + @Mock + private LookupService lookupService; + + private ProducerConstructorInterceptor constructorInterceptor; + + private EnhancedInstance enhancedInstance = new EnhancedInstance() { + + private ProducerEnhanceRequiredInfo requiredInfo; + + @Override public Object getSkyWalkingDynamicField() { + return requiredInfo; + } + + @Override public void setSkyWalkingDynamicField(Object value) { + this.requiredInfo = (ProducerEnhanceRequiredInfo)value; + } + }; + + @Before + public void setUp() { + when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL); + when(pulsarClient.getLookup()).thenReturn(lookupService); + constructorInterceptor = new ProducerConstructorInterceptor(); + } + + @Test + public void testOnConsumer() { + constructorInterceptor.onConstruct(enhancedInstance, new Object[] {pulsarClient, TOPIC_NAME}); + ProducerEnhanceRequiredInfo requiredInfo = (ProducerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField(); + assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL)); + assertThat(requiredInfo.getTopic(), is(TOPIC_NAME)); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java new file mode 100644 index 000000000000..7629f5cac356 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import java.util.List; + +import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_CONSUMER; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class PulsarConsumerInterceptorTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo; + + private PulsarConsumerInterceptor consumerInterceptor; + + private MockMessage msg; + + private EnhancedInstance consumerInstance = new EnhancedInstance() { + @Override public Object getSkyWalkingDynamicField() { + return consumerEnhanceRequiredInfo; + } + + @Override public void setSkyWalkingDynamicField(Object value) { + consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value; + } + }; + + @Before + public void setUp() { + Config.Agent.ACTIVE_V1_HEADER = true; + consumerInterceptor = new PulsarConsumerInterceptor(); + consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo(); + + consumerEnhanceRequiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic"); + consumerEnhanceRequiredInfo.setServiceUrl("pulsar://localhost:6650"); + consumerEnhanceRequiredInfo.setSubscriptionName("my-sub"); + msg = new MockMessage(); + msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder() + .setKey("sw3") + .setValue("1.234.111|3|1|1|#192.168.1.8:18002|#/portal/|#testEntrySpan|#AQA*#AQA*Et0We0tQNQA*")); + } + + @After + public void clear() { + Config.Agent.ACTIVE_V1_HEADER = false; + } + + @Test + public void testConsumerWithNullMessage() throws Throwable { + consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{null}, new Class[0], null); + consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{null}, new Class[0], null); + + List traceSegments = segmentStorage.getTraceSegments(); + assertThat(traceSegments.size(), is(0)); + } + + @Test + public void testConsumerWithMessage() throws Throwable { + consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null); + consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null); + + List traceSegments = segmentStorage.getTraceSegments(); + assertThat(traceSegments.size(), is(1)); + + TraceSegment traceSegment = traceSegments.get(0); + List refs = traceSegment.getRefs(); + assertThat(refs.size(), is(1)); + assertTraceSegmentRef(refs.get(0)); + + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + assertConsumerSpan(spans.get(0)); + } + + private void assertConsumerSpan(AbstractTracingSpan span) { + SpanAssert.assertLayer(span, SpanLayer.MQ); + SpanAssert.assertComponent(span, PULSAR_CONSUMER); + SpanAssert.assertTagSize(span, 2); + SpanAssert.assertTag(span, 0, "pulsar://localhost:6650"); + SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic"); + } + + private void assertTraceSegmentRef(TraceSegmentRef ref) { + MatcherAssert.assertThat(SegmentRefHelper.getEntryServiceInstanceId(ref), is(1)); + MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(3)); + MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("1.234.111")); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java new file mode 100644 index 000000000000..ecbf72d0f3fa --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import java.util.List; + +import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_PRODUCER; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class PulsarProducerInterceptorTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + + private PulsarProducerInterceptor producerInterceptor; + + private Object[] arguments; + private Class[] argumentType; + + private EnhancedInstance pulsarProducerInstance = new EnhancedInstance() { + + + @Override public Object getSkyWalkingDynamicField() { + ProducerEnhanceRequiredInfo requiredInfo = new ProducerEnhanceRequiredInfo(); + requiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic"); + requiredInfo.setServiceUrl("pulsar://localhost:6650"); + return requiredInfo; + } + + @Override public void setSkyWalkingDynamicField(Object value) { + } + }; + + private MessageImpl msg = new MockMessage(); + + @Before + public void setUp() { + producerInterceptor = new PulsarProducerInterceptor(); + arguments = new Object[] {msg, null}; + argumentType = new Class[] {MessageImpl.class}; + } + + @Test + public void testSendMessage() throws Throwable { + producerInterceptor.beforeMethod(pulsarProducerInstance, null, arguments, argumentType, null); + producerInterceptor.afterMethod(pulsarProducerInstance, null, arguments, argumentType, null); + + List traceSegmentList = segmentStorage.getTraceSegments(); + assertThat(traceSegmentList.size(), is(1)); + + TraceSegment segment = traceSegmentList.get(0); + List spans = SegmentHelper.getSpans(segment); + assertThat(spans.size(), is(1)); + + assertMessageSpan(spans.get(0)); + } + + @Test + public void testSendWithNullMessage() throws Throwable { + producerInterceptor.beforeMethod(pulsarProducerInstance, null, new Object[]{null}, argumentType, null); + producerInterceptor.afterMethod(pulsarProducerInstance, null, new Object[]{null}, argumentType, null); + List traceSegmentList = segmentStorage.getTraceSegments(); + assertThat(traceSegmentList.size(), is(0)); + } + + private void assertMessageSpan(AbstractTracingSpan span) { + SpanAssert.assertTag(span, 0, "pulsar://localhost:6650"); + SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic"); + SpanAssert.assertComponent(span, PULSAR_PRODUCER); + SpanAssert.assertLayer(span, SpanLayer.MQ); + assertThat(span.getOperationName(), is("Pulsar/persistent://my-tenant/my-ns/my-topic/Producer")); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java new file mode 100644 index 000000000000..177f959a3bba --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.pulsar; + +import org.apache.skywalking.apm.agent.core.context.MockContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SpanHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentRefAssert; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import java.util.List; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class SendCallbackInterceptorTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + private SendCallbackInterceptor callbackInterceptor; + + private Object[] arguments; + private Object[] argumentsWithException; + private Class[] argumentTypes; + + private EnhancedInstance callBackInstance = new EnhancedInstance() { + + @Override public Object getSkyWalkingDynamicField() { + SendCallbackEnhanceRequiredInfo requiredInfo = new SendCallbackEnhanceRequiredInfo(); + requiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic"); + requiredInfo.setContextSnapshot(MockContextSnapshot.INSTANCE.mockContextSnapshot()); + return requiredInfo; + } + + @Override public void setSkyWalkingDynamicField(Object value) { + + } + }; + + @Before + public void setUp() { + callbackInterceptor = new SendCallbackInterceptor(); + + arguments = new Object[] { + null + }; + argumentsWithException = new Object[] { + new RuntimeException() + }; + + argumentTypes = new Class[] { + Exception.class + }; + } + + @Test + public void testCallbackWithoutException() throws Throwable { + callbackInterceptor.beforeMethod(callBackInstance, null, arguments, argumentTypes, null); + callbackInterceptor.afterMethod(callBackInstance, null, arguments, argumentTypes, null); + + List traceSegments = segmentStorage.getTraceSegments(); + assertThat(traceSegments.size(), is(1)); + TraceSegment traceSegment = traceSegments.get(0); + + List abstractSpans = SegmentHelper.getSpans(traceSegment); + assertThat(abstractSpans.size(), is(1)); + + assertCallbackSpan(abstractSpans.get(0)); + + assertCallbackSegmentRef(traceSegment.getRefs()); + } + + @Test + public void testCallbackWithException() throws Throwable { + callbackInterceptor.beforeMethod(callBackInstance, null, argumentsWithException, argumentTypes, null); + callbackInterceptor.afterMethod(callBackInstance, null, argumentsWithException, argumentTypes, null); + + List traceSegments = segmentStorage.getTraceSegments(); + assertThat(traceSegments.size(), is(1)); + TraceSegment traceSegment = traceSegments.get(0); + + List abstractSpans = SegmentHelper.getSpans(traceSegment); + assertThat(abstractSpans.size(), is(1)); + + assertCallbackSpanWithException(abstractSpans.get(0)); + + assertCallbackSegmentRef(traceSegment.getRefs()); + } + + private void assertCallbackSpanWithException(AbstractTracingSpan span) { + assertCallbackSpan(span); + + SpanAssert.assertException(SpanHelper.getLogs(span).get(0), RuntimeException.class); + assertThat(SpanHelper.getErrorOccurred(span), is(true)); + } + + private void assertCallbackSegmentRef(List refs) { + assertThat(refs.size(), is(1)); + + TraceSegmentRef segmentRef = refs.get(0); + SegmentRefAssert.assertSpanId(segmentRef, 1); + assertThat(segmentRef.getEntryEndpointName(), is("/for-test-entryOperationName")); + } + + private void assertCallbackSpan(AbstractTracingSpan span) { + assertThat(span.getOperationName(), is("Pulsar/Producer/Callback")); + } +} diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md index 51ddb54cf95d..07db4d841ff3 100644 --- a/docs/en/setup/service-agent/java-agent/Supported-list.md +++ b/docs/en/setup/service-agent/java-agent/Supported-list.md @@ -44,6 +44,7 @@ * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 1.0 * [ActiveMQ](https://github.com/apache/activemq) 5.x * [RabbitMQ](https://www.rabbitmq.com/) 5.x + * [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.4.x * NoSQL * Redis * [Jedis](https://github.com/xetorthio/jedis) 2.x diff --git a/oap-server/server-starter/src/main/resources/component-libraries.yml b/oap-server/server-starter/src/main/resources/component-libraries.yml index 88463767ffad..778ebd1456c8 100755 --- a/oap-server/server-starter/src/main/resources/component-libraries.yml +++ b/oap-server/server-starter/src/main/resources/component-libraries.yml @@ -242,7 +242,15 @@ Cassandra: Light4J: id: 71 languages: Java - +Pulsar: + id: 72 + languages: Java +pulsar-producer: + id: 73 + languages: Java +pulsar-consumer: + id: 74 + languages: Java # .NET/.NET Core components # [3000, 4000) for C#/.NET only @@ -343,4 +351,6 @@ Component-Server-Mappings: Npgsql.EntityFrameworkCore.PostgreSQL: PostgreSQL transport-client: Elasticsearch SolrJ: Solr - cassandra-java-driver: Cassandra \ No newline at end of file + cassandra-java-driver: Cassandra + pulsar-producer: Pulsar + pulsar-consumer: Pulsar \ No newline at end of file