Skip to content

Commit

Permalink
feat: add baseEngine class
Browse files Browse the repository at this point in the history
  • Loading branch information
Grigorov-Georgi committed Feb 25, 2025
1 parent 0361e4b commit 15b2f56
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 207 deletions.
16 changes: 16 additions & 0 deletions src/main/java/com/limechain/network/protocol/BaseEngine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.limechain.network.protocol;

import com.limechain.network.ConnectionManager;
import io.libp2p.core.PeerId;
import io.libp2p.core.Stream;

public abstract class BaseEngine {

protected final ConnectionManager connectionManager = ConnectionManager.getInstance();

protected abstract void handleHandshake(byte[] message, PeerId peerId, Stream stream);

public abstract void receiveRequest(byte[] message, Stream stream);

public abstract void writeHandshakeToStream(Stream stream, PeerId peerId);
}
55 changes: 28 additions & 27 deletions src/main/java/com/limechain/network/protocol/beefy/BeefyEngine.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.limechain.network.protocol.beefy;

import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.BaseEngine;
import com.limechain.network.protocol.beefy.messages.BeefyMessageType;
import com.limechain.rpc.server.AppBean;
import com.limechain.state.AbstractState;
Expand All @@ -15,18 +15,30 @@
* Engine for handling transactions on BEEFY streams
*/
@Log
public class BeefyEngine {
public class BeefyEngine extends BaseEngine {

private static final int HANDSHAKE_LENGTH = 1;

protected ConnectionManager connectionManager;
protected BeefyMessageHandler beefyMessageHandler;

public BeefyEngine() {
connectionManager = ConnectionManager.getInstance();
beefyMessageHandler = AppBean.getBean(BeefyMessageHandler.class);
}

@Override
protected void handleHandshake(byte[] message, PeerId peerId, Stream stream) {
if (connectionManager.isBeefyConnected(peerId)) {
log.log(Level.INFO, "Received existing beefy handshake from " + peerId);
stream.close();
} else {
connectionManager.addBeefyStream(stream);
connectionManager.getPeerInfo(peerId).setNodeRole(message[0]);
log.log(Level.INFO, "Received beefy handshake from " + peerId);
writeHandshakeToStream(stream, peerId);
}
}

@Override
public void receiveRequest(byte[] message, Stream stream) {
BeefyMessageType messageType = getBeefyMessageType(message);

Expand All @@ -43,6 +55,18 @@ public void receiveRequest(byte[] message, Stream stream) {
}
}

/**
* Send our BEEFY handshake on a given initiator stream
* @param stream initiator stream to write the message to
* @param peerId peer to send to
*/
@Override
public void writeHandshakeToStream(Stream stream, PeerId peerId) {
byte[] handshake = new byte[]{};
log.log(Level.INFO, "Sending beefy handshake to " + peerId);
stream.writeAndFlush(handshake);
}

private void handleInitiatorStreamMessage(BeefyMessageType messageType, Stream stream) {
PeerId peerId = stream.remotePeerId();
if (messageType != BeefyMessageType.HANDSHAKE) {
Expand Down Expand Up @@ -76,18 +100,6 @@ private void handleResponderStreamMessage(byte[] message, BeefyMessageType messa
}
}

private void handleHandshake(byte[] message, PeerId peerId, Stream stream) {
if (connectionManager.isBeefyConnected(peerId)) {
log.log(Level.INFO, "Received existing beefy handshake from " + peerId);
stream.close();
} else {
connectionManager.addBeefyStream(stream);
connectionManager.getPeerInfo(peerId).setNodeRole(message[0]);
log.log(Level.INFO, "Received beefy handshake from " + peerId);
writeHandshakeToStream(stream, peerId);
}
}

private void handleVoteMessage(byte[] message, PeerId peerId) {
//TODO
}
Expand All @@ -96,17 +108,6 @@ private void handleSignedCommitmentMessage(byte[] message, PeerId peerId) {
//TODO
}

/**
* Send our BEEFY handshake on a given initiator stream
* @param stream initiator stream to write the message to
* @param peerId peer to send to
*/
public void writeHandshakeToStream(Stream stream, PeerId peerId) {
byte[] handshake = new byte[]{};
log.log(Level.INFO, "Sending beefy handshake to " + peerId);
stream.writeAndFlush(handshake);
}

private BeefyMessageType getBeefyMessageType(byte[] message) {
return message.length == HANDSHAKE_LENGTH ?
BeefyMessageType.HANDSHAKE :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void onException(Throwable cause) {
connectionManager.closeBeefyStream(stream);
if (cause != null) {
log.log(Level.WARNING, "Beefy Exception: " + cause.getMessage());
cause.printStackTrace();
} else {
log.log(Level.WARNING, "Beefy Exception with unknown cause");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.limechain.network.protocol.blockannounce;

import com.limechain.exception.scale.ScaleEncodingException;
import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.BaseEngine;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
Expand All @@ -28,77 +28,57 @@

@Log
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class BlockAnnounceEngine {
public class BlockAnnounceEngine extends BaseEngine {

public static final int HANDSHAKE_LENGTH = 69;

protected ConnectionManager connectionManager;
protected WarpSyncState warpSyncState;
private BlockHandler blockHandler;
protected BlockAnnounceHandshakeBuilder handshakeBuilder;
private BlockHandler blockHandler;

public BlockAnnounceEngine() {
connectionManager = ConnectionManager.getInstance();
warpSyncState = AppBean.getBean(WarpSyncState.class);
blockHandler = AppBean.getBean(BlockHandler.class);
handshakeBuilder = new BlockAnnounceHandshakeBuilder();
}

public void receiveRequest(byte[] msg, Stream stream) {
@Override
protected void handleHandshake(byte[] message, PeerId peerId, Stream stream) {
if (connectionManager.isBlockAnnounceConnected(peerId)) {
log.log(Level.INFO, "Received existing handshake from " + peerId);
stream.close();
}

ScaleCodecReader reader = new ScaleCodecReader(message);
BlockAnnounceHandshake handshake = reader.read(BlockAnnounceHandshakeScaleReader.getInstance());
connectionManager.addBlockAnnounceStream(stream);
connectionManager.updatePeer(peerId, handshake);
log.log(Level.INFO, "Received handshake from " + peerId + "\n" + handshake);

writeHandshakeToStream(stream, peerId);
}

@Override
public void receiveRequest(byte[] message, Stream stream) {
PeerId peerId = stream.remotePeerId();
boolean connectedToPeer = connectionManager.isBlockAnnounceConnected(peerId);
boolean isHandshake = msg.length == HANDSHAKE_LENGTH;
boolean isHandshake = message.length == HANDSHAKE_LENGTH;

if (!connectedToPeer && !isHandshake) {
log.log(Level.WARNING, "No handshake for block announce message from Peer " + peerId);
return;
}

if (isHandshake) {
handleHandshake(msg, peerId, stream, connectedToPeer);
handleHandshake(message, peerId, stream);
} else {
handleBlockAnnounce(msg, peerId);
handleBlockAnnounce(message, peerId);
}

//TODO: Send message to network? module
}

private void handleHandshake(byte[] msg, PeerId peerId, Stream stream, boolean connectedToPeer) {
/* We might not need to send a second handshake.
If we already have stored the key it means that we have processed the handshake once.
This might be caused by using a different stream for sending and receiving in libp2p.
*/
if (connectedToPeer) {
log.log(Level.INFO, "Received existing handshake from " + peerId);
stream.close();
} else {
ScaleCodecReader reader = new ScaleCodecReader(msg);
BlockAnnounceHandshake handshake = reader.read(BlockAnnounceHandshakeScaleReader.getInstance());
connectionManager.addBlockAnnounceStream(stream);
connectionManager.updatePeer(peerId, handshake);
log.log(Level.INFO, "Received handshake from " + peerId + "\n" +
handshake);
writeHandshakeToStream(stream, peerId);
}
}

private void handleBlockAnnounce(byte[] msg, PeerId peerId) {
BlockAnnounceMessage announce = ScaleUtils.Decode.decode(msg, BlockAnnounceMessageScaleReader.getInstance());
connectionManager.updatePeer(peerId, announce);
//TODO Yordan: Do we actually need this since each block has a runtime?
warpSyncState.syncBlockAnnounce(announce);
log.log(Level.FINE, "Received block announce for block #" + announce.getHeader().getBlockNumber() +
" from " + peerId +
" with hash:" + announce.getHeader().getHash() +
" parentHash:" + announce.getHeader().getParentHash() +
" stateRoot:" + announce.getHeader().getStateRoot());

if (AppBean.getBean(BlockState.class).isInitialized()) {
//TODO Network improvements: Block requests should be sent to the peer that announced the block itself.
blockHandler.handleAnnounced(announce.getHeader(), Instant.now(), peerId);
}
}

@Override
public void writeHandshakeToStream(Stream stream, PeerId peerId) {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) {
Expand All @@ -118,4 +98,21 @@ public void writeBlockAnnounceMessage(Stream stream, PeerId peerId, byte[] encod
log.log(Level.FINE, "Sending Block Announce message to peer " + peerId);
stream.writeAndFlush(encodedBlockAnnounceMessage);
}

private void handleBlockAnnounce(byte[] msg, PeerId peerId) {
BlockAnnounceMessage announce = ScaleUtils.Decode.decode(msg, BlockAnnounceMessageScaleReader.getInstance());
connectionManager.updatePeer(peerId, announce);
//TODO Yordan: Do we actually need this since each block has a runtime?
warpSyncState.syncBlockAnnounce(announce);
log.log(Level.FINE, "Received block announce for block #" + announce.getHeader().getBlockNumber() +
" from " + peerId +
" with hash:" + announce.getHeader().getHash() +
" parentHash:" + announce.getHeader().getParentHash() +
" stateRoot:" + announce.getHeader().getStateRoot());

if (AppBean.getBean(BlockState.class).isInitialized()) {
//TODO Network improvements: Block requests should be sent to the peer that announced the block itself.
blockHandler.handleAnnounced(announce.getHeader(), Instant.now(), peerId);
}
}
}
Loading

0 comments on commit 15b2f56

Please sign in to comment.