Skip to content

Commit

Permalink
Merge pull request alibaba#600 from zongtanghu/msg_track
Browse files Browse the repository at this point in the history
[ISSUE alibaba#525] Support the message track,add the function which supports trace topic name value configurable by users.
  • Loading branch information
dongeforever authored Dec 19, 2018
2 parents c50ada6 + d480c81 commit d572ffa
Show file tree
Hide file tree
Showing 18 changed files with 96 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public TopicConfigManager(BrokerController brokerController) {
}
{
if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
String topic = this.brokerController.getBrokerConfig().getMsgTrackTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,10 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param msgTraceSwitch switch flag instance for message track trace.
* @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) {
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch, final String traceTopicName) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
Expand All @@ -303,6 +304,11 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name());
if (!UtilAll.isBlank(traceTopicName)) {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName);
} else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
Expand All @@ -329,9 +335,10 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
*
* @param consumerGroup Consumer group.
* @param msgTraceSwitch switch flag instance for message track trace.
* @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch);
public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch, final String traceTopicName) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch, traceTopicName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
Expand Down Expand Up @@ -138,7 +139,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Default constructor.
*/
public DefaultMQProducer() {
this(MixAll.DEFAULT_PRODUCER_GROUP, null,false);
this(MixAll.DEFAULT_PRODUCER_GROUP, null);
}

/**
Expand All @@ -158,8 +159,9 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message track trace.
* @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) {
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch,final String traceTopicName) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message track trace feature
Expand All @@ -171,6 +173,11 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean ms
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name());
if (!UtilAll.isBlank(traceTopicName)) {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName);
} else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
Expand All @@ -197,9 +204,10 @@ public DefaultMQProducer(final String producerGroup) {
*
* @param producerGroup Producer group, see the name-sake field.
* @param msgTraceSwitch switch flag instance for message track trace.
* @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) {
this(producerGroup, null, msgTraceSwitch);
public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch, final String traceTopicName) {
this(producerGroup, null, msgTraceSwitch, traceTopicName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.client.trace.core.common;

import org.apache.rocketmq.common.MixAll;

public class TrackTraceConstants {
public static final String NAMESRV_ADDR = "NAMESRV_ADDR";
public static final String ADDRSRV_URL = "ADDRSRV_URL";
Expand All @@ -27,7 +25,7 @@ public class TrackTraceConstants {
public static final String WAKE_UP_NUM = "WakeUpNum";
public static final String MAX_MSG_SIZE = "MaxMsgSize";
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
public static final String TRACE_TOPIC = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
public static final String TRACE_TOPIC = "TRACK_TRACE_TOPIC_NAME";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_DISPATCHER_TYPE = "DispatcherType";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;

public AsyncArrayDispatcher(Properties properties) throws MQClientException {
dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
Expand All @@ -83,7 +84,7 @@ public AsyncArrayDispatcher(Properties properties) throws MQClientException {
this.discardCount = new AtomicLong(0L);
traceContextQueue = new ArrayBlockingQueue<TrackTraceContext>(1024);
appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);

traceTopicName = properties.getProperty(TrackTraceConstants.TRACE_TOPIC);
this.traceExecuter = new ThreadPoolExecutor(//
10, //
20, //
Expand All @@ -94,6 +95,14 @@ public AsyncArrayDispatcher(Properties properties) throws MQClientException {
traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
}

public String getTraceTopicName() {
return traceTopicName;
}

public void setTraceTopicName(String traceTopicName) {
this.traceTopicName = traceTopicName;
}

public DefaultMQProducer getTraceProducer() {
return traceProducer;
}
Expand All @@ -115,7 +124,7 @@ public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
}

public void start(Properties properties) throws MQClientException {
TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId,properties.getProperty(TrackTraceConstants.NAMESRV_ADDR));
TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TrackTraceConstants.NAMESRV_ADDR));
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
Expand Down Expand Up @@ -247,16 +256,14 @@ public void sendTraceData(List<TrackTraceContext> contextList) {
transBeanList.add(traceData);
}
for (Map.Entry<String, List<TrackTraceTransferBean>> entry : transBeanMap.entrySet()) {
//key -> dataTopic(Not trace Topic)
String dataTopic = entry.getKey();
flushData(entry.getValue(), dataTopic);
flushData(entry.getValue());
}
}

/**
* batch sending data actually
*/
private void flushData(List<TrackTraceTransferBean> transBeanList, String topic) {
private void flushData(List<TrackTraceTransferBean> transBeanList) {
if (transBeanList.size() == 0) {
return;
}
Expand Down Expand Up @@ -292,7 +299,7 @@ private void flushData(List<TrackTraceTransferBean> transBeanList, String topic)
* @param data the message track trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data) {
String topic = TrackTraceConstants.TRACE_TOPIC;
String topic = traceTopicName;
final Message message = new Message(topic, data.getBytes());

//keyset of message track trace includes msgId of or original message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.core.common.TrackTraceBean;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
import org.apache.rocketmq.client.trace.core.common.TrackTraceType;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import java.util.ArrayList;

public class SendMessageTrackHookImpl implements SendMessageHook {
Expand All @@ -43,7 +42,7 @@ public String hookName() {
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) {
if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
Expand All @@ -67,7 +66,7 @@ public void sendMessageBefore(SendMessageContext context) {
@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(TrackTraceConstants.TRACE_TOPIC)
if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,23 @@ public class DefaultMQConsumerWithTraceTest {
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
private DefaultMQPushConsumer normalPushConsumer;
private DefaultMQPushConsumer customTraceTopicpushConsumer;


private AsyncArrayDispatcher asyncArrayDispatcher;
private MQClientInstance mQClientTraceFactory;
@Mock
private MQClientAPIImpl mQClientTraceAPIImpl;
private DefaultMQProducer traceProducer;

private String customerTraceTopic = "rmq_track_trace_topic_12345";

@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup,true);
pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,"");
consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false);

normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,"");
customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class DefaultMQProducerWithTraceTest {
private AsyncArrayDispatcher asyncArrayDispatcher;

private DefaultMQProducer producer;
private DefaultMQProducer customTraceTopicproducer;
private DefaultMQProducer traceProducer;
private DefaultMQProducer normalProducer;

Expand All @@ -84,16 +85,22 @@ public class DefaultMQProducerWithTraceTest {
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis();

private String customerTraceTopic = "rmq_track_trace_topic_12345";

@Before
public void init() throws Exception {

normalProducer = new DefaultMQProducer(producerGroupTemp,false);
producer = new DefaultMQProducer(producerGroupTemp,true);
customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp,false, customerTraceTopic);
normalProducer = new DefaultMQProducer(producerGroupTemp,false,"");
producer = new DefaultMQProducer(producerGroupTemp,true,"");
producer.setNamesrvAddr("127.0.0.1:9876");
normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[] {'a', 'b' ,'c'});
asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher();
asyncArrayDispatcher.setTraceTopicName(customerTraceTopic);
asyncArrayDispatcher.getHostProducer();
asyncArrayDispatcher.getHostConsumer();
traceProducer = asyncArrayDispatcher.getTraceProducer();

producer.start();
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class BrokerConfig {
private String messageStorePlugIn = "";
@ImportantField
private boolean autoTraceBrokerEnable = false;
@ImportantField
private String msgTrackTopicName = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
* value is 1.
Expand Down Expand Up @@ -741,4 +743,12 @@ public boolean isAutoTraceBrokerEnable() {
public void setAutoTraceBrokerEnable(boolean autoTraceBrokerEnable) {
this.autoTraceBrokerEnable = autoTraceBrokerEnable;
}

public String getMsgTrackTopicName() {
return msgTrackTopicName;
}

public void setMsgTrackTopicName(String msgTrackTopicName) {
this.msgTrackTopicName = msgTrackTopicName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,24 @@ public void testConsumerFallBehindThresholdOverflow() {
long expect = 1024L * 1024 * 1024 * 16;
assertThat(new BrokerConfig().getConsumerFallbehindThreshold()).isEqualTo(expect);
}

@Test
public void testBrokerConfigAttribute() {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setNamesrvAddr("127.0.0.1:9876");
brokerConfig.setAutoCreateTopicEnable(false);
brokerConfig.setAutoTraceBrokerEnable(true);
brokerConfig.setBrokerName("broker-a");
brokerConfig.setBrokerId(0);
brokerConfig.setBrokerClusterName("DefaultCluster");
brokerConfig.setMsgTrackTopicName("RMQ_SYS_TRACK_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
assertThat(brokerConfig.getMsgTrackTopicName()).isEqualTo("RMQ_SYS_TRACK_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
assertThat(brokerConfig.isAutoTraceBrokerEnable()).isEqualTo(true);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class TraceProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true, "");
producer.start();

for (int i = 0; i < 128; i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@

public class TracePushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
//here,we use the default message track trace topic name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true, "");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public MQProducer getInstance(String nameServerAddress, String group) throws MQC
return p;
}

DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group,false);
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
defaultMQProducer.setNamesrvAddr(nameServerAddress);
MQProducer beforeProducer = null;
beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class AbstractTestCase {

@Before
public void mockLoggerAppender() throws Exception {
DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender",false));
DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender"));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void create() {
}

public void create(boolean useTLS) {
consumer = new DefaultMQPushConsumer(consumerGroup,false);
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public class ProducerFactory {

public static DefaultMQProducer getRMQProducer(String ns) {
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID(),false);
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
producer.setNamesrvAddr(ns);
try {
producer.start();
Expand Down
Loading

0 comments on commit d572ffa

Please sign in to comment.