Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --exit-when option #280

Merged
merged 1 commit into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions src/main/java/com/rabbitmq/perf/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
import java.time.Duration;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,6 +45,8 @@ public class Consumer extends AgentBase implements Runnable {
private static final AckNackOperation NACK_OPERATION =
(ch, envelope, multiple, requeue) -> ch.basicNack(envelope.getDeliveryTag(), multiple, requeue);
static final String STOP_REASON_CONSUMER_REACHED_MESSAGE_LIMIT = "Consumer reached message limit";
static final String STOP_REASON_CONSUMER_IDLE = "Consumer is idle for more than 1 second";
static final String STOP_REASON_CONSUMER_QUEUE_EMPTY = "Consumer queue(s) empty";

private volatile ConsumerImpl q;
private final Channel channel;
Expand Down Expand Up @@ -76,6 +82,8 @@ public class Consumer extends AgentBase implements Runnable {

private final Map<String, Object> consumerArguments;

private final EXIT_WHEN exitWhen;

public Consumer(ConsumerParameters parameters) {
this.channel = parameters.getChannel();
this.id = parameters.getId();
Expand All @@ -91,6 +99,7 @@ public Consumer(ConsumerParameters parameters) {
this.polling = parameters.isPolling();
this.pollingInterval = parameters.getPollingInterval();
this.consumerArguments = parameters.getConsumerArguments();
this.exitWhen = parameters.getExitWhen();

this.queueNames.set(new ArrayList<>(parameters.getQueueNames()));
this.initialQueueNames = new ArrayList<>(parameters.getQueueNames());
Expand Down Expand Up @@ -133,7 +142,7 @@ public Consumer(ConsumerParameters parameters) {
}


this.state = new ConsumerState(parameters.getRateLimit());
this.state = new ConsumerState(parameters.getRateLimit(), timestampProvider);
this.recoveryProcess = parameters.getRecoveryProcess();
this.recoveryProcess.init(this);
}
Expand Down Expand Up @@ -232,9 +241,10 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie

void handleMessage(Envelope envelope, BasicProperties properties, byte[] body, Channel ch) throws IOException {
int currentMessageCount = state.incrementMessageCount();
long nowTimestamp = timestampProvider.getCurrentTime();
state.setLastActivityTimestamp(nowTimestamp);
if (msgLimit == 0 || currentMessageCount <= msgLimit) {
long messageTimestamp = timestampExtractor.apply(properties, body);
long nowTimestamp = timestampProvider.getCurrentTime();
long diff_time = timestampProvider.getDifference(nowTimestamp, messageTimestamp);

stats.handleRecv(id.equals(envelope.getRoutingKey()) ? diff_time : 0L);
Expand Down Expand Up @@ -339,6 +349,47 @@ public void recover(TopologyRecording topologyRecording) {
}
}

void maybeStopIfNoActivityOrQueueEmpty() {
LOGGER.debug("Checking consumer activity");
if (this.exitWhen == EXIT_WHEN.NEVER) {
return;
}
TimestampProvider tp = state.getTimestampProvider();
long lastActivityTimestamp = state.getLastActivityTimestamp();
if (lastActivityTimestamp == -1) {
// this avoids not terminating a consumer that never consumes
state.setLastActivityTimestamp(tp.getCurrentTime());
return;
}
Duration idleDuration = tp.difference(tp.getCurrentTime(), lastActivityTimestamp);
if (idleDuration.toMillis() > 1000) {
LOGGER.debug("Consumer idle for {}", idleDuration);
List<String> queues = queueNames.get();
if (this.exitWhen == EXIT_WHEN.IDLE) {
LOGGER.debug("Terminating consumer {} because of inactivity", this);
countDown(STOP_REASON_CONSUMER_IDLE);
} else if (this.exitWhen == EXIT_WHEN.EMPTY){
LOGGER.debug("Checking content of consumer queue(s)");
boolean empty = false;
for (String queue : queues) {
try {
DeclareOk declareOk = this.channel.queueDeclarePassive(queue);
LOGGER.debug("Message count for queue {}: {}", queue, declareOk.getMessageCount());
if (declareOk.getMessageCount() == 0) {
empty = true;
}
} catch (IOException e) {
LOGGER.info("Error when calling queue.declarePassive({}) in consumer {}", queue, this);
}
}
if (empty) {
LOGGER.debug("Terminating consumer {} because its queue(s) is (are) empty", this);
countDown(STOP_REASON_CONSUMER_QUEUE_EMPTY);
}
}
}
}

private static String queueName(TopologyRecording recording, String queue) {
TopologyRecording.RecordedQueue queueRecord = recording.queue(queue);
// The recording is missing when using pre-declared, so just using the initial name.
Expand All @@ -352,10 +403,14 @@ private static class ConsumerState implements AgentState {

private final float rateLimit;
private volatile long lastStatsTime;
private volatile long lastActivityTimestamp = -1;
private final AtomicInteger msgCount = new AtomicInteger(0);
private final TimestampProvider timestampProvider;

protected ConsumerState(float rateLimit) {
protected ConsumerState(float rateLimit,
TimestampProvider timestampProvider) {
this.rateLimit = rateLimit;
this.timestampProvider = timestampProvider;
}

public float getRateLimit() {
Expand All @@ -370,10 +425,22 @@ protected void setLastStatsTime(long lastStatsTime) {
this.lastStatsTime = lastStatsTime;
}

public void setLastActivityTimestamp(long lastActivityTimestamp) {
this.lastActivityTimestamp = lastActivityTimestamp;
}

public long getLastActivityTimestamp() {
return lastActivityTimestamp;
}

public int getMsgCount() {
return msgCount.get();
}

public TimestampProvider getTimestampProvider() {
return timestampProvider;
}

protected void setMsgCount(int msgCount) {
this.msgCount.set(msgCount);
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/perf/ConsumerParameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.rabbitmq.client.Channel;

import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -47,6 +48,8 @@ public class ConsumerParameters {

private int pollingInterval;

private EXIT_WHEN exitWhen = EXIT_WHEN.NEVER;

private Map<String, Object> consumerArguments = null;

public Channel getChannel() {
Expand Down Expand Up @@ -219,4 +222,13 @@ public ConsumerParameters setConsumerArguments(Map<String, Object> consumerArgum
public Map<String, Object> getConsumerArguments() {
return consumerArguments;
}

public ConsumerParameters setExitWhen(EXIT_WHEN exitWhen) {
this.exitWhen = exitWhen;
return this;
}

public EXIT_WHEN getExitWhen() {
return exitWhen;
}
}
16 changes: 15 additions & 1 deletion src/main/java/com/rabbitmq/perf/MulticastParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownSignalException;

import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -120,6 +121,8 @@ public class MulticastParams {

private Map<String, Object> consumerArguments = null;

private EXIT_WHEN exitWhen = EXIT_WHEN.NEVER;

// for random JSON body generation
private AtomicReference<MessageBodySource> messageBodySourceReference = new AtomicReference<>();

Expand Down Expand Up @@ -285,6 +288,10 @@ public void setConsumerArguments(Map<String, Object> consumerArguments) {
this.consumerArguments = consumerArguments;
}

public void setExitWhen(EXIT_WHEN exitWhen) {
this.exitWhen = exitWhen;
}

public int getConsumerCount() {
return consumerCount;
}
Expand Down Expand Up @@ -422,6 +429,10 @@ public List<String> getConsumerLatencies() {
return consumerLatencies;
}

public EXIT_WHEN getExitWhen() {
return exitWhen;
}

public void setPolling(boolean polling) {
this.polling = polling;
}
Expand Down Expand Up @@ -546,6 +557,7 @@ public Consumer createConsumer(Connection connection,
.setNack(this.nack)
.setRequeue(this.requeue)
.setConsumerArguments(this.consumerArguments)
.setExitWhen(this.exitWhen)
);
this.topologyHandler.next();
return consumer;
Expand Down Expand Up @@ -599,7 +611,9 @@ private static boolean queueExists(Connection connection, final String queueName
}

public boolean hasLimit() {
return this.timeLimit > 0 || this.consumerMsgCount > 0 || this.producerMsgCount > 0;
return this.timeLimit > 0 || this.consumerMsgCount > 0 || this.producerMsgCount > 0
|| this.exitWhen == EXIT_WHEN.EMPTY
|| this.exitWhen == EXIT_WHEN.IDLE;
}

public void setExclusive(boolean exclusive) {
Expand Down
66 changes: 53 additions & 13 deletions src/main/java/com/rabbitmq/perf/MulticastSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,46 @@

package com.rabbitmq.perf;

import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.rabbitmq.perf.Utils.isRecoverable;
import static java.lang.Math.min;
import static java.lang.String.format;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static com.rabbitmq.perf.Utils.isRecoverable;
import static java.lang.Math.min;
import static java.lang.String.format;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MulticastSet {

Expand Down Expand Up @@ -165,7 +186,7 @@ public void run(boolean announceStartup)

this.params.resetTopologyHandler();

Runnable[] consumerRunnables = new Runnable[params.getConsumerThreadCount()];
Consumer[] consumerRunnables = new Consumer[params.getConsumerThreadCount()];
Connection[] consumerConnections = new Connection[params.getConsumerCount()];
Function<Integer, ExecutorService> consumersExecutorsFactory;
consumersExecutorsFactory = createConsumersExecutorsFactory();
Expand All @@ -188,6 +209,24 @@ public void run(boolean announceStartup)
startConsumers(consumerRunnables);
startProducers(producerStates);

if (params.getExitWhen() == EXIT_WHEN.EMPTY || params.getExitWhen() == EXIT_WHEN.IDLE) {
ScheduledExecutorService scheduledExecutorService =
this.threadingHandler.scheduledExecutorService(
"perf-test-queue-empty-consumer-idle-scheduler", 1);
scheduledExecutorService.scheduleAtFixedRate(
() -> {
for (Consumer consumer : consumerRunnables) {
try {
consumer.maybeStopIfNoActivityOrQueueEmpty();
} catch (Exception e) {
LOGGER.info("Error while checking exit-when for consumer {}: {}", consumer, e.getMessage());
}
}
},
2,
1,
TimeUnit.SECONDS);
}

AutoCloseable shutdownSequence;
int shutdownTimeout = this.params.getShutdownTimeout();
Expand Down Expand Up @@ -313,7 +352,7 @@ private Function<Integer, ExecutorService> createConsumersExecutorsFactory() {
private void createConsumers(boolean announceStartup,
Runnable[] consumerRunnables,
Connection[] consumerConnections,
Function<Integer, ExecutorService> consumersExecutorsFactory) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
Function<Integer, ExecutorService> consumersExecutorsFactory) throws IOException, TimeoutException {
for (int i = 0; i < consumerConnections.length; i++) {
if (announceStartup) {
System.out.println("id: " + testID + ", starting consumer #" + i);
Expand All @@ -327,7 +366,8 @@ private void createConsumers(boolean announceStartup,
if (announceStartup) {
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
}
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = params.createConsumer(consumerConnection, stats, this.consumerLatencyIndicator, this.completionHandler, executorService);
Consumer consumer = params.createConsumer(consumerConnection, stats, this.consumerLatencyIndicator, this.completionHandler, executorService);
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = consumer;
}
}
}
Expand Down
Loading