Skip to content

Commit

Permalink
refactor: core async (#478)
Browse files Browse the repository at this point in the history
* refactor: cleanup core

test: auto dial on startup

* fix: make hangup work properly

* chore: fix lint

* chore: apply suggestions from code review

Co-Authored-By: Vasco Santos <[email protected]>
  • Loading branch information
jacobheun and vasco-santos authored Nov 19, 2019
1 parent 4adee59 commit 17946fb
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 179 deletions.
7 changes: 2 additions & 5 deletions src/get-peer-info.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,15 @@ function getPeerInfoRemote (peer, libp2p) {
try {
peerInfo = getPeerInfo(peer, libp2p.peerStore)
} catch (err) {
return Promise.reject(errCode(
new Error(`${peer} is not a valid peer type`),
'ERR_INVALID_PEER_TYPE'
))
throw errCode(err, 'ERR_INVALID_PEER_TYPE')
}

// If we don't have an address for the peer, attempt to find it
if (peerInfo.multiaddrs.size < 1) {
return libp2p.peerRouting.findPeer(peerInfo.id)
}

return Promise.resolve(peerInfo)
return peerInfo
}

module.exports = {
Expand Down
204 changes: 62 additions & 142 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
'use strict'

const FSM = require('fsm-event')
const { EventEmitter } = require('events')
const debug = require('debug')
const log = debug('libp2p')
log.error = debug('libp2p:error')
const errCode = require('err-code')
const promisify = require('promisify-es6')

const each = require('async/each')

const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const Switch = require('./switch')
const Ping = require('./ping')

const { emitFirst } = require('./util')
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
Expand All @@ -34,20 +26,11 @@ const {
multicodecs: IDENTIFY_PROTOCOLS
} = require('./identify')

const notStarted = (action, state) => {
return errCode(
new Error(`libp2p cannot ${action} when not started; state is ${state}`),
codes.ERR_NODE_NOT_STARTED
)
}

/**
* @fires Libp2p#error Emitted when an error occurs
* @fires Libp2p#peer:connect Emitted when a peer is connected to this node
* @fires Libp2p#peer:disconnect Emitted when a peer disconnects from this node
* @fires Libp2p#peer:discovery Emitted when a peer is discovered
* @fires Libp2p#start Emitted when the node and its services has started
* @fires Libp2p#stop Emitted when the node and its services has stopped
*/
class Libp2p extends EventEmitter {
constructor (_options) {
Expand All @@ -67,9 +50,6 @@ class Libp2p extends EventEmitter {

this.peerStore = new PeerStore()

// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)

// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
Expand Down Expand Up @@ -158,63 +138,7 @@ class Libp2p extends EventEmitter {
this.contentRouting = contentRouting(this)
this.dht = dht(this)

// Mount default protocols
Ping.mount(this._switch)

this.state = new FSM('STOPPED', {
STOPPED: {
start: 'STARTING',
stop: 'STOPPED',
done: 'STOPPED'
},
STARTING: {
done: 'STARTED',
abort: 'STOPPED',
stop: 'STOPPING'
},
STARTED: {
stop: 'STOPPING',
start: 'STARTED'
},
STOPPING: {
stop: 'STOPPING',
done: 'STOPPED'
}
})
this.state.on('STARTING', () => {
log('libp2p is starting')
this._onStarting()
})
this.state.on('STOPPING', () => {
log('libp2p is stopping')
})
this.state.on('STARTED', () => {
log('libp2p has started')
this.emit('start')
})
this.state.on('STOPPED', () => {
log('libp2p has stopped')
this.emit('stop')
})
this.state.on('error', (err) => {
log.error(err)
this.emit('error', err)
})

// Once we start, emit and dial any peers we may have already discovered
this.state.on('STARTED', () => {
for (const peerInfo of this.peerStore.peers) {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
}
})

this._peerDiscovered = this._peerDiscovered.bind(this)

// promisify all instance methods
;['start', 'hangUp', 'ping'].forEach(method => {
this[method] = promisify(this[method], { context: this })
})
}

/**
Expand All @@ -233,14 +157,23 @@ class Libp2p extends EventEmitter {
}

/**
* Starts the libp2p node and all sub services
* Starts the libp2p node and all its subsystems
*
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
start (callback = () => {}) {
emitFirst(this, ['error', 'start'], callback)
this.state('start')
async start () {
log('libp2p is starting')
try {
await this._onStarting()
await this._onDidStart()
log('libp2p has started')
} catch (err) {
this.emit('error', err)
log.error('An error occurred starting libp2p', err)
await this.stop()
throw err
}
this._isStarted = true
}

/**
Expand All @@ -249,23 +182,22 @@ class Libp2p extends EventEmitter {
* @returns {void}
*/
async stop () {
this.state('stop')
log('libp2p is stopping')

try {
this.pubsub && await this.pubsub.stop()
await this.transportManager.close()
await this._switch.stop()
} catch (err) {
if (err) {
log.error(err)
this.emit('error', err)
}
}
this.state('done')
log('libp2p has stopped')
}

isStarted () {
return this.state ? this.state._state === 'STARTED' : false
return this._isStarted
}

/**
Expand Down Expand Up @@ -319,36 +251,30 @@ class Libp2p extends EventEmitter {
}

/**
* Disconnects from the given peer
* Disconnects all connections to the given `peer`
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
* @param {function(Error)} callback
* @returns {void}
* @param {PeerId} peer The PeerId to close connections to
* @returns {Promise<void>}
*/
hangUp (peer, callback) {
getPeerInfoRemote(peer, this)
.then(peerInfo => {
this._switch.hangUp(peerInfo, callback)
}, callback)
hangUp (peer) {
return Promise.all(
this.registrar.connections.get(peer.toB58String()).map(connection => {
return connection.close()
})
)
}

/**
* Pings the provided peer
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
* @param {function(Error, Ping)} callback
* @returns {void}
*/
ping (peer, callback) {
if (!this.isStarted()) {
return callback(notStarted('ping', this.state._state))
}

getPeerInfoRemote(peer, this)
.then(peerInfo => {
callback(null, new Ping(this._switch, peerInfo))
}, callback)
}
// TODO: Update ping
// /**
// * Pings the provided peer
// *
// * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
// * @returns {Promise<Ping>}
// */
// ping (peer) {
// const peerInfo = await getPeerInfoRemote(peer, this)
// return new Ping(this._switch, peerInfo)
// }

/**
* Registers the `handler` for each protocol
Expand Down Expand Up @@ -379,32 +305,25 @@ class Libp2p extends EventEmitter {
}

async _onStarting () {
if (!this._modules.transport) {
this.emit('error', new Error('no transports were present'))
return this.state('abort')
}

const multiaddrs = this.peerInfo.multiaddrs.toArray()

// Start parallel tasks
const tasks = [
this.transportManager.listen(multiaddrs)
]
await this.transportManager.listen(multiaddrs)

if (this._config.pubsub.enabled) {
this.pubsub && this.pubsub.start()
}
}

try {
await Promise.all(tasks)
} catch (err) {
log.error(err)
this.emit('error', err)
return this.state('stop')
/**
* Called when libp2p has started and before it returns
* @private
*/
_onDidStart () {
// Once we start, emit and dial any peers we may have already discovered
for (const peerInfo of this.peerStore.peers.values()) {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
}

// libp2p has started
this.state('done')
}

/**
Expand Down Expand Up @@ -435,15 +354,18 @@ class Libp2p extends EventEmitter {
* @private
* @param {PeerInfo} peerInfo
*/
_maybeConnect (peerInfo) {
// If auto dialing is on, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !peerInfo.isConnected()) {
async _maybeConnect (peerInfo) {
// If auto dialing is on and we have no connection to the peer, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !this.registrar.connections.get(peerInfo)) {
const minPeers = this._options.connectionManager.minPeers || 0
if (minPeers > Object.keys(this._switch.connection.connections).length) {
// TODO: This does not account for multiple connections to a peer
if (minPeers > this.registrar.connections.size) {
log('connecting to discovered peer')
this._switch.dialer.connect(peerInfo, (err) => {
err && log.error('could not connect to discovered peer', err)
})
try {
await this.dialer.connectToPeer(peerInfo)
} catch (err) {
log.error('could not connect to discovered peer', err)
}
}
}
}
Expand All @@ -452,9 +374,9 @@ class Libp2p extends EventEmitter {
* Initializes and starts peer discovery services
*
* @private
* @param {function(Error)} callback
* @returns {Promise<void>}
*/
_setupPeerDiscovery (callback) {
_setupPeerDiscovery () {
for (const DiscoveryService of this._modules.peerDiscovery) {
let config = {
enabled: true // on by default
Expand All @@ -480,9 +402,7 @@ class Libp2p extends EventEmitter {
}
}

each(this._discovery, (d, cb) => {
d.start(cb)
}, callback)
return this._discovery.map(d => d.start())
}
}

Expand Down
18 changes: 18 additions & 0 deletions src/transport-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ class TransportManager {
* @param {Multiaddr[]} addrs
*/
async listen (addrs) {
if (addrs.length === 0) {
log('no addresses were provided for listening, this node is dial only')
return
}

const couldNotListen = []
for (const [key, transport] of this._transports.entries()) {
const supportedAddrs = transport.filter(addrs)
const tasks = []
Expand All @@ -133,6 +139,12 @@ class TransportManager {
tasks.push(listener.listen(addr))
}

// Keep track of transports we had no addresses for
if (tasks.length === 0) {
couldNotListen.push(key)
continue
}

const results = await pSettle(tasks)
// If we are listening on at least 1 address, succeed.
// TODO: we should look at adding a retry (`p-retry`) here to better support
Expand All @@ -143,6 +155,12 @@ class TransportManager {
throw errCode(new Error(`Transport (${key}) could not listen on any available address`), codes.ERR_NO_VALID_ADDRESSES)
}
}

// If no transports were able to listen, throw an error. This likely
// means we were given addresses we do not have transports for
if (couldNotListen.length === this._transports.size) {
throw errCode(new Error(`no valid addresses were provided for transports [${couldNotListen}]`), codes.ERR_NO_VALID_ADDRESSES)
}
}

/**
Expand Down
Loading

0 comments on commit 17946fb

Please sign in to comment.