From dd6b029f5bdf8d832c3594a7e11ce040c6b3c505 Mon Sep 17 00:00:00 2001 From: Georgi Grigorov <72125420+Grigorov-Georgi@users.noreply.github.com> Date: Thu, 27 Feb 2025 15:44:06 +0200 Subject: [PATCH] 769 implement beefy networking (#794) # Description - Add all components needed to receive and send messages on the libp2p substream "/beefy/2" - Implement abstract classes for the network layer in order to reduce code duplication Fixes https://github.com/LimeChain/Fruzhin/issues/769 --- .../limechain/network/ConnectionManager.java | 83 +++++-- .../com/limechain/network/NetworkService.java | 21 +- .../network/PeerMessageCoordinator.java | 3 + .../com/limechain/network/ProtocolUtils.java | 19 +- .../com/limechain/network/dto/PeerInfo.java | 14 +- .../network/dto/ProtocolStreamType.java | 2 +- .../network/protocol/base/BaseController.java | 24 +++ .../network/protocol/base/BaseEngine.java | 16 ++ .../network/protocol/base/BaseProtocol.java | 49 +++++ .../network/protocol/beefy/Beefy.java | 12 ++ .../protocol/beefy/BeefyController.java | 18 ++ .../network/protocol/beefy/BeefyEngine.java | 119 +++++++++++ .../protocol/beefy/BeefyMessageHandler.java | 12 ++ .../network/protocol/beefy/BeefyProtocol.java | 71 ++++++ .../network/protocol/beefy/BeefyService.java | 34 +++ .../beefy/messages/BeefyMessageType.java | 26 +++ .../BlockAnnounceController.java | 11 +- .../blockannounce/BlockAnnounceEngine.java | 84 ++++---- .../blockannounce/BlockAnnounceProtocol.java | 40 +--- .../BlockAnnounceHandshakeBuilder.java | 2 +- .../protocol/grandpa/GrandpaController.java | 14 +- .../protocol/grandpa/GrandpaEngine.java | 202 +++++++++--------- .../protocol/grandpa/GrandpaProtocol.java | 44 +--- .../protocol/grandpa/GrandpaService.java | 1 + .../grandpa/messages/GrandpaMessageType.java | 10 +- .../transaction/TransactionController.java | 14 +- .../transaction/TransactionEngine.java | 78 +++---- .../transaction/TransactionMessages.java | 2 +- ...Protocol.java => TransactionProtocol.java} | 45 +--- .../transaction/TransactionsService.java | 2 +- .../limechain/network/protocol/BaseUtils.java | 37 ++++ .../protocol/beefy/BeefyControllerTest.java | 39 ++++ .../protocol/beefy/BeefyProtocolTest.java | 104 +++++++++ .../BlockAnnounceControllerTest.java | 6 +- .../BlockAnnounceEngineTest.java | 24 ++- .../BlockAnnounceProtocolTest.java | 67 +++--- .../grandpa/GrandpaControllerTest.java | 6 +- .../protocol/grandpa/GrandpaEngineTest.java | 27 +-- .../protocol/grandpa/GrandpaProtocolTest.java | 38 ++-- .../TransactionControllerTest.java | 46 ++++ .../transaction/TransactionProtocolTest.java | 104 +++++++++ 41 files changed, 1138 insertions(+), 432 deletions(-) create mode 100644 src/main/java/com/limechain/network/protocol/base/BaseController.java create mode 100644 src/main/java/com/limechain/network/protocol/base/BaseEngine.java create mode 100644 src/main/java/com/limechain/network/protocol/base/BaseProtocol.java create mode 100644 src/main/java/com/limechain/network/protocol/beefy/Beefy.java create mode 100644 src/main/java/com/limechain/network/protocol/beefy/BeefyController.java create mode 100644 src/main/java/com/limechain/network/protocol/beefy/BeefyEngine.java create mode 100644 src/main/java/com/limechain/network/protocol/beefy/BeefyMessageHandler.java create mode 100644 src/main/java/com/limechain/network/protocol/beefy/BeefyProtocol.java create mode 100644 src/main/java/com/limechain/network/protocol/beefy/BeefyService.java create mode 100644 src/main/java/com/limechain/network/protocol/beefy/messages/BeefyMessageType.java rename src/main/java/com/limechain/network/protocol/transaction/{TransactionsProtocol.java => TransactionProtocol.java} (58%) create mode 100644 src/test/java/com/limechain/network/protocol/BaseUtils.java create mode 100644 src/test/java/com/limechain/network/protocol/beefy/BeefyControllerTest.java create mode 100644 src/test/java/com/limechain/network/protocol/beefy/BeefyProtocolTest.java create mode 100644 src/test/java/com/limechain/network/protocol/transaction/TransactionControllerTest.java create mode 100644 src/test/java/com/limechain/network/protocol/transaction/TransactionProtocolTest.java diff --git a/src/main/java/com/limechain/network/ConnectionManager.java b/src/main/java/com/limechain/network/ConnectionManager.java index 68acbfed..04fe2a3d 100644 --- a/src/main/java/com/limechain/network/ConnectionManager.java +++ b/src/main/java/com/limechain/network/ConnectionManager.java @@ -25,6 +25,7 @@ @Log @NoArgsConstructor(access = AccessLevel.PROTECTED) public class ConnectionManager { + private static ConnectionManager instance; protected final Map peers = new HashMap<>(); @@ -45,6 +46,14 @@ public PeerInfo getPeerInfo(PeerId peerId) { return peers.get(peerId); } + /** + * Adds a Transaction stream to the peer info. Peer id is retrieved from the stream. + * @param stream stream to be added + */ + public void addTransactionsStream(Stream stream) { + addStream(stream, ProtocolStreamType.TRANSACTIONS); + } + /** * Adds a Block Announce stream to the peer info. Peer id is retrieved from the stream. * @@ -63,8 +72,12 @@ public void addGrandpaStream(Stream stream) { addStream(stream, ProtocolStreamType.GRANDPA); } - public void addTransactionsStream(Stream stream) { - addStream(stream, ProtocolStreamType.TRANSACTIONS); + /** + * Adds a BEEFY stream to the peer info. Peer id is retrieved from the stream. + * @param stream stream to be added + */ + public void addBeefyStream(Stream stream) { + addStream(stream, ProtocolStreamType.BEEFY); } private void addStream(Stream stream, ProtocolStreamType type) { @@ -94,15 +107,6 @@ public PeerInfo addNewPeer(PeerId peerId) { return peerInfo; } - /** - * Removes a GRANDPA stream from the peer info. Peer id is retrieved from the stream. - * - * @param stream stream to be closed - */ - public void closeGrandpaStream(Stream stream) { - closeStream(stream, ProtocolStreamType.GRANDPA); - } - /** * Removes a Transactions stream from the peer info. Peer id is retrieved from the stream. * @@ -121,6 +125,23 @@ public void closeBlockAnnounceStream(Stream stream) { closeStream(stream, ProtocolStreamType.BLOCK_ANNOUNCE); } + /** + * Removes a GRANDPA stream from the peer info. Peer id is retrieved from the stream. + * + * @param stream stream to be closed + */ + public void closeGrandpaStream(Stream stream) { + closeStream(stream, ProtocolStreamType.GRANDPA); + } + + /** + * Removes a BEEFY stream from the peer info. Peer id is retrieved from the stream + * @param stream stream to be closed + */ + public void closeBeefyStream(Stream stream) { + closeStream(stream, ProtocolStreamType.BEEFY); + } + private void closeStream(Stream stream, ProtocolStreamType type) { PeerInfo peerInfo = peers.get(stream.remotePeerId()); if (peerInfo == null) { @@ -179,16 +200,6 @@ public void updatePeer(PeerId peerId, BlockAnnounceMessage blockAnnounceMessage) } } - /** - * Checks if we have an open GRANDPA responder stream with a peer. - * - * @param peerId peer to check - * @return do peer info and GRANDPA responder stream exist - */ - public boolean isGrandpaConnected(PeerId peerId) { - return peers.containsKey(peerId) && peers.get(peerId).getGrandpaStreams().getResponder() != null; - } - /** * Checks if we have an open Transactions responder stream with a peer. * @@ -209,6 +220,25 @@ public boolean isBlockAnnounceConnected(PeerId peerId) { return peers.containsKey(peerId) && peers.get(peerId).getBlockAnnounceStreams().getResponder() != null; } + /** + * Checks if we have an open GRANDPA responder stream with a peer. + * + * @param peerId peer to check + * @return do peer info and GRANDPA responder stream exist + */ + public boolean isGrandpaConnected(PeerId peerId) { + return peers.containsKey(peerId) && peers.get(peerId).getGrandpaStreams().getResponder() != null; + } + + /** + * Checks if we have an open BEEFY responder steam with a peer + * @param peerId peer to check + * @return do peer info and BEEFY responder steam exist + */ + public boolean isBeefyConnected(PeerId peerId) { + return peers.containsKey(peerId) && peers.get(peerId).getBeefyStreams().getResponder() != null; + } + /** * Gets the ids of all peers with open connections. * Open connection means either Grandpa or Block Announce stream has been opened. @@ -223,10 +253,14 @@ public Set getPeerIds() { * Closes conneciton to all the connected peers and removes them from the peersList. */ public void removeAllPeers() { - peers.forEach((peerId, peerInfo) -> { + + peers.values().forEach(peerInfo -> { + closeProtocolStream(peerInfo.getTransactionsStreams()); closeProtocolStream(peerInfo.getBlockAnnounceStreams()); closeProtocolStream(peerInfo.getGrandpaStreams()); + closeProtocolStream(peerInfo.getBeefyStreams()); }); + peers.clear(); } @@ -236,10 +270,15 @@ public void removeAllPeers() { * @param peerId peerId of the peer to be removed */ public void removePeer(PeerId peerId) { + if (peers.containsKey(peerId)) { + PeerInfo peerInfo = peers.get(peerId); + closeProtocolStream(peerInfo.getTransactionsStreams()); closeProtocolStream(peerInfo.getBlockAnnounceStreams()); closeProtocolStream(peerInfo.getGrandpaStreams()); + closeProtocolStream(peerInfo.getBeefyStreams()); + peers.remove(peerId); } } diff --git a/src/main/java/com/limechain/network/NetworkService.java b/src/main/java/com/limechain/network/NetworkService.java index 39773ac8..d478b9f0 100644 --- a/src/main/java/com/limechain/network/NetworkService.java +++ b/src/main/java/com/limechain/network/NetworkService.java @@ -7,6 +7,7 @@ import com.limechain.config.HostConfig; import com.limechain.constants.GenesisBlockHash; import com.limechain.network.kad.KademliaService; +import com.limechain.network.protocol.beefy.BeefyService; import com.limechain.network.protocol.blockannounce.BlockAnnounceService; import com.limechain.network.protocol.blockannounce.NodeRole; import com.limechain.network.protocol.grandpa.GrandpaService; @@ -70,9 +71,10 @@ public class NetworkService implements NodeService { private WarpSyncService warpSyncService; private LightMessagesService lightMessagesService; + private TransactionsService transactionsService; private BlockAnnounceService blockAnnounceService; private GrandpaService grandpaService; - private TransactionsService transactionsService; + private BeefyService beefyService; private Ping ping; @@ -104,6 +106,7 @@ public NetworkService(ChainService chainService, HostConfig hostConfig, KVReposi private void initializeProtocols(ChainService chainService, GenesisBlockHash genesisBlockHash, HostConfig hostConfig, KVRepository repository, CliArguments cliArgs) { + boolean isLocalEnabled = hostConfig.getChain() == Chain.LOCAL; boolean clientMode = true; @@ -118,26 +121,31 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen String pingProtocol = ProtocolUtils.PING_PROTOCOL; String chainId = chainService.getChainSpec().getProtocolId(); boolean legacyProtocol = !cliArgs.noLegacyProtocols(); - String protocolId = legacyProtocol ? chainId : - StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString()); + String genesisBlockHashWithoutPrefix = StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString()); + String protocolId = legacyProtocol ? + chainId : + genesisBlockHashWithoutPrefix; + String kadProtocolId = ProtocolUtils.getKadProtocol(chainId); String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(protocolId); String lightProtocolId = ProtocolUtils.getLightMessageProtocol(protocolId); String syncProtocolId = ProtocolUtils.getSyncProtocol(protocolId); String stateProtocolId = ProtocolUtils.getStateProtocol(protocolId); + String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId); String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(protocolId); String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId, legacyProtocol); - String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId); + String beefyProtocolId = ProtocolUtils.getBeefyProtocol(genesisBlockHashWithoutPrefix); kademliaService = new KademliaService(kadProtocolId, hostId, isLocalEnabled, clientMode); lightMessagesService = new LightMessagesService(lightProtocolId); warpSyncService = new WarpSyncService(warpProtocolId); syncService = new SyncService(syncProtocolId); stateService = new StateService(stateProtocolId); + transactionsService = new TransactionsService(transactionsProtocolId); blockAnnounceService = new BlockAnnounceService(blockAnnounceProtocolId); grandpaService = new GrandpaService(grandpaProtocolId); + beefyService = new BeefyService(beefyProtocolId); ping = new Ping(pingProtocol, new PingProtocol()); - transactionsService = new TransactionsService(transactionsProtocolId); hostBuilder.addProtocols( List.of( @@ -148,7 +156,8 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen syncService.getProtocol(), stateService.getProtocol(), blockAnnounceService.getProtocol(), - grandpaService.getProtocol() + grandpaService.getProtocol(), + beefyService.getProtocol() ) ); diff --git a/src/main/java/com/limechain/network/PeerMessageCoordinator.java b/src/main/java/com/limechain/network/PeerMessageCoordinator.java index 5de1fa4f..62616952 100644 --- a/src/main/java/com/limechain/network/PeerMessageCoordinator.java +++ b/src/main/java/com/limechain/network/PeerMessageCoordinator.java @@ -42,6 +42,9 @@ public void handshakePeers() { asyncExecutor.executeAndForget(() -> network.getGrandpaService().sendHandshake(network.getHost(), peerId)); + asyncExecutor.executeAndForget(() -> + network.getBeefyService().sendHandshake(network.getHost(), peerId)); + if (network.getNodeRole().equals(NodeRole.AUTHORING)) { asyncExecutor.executeAndForget(() -> network.getTransactionsService().sendHandshake(network.getHost(), peerId)); diff --git a/src/main/java/com/limechain/network/ProtocolUtils.java b/src/main/java/com/limechain/network/ProtocolUtils.java index acd86596..6754946d 100644 --- a/src/main/java/com/limechain/network/ProtocolUtils.java +++ b/src/main/java/com/limechain/network/ProtocolUtils.java @@ -23,19 +23,26 @@ public static String getStateProtocol(String chainId) { return String.format("/%s/state/2", chainId); } - public static String getBlockAnnounceProtocol(String chainId) { - return String.format("/%s/block-announces/1", chainId); - } - public static String getKadProtocol(String chainId) { return String.format("/%s/kad", chainId); } + public static String getTransactionsProtocol(String chainId) { + return String.format("/%s/transactions/1", chainId); + } + + public static String getBlockAnnounceProtocol(String chainId) { + return String.format("/%s/block-announces/1", chainId); + } + public static String getGrandpaProtocol(String chainId, boolean legacyProtocol) { return String.format("/%s/grandpa/1", legacyProtocol ? "paritytech" : chainId); } - public static String getTransactionsProtocol(String chainId) { - return String.format("/%s/transactions/1", chainId); + // NOTE: Beefy was likely not part of the original protocols and therefore + // only operates with the genesis hash. As a result, it does not support + // the {chainId}/beefy/2 format. + public static String getBeefyProtocol(String genesisBlockHash) { + return String.format("/%s/beefy/2", genesisBlockHash); } } \ No newline at end of file diff --git a/src/main/java/com/limechain/network/dto/PeerInfo.java b/src/main/java/com/limechain/network/dto/PeerInfo.java index 5850a1bc..67e1b815 100644 --- a/src/main/java/com/limechain/network/dto/PeerInfo.java +++ b/src/main/java/com/limechain/network/dto/PeerInfo.java @@ -12,15 +12,18 @@ @Data @NoArgsConstructor public class PeerInfo { + + private final ProtocolStreams transactionsStreams = new ProtocolStreams(); + private final ProtocolStreams blockAnnounceStreams = new ProtocolStreams(); + private final ProtocolStreams grandpaStreams = new ProtocolStreams(); + private final ProtocolStreams beefyStreams = new ProtocolStreams(); + private PeerId peerId; private int nodeRole; private BigInteger bestBlock; private Hash256 bestBlockHash; private Hash256 genesisBlockHash; private BigInteger latestBlock = BigInteger.ZERO; - private final ProtocolStreams blockAnnounceStreams = new ProtocolStreams(); - private final ProtocolStreams grandpaStreams = new ProtocolStreams(); - private final ProtocolStreams transactionsStreams = new ProtocolStreams(); public String getNodeRoleName(){ return Arrays @@ -33,9 +36,10 @@ public String getNodeRoleName(){ public ProtocolStreams getProtocolStreams(ProtocolStreamType type) { return switch (type) { - case GRANDPA -> grandpaStreams; - case BLOCK_ANNOUNCE -> blockAnnounceStreams; case TRANSACTIONS -> transactionsStreams; + case BLOCK_ANNOUNCE -> blockAnnounceStreams; + case GRANDPA -> grandpaStreams; + case BEEFY -> beefyStreams; }; } diff --git a/src/main/java/com/limechain/network/dto/ProtocolStreamType.java b/src/main/java/com/limechain/network/dto/ProtocolStreamType.java index 97119875..6e78a540 100644 --- a/src/main/java/com/limechain/network/dto/ProtocolStreamType.java +++ b/src/main/java/com/limechain/network/dto/ProtocolStreamType.java @@ -1,5 +1,5 @@ package com.limechain.network.dto; public enum ProtocolStreamType { - GRANDPA, BLOCK_ANNOUNCE, TRANSACTIONS + TRANSACTIONS, BLOCK_ANNOUNCE, GRANDPA, BEEFY } diff --git a/src/main/java/com/limechain/network/protocol/base/BaseController.java b/src/main/java/com/limechain/network/protocol/base/BaseController.java new file mode 100644 index 00000000..673e2b20 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/base/BaseController.java @@ -0,0 +1,24 @@ +package com.limechain.network.protocol.base; + +import io.libp2p.core.Stream; + +/** + * An abstract controller for sending message on a specific stream + */ +public abstract class BaseController { + + protected final Stream stream; + protected final E engine; + + protected BaseController(Stream stream, E engine) { + this.stream = stream; + this.engine = engine; + } + + /** + * Sends a handshake message over the controller stream. + */ + public void sendHandshake() { + engine.writeHandshakeToStream(stream, stream.remotePeerId()); + } +} diff --git a/src/main/java/com/limechain/network/protocol/base/BaseEngine.java b/src/main/java/com/limechain/network/protocol/base/BaseEngine.java new file mode 100644 index 00000000..9f0c9f15 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/base/BaseEngine.java @@ -0,0 +1,16 @@ +package com.limechain.network.protocol.base; + +import io.libp2p.core.PeerId; +import io.libp2p.core.Stream; + +/** + * Abstract engine for handling transactions on specific streams + */ +public interface BaseEngine { + + void handleHandshake(byte[] message, PeerId peerId, Stream stream); + + void receiveRequest(byte[] message, Stream stream); + + void writeHandshakeToStream(Stream stream, PeerId peerId); +} \ No newline at end of file diff --git a/src/main/java/com/limechain/network/protocol/base/BaseProtocol.java b/src/main/java/com/limechain/network/protocol/base/BaseProtocol.java new file mode 100644 index 00000000..56ac0613 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/base/BaseProtocol.java @@ -0,0 +1,49 @@ +package com.limechain.network.protocol.base; + +import com.limechain.network.encoding.Leb128LengthFrameDecoder; +import com.limechain.network.encoding.Leb128LengthFrameEncoder; +import io.libp2p.core.Stream; +import io.libp2p.protocol.ProtocolHandler; +import io.libp2p.protocol.ProtocolMessageHandler; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.bytes.ByteArrayEncoder; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.CompletableFuture; + +/** + * Base protocol class to reduce code duplication in protocol implementations + * @param The controller type + * @param The notification handler type that must extend T and implement ProtocolMessageHandler + */ +public abstract class BaseProtocol> extends ProtocolHandler { + + protected BaseProtocol(long initiatorTrafficLimit, long responderTrafficLimit) { + super(initiatorTrafficLimit, responderTrafficLimit); + } + + protected abstract E createNotificationHandler(Stream stream); + + @NotNull + @Override + public CompletableFuture onStartInitiator(Stream stream) { + return onStartStream(stream); + } + + @NotNull + @Override + public CompletableFuture onStartResponder(Stream stream) { + return onStartStream(stream); + } + + protected CompletableFuture onStartStream(Stream stream) { + stream.pushHandler(new Leb128LengthFrameDecoder()); + stream.pushHandler(new Leb128LengthFrameEncoder()); + stream.pushHandler(new ByteArrayEncoder()); + + E handler = createNotificationHandler(stream); + stream.pushHandler(handler); + + return CompletableFuture.completedFuture((T) handler); + } +} diff --git a/src/main/java/com/limechain/network/protocol/beefy/Beefy.java b/src/main/java/com/limechain/network/protocol/beefy/Beefy.java new file mode 100644 index 00000000..5b6a724e --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/beefy/Beefy.java @@ -0,0 +1,12 @@ +package com.limechain.network.protocol.beefy; + +import com.limechain.network.StrictProtocolBinding; + +/** + * BEEFY protocol binding + */ +public class Beefy extends StrictProtocolBinding { + public Beefy(String protocolId, BeefyProtocol protocol) { + super(protocolId, protocol); + } +} diff --git a/src/main/java/com/limechain/network/protocol/beefy/BeefyController.java b/src/main/java/com/limechain/network/protocol/beefy/BeefyController.java new file mode 100644 index 00000000..f76d21d5 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/beefy/BeefyController.java @@ -0,0 +1,18 @@ +package com.limechain.network.protocol.beefy; + +import com.limechain.network.protocol.base.BaseController; +import io.libp2p.core.Stream; + +/** + * A controller for sending message on a BEEFY stream + */ +public class BeefyController extends BaseController { + + public BeefyController(Stream stream) { + super(stream, new BeefyEngine()); + } + + public void sendVoteMessage() { + //TODO + } +} diff --git a/src/main/java/com/limechain/network/protocol/beefy/BeefyEngine.java b/src/main/java/com/limechain/network/protocol/beefy/BeefyEngine.java new file mode 100644 index 00000000..c95e397a --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/beefy/BeefyEngine.java @@ -0,0 +1,119 @@ +package com.limechain.network.protocol.beefy; + +import com.limechain.network.ConnectionManager; +import com.limechain.network.protocol.base.BaseEngine; +import com.limechain.network.protocol.beefy.messages.BeefyMessageType; +import com.limechain.rpc.server.AppBean; +import com.limechain.state.AbstractState; +import com.limechain.sync.SyncMode; +import io.libp2p.core.PeerId; +import io.libp2p.core.Stream; +import lombok.extern.java.Log; + +import java.util.logging.Level; + +/** + * Engine for handling transactions on BEEFY streams + */ +@Log +public class BeefyEngine implements BaseEngine { + + private static final int HANDSHAKE_LENGTH = 1; + + protected ConnectionManager connectionManager; + protected BeefyMessageHandler beefyMessageHandler; + + public BeefyEngine() { + connectionManager = ConnectionManager.getInstance(); + beefyMessageHandler = AppBean.getBean(BeefyMessageHandler.class); + } + + @Override + public void handleHandshake(byte[] message, PeerId peerId, Stream stream) { + if (connectionManager.isBeefyConnected(peerId)) { + log.log(Level.INFO, "Received existing beefy handshake from " + peerId); + stream.close(); + } else { + connectionManager.addBeefyStream(stream); + connectionManager.getPeerInfo(peerId).setNodeRole(message[0]); + log.log(Level.INFO, "Received beefy handshake from " + peerId); + writeHandshakeToStream(stream, peerId); + } + } + + @Override + public void receiveRequest(byte[] message, Stream stream) { + BeefyMessageType messageType = getBeefyMessageType(message); + + if (messageType == null) { + log.log(Level.WARNING, String.format("Unknown beefy message type \"%d\" from Peer %s", + message[0], stream.remotePeerId())); + return; + } + + if (stream.isInitiator()) { + handleInitiatorStreamMessage(messageType, stream); + } else { + handleResponderStreamMessage(message, messageType, stream); + } + } + + /** + * Send our BEEFY handshake on a given initiator stream + * @param stream initiator stream to write the message to + * @param peerId peer to send to + */ + @Override + public void writeHandshakeToStream(Stream stream, PeerId peerId) { + byte[] handshake = new byte[]{}; + log.log(Level.INFO, "Sending beefy handshake to " + peerId); + stream.writeAndFlush(handshake); + } + + private void handleInitiatorStreamMessage(BeefyMessageType messageType, Stream stream) { + PeerId peerId = stream.remotePeerId(); + if (messageType != BeefyMessageType.HANDSHAKE) { + stream.close(); + log.log(Level.WARNING, "Non handshake message on initiator beefy steam from peer " + peerId); + return; + } + connectionManager.addBeefyStream(stream); + log.log(Level.INFO, "Received beefy handshake from " + peerId); + } + + private void handleResponderStreamMessage(byte[] message, BeefyMessageType messageType, Stream stream) { + PeerId peerId = stream.remotePeerId(); + boolean connectedToPeer = connectionManager.isBeefyConnected(peerId); + + if (!connectedToPeer && messageType != BeefyMessageType.HANDSHAKE) { + log.log(Level.WARNING, "No handshake for beefy message from peer " + peerId); + stream.close(); + return; + } + + if (!SyncMode.HEAD.equals(AbstractState.getSyncMode())) { + log.fine("Skipping beefy message before we reach head of chain."); + return; + } + + switch (messageType) { + case HANDSHAKE -> handleHandshake(message, peerId, stream); + case VOTE -> handleVoteMessage(message, peerId); + case SIGNED_COMMITMENT -> handleSignedCommitmentMessage(message, peerId); + } + } + + private void handleVoteMessage(byte[] message, PeerId peerId) { + //TODO + } + + private void handleSignedCommitmentMessage(byte[] message, PeerId peerId) { + //TODO + } + + private BeefyMessageType getBeefyMessageType(byte[] message) { + return message.length == HANDSHAKE_LENGTH ? + BeefyMessageType.HANDSHAKE : + BeefyMessageType.getByType(message[0]); + } +} diff --git a/src/main/java/com/limechain/network/protocol/beefy/BeefyMessageHandler.java b/src/main/java/com/limechain/network/protocol/beefy/BeefyMessageHandler.java new file mode 100644 index 00000000..f7463394 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/beefy/BeefyMessageHandler.java @@ -0,0 +1,12 @@ +package com.limechain.network.protocol.beefy; + +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; +import org.springframework.stereotype.Component; + +@Log +@RequiredArgsConstructor +@Component +public class BeefyMessageHandler { + //TODO +} diff --git a/src/main/java/com/limechain/network/protocol/beefy/BeefyProtocol.java b/src/main/java/com/limechain/network/protocol/beefy/BeefyProtocol.java new file mode 100644 index 00000000..0347b5a0 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/beefy/BeefyProtocol.java @@ -0,0 +1,71 @@ +package com.limechain.network.protocol.beefy; + +import com.limechain.network.ConnectionManager; +import com.limechain.network.protocol.base.BaseProtocol; +import io.libp2p.core.Stream; +import io.libp2p.protocol.ProtocolMessageHandler; +import io.netty.buffer.ByteBuf; +import lombok.extern.java.Log; +import org.jetbrains.annotations.NotNull; + +import java.util.logging.Level; + +/** + * Handler for BEEFY protocol messages and streams + */ +@Log +public class BeefyProtocol extends BaseProtocol { + + private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; + + /** + * Creates a handler with {@link Long#MAX_VALUE} traffic limit. + * This is a global decreasing limit for the protocol, that gets reduced by the size of each message. + * In the future it should be changed to a per-message limit + */ + public BeefyProtocol() { + super(TRAFFIC_LIMIT, TRAFFIC_LIMIT); + } + + @Override + protected BeefyProtocol.NotificationHandler createNotificationHandler(Stream stream) { + return new BeefyProtocol.NotificationHandler(stream); + } + + /** + * Handler for notifications received on the BEEFY protocol + */ + static class NotificationHandler extends BeefyController implements ProtocolMessageHandler { + + ConnectionManager connectionManager = ConnectionManager.getInstance(); + + public NotificationHandler(Stream stream) { + super(stream); + } + + @Override + public void onMessage(@NotNull Stream stream, ByteBuf msg) { + byte[] messageBytes = new byte[msg.readableBytes()]; + msg.readBytes(messageBytes); + engine.receiveRequest(messageBytes, stream); + } + + @Override + public void onClosed(Stream stream) { + connectionManager.closeBeefyStream(stream); + log.log(Level.INFO, "Beefy stream closed for peer " + stream.remotePeerId()); + ProtocolMessageHandler.super.onClosed(stream); + } + + @Override + public void onException(Throwable cause) { + connectionManager.closeBeefyStream(stream); + if (cause != null) { + log.log(Level.WARNING, "Beefy Exception: " + cause.getMessage()); + } else { + log.log(Level.WARNING, "Beefy Exception with unknown cause"); + } + ProtocolMessageHandler.super.onException(cause); + } + } +} diff --git a/src/main/java/com/limechain/network/protocol/beefy/BeefyService.java b/src/main/java/com/limechain/network/protocol/beefy/BeefyService.java new file mode 100644 index 00000000..739b64ec --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/beefy/BeefyService.java @@ -0,0 +1,34 @@ +package com.limechain.network.protocol.beefy; + +import com.limechain.network.ConnectionManager; +import com.limechain.network.protocol.NetworkService; +import io.libp2p.core.Host; +import io.libp2p.core.PeerId; +import lombok.extern.java.Log; + +/** + * Service for sending messages on {@link Beefy} protocol. + */ +@Log +public class BeefyService extends NetworkService { + + ConnectionManager connectionManager = ConnectionManager.getInstance(); + + public BeefyService(String protocolId) { + this.protocol = new Beefy(protocolId, new BeefyProtocol()); + } + + //TODO: write doc + public void sendVoteMessage(Host us, PeerId peerId, byte[] encodedMessage) { + //TODO + } + + public void sendHandshake(Host us, PeerId peerId) { + try { + BeefyController controller = this.protocol.dialPeer(us, peerId, us.getAddressBook()); + controller.sendHandshake(); + } catch (Exception e) { + log.warning("Failed to send Beefy handshake to " + peerId); + } + } +} diff --git a/src/main/java/com/limechain/network/protocol/beefy/messages/BeefyMessageType.java b/src/main/java/com/limechain/network/protocol/beefy/messages/BeefyMessageType.java new file mode 100644 index 00000000..9cf4cb25 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/beefy/messages/BeefyMessageType.java @@ -0,0 +1,26 @@ +package com.limechain.network.protocol.beefy.messages; + +import lombok.Getter; + +import java.util.Arrays; + +@Getter +public enum BeefyMessageType { + + HANDSHAKE(-1), + VOTE(0), + SIGNED_COMMITMENT(1); + + private final int type; + + BeefyMessageType(int type) { + this.type = type; + } + + public static BeefyMessageType getByType(int type) { + return Arrays.stream(values()) + .filter(t -> t.type == type) + .findFirst() + .orElse(null); + } +} diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceController.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceController.java index 34b609ac..4929a40e 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceController.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceController.java @@ -1,17 +1,12 @@ package com.limechain.network.protocol.blockannounce; +import com.limechain.network.protocol.base.BaseController; import io.libp2p.core.Stream; -public class BlockAnnounceController { - protected BlockAnnounceEngine engine = new BlockAnnounceEngine(); - protected final Stream stream; +public class BlockAnnounceController extends BaseController { public BlockAnnounceController(Stream stream) { - this.stream = stream; - } - - public void sendHandshake() { - engine.writeHandshakeToStream(stream, stream.remotePeerId()); + super(stream, new BlockAnnounceEngine()); } /** diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index 183bb759..eb9bea1f 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -2,6 +2,7 @@ import com.limechain.exception.scale.ScaleEncodingException; import com.limechain.network.ConnectionManager; +import com.limechain.network.protocol.base.BaseEngine; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; @@ -17,8 +18,6 @@ import io.emeraldpay.polkaj.scale.ScaleCodecWriter; import io.libp2p.core.PeerId; import io.libp2p.core.Stream; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.extern.java.Log; import java.io.ByteArrayOutputStream; @@ -27,15 +26,14 @@ import java.util.logging.Level; @Log -@AllArgsConstructor(access = AccessLevel.PROTECTED) -public class BlockAnnounceEngine { +public class BlockAnnounceEngine implements BaseEngine { public static final int HANDSHAKE_LENGTH = 69; protected ConnectionManager connectionManager; protected WarpSyncState warpSyncState; - private BlockHandler blockHandler; protected BlockAnnounceHandshakeBuilder handshakeBuilder; + private BlockHandler blockHandler; public BlockAnnounceEngine() { connectionManager = ConnectionManager.getInstance(); @@ -44,10 +42,27 @@ public BlockAnnounceEngine() { handshakeBuilder = new BlockAnnounceHandshakeBuilder(); } - public void receiveRequest(byte[] msg, Stream stream) { + @Override + public void handleHandshake(byte[] message, PeerId peerId, Stream stream) { + if (connectionManager.isBlockAnnounceConnected(peerId)) { + log.log(Level.INFO, "Received existing handshake from " + peerId); + stream.close(); + } + + ScaleCodecReader reader = new ScaleCodecReader(message); + BlockAnnounceHandshake handshake = reader.read(BlockAnnounceHandshakeScaleReader.getInstance()); + connectionManager.addBlockAnnounceStream(stream); + connectionManager.updatePeer(peerId, handshake); + log.log(Level.INFO, "Received handshake from " + peerId + "\n" + handshake); + + writeHandshakeToStream(stream, peerId); + } + + @Override + public void receiveRequest(byte[] message, Stream stream) { PeerId peerId = stream.remotePeerId(); boolean connectedToPeer = connectionManager.isBlockAnnounceConnected(peerId); - boolean isHandshake = msg.length == HANDSHAKE_LENGTH; + boolean isHandshake = message.length == HANDSHAKE_LENGTH; if (!connectedToPeer && !isHandshake) { log.log(Level.WARNING, "No handshake for block announce message from Peer " + peerId); @@ -55,31 +70,33 @@ public void receiveRequest(byte[] msg, Stream stream) { } if (isHandshake) { - handleHandshake(msg, peerId, stream, connectedToPeer); + handleHandshake(message, peerId, stream); } else { - handleBlockAnnounce(msg, peerId); + handleBlockAnnounce(message, peerId); } //TODO: Send message to network? module } - private void handleHandshake(byte[] msg, PeerId peerId, Stream stream, boolean connectedToPeer) { - /* We might not need to send a second handshake. - If we already have stored the key it means that we have processed the handshake once. - This might be caused by using a different stream for sending and receiving in libp2p. - */ - if (connectedToPeer) { - log.log(Level.INFO, "Received existing handshake from " + peerId); - stream.close(); - } else { - ScaleCodecReader reader = new ScaleCodecReader(msg); - BlockAnnounceHandshake handshake = reader.read(BlockAnnounceHandshakeScaleReader.getInstance()); - connectionManager.addBlockAnnounceStream(stream); - connectionManager.updatePeer(peerId, handshake); - log.log(Level.INFO, "Received handshake from " + peerId + "\n" + - handshake); - writeHandshakeToStream(stream, peerId); + @Override + public void writeHandshakeToStream(Stream stream, PeerId peerId) { + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) { + writer.write( + BlockAnnounceHandshakeScaleWriter.getInstance(), + handshakeBuilder.getBlockAnnounceHandshake() + ); + } catch (IOException e) { + throw new ScaleEncodingException(e); } + + log.log(Level.INFO, "Sending handshake to " + peerId); + stream.writeAndFlush(buf.toByteArray()); + } + + public void writeBlockAnnounceMessage(Stream stream, PeerId peerId, byte[] encodedBlockAnnounceMessage) { + log.log(Level.FINE, "Sending Block Announce message to peer " + peerId); + stream.writeAndFlush(encodedBlockAnnounceMessage); } private void handleBlockAnnounce(byte[] msg, PeerId peerId) { @@ -98,21 +115,4 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) { blockHandler.handleAnnounced(announce.getHeader(), Instant.now(), peerId); } } - - public void writeHandshakeToStream(Stream stream, PeerId peerId) { - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) { - writer.write(BlockAnnounceHandshakeScaleWriter.getInstance(), handshakeBuilder.getBlockAnnounceHandshake()); - } catch (IOException e) { - throw new ScaleEncodingException(e); - } - - log.log(Level.INFO, "Sending handshake to " + peerId); - stream.writeAndFlush(buf.toByteArray()); - } - - public void writeBlockAnnounceMessage(Stream stream, PeerId peerId, byte[] encodedBlockAnnounceMessage) { - log.log(Level.FINE, "Sending Block Announce message to peer " + peerId); - stream.writeAndFlush(encodedBlockAnnounceMessage); - } } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceProtocol.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceProtocol.java index 8ee7dc15..b23ea7b7 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceProtocol.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceProtocol.java @@ -1,50 +1,31 @@ package com.limechain.network.protocol.blockannounce; import com.limechain.network.ConnectionManager; -import com.limechain.network.encoding.Leb128LengthFrameDecoder; -import com.limechain.network.encoding.Leb128LengthFrameEncoder; +import com.limechain.network.protocol.base.BaseProtocol; import io.libp2p.core.Stream; -import io.libp2p.protocol.ProtocolHandler; import io.libp2p.protocol.ProtocolMessageHandler; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.bytes.ByteArrayEncoder; import lombok.extern.java.Log; import org.jetbrains.annotations.NotNull; -import java.util.concurrent.CompletableFuture; import java.util.logging.Level; +/** + * Handler for BlockAnnounce protocol messages and streams + */ @Log -public class BlockAnnounceProtocol extends ProtocolHandler { - public static final int MAX_HANDSHAKE_SIZE = 1024 * 1024; - public static final int MAX_NOTIFICATION_SIZE = 1024 * 1024; +public class BlockAnnounceProtocol extends BaseProtocol { + + public static final long MAX_HANDSHAKE_SIZE = 1024L * 1024L; + public static final long MAX_NOTIFICATION_SIZE = 1024L * 1024L; public BlockAnnounceProtocol() { super(MAX_HANDSHAKE_SIZE, MAX_NOTIFICATION_SIZE); } - @NotNull - @Override - protected CompletableFuture onStartInitiator(Stream stream) { - stream.pushHandler(new Leb128LengthFrameDecoder()); - stream.pushHandler(new Leb128LengthFrameEncoder()); - - stream.pushHandler(new ByteArrayEncoder()); - NotificationHandler handler = new NotificationHandler(stream); - stream.pushHandler(handler); - return CompletableFuture.completedFuture(handler); - } - - @NotNull @Override - protected CompletableFuture onStartResponder(Stream stream) { - stream.pushHandler(new Leb128LengthFrameDecoder()); - stream.pushHandler(new Leb128LengthFrameEncoder()); - - stream.pushHandler(new ByteArrayEncoder()); - NotificationHandler handler = new NotificationHandler(stream); - stream.pushHandler(handler); - return CompletableFuture.completedFuture(handler); + protected BlockAnnounceProtocol.NotificationHandler createNotificationHandler(Stream stream) { + return new BlockAnnounceProtocol.NotificationHandler(stream); } static class NotificationHandler extends BlockAnnounceController implements ProtocolMessageHandler { @@ -72,7 +53,6 @@ public void onException(Throwable cause) { connectionManager.closeBlockAnnounceStream(stream); if (cause != null) { log.log(Level.WARNING, "Block Announce Exception: " + cause.getMessage()); - cause.printStackTrace(); } else { log.log(Level.WARNING, "Block Announce Exception with unknown cause"); } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/messages/BlockAnnounceHandshakeBuilder.java b/src/main/java/com/limechain/network/protocol/blockannounce/messages/BlockAnnounceHandshakeBuilder.java index c413614b..79485c08 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/messages/BlockAnnounceHandshakeBuilder.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/messages/BlockAnnounceHandshakeBuilder.java @@ -28,6 +28,7 @@ public BlockAnnounceHandshake getBlockAnnounceHandshake() { Hash256 blockHash = lastFinalizedBlockHash == null ? genesisBlockHash : lastFinalizedBlockHash; + return new BlockAnnounceHandshake( nodeRole.getValue(), lastFinalizedBlockNumber, @@ -35,5 +36,4 @@ public BlockAnnounceHandshake getBlockAnnounceHandshake() { genesisBlockHash ); } - } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java index 0919df05..9f1816cb 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java @@ -1,23 +1,15 @@ package com.limechain.network.protocol.grandpa; +import com.limechain.network.protocol.base.BaseController; import io.libp2p.core.Stream; /** * A controller for sending message on a GRANDPA stream. */ -public class GrandpaController { - protected GrandpaEngine engine = new GrandpaEngine(); - protected final Stream stream; +public class GrandpaController extends BaseController { public GrandpaController(Stream stream) { - this.stream = stream; - } - - /** - * Sends a handshake message over the controller stream. - */ - public void sendHandshake() { - engine.writeHandshakeToStream(stream, stream.remotePeerId()); + super(stream, new GrandpaEngine()); } /** diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java index fcd8709f..a1f722b2 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java @@ -1,9 +1,11 @@ package com.limechain.network.protocol.grandpa; +import com.limechain.config.HostConfig; import com.limechain.exception.scale.ScaleEncodingException; import com.limechain.grandpa.GrandpaService; import com.limechain.network.ConnectionManager; -import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; +import com.limechain.network.protocol.base.BaseEngine; +import com.limechain.network.protocol.blockannounce.NodeRole; import com.limechain.network.protocol.grandpa.messages.GrandpaMessageType; import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleReader; @@ -25,8 +27,6 @@ import io.emeraldpay.polkaj.scale.ScaleCodecWriter; import io.libp2p.core.PeerId; import io.libp2p.core.Stream; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.extern.java.Log; import java.io.ByteArrayOutputStream; @@ -37,17 +37,31 @@ * Engine for handling transactions on GRANDPA streams. */ @Log -@AllArgsConstructor(access = AccessLevel.PROTECTED) -public class GrandpaEngine { +public class GrandpaEngine implements BaseEngine { + private static final int HANDSHAKE_LENGTH = 1; + protected ConnectionManager connectionManager; - protected BlockAnnounceHandshakeBuilder handshakeBuilder; protected GrandpaMessageHandler grandpaMessageHandler; + protected HostConfig hostConfig; public GrandpaEngine() { connectionManager = ConnectionManager.getInstance(); - handshakeBuilder = new BlockAnnounceHandshakeBuilder(); grandpaMessageHandler = AppBean.getBean(GrandpaMessageHandler.class); + hostConfig = AppBean.getBean(HostConfig.class); + } + + @Override + public void handleHandshake(byte[] message, PeerId peerId, Stream stream) { + if (connectionManager.isGrandpaConnected(peerId)) { + log.log(Level.INFO, "Received existing grandpa handshake from " + peerId); + stream.close(); + } else { + connectionManager.addGrandpaStream(stream); + connectionManager.getPeerInfo(peerId).setNodeRole(message[0]); + log.log(Level.INFO, "Received grandpa handshake from " + peerId); + writeHandshakeToStream(stream, peerId); + } } /** @@ -65,6 +79,7 @@ public GrandpaEngine() { * @param message received message as byre array * @param stream stream, where the request was received */ + @Override public void receiveRequest(byte[] message, Stream stream) { GrandpaMessageType messageType = getGrandpaMessageType(message); @@ -75,13 +90,93 @@ public void receiveRequest(byte[] message, Stream stream) { } if (stream.isInitiator()) { - handleInitiatorStreamMessage(message, messageType, stream); + handleInitiatorStreamMessage(messageType, stream); } else { handleResponderStreamMessage(message, messageType, stream); } } - private void handleInitiatorStreamMessage(byte[] message, GrandpaMessageType messageType, Stream stream) { + /** + * Send our GRANDPA handshake on a given initiator stream. + * + * @param stream initiator stream to write the message to + * @param peerId peer to send to + */ + @Override + public void writeHandshakeToStream(Stream stream, PeerId peerId) { + NodeRole nodeRole = hostConfig.getNodeRole(); + + byte[] handshake = new byte[]{ + nodeRole.getValue().byteValue() + }; + + log.log(Level.INFO, "Sending grandpa handshake to " + peerId); + stream.writeAndFlush(handshake); + } + + /** + * Send our GRANDPA neighbour message from {@link WarpSyncState} on a given responder stream. + * + * @param stream responder stream to write the message to + * @param peerId peer to send to + */ + public void writeNeighbourMessage(Stream stream, PeerId peerId) { + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) { + writer.write(NeighbourMessageScaleWriter.getInstance(), ProtocolMessageBuilder.buildNeighbourMessage()); + } catch (IOException e) { + throw new ScaleEncodingException(e); + } + + log.log(Level.FINE, "Sending neighbour message to Peer " + peerId); + stream.writeAndFlush(buf.toByteArray()); + } + + /** + * Send our GRANDPA commit message from {@link GrandpaService} on a given responder stream. + * + * @param stream responder stream to write the message to + * @param encodedCommitMessage scale encoded CommitMessage object + */ + public void writeCommitMessage(Stream stream, byte[] encodedCommitMessage) { + log.log(Level.FINE, "Sending commit message to Peer " + stream.remotePeerId()); + stream.writeAndFlush(encodedCommitMessage); + } + + /** + * Send our GRANDPA catch-up request message on a given responder stream. + * + * @param stream responder stream to write the message to + * @param encodedCatchUpReqMessage scale encoded CatchUpRequestMessage object + */ + public void writeCatchUpRequest(Stream stream, byte[] encodedCatchUpReqMessage) { + log.log(Level.FINE, "Sending catch up request to Peer " + stream.remotePeerId()); + stream.writeAndFlush(encodedCatchUpReqMessage); + } + + /** + * Send our GRANDPA catch-up response message on a given responder stream. + * + * @param stream responder stream to write the message to + * @param encodedCatchUpResMessage scale encoded CatchUpResMessage object + */ + public void writeCatchUpResponse(Stream stream, byte[] encodedCatchUpResMessage) { + log.log(Level.FINE, "Sending catch up response to Peer " + stream.remotePeerId()); + stream.writeAndFlush(encodedCatchUpResMessage); + } + + /** + * Send our GRANDPA vote message from {@link GrandpaService} on a given responder stream. + * + * @param stream responder stream to write the message to + * @param encodedVoteMessage scale encoded VoteMessage object + */ + public void writeVoteMessage(Stream stream, byte[] encodedVoteMessage) { + log.log(Level.FINE, "Sending vote message to peer " + stream.remotePeerId()); + stream.writeAndFlush(encodedVoteMessage); + } + + private void handleInitiatorStreamMessage(GrandpaMessageType messageType, Stream stream) { PeerId peerId = stream.remotePeerId(); if (messageType != GrandpaMessageType.HANDSHAKE) { stream.close(); @@ -136,18 +231,6 @@ private GrandpaMessageType getGrandpaMessageType(byte[] message) { return GrandpaMessageType.getByType(message[0]); } - private void handleHandshake(byte[] message, PeerId peerId, Stream stream) { - if (connectionManager.isGrandpaConnected(peerId)) { - log.log(Level.INFO, "Received existing grandpa handshake from " + peerId); - stream.close(); - } else { - connectionManager.addGrandpaStream(stream); - connectionManager.getPeerInfo(peerId).setNodeRole(message[0]); - log.log(Level.INFO, "Received grandpa handshake from " + peerId); - writeHandshakeToStream(stream, peerId); - } - } - private void handleNeighbourMessage(byte[] message, PeerId peerId) { ScaleCodecReader reader = new ScaleCodecReader(message); NeighbourMessage neighbourMessage = reader.read(NeighbourMessageScaleReader.getInstance()); @@ -188,81 +271,4 @@ private void handleCatchupResponseMessage(byte[] message, PeerId peerId) { grandpaMessageHandler.handleCatchUpResponse(peerId, catchUpResMessage, connectionManager::getPeerIds); } - - /** - * Send our GRANDPA handshake on a given initiator stream. - * - * @param stream initiator stream to write the message to - * @param peerId peer to send to - */ - public void writeHandshakeToStream(Stream stream, PeerId peerId) { - byte[] handshake = new byte[]{ - (byte) handshakeBuilder.getBlockAnnounceHandshake().getNodeRole() - }; - - log.log(Level.INFO, "Sending grandpa handshake to " + peerId); - stream.writeAndFlush(handshake); - } - - /** - * Send our GRANDPA neighbour message from {@link WarpSyncState} on a given responder stream. - * - * @param stream responder stream to write the message to - * @param peerId peer to send to - */ - public void writeNeighbourMessage(Stream stream, PeerId peerId) { - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) { - writer.write(NeighbourMessageScaleWriter.getInstance(), ProtocolMessageBuilder.buildNeighbourMessage()); - } catch (IOException e) { - throw new ScaleEncodingException(e); - } - - log.log(Level.FINE, "Sending neighbour message to Peer " + peerId); - stream.writeAndFlush(buf.toByteArray()); - } - - /** - * Send our GRANDPA commit message from {@link GrandpaService} on a given responder stream. - * - * @param stream responder stream to write the message to - * @param encodedCommitMessage scale encoded CommitMessage object - */ - public void writeCommitMessage(Stream stream, byte[] encodedCommitMessage) { - log.log(Level.FINE, "Sending commit message to Peer " + stream.remotePeerId()); - stream.writeAndFlush(encodedCommitMessage); - } - - /** - * Send our GRANDPA catch-up request message on a given responder stream. - * - * @param stream responder stream to write the message to - * @param encodedCatchUpReqMessage scale encoded CatchUpRequestMessage object - */ - public void writeCatchUpRequest(Stream stream, byte[] encodedCatchUpReqMessage) { - log.log(Level.FINE, "Sending catch up request to Peer " + stream.remotePeerId()); - stream.writeAndFlush(encodedCatchUpReqMessage); - } - - /** - * Send our GRANDPA catch-up response message on a given responder stream. - * - * @param stream responder stream to write the message to - * @param encodedCatchUpResMessage scale encoded CatchUpResMessage object - */ - public void writeCatchUpResponse(Stream stream, byte[] encodedCatchUpResMessage) { - log.log(Level.FINE, "Sending catch up response to Peer " + stream.remotePeerId()); - stream.writeAndFlush(encodedCatchUpResMessage); - } - - /** - * Send our GRANDPA vote message from {@link GrandpaService} on a given responder stream. - * - * @param stream responder stream to write the message to - * @param encodedVoteMessage scale encoded VoteMessage object - */ - public void writeVoteMessage(Stream stream, byte[] encodedVoteMessage) { - log.log(Level.FINE, "Sending vote message to peer " + stream.remotePeerId()); - stream.writeAndFlush(encodedVoteMessage); - } } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaProtocol.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaProtocol.java index db9da94a..518a05b8 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaProtocol.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaProtocol.java @@ -1,24 +1,21 @@ package com.limechain.network.protocol.grandpa; import com.limechain.network.ConnectionManager; -import com.limechain.network.encoding.Leb128LengthFrameDecoder; -import com.limechain.network.encoding.Leb128LengthFrameEncoder; +import com.limechain.network.protocol.base.BaseProtocol; import io.libp2p.core.Stream; -import io.libp2p.protocol.ProtocolHandler; import io.libp2p.protocol.ProtocolMessageHandler; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.bytes.ByteArrayEncoder; import lombok.extern.java.Log; import org.jetbrains.annotations.NotNull; -import java.util.concurrent.CompletableFuture; import java.util.logging.Level; /** * Handler for GRANDPA protocol messages and streams. */ @Log -public class GrandpaProtocol extends ProtocolHandler { +public class GrandpaProtocol extends BaseProtocol { + private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; /** @@ -30,44 +27,16 @@ public GrandpaProtocol() { super(TRAFFIC_LIMIT, TRAFFIC_LIMIT); } - /** - * Handles a new opened initiator stream and adds channel and notification handlers to it. - * - * @param stream stream opened - * @return async controller for the stream - */ - @NotNull - @Override - protected CompletableFuture onStartInitiator(Stream stream) { - return onStartStream(stream); - } - - /** - * Handles a new opened responder stream and adds channel and notification handlers to it. - * - * @param stream stream opened - * @return async controller for the stream - */ - @NotNull @Override - protected CompletableFuture onStartResponder(Stream stream) { - return onStartStream(stream); - } - - private CompletableFuture onStartStream(Stream stream) { - stream.pushHandler(new Leb128LengthFrameDecoder()); - stream.pushHandler(new Leb128LengthFrameEncoder()); - - stream.pushHandler(new ByteArrayEncoder()); - GrandpaProtocol.NotificationHandler handler = new GrandpaProtocol.NotificationHandler(stream); - stream.pushHandler(handler); - return CompletableFuture.completedFuture(handler); + protected GrandpaProtocol.NotificationHandler createNotificationHandler(Stream stream) { + return new GrandpaProtocol.NotificationHandler(stream); } /** * Handler for notifications received on the GRANDPA protocol. */ static class NotificationHandler extends GrandpaController implements ProtocolMessageHandler { + ConnectionManager connectionManager = ConnectionManager.getInstance(); public NotificationHandler(Stream stream) { @@ -93,7 +62,6 @@ public void onException(Throwable cause) { connectionManager.closeGrandpaStream(stream); if (cause != null) { log.log(Level.WARNING, "Grandpa Exception: " + cause.getMessage()); - cause.printStackTrace(); } else { log.log(Level.WARNING, "Grandpa Exception with unknown cause"); } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java index 0fadc8af..9a59aa01 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java @@ -14,6 +14,7 @@ */ @Log public class GrandpaService extends NetworkService { + ConnectionManager connectionManager = ConnectionManager.getInstance(); public GrandpaService(String protocolId) { diff --git a/src/main/java/com/limechain/network/protocol/grandpa/messages/GrandpaMessageType.java b/src/main/java/com/limechain/network/protocol/grandpa/messages/GrandpaMessageType.java index 41e3b0ad..eae4a6a3 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/messages/GrandpaMessageType.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/messages/GrandpaMessageType.java @@ -4,10 +4,16 @@ import java.util.Arrays; +@Getter public enum GrandpaMessageType { - HANDSHAKE(-1), VOTE(0), COMMIT(1), NEIGHBOUR(2), CATCH_UP_REQUEST(3), CATCH_UP_RESPONSE(4); - @Getter + HANDSHAKE(-1), + VOTE(0), + COMMIT(1), + NEIGHBOUR(2), + CATCH_UP_REQUEST(3), + CATCH_UP_RESPONSE(4); + private final int type; GrandpaMessageType(int type) { diff --git a/src/main/java/com/limechain/network/protocol/transaction/TransactionController.java b/src/main/java/com/limechain/network/protocol/transaction/TransactionController.java index b50b0bd8..8f8e3d8c 100644 --- a/src/main/java/com/limechain/network/protocol/transaction/TransactionController.java +++ b/src/main/java/com/limechain/network/protocol/transaction/TransactionController.java @@ -1,23 +1,15 @@ package com.limechain.network.protocol.transaction; +import com.limechain.network.protocol.base.BaseController; import io.libp2p.core.Stream; /** * A controller for sending message on a Transactions stream. */ -public class TransactionController { - protected final TransactionEngine engine = new TransactionEngine(); - protected final Stream stream; +public class TransactionController extends BaseController { public TransactionController(Stream stream) { - this.stream = stream; - } - - /** - * Sends a handshake message over the controller stream. - */ - public void sendHandshake() { - engine.writeHandshakeToStream(stream, stream.remotePeerId()); + super(stream, new TransactionEngine()); } /** diff --git a/src/main/java/com/limechain/network/protocol/transaction/TransactionEngine.java b/src/main/java/com/limechain/network/protocol/transaction/TransactionEngine.java index 6d95d287..20665030 100644 --- a/src/main/java/com/limechain/network/protocol/transaction/TransactionEngine.java +++ b/src/main/java/com/limechain/network/protocol/transaction/TransactionEngine.java @@ -1,6 +1,7 @@ package com.limechain.network.protocol.transaction; import com.limechain.network.ConnectionManager; +import com.limechain.network.protocol.base.BaseEngine; import com.limechain.network.protocol.transaction.scale.TransactionReader; import com.limechain.rpc.server.AppBean; import com.limechain.state.AbstractState; @@ -19,7 +20,7 @@ * Engine for handling transactions on Transactions streams. */ @Log -public class TransactionEngine { +public class TransactionEngine implements BaseEngine { //TODO Network improvements: We need a static lock as it seems we create new instances of this engine on // each incoming protocol thread. I am not sure that that is optimal, but I could be wrong. @@ -35,6 +36,19 @@ public TransactionEngine() { transactionProcessor = AppBean.getBean(TransactionProcessor.class); } + @Override + public void handleHandshake(byte[] message, PeerId peerId, Stream stream) { + if (connectionManager.isTransactionsConnected(peerId)) { + log.log(Level.INFO, "Received existing transactions handshake from " + peerId); + stream.close(); + } + + connectionManager.addTransactionsStream(stream); + log.log(Level.INFO, "Received transactions handshake from " + peerId); + + writeHandshakeToStream(stream, peerId); + } + /** * Handles an incoming request as follows: * @@ -50,6 +64,7 @@ public TransactionEngine() { * @param message received message as byre array * @param stream stream, where the request was received */ + @Override public void receiveRequest(byte[] message, Stream stream) { if (message == null || message.length == 0) { log.log(Level.WARNING, @@ -65,6 +80,30 @@ public void receiveRequest(byte[] message, Stream stream) { } } + /** + * Send our Transactions handshake on a given initiator stream. + * + * @param stream initiator stream to write the message to + * @param peerId peer to send to + */ + @Override + public void writeHandshakeToStream(Stream stream, PeerId peerId) { + byte[] handshake = new byte[]{}; + log.log(Level.INFO, "Sending transactions handshake to " + peerId); + stream.writeAndFlush(handshake); + } + + /** + * Send our Transactions message from {@link WarpSyncState} on a given responder stream. + * + * @param stream responder stream to write the message to + * @param encodedTransactionMessage scale encoded transaction message + */ + public void writeTransactionsMessage(Stream stream, byte[] encodedTransactionMessage) { + log.log(Level.INFO, "Sending transaction message to peer " + stream.remotePeerId()); + stream.writeAndFlush(encodedTransactionMessage); + } + private void handleInitiatorStreamMessage(byte[] message, Stream stream) { PeerId peerId = stream.remotePeerId(); @@ -95,24 +134,12 @@ private void handleResponderStreamMessage(byte[] message, Stream stream) { } if (isHandshake(message)) { - handleHandshake(peerId, stream); + handleHandshake(message, peerId, stream); } else { handleTransactionMessage(message, stream); } } - private void handleHandshake(PeerId peerId, Stream stream) { - if (connectionManager.isTransactionsConnected(peerId)) { - log.log(Level.INFO, "Received existing transactions handshake from " + peerId); - stream.close(); - } - - connectionManager.addTransactionsStream(stream); - log.log(Level.INFO, "Received transactions handshake from " + peerId); - - writeHandshakeToStream(stream, peerId); - } - private void handleTransactionMessage(byte[] message, Stream stream) { ScaleCodecReader reader = new ScaleCodecReader(message); ExtrinsicArray transactions = reader.read(TransactionReader.getInstance()); @@ -124,29 +151,6 @@ private void handleTransactionMessage(byte[] message, Stream stream) { } } - /** - * Send our Transactions handshake on a given initiator stream. - * - * @param stream initiator stream to write the message to - * @param peerId peer to send to - */ - public void writeHandshakeToStream(Stream stream, PeerId peerId) { - byte[] handshake = new byte[]{}; - log.log(Level.INFO, "Sending transactions handshake to " + peerId); - stream.writeAndFlush(handshake); - } - - /** - * Send our Transactions message from {@link WarpSyncState} on a given responder stream. - * - * @param stream responder stream to write the message to - * @param encodedTransactionMessage scale encoded transaction message - */ - public void writeTransactionsMessage(Stream stream, byte[] encodedTransactionMessage) { - log.log(Level.INFO, "Sending transaction message to peer " + stream.remotePeerId()); - stream.writeAndFlush(encodedTransactionMessage); - } - private boolean isHandshake(byte[] message) { return message.length == HANDSHAKE_LENGTH; } diff --git a/src/main/java/com/limechain/network/protocol/transaction/TransactionMessages.java b/src/main/java/com/limechain/network/protocol/transaction/TransactionMessages.java index d5ad354e..51a89b23 100644 --- a/src/main/java/com/limechain/network/protocol/transaction/TransactionMessages.java +++ b/src/main/java/com/limechain/network/protocol/transaction/TransactionMessages.java @@ -6,7 +6,7 @@ * Transactions protocol binding */ public class TransactionMessages extends StrictProtocolBinding { - public TransactionMessages(String protocolId, TransactionsProtocol protocol) { + public TransactionMessages(String protocolId, TransactionProtocol protocol) { super(protocolId, protocol); } } diff --git a/src/main/java/com/limechain/network/protocol/transaction/TransactionsProtocol.java b/src/main/java/com/limechain/network/protocol/transaction/TransactionProtocol.java similarity index 58% rename from src/main/java/com/limechain/network/protocol/transaction/TransactionsProtocol.java rename to src/main/java/com/limechain/network/protocol/transaction/TransactionProtocol.java index f028e50a..a5e53f3f 100644 --- a/src/main/java/com/limechain/network/protocol/transaction/TransactionsProtocol.java +++ b/src/main/java/com/limechain/network/protocol/transaction/TransactionProtocol.java @@ -1,24 +1,21 @@ package com.limechain.network.protocol.transaction; import com.limechain.network.ConnectionManager; -import com.limechain.network.encoding.Leb128LengthFrameDecoder; -import com.limechain.network.encoding.Leb128LengthFrameEncoder; +import com.limechain.network.protocol.base.BaseProtocol; import io.libp2p.core.Stream; -import io.libp2p.protocol.ProtocolHandler; import io.libp2p.protocol.ProtocolMessageHandler; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.bytes.ByteArrayEncoder; import lombok.extern.java.Log; import org.jetbrains.annotations.NotNull; -import java.util.concurrent.CompletableFuture; import java.util.logging.Level; /** * Handler for Transactions protocol messages and streams. */ @Log -public class TransactionsProtocol extends ProtocolHandler { +public class TransactionProtocol extends BaseProtocol { + private static final long TRAFFIC_LIMIT = Long.MAX_VALUE; /** @@ -26,48 +23,20 @@ public class TransactionsProtocol extends ProtocolHandler * This is a global decreasing limit for the protocol, that gets reduced by the size of each message. * In the future it should be changed to a per-message limit */ - public TransactionsProtocol() { + public TransactionProtocol() { super(TRAFFIC_LIMIT, TRAFFIC_LIMIT); } - /** - * Handles a new opened initiator stream and adds channel and notification handlers to it. - * - * @param stream stream opened - * @return async controller for the stream - */ - @NotNull - @Override - protected CompletableFuture onStartInitiator(Stream stream) { - return onStartStream(stream); - } - - /** - * Handles a new opened responder stream and adds channel and notification handlers to it. - * - * @param stream stream opened - * @return async controller for the stream - */ - @NotNull @Override - protected CompletableFuture onStartResponder(Stream stream) { - return onStartStream(stream); - } - - private CompletableFuture onStartStream(Stream stream) { - stream.pushHandler(new Leb128LengthFrameDecoder()); - stream.pushHandler(new Leb128LengthFrameEncoder()); - - stream.pushHandler(new ByteArrayEncoder()); - TransactionsProtocol.NotificationHandler handler = new TransactionsProtocol.NotificationHandler(stream); - stream.pushHandler(handler); - return CompletableFuture.completedFuture(handler); + protected TransactionProtocol.NotificationHandler createNotificationHandler(Stream stream) { + return new TransactionProtocol.NotificationHandler(stream); } /** * Handler for notifications received on the Transactions protocol. */ static class NotificationHandler extends TransactionController implements ProtocolMessageHandler { + ConnectionManager connectionManager = ConnectionManager.getInstance(); public NotificationHandler(Stream stream) { diff --git a/src/main/java/com/limechain/network/protocol/transaction/TransactionsService.java b/src/main/java/com/limechain/network/protocol/transaction/TransactionsService.java index 75dc5bcc..2288901f 100644 --- a/src/main/java/com/limechain/network/protocol/transaction/TransactionsService.java +++ b/src/main/java/com/limechain/network/protocol/transaction/TransactionsService.java @@ -13,7 +13,7 @@ public class TransactionsService extends NetworkService { ConnectionManager connectionManager = ConnectionManager.getInstance(); public TransactionsService(String protocolId) { - this.protocol = new TransactionMessages(protocolId, new TransactionsProtocol()); + this.protocol = new TransactionMessages(protocolId, new TransactionProtocol()); } /** diff --git a/src/test/java/com/limechain/network/protocol/BaseUtils.java b/src/test/java/com/limechain/network/protocol/BaseUtils.java new file mode 100644 index 00000000..2393c4be --- /dev/null +++ b/src/test/java/com/limechain/network/protocol/BaseUtils.java @@ -0,0 +1,37 @@ +package com.limechain.network.protocol; + +import com.limechain.network.protocol.base.BaseController; +import com.limechain.network.protocol.base.BaseEngine; +import com.limechain.network.protocol.base.BaseProtocol; +import io.libp2p.core.Stream; +import lombok.experimental.UtilityClass; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +@UtilityClass +public class BaseUtils { + + public void setProtectedEngineField(BaseController controller, BaseEngine engine) throws NoSuchFieldException, IllegalAccessException { + Field engineField = BaseController.class.getDeclaredField("engine"); + engineField.setAccessible(true); + engineField.set(controller, engine); + } + + public Object callProtectedMethod(BaseProtocol protocol, Stream stream, String methodName) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + + Method method = BaseProtocol.class.getDeclaredMethod(methodName, Stream.class); + method.setAccessible(true); + return method.invoke(protocol, stream); + } + + public Stream getProtectedStreamField(BaseController controller) + throws NoSuchFieldException, IllegalAccessException { + + Field streamField = BaseController.class.getDeclaredField("stream"); + streamField.setAccessible(true); + return (Stream) streamField.get(controller); + } +} diff --git a/src/test/java/com/limechain/network/protocol/beefy/BeefyControllerTest.java b/src/test/java/com/limechain/network/protocol/beefy/BeefyControllerTest.java new file mode 100644 index 00000000..cd5fec52 --- /dev/null +++ b/src/test/java/com/limechain/network/protocol/beefy/BeefyControllerTest.java @@ -0,0 +1,39 @@ +package com.limechain.network.protocol.beefy; + +import com.limechain.network.protocol.BaseUtils; +import io.libp2p.core.PeerId; +import io.libp2p.core.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class BeefyControllerTest { + + @InjectMocks + private BeefyController beefyController; + @Mock + private Stream stream; + @Mock + private PeerId peerId; + @Mock + private BeefyEngine engine; + + @BeforeEach + void setup() throws NoSuchFieldException, IllegalAccessException { + BaseUtils.setProtectedEngineField(beefyController, engine); + } + + @Test + void sendHandshake() { + when(stream.remotePeerId()).thenReturn(peerId); + beefyController.sendHandshake(); + verify(engine).writeHandshakeToStream(stream, peerId); + } +} diff --git a/src/test/java/com/limechain/network/protocol/beefy/BeefyProtocolTest.java b/src/test/java/com/limechain/network/protocol/beefy/BeefyProtocolTest.java new file mode 100644 index 00000000..7ee529f7 --- /dev/null +++ b/src/test/java/com/limechain/network/protocol/beefy/BeefyProtocolTest.java @@ -0,0 +1,104 @@ +package com.limechain.network.protocol.beefy; + +import com.limechain.network.ConnectionManager; +import com.limechain.network.encoding.Leb128LengthFrameDecoder; +import com.limechain.network.encoding.Leb128LengthFrameEncoder; +import com.limechain.network.protocol.BaseUtils; +import io.libp2p.core.Stream; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.bytes.ByteArrayEncoder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class BeefyProtocolTest { + + @InjectMocks + private BeefyProtocol beefyProtocol; + @InjectMocks + private BeefyProtocol.NotificationHandler notificationHandler; + @Mock + private BeefyEngine beefyEngine; + @Mock + private Stream stream; + @Mock + private ConnectionManager connectionManager; + + @Test + void onStartInitiator() + throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException { + + Object result = BaseUtils.callProtectedMethod(beefyProtocol, stream, "onStartInitiator"); + BeefyController actualResult = ((CompletableFuture) result).join(); + + verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); + verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); + verify(stream).pushHandler(any(ByteArrayEncoder.class)); + verify(stream).pushHandler(any(BeefyProtocol.NotificationHandler.class)); + + assertEquals(stream, BaseUtils.getProtectedStreamField(actualResult)); + } + + @Test + void onStartResponder() + throws NoSuchFieldException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + + Object result = BaseUtils.callProtectedMethod(beefyProtocol, stream, "onStartResponder"); + BeefyController actualResult = ((CompletableFuture) result).join(); + + verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); + verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); + verify(stream).pushHandler(any(ByteArrayEncoder.class)); + verify(stream).pushHandler(any(BeefyProtocol.NotificationHandler.class)); + + assertEquals(stream, BaseUtils.getProtectedStreamField(actualResult)); + } + + @Test + void onMessage() throws NoSuchFieldException, IllegalAccessException { + + byte[] message = new byte[] { 1, 2, 3 }; + ByteBuf byteBuf = Unpooled.copiedBuffer(message); + + BaseUtils.setProtectedEngineField(notificationHandler, beefyEngine); + notificationHandler.connectionManager = connectionManager; + + notificationHandler.onMessage(stream, byteBuf); + + verify(beefyEngine).receiveRequest(message, stream); + } + + @Test + void onClosed() throws NoSuchFieldException, IllegalAccessException { + + BaseUtils.setProtectedEngineField(notificationHandler, beefyEngine); + notificationHandler.connectionManager = connectionManager; + + notificationHandler.onClosed(stream); + + verify(connectionManager).closeBeefyStream(stream); + } + + @Test + void onException() throws NoSuchFieldException, IllegalAccessException { + + BaseUtils.setProtectedEngineField(notificationHandler, beefyEngine); + notificationHandler.connectionManager = connectionManager; + + notificationHandler.onException(mock(Throwable.class)); + + verify(connectionManager).closeBeefyStream(stream); + } +} diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceControllerTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceControllerTest.java index 21314c8b..ad56cb49 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceControllerTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceControllerTest.java @@ -1,5 +1,6 @@ package com.limechain.network.protocol.blockannounce; +import com.limechain.network.protocol.BaseUtils; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; import io.libp2p.core.PeerId; import io.libp2p.core.Stream; @@ -15,6 +16,7 @@ @ExtendWith(MockitoExtension.class) class BlockAnnounceControllerTest { + @InjectMocks private BlockAnnounceController blockAnnounceController; @Mock @@ -27,8 +29,8 @@ class BlockAnnounceControllerTest { private BlockAnnounceHandshakeBuilder blockAnnounceHandshakeBuilder; @BeforeEach - void setup() { - blockAnnounceController.engine = engine; + void setup() throws NoSuchFieldException, IllegalAccessException { + BaseUtils.setProtectedEngineField(blockAnnounceController, engine); engine.handshakeBuilder = blockAnnounceHandshakeBuilder; } diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java index 8b18967b..4d206462 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java @@ -14,6 +14,7 @@ import com.limechain.sync.warpsync.WarpSyncState; import io.emeraldpay.polkaj.scale.ScaleCodecReader; import io.emeraldpay.polkaj.scale.ScaleCodecWriter; +import io.emeraldpay.polkaj.types.Hash256; import io.libp2p.core.PeerId; import io.libp2p.core.Stream; import org.junit.jupiter.api.Test; @@ -25,6 +26,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; +import java.math.BigInteger; import java.util.Arrays; import static org.mockito.ArgumentMatchers.any; @@ -40,6 +42,7 @@ @SuppressWarnings("unused") @ExtendWith(MockitoExtension.class) class BlockAnnounceEngineTest { + @InjectMocks private BlockAnnounceEngine blockAnnounceEngine; @@ -59,8 +62,6 @@ class BlockAnnounceEngineTest { @Mock private DigestHelper digestHelper; - @Mock - private BlockAnnounceHandshake handshake; @Mock private BlockAnnounceHandshakeBuilder handshakeBuilder; @@ -85,10 +86,10 @@ void receiveHandshakeRequestWhenNotConnectedShouldAddStreamToConnection() { Arrays.fill(message, (byte) 1); when(stream.remotePeerId()).thenReturn(peerId); when(connectionManager.isBlockAnnounceConnected(peerId)).thenReturn(false); - when(handshakeBuilder.getBlockAnnounceHandshake()).thenReturn(handshake); + when(handshakeBuilder.getBlockAnnounceHandshake()).thenReturn(getBlockAnnounceHandshake()); try ( MockedConstruction readerMock = mockConstruction(ScaleCodecReader.class, - (mock, context) -> when(mock.read(any())).thenReturn(handshake)); + (mock, context) -> when(mock.read(any())).thenReturn(getBlockAnnounceHandshake())); MockedConstruction writerMock = mockConstruction(ScaleCodecWriter.class) ) { blockAnnounceEngine.receiveRequest(message, stream); @@ -103,7 +104,7 @@ void receiveHandshakeRequestWhenNotConnectedShouldSendHandshakeBack() throws IOE Arrays.fill(message, (byte) 1); when(stream.remotePeerId()).thenReturn(peerId); when(connectionManager.isBlockAnnounceConnected(peerId)).thenReturn(false); - when(handshakeBuilder.getBlockAnnounceHandshake()).thenReturn(handshake); + when(handshakeBuilder.getBlockAnnounceHandshake()).thenReturn(getBlockAnnounceHandshake()); try ( MockedConstruction readerMock = mockConstruction(ScaleCodecReader.class); MockedConstruction writerMock = mockConstruction(ScaleCodecWriter.class) @@ -111,7 +112,7 @@ void receiveHandshakeRequestWhenNotConnectedShouldSendHandshakeBack() throws IOE blockAnnounceEngine.receiveRequest(message, stream); ScaleCodecWriter writer = writerMock.constructed().getFirst(); - verify(writer).write(any(BlockAnnounceHandshakeScaleWriter.class), eq(handshake)); + verify(writer).write(any(BlockAnnounceHandshakeScaleWriter.class), eq(getBlockAnnounceHandshake())); verify(stream).writeAndFlush(any()); } } @@ -122,6 +123,8 @@ void receiveHandshakeRequestWhenAlreadyConnectedShouldCloseStream() { Arrays.fill(message, (byte) 1); when(stream.remotePeerId()).thenReturn(peerId); when(connectionManager.isBlockAnnounceConnected(peerId)).thenReturn(true); + when(handshakeBuilder.getBlockAnnounceHandshake()).thenReturn(getBlockAnnounceHandshake()); + try (MockedConstruction readerMock = mockConstruction(ScaleCodecReader.class)) { blockAnnounceEngine.receiveRequest(message, stream); @@ -183,4 +186,13 @@ void writeBlockAnnounceMessage() { verify(stream).writeAndFlush(message); } + + private BlockAnnounceHandshake getBlockAnnounceHandshake() { + return new BlockAnnounceHandshake( + NodeRole.AUTHORING.getValue(), + BigInteger.ONE, + Hash256.empty(), + Hash256.empty() + ); + } } \ No newline at end of file diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceProtocolTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceProtocolTest.java index af4eec20..b628b6c5 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceProtocolTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceProtocolTest.java @@ -3,7 +3,7 @@ import com.limechain.network.ConnectionManager; import com.limechain.network.encoding.Leb128LengthFrameDecoder; import com.limechain.network.encoding.Leb128LengthFrameEncoder; -import com.limechain.rpc.server.AppBean; +import com.limechain.network.protocol.BaseUtils; import io.libp2p.core.Stream; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -12,10 +12,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.CompletableFuture; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -23,6 +24,7 @@ @ExtendWith(MockitoExtension.class) class BlockAnnounceProtocolTest { + @InjectMocks private BlockAnnounceProtocol blockAnnounceProtocol; @InjectMocks @@ -35,40 +37,37 @@ class BlockAnnounceProtocolTest { private ConnectionManager connectionManager; @Test - void onStartInitiator() { - try (MockedStatic appBean = Mockito.mockStatic(AppBean.class)) { - appBean.when(() -> AppBean.getBean(BlockAnnounceEngine.class)).thenReturn(blockAnnounceEngine); - BlockAnnounceController result = blockAnnounceProtocol.onStartInitiator(stream).join(); - - verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); - verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); - verify(stream).pushHandler(any(ByteArrayEncoder.class)); - verify(stream).pushHandler(any(BlockAnnounceProtocol.NotificationHandler.class)); - - assertEquals(stream, result.stream); - } + void onStartInitiator() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException { + Object result = BaseUtils.callProtectedMethod(blockAnnounceProtocol, stream, "onStartInitiator"); + BlockAnnounceController actualResult = ((CompletableFuture) result).join(); + + verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); + verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); + verify(stream).pushHandler(any(ByteArrayEncoder.class)); + verify(stream).pushHandler(any(BlockAnnounceProtocol.NotificationHandler.class)); + + assertEquals(stream, BaseUtils.getProtectedStreamField(actualResult)); } @Test - void onStartResponder() { - try (MockedStatic appBean = Mockito.mockStatic(AppBean.class)) { - appBean.when(() -> AppBean.getBean(BlockAnnounceEngine.class)).thenReturn(blockAnnounceEngine); - BlockAnnounceController result = blockAnnounceProtocol.onStartResponder(stream).join(); - - verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); - verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); - verify(stream).pushHandler(any(ByteArrayEncoder.class)); - verify(stream).pushHandler(any(BlockAnnounceProtocol.NotificationHandler.class)); - - assertEquals(stream, result.stream); - } + void onStartResponder() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + Object result = BaseUtils.callProtectedMethod(blockAnnounceProtocol, stream, "onStartResponder"); + BlockAnnounceController actualResult = ((CompletableFuture) result).join(); + + verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); + verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); + verify(stream).pushHandler(any(ByteArrayEncoder.class)); + verify(stream).pushHandler(any(BlockAnnounceProtocol.NotificationHandler.class)); + + assertEquals(stream, BaseUtils.getProtectedStreamField(actualResult)); } @Test - void onMessage() { + void onMessage() throws NoSuchFieldException, IllegalAccessException { byte[] message = new byte[]{1, 2, 3}; ByteBuf byteBuf = Unpooled.copiedBuffer(message); - notificationHandler.engine = blockAnnounceEngine; + + BaseUtils.setProtectedEngineField(notificationHandler, blockAnnounceEngine); notificationHandler.connectionManager = connectionManager; notificationHandler.onMessage(stream, byteBuf); @@ -77,8 +76,9 @@ void onMessage() { } @Test - void onClosed() { - notificationHandler.engine = blockAnnounceEngine; + void onClosed() throws NoSuchFieldException, IllegalAccessException { + + BaseUtils.setProtectedEngineField(notificationHandler, blockAnnounceEngine); notificationHandler.connectionManager = connectionManager; notificationHandler.onClosed(stream); @@ -87,8 +87,9 @@ void onClosed() { } @Test - void onException() { - notificationHandler.engine = blockAnnounceEngine; + void onException() throws NoSuchFieldException, IllegalAccessException { + + BaseUtils.setProtectedEngineField(notificationHandler, blockAnnounceEngine); notificationHandler.connectionManager = connectionManager; notificationHandler.onException(mock(Throwable.class)); diff --git a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaControllerTest.java b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaControllerTest.java index a0fabd43..10d5e448 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaControllerTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaControllerTest.java @@ -1,5 +1,6 @@ package com.limechain.network.protocol.grandpa; +import com.limechain.network.protocol.BaseUtils; import io.libp2p.core.PeerId; import io.libp2p.core.Stream; import org.junit.jupiter.api.BeforeEach; @@ -14,6 +15,7 @@ @ExtendWith(MockitoExtension.class) class GrandpaControllerTest { + @InjectMocks private GrandpaController grandpaController; @Mock @@ -24,8 +26,8 @@ class GrandpaControllerTest { private GrandpaEngine engine; @BeforeEach - void setup() { - grandpaController.engine = engine; + void setup() throws NoSuchFieldException, IllegalAccessException { + BaseUtils.setProtectedEngineField(grandpaController, engine); } @Test diff --git a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java index 43d00312..fb3be9a5 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java @@ -1,11 +1,10 @@ package com.limechain.network.protocol.grandpa; +import com.limechain.config.HostConfig; import com.limechain.grandpa.state.GrandpaSetState; import com.limechain.network.ConnectionManager; import com.limechain.network.dto.PeerInfo; import com.limechain.network.protocol.blockannounce.NodeRole; -import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake; -import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleReader; import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage; @@ -45,6 +44,7 @@ @SuppressWarnings("unused") @ExtendWith(MockitoExtension.class) class GrandpaEngineTest { + @InjectMocks private GrandpaEngine grandpaEngine; @Mock @@ -58,7 +58,7 @@ class GrandpaEngineTest { @Mock private GrandpaSetState grandpaSetState; @Mock - private BlockAnnounceHandshakeBuilder blockAnnounceHandshakeBuilder; + private HostConfig hostConfig; private final NeighbourMessage neighbourMessage = new NeighbourMessage(1, BigInteger.ONE, BigInteger.TWO, BigInteger.TEN); @@ -68,7 +68,6 @@ class GrandpaEngineTest { private final byte[] encodedCommitMessage = new byte[]{2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0}; - @Test void receiveRequestWithUnknownGrandpaTypeShouldLogAndIgnore() { byte[] unknownTypeMessage = new byte[]{7, 1, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0}; @@ -161,7 +160,7 @@ void receiveHandshakeRequestOnResponderStreamWhenNotConnectedShouldAddStreamToCo when(stream.remotePeerId()).thenReturn(peerId); when(connectionManager.isGrandpaConnected(peerId)).thenReturn(false); when(connectionManager.getPeerInfo(peerId)).thenReturn(mock(PeerInfo.class)); - when(blockAnnounceHandshakeBuilder.getBlockAnnounceHandshake()).thenReturn(mock(BlockAnnounceHandshake.class)); + when(hostConfig.getNodeRole()).thenReturn(NodeRole.AUTHORING); grandpaEngine.receiveRequest(message, stream); @@ -174,19 +173,15 @@ void receiveHandshakeRequestOnResponderStreamWhenNotConnectedShouldSendHandshake try (MockedStatic mockedState = mockStatic(AbstractState.class)) { mockedState.when(AbstractState::getSyncMode).thenReturn(SyncMode.HEAD); byte[] message = new byte[]{2}; - Integer role = NodeRole.LIGHT.getValue(); - when(stream.isInitiator()).thenReturn(false); when(stream.remotePeerId()).thenReturn(peerId); when(connectionManager.isGrandpaConnected(peerId)).thenReturn(false); when(connectionManager.getPeerInfo(peerId)).thenReturn(mock(PeerInfo.class)); - BlockAnnounceHandshake handshake = mock(BlockAnnounceHandshake.class); - when(blockAnnounceHandshakeBuilder.getBlockAnnounceHandshake()).thenReturn(handshake); - when(handshake.getNodeRole()).thenReturn(role); + when(hostConfig.getNodeRole()).thenReturn(NodeRole.LIGHT); grandpaEngine.receiveRequest(message, stream); - verify(stream).writeAndFlush(new byte[]{role.byteValue()}); + verify(stream).writeAndFlush(new byte[]{NodeRole.LIGHT.getValue().byteValue()}); } } @@ -283,7 +278,6 @@ void receiveCatchUpResponseMessageOnResponderStreamShouldLogAndIgnore() { ) { grandpaEngine.receiveRequest(message, stream); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(grandpaMessageHandler); } } @@ -291,14 +285,9 @@ void receiveCatchUpResponseMessageOnResponderStreamShouldLogAndIgnore() { // WRITE @Test void writeHandshakeToStream() { - Integer role = NodeRole.LIGHT.getValue(); - BlockAnnounceHandshake handshake = mock(BlockAnnounceHandshake.class); - when(blockAnnounceHandshakeBuilder.getBlockAnnounceHandshake()).thenReturn(handshake); - when(handshake.getNodeRole()).thenReturn(role); - + when(hostConfig.getNodeRole()).thenReturn(NodeRole.LIGHT); grandpaEngine.writeHandshakeToStream(stream, peerId); - - verify(stream).writeAndFlush(new byte[]{role.byteValue()}); + verify(stream).writeAndFlush(new byte[]{NodeRole.LIGHT.getValue().byteValue()}); } @Test diff --git a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaProtocolTest.java b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaProtocolTest.java index 7dff1549..2dabf6aa 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaProtocolTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaProtocolTest.java @@ -3,6 +3,7 @@ import com.limechain.network.ConnectionManager; import com.limechain.network.encoding.Leb128LengthFrameDecoder; import com.limechain.network.encoding.Leb128LengthFrameEncoder; +import com.limechain.network.protocol.BaseUtils; import io.libp2p.core.Stream; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -13,6 +14,9 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.CompletableFuture; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -20,6 +24,7 @@ @ExtendWith(MockitoExtension.class) class GrandpaProtocolTest { + @InjectMocks private GrandpaProtocol grandpaProtocol; @InjectMocks @@ -32,34 +37,41 @@ class GrandpaProtocolTest { private ConnectionManager connectionManager; @Test - void onStartInitiator() { - GrandpaController result = grandpaProtocol.onStartInitiator(stream).join(); + void onStartInitiator() + throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException { + + Object result = BaseUtils.callProtectedMethod(grandpaProtocol, stream, "onStartInitiator"); + GrandpaController actualResult = ((CompletableFuture) result).join(); verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); verify(stream).pushHandler(any(ByteArrayEncoder.class)); verify(stream).pushHandler(any(GrandpaProtocol.NotificationHandler.class)); - assertEquals(stream, result.stream); + assertEquals(stream, BaseUtils.getProtectedStreamField(actualResult)); } @Test - void onStartResponder() { - GrandpaController result = grandpaProtocol.onStartResponder(stream).join(); + void onStartResponder() + throws NoSuchFieldException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + + Object result = BaseUtils.callProtectedMethod(grandpaProtocol, stream, "onStartResponder"); + GrandpaController actualResult = ((CompletableFuture) result).join(); verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); verify(stream).pushHandler(any(ByteArrayEncoder.class)); verify(stream).pushHandler(any(GrandpaProtocol.NotificationHandler.class)); - assertEquals(stream, result.stream); + assertEquals(stream, BaseUtils.getProtectedStreamField(actualResult)); } @Test - void onMessage() { + void onMessage() throws NoSuchFieldException, IllegalAccessException { byte[] message = new byte[] { 1, 2, 3 }; ByteBuf byteBuf = Unpooled.copiedBuffer(message); - notificationHandler.engine = grandpaEngine; + + BaseUtils.setProtectedEngineField(notificationHandler, grandpaEngine); notificationHandler.connectionManager = connectionManager; notificationHandler.onMessage(stream, byteBuf); @@ -68,8 +80,9 @@ void onMessage() { } @Test - void onClosed() { - notificationHandler.engine = grandpaEngine; + void onClosed() throws NoSuchFieldException, IllegalAccessException { + + BaseUtils.setProtectedEngineField(notificationHandler, grandpaEngine); notificationHandler.connectionManager = connectionManager; notificationHandler.onClosed(stream); @@ -78,8 +91,9 @@ void onClosed() { } @Test - void onException() { - notificationHandler.engine = grandpaEngine; + void onException() throws NoSuchFieldException, IllegalAccessException { + + BaseUtils.setProtectedEngineField(notificationHandler, grandpaEngine); notificationHandler.connectionManager = connectionManager; notificationHandler.onException(mock(Throwable.class)); diff --git a/src/test/java/com/limechain/network/protocol/transaction/TransactionControllerTest.java b/src/test/java/com/limechain/network/protocol/transaction/TransactionControllerTest.java new file mode 100644 index 00000000..c6cf0ff3 --- /dev/null +++ b/src/test/java/com/limechain/network/protocol/transaction/TransactionControllerTest.java @@ -0,0 +1,46 @@ +package com.limechain.network.protocol.transaction; + +import com.limechain.network.protocol.BaseUtils; +import io.libp2p.core.PeerId; +import io.libp2p.core.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class TransactionControllerTest { + + @InjectMocks + private TransactionController transactionController; + @Mock + private Stream stream; + @Mock + private PeerId peerId; + @Mock + private TransactionEngine engine; + + @BeforeEach + void setup() throws NoSuchFieldException, IllegalAccessException { + BaseUtils.setProtectedEngineField(transactionController, engine); + } + + @Test + void sendHandshake() { + when(stream.remotePeerId()).thenReturn(peerId); + transactionController.sendHandshake(); + verify(engine).writeHandshakeToStream(stream, peerId); + } + + @Test + void sendTransactionsMessage() { + byte[] encodedCommitMessage = {1, 0, 0, 0, 2, 0, 1, 1, 1, 1, 0, 0, 0, 1, 2, 0}; + transactionController.sendTransactionsMessage(encodedCommitMessage); + verify(engine).writeTransactionsMessage(stream, encodedCommitMessage); + } +} diff --git a/src/test/java/com/limechain/network/protocol/transaction/TransactionProtocolTest.java b/src/test/java/com/limechain/network/protocol/transaction/TransactionProtocolTest.java new file mode 100644 index 00000000..6f196c54 --- /dev/null +++ b/src/test/java/com/limechain/network/protocol/transaction/TransactionProtocolTest.java @@ -0,0 +1,104 @@ +package com.limechain.network.protocol.transaction; + +import com.limechain.network.ConnectionManager; +import com.limechain.network.encoding.Leb128LengthFrameDecoder; +import com.limechain.network.encoding.Leb128LengthFrameEncoder; +import com.limechain.network.protocol.BaseUtils; +import io.libp2p.core.Stream; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.bytes.ByteArrayEncoder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class TransactionProtocolTest { + + @InjectMocks + private TransactionProtocol transactionProtocol; + @InjectMocks + private TransactionProtocol.NotificationHandler notificationHandler; + @Mock + private TransactionEngine transactionEngine; + @Mock + private Stream stream; + @Mock + private ConnectionManager connectionManager; + + @Test + void onStartInitiator() + throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException { + + Object result = BaseUtils.callProtectedMethod(transactionProtocol, stream, "onStartInitiator"); + TransactionController actualResult = ((CompletableFuture) result).join(); + + verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); + verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); + verify(stream).pushHandler(any(ByteArrayEncoder.class)); + verify(stream).pushHandler(any(TransactionProtocol.NotificationHandler.class)); + + assertEquals(stream, BaseUtils.getProtectedStreamField(actualResult)); + } + + @Test + void onStartResponder() + throws NoSuchFieldException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + + Object result = BaseUtils.callProtectedMethod(transactionProtocol, stream, "onStartResponder"); + TransactionController actualResult = ((CompletableFuture) result).join(); + + verify(stream).pushHandler(any(Leb128LengthFrameEncoder.class)); + verify(stream).pushHandler(any(Leb128LengthFrameDecoder.class)); + verify(stream).pushHandler(any(ByteArrayEncoder.class)); + verify(stream).pushHandler(any(TransactionProtocol.NotificationHandler.class)); + + assertEquals(stream, BaseUtils.getProtectedStreamField(actualResult)); + } + + @Test + void onMessage() throws NoSuchFieldException, IllegalAccessException { + + byte[] message = new byte[] { 1, 2, 3 }; + ByteBuf byteBuf = Unpooled.copiedBuffer(message); + + BaseUtils.setProtectedEngineField(notificationHandler, transactionEngine); + notificationHandler.connectionManager = connectionManager; + + notificationHandler.onMessage(stream, byteBuf); + + verify(transactionEngine).receiveRequest(message, stream); + } + + @Test + void onClosed() throws NoSuchFieldException, IllegalAccessException { + + BaseUtils.setProtectedEngineField(notificationHandler, transactionEngine); + notificationHandler.connectionManager = connectionManager; + + notificationHandler.onClosed(stream); + + verify(connectionManager).closeTransactionsStream(stream); + } + + @Test + void onException() throws NoSuchFieldException, IllegalAccessException { + + BaseUtils.setProtectedEngineField(notificationHandler, transactionEngine); + notificationHandler.connectionManager = connectionManager; + + notificationHandler.onException(mock(Throwable.class)); + + verify(connectionManager).closeTransactionsStream(stream); + } +}