Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pulsar apm plugin #3476

Merged
merged 32 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
91ebca5
Add pulsar apm plugin.
codelipenghui Sep 17, 2019
5b6e792
Fix check style
codelipenghui Sep 17, 2019
e562e51
Merge branch 'master' into pulsar-plugin
wu-sheng Sep 17, 2019
76e9b8e
Fix pulsar consumer component define.
codelipenghui Sep 18, 2019
2d09d59
Merge remote-tracking branch 'origin/pulsar-plugin' into pulsar-plugin
codelipenghui Sep 18, 2019
3167425
Merge branch 'master' into pulsar-plugin
wu-sheng Sep 18, 2019
44bb61d
Add pulsar to component-libraries.yml
codelipenghui Sep 19, 2019
a8bda15
Merge branch 'master' into pulsar-plugin
codelipenghui Sep 19, 2019
0222726
Fix error interceptor class.
codelipenghui Sep 19, 2019
bf2e90a
Add pulsar to agent support list.
codelipenghui Sep 19, 2019
6e8c09f
Merge branch 'master' into pulsar-plugin
wu-sheng Sep 19, 2019
ec7b752
Add Pulsar to ComponentsDefine and component-libraries.yml
codelipenghui Sep 19, 2019
144d639
Merge remote-tracking branch 'origin/pulsar-plugin' into pulsar-plugin
codelipenghui Sep 19, 2019
67383b9
Move create entry span log of consumer from after method to before me…
codelipenghui Sep 19, 2019
88a4a1d
Fix send callback issue when exception cause.
codelipenghui Sep 19, 2019
ddc3b9e
Fix test issues
codelipenghui Sep 19, 2019
4da8118
Move pulsar plugin to optional plugins
codelipenghui Sep 20, 2019
c215cb3
Add none messages tests for interceptor of producer and consumer.
codelipenghui Sep 20, 2019
087c3d1
Merge branch 'master' into pulsar-plugin
codelipenghui Sep 20, 2019
faba185
Merge branch 'master' into pulsar-plugin
codelipenghui Sep 20, 2019
17c6e83
Remove unused comments.
codelipenghui Sep 20, 2019
5811942
Move pulsar plugin back to the apm-sdk-plugin
codelipenghui Sep 20, 2019
8bc54eb
Fix comments
codelipenghui Sep 23, 2019
8da65ca
Merge branch 'master' into pulsar-plugin
codelipenghui Sep 23, 2019
ef9136f
remove set startTime for entry span(default is set by System.currentT…
codelipenghui Sep 23, 2019
a217106
Merge branch 'master' into pulsar-plugin
wu-sheng Sep 23, 2019
53f2e76
Merge branch 'master' into pulsar-plugin
wu-sheng Sep 23, 2019
e0eb229
Merge branch 'master' into pulsar-plugin
wu-sheng Sep 24, 2019
11e3bb1
Merge branch 'master' into pulsar-plugin
wu-sheng Sep 24, 2019
69bba9b
Fix comments
codelipenghui Sep 24, 2019
88e1ef3
Merge remote-tracking branch 'origin/pulsar-plugin' into pulsar-plugin
codelipenghui Sep 24, 2019
2d18940
Merge branch 'master' into pulsar-plugin
wu-sheng Sep 24, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ public class ComponentsDefine {
public static final OfficialComponent PLAY = new OfficialComponent(68, "Play");

public static final OfficialComponent CASSANDRA_JAVA_DRIVER = new OfficialComponent(69, "cassandra-java-driver");

public static final OfficialComponent LIGHT_4J = new OfficialComponent(71, "Light4J");

public static final OfficialComponent PULSAR_PRODUCER = new OfficialComponent(73, "pulsar-producer");

public static final OfficialComponent PULSAR_CONSUMER = new OfficialComponent(74, "pulsar-consumer");
}
1 change: 1 addition & 0 deletions apm-sniffer/apm-sdk-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<module>solrj-7.x-plugin</module>
<module>cassandra-java-driver-3.x-plugin</module>
<module>light4j-plugins</module>
<module>pulsar-plugin</module>
</modules>
<packaging>pom</packaging>

Expand Down
42 changes: 42 additions & 0 deletions apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>apm-pulsar-plugin</artifactId>

<properties>
<pulsar.version>2.4.0</pulsar.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.pulsar;

import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;

/**
* Interceptor of pulsar consumer constructor.
*
* The interceptor create {@link ConsumerEnhanceRequiredInfo} which is required by instance method interceptor,
* So use it to update the skywalking dynamic field of pulsar consumer enhanced instance.
* So that the instance methods can get the {@link ConsumerEnhanceRequiredInfo}
*
* @author penghui
*/
public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
String topic = (String) allArguments[1];
ConsumerConfigurationData consumerConfigurationData = (ConsumerConfigurationData) allArguments[2];
ConsumerEnhanceRequiredInfo requireInfo = new ConsumerEnhanceRequiredInfo();
/*
* Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
* can handle the service url provider which use a dynamic service url
*/
requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
requireInfo.setTopic(topic);
requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
objInst.setSkyWalkingDynamicField(requireInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.pulsar;

/**
* Pulsar consumer enhance required info is required by consumer enhanced object method interceptor
*
* @author penghui
*/
public class ConsumerEnhanceRequiredInfo {

/**
* service url of the consumer
*/
private String serviceUrl;

/**
* topic name of the consumer
*/
private String topic;

/**
* subscription name of the consumer
*/
private String subscriptionName;

public String getServiceUrl() {
return serviceUrl;
}

public void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getSubscriptionName() {
return subscriptionName;
}

public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.pulsar;

import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;

/**
* Interceptor of pulsar producer constructor.
*
* The interceptor create {@link ProducerEnhanceRequiredInfo} which is required by instance method interceptor,
* So use it to update the skywalking dynamic field of pulsar producer enhanced instance.
* So that the instance methods can get the {@link ProducerEnhanceRequiredInfo}
*
* @author penghui
*/
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
String topic = (String) allArguments[1];
ProducerEnhanceRequiredInfo producerEnhanceRequiredInfo = new ProducerEnhanceRequiredInfo();
producerEnhanceRequiredInfo.setTopic(topic);
/*
* Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
* can handle the service url provider which use a dynamic service url
*/
producerEnhanceRequiredInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
objInst.setSkyWalkingDynamicField(producerEnhanceRequiredInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.pulsar;

/**
* Pulsar producer enhance required info is required by producer enhanced object method interceptor
*
* @author penghui
*/
public class ProducerEnhanceRequiredInfo {

/**
* service url of the pulsar producer
*/
private String serviceUrl;

/**
* topic name of the pulsar producer
*/
private String topic;

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getServiceUrl() {
return serviceUrl;
}

public void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.pulsar;

import org.apache.pulsar.client.api.Message;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;

import java.lang.reflect.Method;

/**
* Interceptor for pulsar consumer enhanced instance
*
* Here is the intercept process steps:
*
* <pre>
* 1. Get the @{@link ConsumerEnhanceRequiredInfo} and record the service url, topic name and subscription name
* 2. Create the entry span when call <code>messageProcessed</code> method
* 3. Extract all the <code>Trace Context</code> when call <code>messageProcessed</code> method
* 4. Stop the entry span when <code>messageProcessed</code> method finished.
* </pre>
*
* @author penghui
*/
public class PulsarConsumerInterceptor implements InstanceMethodsAroundInterceptor {

public static final String OPERATE_NAME_PREFIX = "Pulsar/";
public static final String CONSUMER_OPERATE_NAME = "/Consumer/";

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
if (allArguments[0] != null) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
Message msg = (Message) allArguments[0];
ContextCarrier carrier = new ContextCarrier();
CarrierItem next = carrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(msg.getProperty(next.getHeadKey()));
}
AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX +
requiredInfo.getTopic() + CONSUMER_OPERATE_NAME + requiredInfo.getSubscriptionName(), carrier);
activeSpan.setComponent(ComponentsDefine.PULSAR_CONSUMER);
SpanLayer.asMQ(activeSpan);
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
}
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
if (allArguments[0] != null) {
ContextManager.stopSpan();
}
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
if (allArguments[0] != null) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
}
Loading