Skip to content

Commit

Permalink
I have already fix my code as suggestion.Please help to review them.[…
Browse files Browse the repository at this point in the history
…ISSUE alibaba#525].
  • Loading branch information
zongtanghu committed Nov 28, 2018
1 parent e04db85 commit 47556d6
Show file tree
Hide file tree
Showing 36 changed files with 501 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public TopicConfigManager(BrokerController brokerController) {
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// MixAll.RMQ_SYS_TRACK_TRACE_TOPIC
if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
Expand Down Expand Up @@ -165,6 +164,10 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
if (topicConfig != null)
return topicConfig;

if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
return topicConfig;
}

TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Default constructor.
*/
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely(), false);
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}

/**
Expand All @@ -275,12 +275,26 @@ public DefaultMQPushConsumer() {
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
//if client open the message track trace feature
if (msgTraceSwitch) {
try {
Properties tempProperties = new Properties();
Expand All @@ -307,16 +321,26 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely(),false);
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}

/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(),msgTraceSwitch);
this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch);
}

/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli

this.rebalanceService = new RebalanceService(this);

this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP,false);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);

this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,18 @@ public DefaultMQProducer() {
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message track trace
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

/**
* Constructor specifying both producer group and RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) {
this.producerGroup = producerGroup;
Expand Down Expand Up @@ -177,8 +188,18 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean ms
*
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}

/**
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) {
this(producerGroup, null,msgTraceSwitch);
this(producerGroup, null, msgTraceSwitch);
}

/**
Expand All @@ -187,7 +208,7 @@ public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) {
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(RPCHook rpcHook) {
this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook,false);
this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ public TransactionMQProducer() {
}

public TransactionMQProducer(final String producerGroup) {
super(producerGroup,false);
super(producerGroup);
}

public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
super(producerGroup, rpcHook,false);
super(producerGroup, rpcHook);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package org.apache.rocketmq.client.trace.core.common;

import org.apache.rocketmq.client.trace.core.Utils.TrackTraceUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;

public class TrackTraceBean {
private static final String LOCAL_ADDRESS = TrackTraceUtils.getLocalAddress();
private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
private String topic = "";
private String msgId = "";
private String offsetMsgId = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.rocketmq.client.trace.core.common;

/**
* Created by zongtanghu on 2018/11/6.
*/
public enum TrackTraceType {
Pub,
SubBefore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class DefaultMQPushConsumerTest {
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup,false);
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);

Expand Down Expand Up @@ -252,7 +252,7 @@ public void testCheckConfig() {
}

private DefaultMQPushConsumer createPushConsumer() {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup,false);
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void checkConfigTest() throws MQClientException {
//test message
thrown.expectMessage("consumeThreadMin (10) is larger than consumeThreadMax (9)");

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group",false);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");

consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(9);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@RunWith(MockitoJUnitRunner.class)
public class RebalancePushImplTest {
@Spy
private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest",false), null);
private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"), null);
@Mock
private MQClientInstance mqClientInstance;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class DefaultMQProducerTest {
@Before
public void init() throws Exception {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp,false);
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setCompressMsgBodyOverHowmuch(16);
message = new Message(topic, new byte[] {'a'});
Expand Down Expand Up @@ -309,7 +309,7 @@ public void run() {
@Test
public void testSetCallbackExecutor() throws MQClientException {
String producerGroupTemp = "testSetCallbackExecutor_" + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp,false);
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

Expand Down
Loading

0 comments on commit 47556d6

Please sign in to comment.