Skip to content

Commit

Permalink
769 implement beefy networking (#794)
Browse files Browse the repository at this point in the history
# Description

- Add all components needed to receive and send messages on the libp2p
substream "/beefy/2"
- Implement abstract classes for the network layer in order to reduce
code duplication

Fixes #769
  • Loading branch information
Grigorov-Georgi authored Feb 27, 2025
1 parent f4fed72 commit dd6b029
Show file tree
Hide file tree
Showing 41 changed files with 1,138 additions and 432 deletions.
83 changes: 61 additions & 22 deletions src/main/java/com/limechain/network/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
@Log
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class ConnectionManager {

private static ConnectionManager instance;
protected final Map<PeerId, PeerInfo> peers = new HashMap<>();

Expand All @@ -45,6 +46,14 @@ public PeerInfo getPeerInfo(PeerId peerId) {
return peers.get(peerId);
}

/**
* Adds a Transaction stream to the peer info. Peer id is retrieved from the stream.
* @param stream stream to be added
*/
public void addTransactionsStream(Stream stream) {
addStream(stream, ProtocolStreamType.TRANSACTIONS);
}

/**
* Adds a Block Announce stream to the peer info. Peer id is retrieved from the stream.
*
Expand All @@ -63,8 +72,12 @@ public void addGrandpaStream(Stream stream) {
addStream(stream, ProtocolStreamType.GRANDPA);
}

public void addTransactionsStream(Stream stream) {
addStream(stream, ProtocolStreamType.TRANSACTIONS);
/**
* Adds a BEEFY stream to the peer info. Peer id is retrieved from the stream.
* @param stream stream to be added
*/
public void addBeefyStream(Stream stream) {
addStream(stream, ProtocolStreamType.BEEFY);
}

private void addStream(Stream stream, ProtocolStreamType type) {
Expand Down Expand Up @@ -94,15 +107,6 @@ public PeerInfo addNewPeer(PeerId peerId) {
return peerInfo;
}

/**
* Removes a GRANDPA stream from the peer info. Peer id is retrieved from the stream.
*
* @param stream stream to be closed
*/
public void closeGrandpaStream(Stream stream) {
closeStream(stream, ProtocolStreamType.GRANDPA);
}

/**
* Removes a Transactions stream from the peer info. Peer id is retrieved from the stream.
*
Expand All @@ -121,6 +125,23 @@ public void closeBlockAnnounceStream(Stream stream) {
closeStream(stream, ProtocolStreamType.BLOCK_ANNOUNCE);
}

/**
* Removes a GRANDPA stream from the peer info. Peer id is retrieved from the stream.
*
* @param stream stream to be closed
*/
public void closeGrandpaStream(Stream stream) {
closeStream(stream, ProtocolStreamType.GRANDPA);
}

/**
* Removes a BEEFY stream from the peer info. Peer id is retrieved from the stream
* @param stream stream to be closed
*/
public void closeBeefyStream(Stream stream) {
closeStream(stream, ProtocolStreamType.BEEFY);
}

private void closeStream(Stream stream, ProtocolStreamType type) {
PeerInfo peerInfo = peers.get(stream.remotePeerId());
if (peerInfo == null) {
Expand Down Expand Up @@ -179,16 +200,6 @@ public void updatePeer(PeerId peerId, BlockAnnounceMessage blockAnnounceMessage)
}
}

/**
* Checks if we have an open GRANDPA responder stream with a peer.
*
* @param peerId peer to check
* @return do peer info and GRANDPA responder stream exist
*/
public boolean isGrandpaConnected(PeerId peerId) {
return peers.containsKey(peerId) && peers.get(peerId).getGrandpaStreams().getResponder() != null;
}

/**
* Checks if we have an open Transactions responder stream with a peer.
*
Expand All @@ -209,6 +220,25 @@ public boolean isBlockAnnounceConnected(PeerId peerId) {
return peers.containsKey(peerId) && peers.get(peerId).getBlockAnnounceStreams().getResponder() != null;
}

/**
* Checks if we have an open GRANDPA responder stream with a peer.
*
* @param peerId peer to check
* @return do peer info and GRANDPA responder stream exist
*/
public boolean isGrandpaConnected(PeerId peerId) {
return peers.containsKey(peerId) && peers.get(peerId).getGrandpaStreams().getResponder() != null;
}

/**
* Checks if we have an open BEEFY responder steam with a peer
* @param peerId peer to check
* @return do peer info and BEEFY responder steam exist
*/
public boolean isBeefyConnected(PeerId peerId) {
return peers.containsKey(peerId) && peers.get(peerId).getBeefyStreams().getResponder() != null;
}

/**
* Gets the ids of all peers with open connections.
* Open connection means either Grandpa or Block Announce stream has been opened.
Expand All @@ -223,10 +253,14 @@ public Set<PeerId> getPeerIds() {
* Closes conneciton to all the connected peers and removes them from the peersList.
*/
public void removeAllPeers() {
peers.forEach((peerId, peerInfo) -> {

peers.values().forEach(peerInfo -> {
closeProtocolStream(peerInfo.getTransactionsStreams());
closeProtocolStream(peerInfo.getBlockAnnounceStreams());
closeProtocolStream(peerInfo.getGrandpaStreams());
closeProtocolStream(peerInfo.getBeefyStreams());
});

peers.clear();
}

Expand All @@ -236,10 +270,15 @@ public void removeAllPeers() {
* @param peerId peerId of the peer to be removed
*/
public void removePeer(PeerId peerId) {

if (peers.containsKey(peerId)) {

PeerInfo peerInfo = peers.get(peerId);
closeProtocolStream(peerInfo.getTransactionsStreams());
closeProtocolStream(peerInfo.getBlockAnnounceStreams());
closeProtocolStream(peerInfo.getGrandpaStreams());
closeProtocolStream(peerInfo.getBeefyStreams());

peers.remove(peerId);
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/com/limechain/network/NetworkService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.limechain.config.HostConfig;
import com.limechain.constants.GenesisBlockHash;
import com.limechain.network.kad.KademliaService;
import com.limechain.network.protocol.beefy.BeefyService;
import com.limechain.network.protocol.blockannounce.BlockAnnounceService;
import com.limechain.network.protocol.blockannounce.NodeRole;
import com.limechain.network.protocol.grandpa.GrandpaService;
Expand Down Expand Up @@ -70,9 +71,10 @@ public class NetworkService implements NodeService {
private WarpSyncService warpSyncService;
private LightMessagesService lightMessagesService;

private TransactionsService transactionsService;
private BlockAnnounceService blockAnnounceService;
private GrandpaService grandpaService;
private TransactionsService transactionsService;
private BeefyService beefyService;

private Ping ping;

Expand Down Expand Up @@ -104,6 +106,7 @@ public NetworkService(ChainService chainService, HostConfig hostConfig, KVReposi
private void initializeProtocols(ChainService chainService, GenesisBlockHash genesisBlockHash,
HostConfig hostConfig,
KVRepository<String, Object> repository, CliArguments cliArgs) {

boolean isLocalEnabled = hostConfig.getChain() == Chain.LOCAL;
boolean clientMode = true;

Expand All @@ -118,26 +121,31 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen
String pingProtocol = ProtocolUtils.PING_PROTOCOL;
String chainId = chainService.getChainSpec().getProtocolId();
boolean legacyProtocol = !cliArgs.noLegacyProtocols();
String protocolId = legacyProtocol ? chainId :
StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString());
String genesisBlockHashWithoutPrefix = StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString());
String protocolId = legacyProtocol ?
chainId :
genesisBlockHashWithoutPrefix;

String kadProtocolId = ProtocolUtils.getKadProtocol(chainId);
String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(protocolId);
String lightProtocolId = ProtocolUtils.getLightMessageProtocol(protocolId);
String syncProtocolId = ProtocolUtils.getSyncProtocol(protocolId);
String stateProtocolId = ProtocolUtils.getStateProtocol(protocolId);
String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId);
String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(protocolId);
String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId, legacyProtocol);
String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId);
String beefyProtocolId = ProtocolUtils.getBeefyProtocol(genesisBlockHashWithoutPrefix);

kademliaService = new KademliaService(kadProtocolId, hostId, isLocalEnabled, clientMode);
lightMessagesService = new LightMessagesService(lightProtocolId);
warpSyncService = new WarpSyncService(warpProtocolId);
syncService = new SyncService(syncProtocolId);
stateService = new StateService(stateProtocolId);
transactionsService = new TransactionsService(transactionsProtocolId);
blockAnnounceService = new BlockAnnounceService(blockAnnounceProtocolId);
grandpaService = new GrandpaService(grandpaProtocolId);
beefyService = new BeefyService(beefyProtocolId);
ping = new Ping(pingProtocol, new PingProtocol());
transactionsService = new TransactionsService(transactionsProtocolId);

hostBuilder.addProtocols(
List.of(
Expand All @@ -148,7 +156,8 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen
syncService.getProtocol(),
stateService.getProtocol(),
blockAnnounceService.getProtocol(),
grandpaService.getProtocol()
grandpaService.getProtocol(),
beefyService.getProtocol()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public void handshakePeers() {
asyncExecutor.executeAndForget(() ->
network.getGrandpaService().sendHandshake(network.getHost(), peerId));

asyncExecutor.executeAndForget(() ->
network.getBeefyService().sendHandshake(network.getHost(), peerId));

if (network.getNodeRole().equals(NodeRole.AUTHORING)) {
asyncExecutor.executeAndForget(() ->
network.getTransactionsService().sendHandshake(network.getHost(), peerId));
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/com/limechain/network/ProtocolUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,26 @@ public static String getStateProtocol(String chainId) {
return String.format("/%s/state/2", chainId);
}

public static String getBlockAnnounceProtocol(String chainId) {
return String.format("/%s/block-announces/1", chainId);
}

public static String getKadProtocol(String chainId) {
return String.format("/%s/kad", chainId);
}

public static String getTransactionsProtocol(String chainId) {
return String.format("/%s/transactions/1", chainId);
}

public static String getBlockAnnounceProtocol(String chainId) {
return String.format("/%s/block-announces/1", chainId);
}

public static String getGrandpaProtocol(String chainId, boolean legacyProtocol) {
return String.format("/%s/grandpa/1", legacyProtocol ? "paritytech" : chainId);
}

public static String getTransactionsProtocol(String chainId) {
return String.format("/%s/transactions/1", chainId);
// NOTE: Beefy was likely not part of the original protocols and therefore
// only operates with the genesis hash. As a result, it does not support
// the {chainId}/beefy/2 format.
public static String getBeefyProtocol(String genesisBlockHash) {
return String.format("/%s/beefy/2", genesisBlockHash);
}
}
14 changes: 9 additions & 5 deletions src/main/java/com/limechain/network/dto/PeerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
@Data
@NoArgsConstructor
public class PeerInfo {

private final ProtocolStreams transactionsStreams = new ProtocolStreams();
private final ProtocolStreams blockAnnounceStreams = new ProtocolStreams();
private final ProtocolStreams grandpaStreams = new ProtocolStreams();
private final ProtocolStreams beefyStreams = new ProtocolStreams();

private PeerId peerId;
private int nodeRole;
private BigInteger bestBlock;
private Hash256 bestBlockHash;
private Hash256 genesisBlockHash;
private BigInteger latestBlock = BigInteger.ZERO;
private final ProtocolStreams blockAnnounceStreams = new ProtocolStreams();
private final ProtocolStreams grandpaStreams = new ProtocolStreams();
private final ProtocolStreams transactionsStreams = new ProtocolStreams();

public String getNodeRoleName(){
return Arrays
Expand All @@ -33,9 +36,10 @@ public String getNodeRoleName(){

public ProtocolStreams getProtocolStreams(ProtocolStreamType type) {
return switch (type) {
case GRANDPA -> grandpaStreams;
case BLOCK_ANNOUNCE -> blockAnnounceStreams;
case TRANSACTIONS -> transactionsStreams;
case BLOCK_ANNOUNCE -> blockAnnounceStreams;
case GRANDPA -> grandpaStreams;
case BEEFY -> beefyStreams;
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.limechain.network.dto;

public enum ProtocolStreamType {
GRANDPA, BLOCK_ANNOUNCE, TRANSACTIONS
TRANSACTIONS, BLOCK_ANNOUNCE, GRANDPA, BEEFY
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.limechain.network.protocol.base;

import io.libp2p.core.Stream;

/**
* An abstract controller for sending message on a specific stream
*/
public abstract class BaseController<E extends BaseEngine> {

protected final Stream stream;
protected final E engine;

protected BaseController(Stream stream, E engine) {
this.stream = stream;
this.engine = engine;
}

/**
* Sends a handshake message over the controller stream.
*/
public void sendHandshake() {
engine.writeHandshakeToStream(stream, stream.remotePeerId());
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/limechain/network/protocol/base/BaseEngine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.limechain.network.protocol.base;

import io.libp2p.core.PeerId;
import io.libp2p.core.Stream;

/**
* Abstract engine for handling transactions on specific streams
*/
public interface BaseEngine {

void handleHandshake(byte[] message, PeerId peerId, Stream stream);

void receiveRequest(byte[] message, Stream stream);

void writeHandshakeToStream(Stream stream, PeerId peerId);
}
Loading

0 comments on commit dd6b029

Please sign in to comment.