Skip to content

Commit

Permalink
p2p/adapters: Use RPC with SimNodes
Browse files Browse the repository at this point in the history
Signed-off-by: Lewis Marshall <[email protected]>
  • Loading branch information
lmars committed Apr 27, 2017
1 parent 5c003a3 commit 71d8590
Show file tree
Hide file tree
Showing 35 changed files with 417 additions and 425 deletions.
2 changes: 1 addition & 1 deletion cmd/swarm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func getPassPhrase(prompt string, i int, passwords []string) string {
return password
}

func injectBootnodes(srv *p2p.Server, nodes []string) {
func injectBootnodes(srv p2p.Server, nodes []string) {
for _, url := range nodes {
n, err := discover.ParseNode(url)
if err != nil {
Expand Down
26 changes: 12 additions & 14 deletions cmd/wnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const quitCommand = "~Q"

// singletons
var (
server *p2p.Server
server p2p.Server
shh *whisper.Whisper
done chan struct{}
mailServer mailserver.WMailServer
Expand Down Expand Up @@ -253,19 +253,17 @@ func initialize() {
maxPeers = 800
}

server = &p2p.Server{
Config: p2p.Config{
PrivateKey: nodeid,
MaxPeers: maxPeers,
Name: common.MakeName("wnode", "5.0"),
Protocols: shh.Protocols(),
ListenAddr: *argIP,
NAT: nat.Any(),
BootstrapNodes: peers,
StaticNodes: peers,
TrustedNodes: peers,
},
}
server = p2p.NewServer(p2p.Config{
PrivateKey: nodeid,
MaxPeers: maxPeers,
Name: common.MakeName("wnode", "5.0"),
Protocols: shh.Protocols(),
ListenAddr: *argIP,
NAT: nat.Any(),
BootstrapNodes: peers,
StaticNodes: peers,
TrustedNodes: peers,
})
}

func startServer() {
Expand Down
2 changes: 1 addition & 1 deletion contracts/release/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (r *ReleaseService) Protocols() []p2p.Protocol { return nil }
func (r *ReleaseService) APIs() []rpc.API { return nil }

// Start spawns the periodic version checker goroutine
func (r *ReleaseService) Start(server *p2p.Server) error {
func (r *ReleaseService) Start(server p2p.Server) error {
go r.checker()
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
)

type LesServer interface {
Start(srvr *p2p.Server)
Start(srvr p2p.Server)
Stop()
Protocols() []p2p.Protocol
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol {

// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
func (s *Ethereum) Start(srvr p2p.Server) error {
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())

s.protocolManager.Start()
Expand Down
4 changes: 2 additions & 2 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const historyUpdateRange = 50
type Service struct {
stack *node.Node // Temporary workaround, remove when API finalized

server *p2p.Server // Peer-to-peer server to retrieve networking infos
server p2p.Server // Peer-to-peer server to retrieve networking infos
eth *eth.Ethereum // Full Ethereum service if monitoring a full node
les *les.LightEthereum // Light Ethereum service if monitoring a light node
engine consensus.Engine // Consensus engine to retrieve variadic block fields
Expand Down Expand Up @@ -101,7 +101,7 @@ func (s *Service) Protocols() []p2p.Protocol { return nil }
func (s *Service) APIs() []rpc.API { return nil }

// Start implements node.Service, starting up the monitoring and reporting daemon.
func (s *Service) Start(server *p2p.Server) error {
func (s *Service) Start(server p2p.Server) error {
s.server = server
go s.loop()

Expand Down
4 changes: 2 additions & 2 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,12 +1434,12 @@ func (api *PrivateDebugAPI) SetHead(number hexutil.Uint64) {

// PublicNetAPI offers network related RPC methods
type PublicNetAPI struct {
net *p2p.Server
net p2p.Server
networkVersion int
}

// NewPublicNetAPI creates a new net API instance.
func NewPublicNetAPI(net *p2p.Server, networkVersion int) *PublicNetAPI {
func NewPublicNetAPI(net p2p.Server, networkVersion int) *PublicNetAPI {
return &PublicNetAPI{net, networkVersion}
}

Expand Down
2 changes: 1 addition & 1 deletion les/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {

// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *LightEthereum) Start(srvr *p2p.Server) error {
func (s *LightEthereum) Start(srvr p2p.Server) error {
log.Warn("Light client mode is an experimental feature")
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.netVersionId)
s.protocolManager.Start(srvr)
Expand Down
10 changes: 7 additions & 3 deletions les/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const (
disableClientRemovePeer = false
)

type discV5Server interface {
DiscV5() *discv5.Network
}

// errIncompatibleConfig is returned if the requested protocols and configs are
// not compatible (low protocol version restrictions and high requirements).
var errIncompatibleConfig = errors.New("incompatible configuration")
Expand Down Expand Up @@ -256,10 +260,10 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}

func (pm *ProtocolManager) Start(srvr *p2p.Server) {
func (pm *ProtocolManager) Start(srvr p2p.Server) {
var topicDisc *discv5.Network
if srvr != nil {
topicDisc = srvr.DiscV5
if v, ok := srvr.(discV5Server); ok {
topicDisc = v.DiscV5()
}
lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync {
Expand Down
2 changes: 1 addition & 1 deletion les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *LesServer) Protocols() []p2p.Protocol {
}

// Start starts the LES server
func (s *LesServer) Start(srvr *p2p.Server) {
func (s *LesServer) Start(srvr p2p.Server) {
s.protocolManager.Start(srvr)
}

Expand Down
8 changes: 4 additions & 4 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ const (
type serverPool struct {
db ethdb.Database
dbKey []byte
server *p2p.Server
server p2p.Server
quit chan struct{}
wg *sync.WaitGroup
connWg sync.WaitGroup
Expand All @@ -118,7 +118,7 @@ type serverPool struct {
}

// newServerPool creates a new serverPool instance
func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
func newServerPool(db ethdb.Database, dbPrefix []byte, server p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
pool := &serverPool{
db: db,
dbKey: append(dbPrefix, []byte(topic)...),
Expand All @@ -139,11 +139,11 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic
pool.loadNodes()
pool.checkDial()

if pool.server.DiscV5 != nil {
if srv, ok := pool.server.(discV5Server); ok && srv.DiscV5() != nil {
pool.discSetPeriod = make(chan time.Duration, 1)
pool.discNodes = make(chan *discv5.Node, 100)
pool.discLookups = make(chan bool, 100)
go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
go srv.DiscV5().SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
}

go pool.eventLoop()
Expand Down
2 changes: 1 addition & 1 deletion node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func NewPublicWeb3API(stack *Node) *PublicWeb3API {

// ClientVersion returns the node name
func (s *PublicWeb3API) ClientVersion() string {
return s.stack.Server().Name
return s.stack.serverConfig.Name
}

// Sha3 applies the ethereum sha3 implementation on the input.
Expand Down
8 changes: 4 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Node struct {
instanceDirLock storage.Storage // prevents concurrent use of instance directory

serverConfig p2p.Config
server *p2p.Server // Currently running P2P networking layer
server p2p.Server // Currently running P2P networking layer

serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
services map[reflect.Type]Service // Currently running services
Expand Down Expand Up @@ -165,7 +165,7 @@ func (n *Node) Start() error {
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
running := &p2p.Server{Config: n.serverConfig}
running := p2p.NewServer(n.serverConfig)
log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

// Otherwise copy and specialize the P2P configuration
Expand Down Expand Up @@ -194,7 +194,7 @@ func (n *Node) Start() error {
}
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)
n.serverConfig.Protocols = append(n.serverConfig.Protocols, service.Protocols()...)
}
if err := running.Start(); err != nil {
if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
Expand Down Expand Up @@ -581,7 +581,7 @@ func (n *Node) RPCHandler() (*rpc.Server, error) {
// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.
func (n *Node) Server() *p2p.Server {
func (n *Node) Server() p2p.Server {
n.lock.RLock()
defer n.lock.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion node/node_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type SampleService struct{}

func (s *SampleService) Protocols() []p2p.Protocol { return nil }
func (s *SampleService) APIs() []rpc.API { return nil }
func (s *SampleService) Start(*p2p.Server) error { fmt.Println("Service starting..."); return nil }
func (s *SampleService) Start(p2p.Server) error { fmt.Println("Service starting..."); return nil }
func (s *SampleService) Stop() error { fmt.Println("Service stopping..."); return nil }

func ExampleService() {
Expand Down
10 changes: 5 additions & 5 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestServiceLifeCycle(t *testing.T) {
id := id // Closure for the constructor
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{
startHook: func(*p2p.Server) { started[id] = true },
startHook: func(p2p.Server) { started[id] = true },
stopHook: func() { stopped[id] = true },
}, nil
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestServiceRestarts(t *testing.T) {
running = false

return &InstrumentedService{
startHook: func(*p2p.Server) {
startHook: func(p2p.Server) {
if running {
panic("already running")
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestServiceConstructionAbortion(t *testing.T) {
id := id // Closure for the constructor
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{
startHook: func(*p2p.Server) { started[id] = true },
startHook: func(p2p.Server) { started[id] = true },
}, nil
}
if err := stack.Register(maker(constructor)); err != nil {
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestServiceStartupAbortion(t *testing.T) {
id := id // Closure for the constructor
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{
startHook: func(*p2p.Server) { started[id] = true },
startHook: func(p2p.Server) { started[id] = true },
stopHook: func() { stopped[id] = true },
}, nil
}
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestServiceTerminationGuarantee(t *testing.T) {
id := id // Closure for the constructor
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{
startHook: func(*p2p.Server) { started[id] = true },
startHook: func(p2p.Server) { started[id] = true },
stopHook: func() { stopped[id] = true },
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Service interface {

// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
Start(server *p2p.Server) error
Start(server p2p.Server) error

// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
Expand Down
6 changes: 3 additions & 3 deletions node/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type NoopService struct{}

func (s *NoopService) Protocols() []p2p.Protocol { return nil }
func (s *NoopService) APIs() []rpc.API { return nil }
func (s *NoopService) Start(*p2p.Server) error { return nil }
func (s *NoopService) Start(p2p.Server) error { return nil }
func (s *NoopService) Stop() error { return nil }

func NewNoopService(*ServiceContext) (Service, error) { return new(NoopService), nil }
Expand All @@ -57,7 +57,7 @@ type InstrumentedService struct {
stop error

protocolsHook func()
startHook func(*p2p.Server)
startHook func(p2p.Server)
stopHook func()
}

Expand All @@ -74,7 +74,7 @@ func (s *InstrumentedService) APIs() []rpc.API {
return s.apis
}

func (s *InstrumentedService) Start(server *p2p.Server) error {
func (s *InstrumentedService) Start(server p2p.Server) error {
if s.startHook != nil {
s.startHook(server)
}
Expand Down
Loading

0 comments on commit 71d8590

Please sign in to comment.