Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix memory leak and optimize memory usage #88

Merged
merged 3 commits into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/kop
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ if [ -z "$KOP_LOG_CONF" ]; then
KOP_LOG_CONF=$DEFAULT_LOG_CONF
fi

KOP_EXTRA_OPTS=${KOP_EXTRA_OPTS:-" -Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g -XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"}

KOP_CLASSPATH="$KOP_JAR:$KOP_CLASSPATH"
KOP_CLASSPATH="$(dirname $KOP_LOG_CONF):$KOP_CLASSPATH"
OPTS="$OPTS -Dlog4j.configurationFile=$(basename $KOP_LOG_CONF)"
Expand Down
8 changes: 8 additions & 0 deletions conf/kop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ clusterName=kafka-cluster

### --- General broker settings --- ###

# Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled
# (Disable message rate limit with value 0)
brokerPublisherThrottlingMaxMessageRate=0

# Max Rate(in 1 seconds) of Byte allowed to publish for a broker if broker publish rate limiting enabled.
# (Disable byte rate limit with value 0)
brokerPublisherThrottlingMaxByteRate=0

# Zookeeper quorum connection string
zookeeperServers=localhost:2181

Expand Down
8 changes: 8 additions & 0 deletions conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ brokerDeleteInactiveTopicsEnabled=false

### --- General broker settings --- ###

# Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled
# (Disable message rate limit with value 0)
brokerPublisherThrottlingMaxMessageRate=0

# Max Rate(in 1 seconds) of Byte allowed to publish for a broker if broker publish rate limiting enabled.
# (Disable byte rate limit with value 0)
brokerPublisherThrottlingMaxByteRate=0

# Zookeeper quorum connection string
zookeeperServers=

Expand Down
33 changes: 16 additions & 17 deletions src/main/java/io/streamnative/kop/KafkaCommandDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;
import static org.apache.kafka.common.protocol.ApiKeys.METADATA;

import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand All @@ -40,6 +38,7 @@
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.ResponseUtils;


/**
Expand Down Expand Up @@ -110,13 +109,14 @@ protected ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndReq
try (KafkaHeaderAndResponse kafkaHeaderAndResponse =
KafkaHeaderAndResponse.responseForRequest(request, response)) {

ByteBuffer serialized = kafkaHeaderAndResponse
.getResponse()
.serialize(kafkaHeaderAndResponse.getApiVersion(), kafkaHeaderAndResponse.getHeader());

// Already converted the ByteBuf into ByteBuffer now, release ByteBuf
kafkaHeaderAndResponse.buffer.release();
return Unpooled.wrappedBuffer(serialized);
return ResponseUtils.serializeResponse(
kafkaHeaderAndResponse.getApiVersion(),
kafkaHeaderAndResponse.getHeader(),
kafkaHeaderAndResponse.getResponse()
);
} finally {
// the request is not needed any more.
request.close();
}
}

Expand All @@ -133,7 +133,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

CompletableFuture<AbstractResponse> responseFuture;

try (KafkaHeaderAndRequest kafkaHeaderAndRequest = byteBufToRequest(buffer, remoteAddress)){
KafkaHeaderAndRequest kafkaHeaderAndRequest = byteBufToRequest(buffer, remoteAddress);
try {
if (log.isDebugEnabled()) {
log.debug("[{}] Received kafka cmd {}, the request content is: {}",
ctx.channel() != null ? ctx.channel().remoteAddress() : "Null channel",
Expand Down Expand Up @@ -219,6 +220,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} catch (Exception e) {
log.error("error while handle command:", e);
close();
} finally {
// the kafkaHeaderAndRequest has already held the reference.
buffer.release();
}
}

Expand Down Expand Up @@ -399,16 +403,13 @@ static class KafkaHeaderAndResponse implements Closeable {
private final short apiVersion;
private final ResponseHeader header;
private final AbstractResponse response;
private final ByteBuf buffer;

private KafkaHeaderAndResponse(short apiVersion,
ResponseHeader header,
AbstractResponse response,
ByteBuf buffer) {
AbstractResponse response) {
this.apiVersion = apiVersion;
this.header = header;
this.response = response;
this.buffer = buffer.retain();
}

public short getApiVersion() {
Expand All @@ -427,8 +428,7 @@ static KafkaHeaderAndResponse responseForRequest(KafkaHeaderAndRequest request,
return new KafkaHeaderAndResponse(
request.getHeader().apiVersion(),
request.getHeader().toResponseHeader(),
response,
request.getBuffer());
response);
}

public String toString() {
Expand All @@ -438,7 +438,6 @@ public String toString() {

@Override
public void close() {
this.buffer.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) {

messageMetaBuilder.recycle();
msgMetadata.recycle();
batchedMessageMetadataAndPayload.release();

return buf;
}
Expand Down Expand Up @@ -266,8 +267,10 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.

SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build();

// TODO: optimize this to avoid memory copy
byte[] data = new byte[singleMessagePayload.readableBytes()];
singleMessagePayload.readBytes(data);
singleMessagePayload.release();
Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList());

builder.appendWithOffset(
Expand All @@ -279,6 +282,7 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
singleMessageMetadataBuilder.recycle();
}
} else {
// TODO: optimize this to avoid memory copy
byte[] data = new byte[payload.readableBytes()];
payload.readBytes(data);
Header[] headers = getHeadersFromMetadata(msgMetadata.getPropertiesList());
Expand All @@ -290,6 +294,9 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
data,
headers);
}

payload.release();
entry.release();
}
return builder.build();
} catch (IOException ioe){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.nio.ByteBuffer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.types.Struct;

/**
* Provide util classes to access protected fields in kafka structures.
*/
@Slf4j
public class ResponseUtils {

/**
* Serialize a kafka response into a byte buf.
* @param version
* @param responseHeader
* @param response
* @return
*/
public static ByteBuf serializeResponse(short version,
ResponseHeader responseHeader,
AbstractResponse response) {
return serialize(
responseHeader.toStruct(),
response.toStruct(version)
);
}

public static ByteBuf serialize(Struct headerStruct, Struct bodyStruct) {
int size = headerStruct.sizeOf() + bodyStruct.sizeOf();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(size, size);
buf.writerIndex(buf.readerIndex() + size);
ByteBuffer buffer = buf.nioBuffer();
headerStruct.writeTo(buffer);
bodyStruct.writeTo(buffer);
buffer.rewind();
return buf;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Timer related classes.
*
* <p>The classes under this package are ported from Kafka.
*/
package org.apache.kafka.common.requests;