Skip to content

Commit

Permalink
fix: improve connection maintenance with circuit
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Nov 28, 2019
1 parent a0abe0f commit 08cb39d
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 37 deletions.
7 changes: 5 additions & 2 deletions src/circuit/circuit/hop.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ const { validateAddrs } = require('./utils')
const StreamHandler = require('./stream-handler')
const { CircuitRelay: CircuitPB } = require('../protocol')
const pipe = require('it-pipe')
const errCode = require('err-code')
const { codes: Errors } = require('../../errors')

const { stop } = require('./stop')

const multicodec = require('./../multicodec')

const log = debug('libp2p:circuit:hop')
log.err = debug('libp2p:circuit:hop:error')
log.error = debug('libp2p:circuit:hop:error')

module.exports.handleHop = async function handleHop ({
connection,
Expand Down Expand Up @@ -112,7 +114,8 @@ module.exports.hop = async function hop ({
}

log('hop request failed with code %d, closing stream', response.code)
return streamHandler.close()
streamHandler.close()
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/circuit/circuit/stream-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class StreamHandler {
const msg = await this.decoder.next()
if (msg.value) {
const value = CircuitPB.decode(msg.value.slice())
log('read', value)
log('read message type', value.type)
return value
}

Expand All @@ -47,7 +47,7 @@ class StreamHandler {
* @param {*} msg An unencoded CircuitRelay protobuf message
*/
write (msg) {
log('write', msg)
log('write message type %s', msg.type)
this.shake.write(lp.encode.single(CircuitPB.encode(msg)))
}

Expand Down
48 changes: 29 additions & 19 deletions src/circuit/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const withIs = require('class-is')
const { CircuitRelay: CircuitPB } = require('./protocol')

Expand Down Expand Up @@ -103,26 +104,33 @@ class Circuit {
const addrs = ma.toString().split('/p2p-circuit')
const relayAddr = multiaddr(addrs[0])
const destinationAddr = multiaddr(addrs[addrs.length - 1])

const relayPeer = PeerId.createFromCID(relayAddr.getPeerId())
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
const relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options)
const virtualConnection = await hop({
connection: relayConnection,
circuit: this,
request: {
type: CircuitPB.Type.HOP,
srcPeer: {
id: this.peerInfo.id.toBytes(),
addrs: this.peerInfo.multiaddrs.toArray().map(addr => addr.buffer)
},
dstPeer: {
id: destinationPeer.toBytes(),
addrs: [multiaddr(destinationAddr).buffer]

let disconnectOnFailure = false
let relayConnection = this._registrar.getConnection(new PeerInfo(relayPeer))
if (!relayConnection) {
relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options)
disconnectOnFailure = true
}

try {
const virtualConnection = await hop({
connection: relayConnection,
circuit: this,
request: {
type: CircuitPB.Type.HOP,
srcPeer: {
id: this.peerInfo.id.toBytes(),
addrs: this.peerInfo.multiaddrs.toArray().map(addr => addr.buffer)
},
dstPeer: {
id: destinationPeer.toBytes(),
addrs: [multiaddr(destinationAddr).buffer]
}
}
}
})
})

if (virtualConnection) {
const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerInfo.id.toB58String()}`)
const maConn = toConnection({
stream: virtualConnection,
Expand All @@ -132,8 +140,10 @@ class Circuit {
log('new outbound connection %s', maConn.remoteAddr)

return this._upgrader.upgradeOutbound(maConn)
} else {
// TODO: throw an error
} catch (err) {
log.error('Circuit relay dial failed', err)
disconnectOnFailure && await relayConnection.close()
throw err
}
}

Expand Down
13 changes: 5 additions & 8 deletions src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ const log = debug('libp2p:circuit:listener')
log.err = debug('libp2p:circuit:error:listener')

/**
* @param {object} properties
* @param {Dialer} properties.dialer
* @param {object} properties.options
* @param {*} circuit
* @returns {Listener} a transport listener
*/
module.exports = (circuit, options) => {
module.exports = (circuit) => {
const listener = new EventEmitter()
const listeningAddrs = new Map()

Expand All @@ -24,12 +23,10 @@ module.exports = (circuit, options) => {
* @return {void}
*/
listener.listen = async (addr) => {
let [addrString] = String(addr).split('/p2p-circuit').slice(-1)
const [addrString] = String(addr).split('/p2p-circuit').slice(-1)

const relayConn = await circuit._dialer.connectToMultiaddr(multiaddr(addrString))
const relayedAddr = relayConn.remoteAddr.encapsulate(`/p2p-circuit`)

console.log('Relayed addr %s', String(relayedAddr))
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')

listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
listener.emit('listening')
Expand Down
1 change: 1 addition & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ exports.codes = {
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
ERR_HOP_REQUEST_FAILED: 'ERR_HOP_REQUEST_FAILED',
ERR_INVALID_KEY: 'ERR_INVALID_KEY',
ERR_INVALID_MESSAGE: 'ERR_INVALID_MESSAGE',
ERR_INVALID_PEER: 'ERR_INVALID_PEER',
Expand Down
101 changes: 95 additions & 6 deletions test/dialing/relay.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const sinon = require('sinon')

const multiaddr = require('multiaddr')
const { collect } = require('streaming-iterables')
const pipe = require('it-pipe')
const { createPeerInfoFromFixture } = require('../utils/creators/peer')
const baseOptions = require('../utils/base-options')
const Libp2p = require('../../src')
const { codes: Errors } = require('../../src/errors')

describe('Dialing (via relay, TCP)', () => {
let srcLibp2p
Expand Down Expand Up @@ -51,16 +53,10 @@ describe('Dialing (via relay, TCP)', () => {
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)

// Connect the target peer and the relay, since the relay is not active
const destToRelayConn = await dstLibp2p.dial(relayAddr)
expect(destToRelayConn).to.exist()

const tcpAddrs = dstLibp2p.transportManager.getAddrs()
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])

dstLibp2p.transportManager.getAddrs().forEach(addr => console.log(String(addr)))

const connection = await srcLibp2p.dial(dialAddr)
expect(connection).to.exist()
expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerInfo.id.toBytes())
Expand All @@ -82,4 +78,97 @@ describe('Dialing (via relay, TCP)', () => {
)
expect(output.slice()).to.eql(input)
})

it('should fail to connect to a peer over a relay with inactive connections', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()

const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)

try {
await srcLibp2p.dial(dialAddr)
expect.fail('Dial should have failed')
} catch (err) {
expect(err).to.exist()
expect(err).to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
}
})

it('should not stay connected to a relay when not already connected and HOP fails', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()

const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)

try {
await srcLibp2p.dial(dialAddr)
expect.fail('Dial should have failed')
} catch (err) {
expect(err).to.exist()
expect(err).to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
}

// We should not be connected to the relay, because we weren't before the dial
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
expect(srcToRelayConn).to.not.exist()
})

it('dialer should stay connected to an already connected relay on hop failure', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()

const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)

// Connect to the relay first
await srcLibp2p.dial(relayAddr)

try {
await srcLibp2p.dial(dialAddr)
expect.fail('Dial should have failed')
} catch (err) {
expect(err).to.exist()
expect(err).to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
}

const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
expect(srcToRelayConn).to.exist()
expect(srcToRelayConn.stat.status).to.equal('open')
})

it('destination peer should stay connected to an already connected relay on hop failure', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()

const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)

// Connect the destination peer and the relay
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])

// Tamper with the our multiaddrs for the circuit message
sinon.stub(srcLibp2p.peerInfo.multiaddrs, 'toArray').returns([{
buffer: Buffer.from('an invalid multiaddr')
}])

try {
await srcLibp2p.dial(dialAddr)
expect.fail('Dial should have failed')
} catch (err) {
expect(err).to.exist()
expect(err).to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
}

const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
expect(dstToRelayConn).to.exist()
expect(dstToRelayConn.stat.status).to.equal('open')
})
})

0 comments on commit 08cb39d

Please sign in to comment.