Skip to content

Commit

Permalink
Enhance: share netty handler (alibaba#635)
Browse files Browse the repository at this point in the history
* It's highly recommended to share one stateless handler with all the server-side channels.

* comment for prepareSharableHandlers

* Simplify comment

* remove comment

* supply test case
  • Loading branch information
Erik1288 authored and zongtanghu committed Jul 11, 2019
1 parent 5b29b73 commit 9226292
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.ByteBuffer;
Expand All @@ -26,6 +27,7 @@
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -82,6 +83,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";

// sharable handlers
private HandshakeHandler handshakeHandler;
private NettyEncoder encoder;
private NettyConnectManageHandler connectionManageHandler;
private NettyServerHandler serverHandler;

public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
Expand Down Expand Up @@ -186,6 +193,8 @@ public Thread newThread(Runnable r) {
}
});

prepareSharableHandlers();

ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
Expand All @@ -200,14 +209,13 @@ public Thread newThread(Runnable r) {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
connectionManageHandler,
serverHandler
);
}
});
Expand Down Expand Up @@ -334,6 +342,14 @@ public ExecutorService getCallbackExecutor() {
return this.publicExecutor;
}

private void prepareSharableHandlers() {
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
}

@ChannelHandler.Sharable
class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {

private final TlsMode tlsMode;
Expand Down Expand Up @@ -396,6 +412,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep
}
}

@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

@Override
Expand All @@ -404,6 +421,7 @@ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) thro
}
}

@ChannelHandler.Sharable
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.mockito.junit.MockitoJUnitRunner;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -90,4 +93,18 @@ public void operationComplete(final ResponseFuture responseFuture) {
semaphore.acquire(1);
assertThat(semaphore.availablePermits()).isEqualTo(0);
}

@Test
public void testScanResponseTable() {
int dummyId = 1;
// mock timeout
ResponseFuture responseFuture = new ResponseFuture(null,dummyId, -1000, new InvokeCallback() {
@Override
public void operationComplete(final ResponseFuture responseFuture) {
}
}, null);
remotingAbstract.responseTable.putIfAbsent(dummyId, responseFuture);
remotingAbstract.scanResponseTable();
assertNull(remotingAbstract.responseTable.get(dummyId));
}
}

0 comments on commit 9226292

Please sign in to comment.