Skip to content

Commit

Permalink
feat(RequestDatasetLog): add back functionality for getting a peer's …
Browse files Browse the repository at this point in the history
…dataset log over p2p

Modelled after the `RequestDataset` and `handleDataset` functions.

There is some repeat code used between `actions.log` and `handleDatasetLog`, this spawned a discussion about needing another layer of functions, perhaps called `core` that would be shared between actions and p2p. Just noting this here for posterity.

Commit also adds back clearing the "Backoff" indicator when explicitly connecting to another peer.
  • Loading branch information
ramfox committed Oct 4, 2018
1 parent 639566a commit 813bf0d
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 100 deletions.
220 changes: 128 additions & 92 deletions p2p/log.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package p2p

import (
"encoding/json"
"fmt"

"github.com/ipfs/go-datastore"
"github.com/qri-io/dataset/dsfs"
"github.com/qri-io/qri/repo"
)

Expand All @@ -9,100 +14,131 @@ const (
MtDatasetLog = MsgType("dataset_log")
)

// DatasetLogRequestBody encapsulates options for requesting dataset history
type DatasetLogRequestBody struct {
Ref repo.DatasetRef
Limit int
Offset int
}

// DatasetLogResponseBody encapsulates option for responding to a dataset history request
type DatasetLogResponseBody struct {
History []repo.DatasetRef
Err error
}

// RequestDatasetLog gets the log information of Peer's dataset
func (n *QriNode) RequestDatasetLog(ref repo.DatasetRef, limit, offset int) ([]repo.DatasetRef, error) {
// id, err := n.Repo.Peers().IPFSPeerID(ref.Peername)
// if err != nil {
// return nil, fmt.Errorf("error getting peer IPFS id: %s", err.Error())
// }
// res, err := n.SendMessage(id, &Message{
// Type: MtDatasetLog,
// Phase: MpRequest,
// Payload: ref,
// })
// if err != nil {
// log.Debugf("send dataset log message error: %s", err.Error())
// return nil, err
// }

// data, err := json.Marshal(res.Payload)
// if err != nil {
// log.Debug(err.Error())
// return nil, err
// }

resref := []repo.DatasetRef{}
// err = json.Unmarshal(data, &resref)
// if len(resref) == 0 && err != nil {
// err = fmt.Errorf("no log found")
// }

return resref, nil

// get a list of peers to whom we will send the request
pids := n.ClosestConnectedPeers(ref.ProfileID, 15)
if len(pids) == 0 {
return nil, fmt.Errorf("no connected peers")
}

// create a channel on which to send the requests and receive the responses
replies := make(chan Message)

// create body of message:
body := DatasetLogRequestBody{
Ref: ref,
Limit: limit,
Offset: offset,
}

// create the request
req, err := NewJSONBodyMessage(n.ID, MtDatasetLog, body)
req = req.WithHeaders("phase", "request")
if err != nil {
log.Debug(err.Error())
return nil, err
}

// iterate through the peer ids
for _, pid := range pids {

if err = n.SendMessage(req, replies, pid); err != nil {
log.Debugf("%s err: %s", pid, err.Error())
continue
}

res := <-replies
dlrb := DatasetLogResponseBody{}
if err = json.Unmarshal(res.Body, &dlrb); err == nil {
if dlrb.Err != nil {
log.Debugf("%s err: %s", pid, err.Error())
continue
}
if len(dlrb.History) != 0 {
return dlrb.History, nil
}
}
}
return nil, fmt.Errorf("unable to locate dataset log for %s", ref)
}

func (n *QriNode) datasetsHistoryHandler(ws *WrappedStream, msg Message) (hangup bool) {
// data, err := json.Marshal(msg.Payload)
// if err != nil {
// log.Debug(err.Error())
// }

return false

// ref := repo.DatasetRef{}
// if err = json.Unmarshal(data, &ref); err != nil {
// log.Infof(err.Error())
// return &Message{
// Type: MtDatasetLog,
// Phase: MpError,
// Payload: err,
// }
// }

// ref, err = n.Repo.GetRef(ref)
// if err != nil {
// return &Message{
// Type: MtDatasetLog,
// Phase: MpError,
// Payload: err,
// }
// }
// // TODO: probably shouldn't write over ref.Path if ref.Path is set, but
// // until we make the changes to the way we use hashes to make them
// // more consistent, this feels safer.
// // ref.Path = path.String()

// log := []repo.DatasetRef{}
// limit := 50

// for {
// ref.Dataset, err = n.Repo.GetDataset(datastore.NewKey(ref.Path))
// if err != nil {
// return &Message{
// Type: MtDatasetLog,
// Phase: MpError,
// Payload: err,
// }
// }
// log = append(log, ref)

// limit--
// if limit == 0 || ref.Dataset.PreviousPath == "" {
// break
// }

// ref, err = repo.ParseDatasetRef(ref.Dataset.PreviousPath)

// if err != nil {
// return &Message{
// Type: MtDatasetLog,
// Phase: MpError,
// Payload: err,
// }
// }
// }
// return &Message{
// Type: MtDatasetLog,
// Phase: MpResponse,
// Payload: &log,
// }
func (n *QriNode) handleDatasetLog(ws *WrappedStream, req Message) (hangup bool) {
hangup = true

switch req.Header("phase") {
case "request":

resBody := DatasetLogRequestBody{}
if err := json.Unmarshal(req.Body, &resBody); err != nil {
log.Debug(err.Error())
return
}

ref := resBody.Ref
limit := resBody.Limit
offset := resBody.Offset

local := true

err := n.ResolveDatasetRef(&ref)
if err != nil {
local = false
}

reqBody := DatasetLogResponseBody{}
history := []repo.DatasetRef{}

if local {
for {
dataset, err := dsfs.LoadDataset(n.Repo.Store(), datastore.NewKey(ref.Path))
if err != nil {
reqBody.Err = err
break
}
ref.Dataset = dataset.Encode()

offset--
if offset > 0 {
continue
}

history = append(history, ref)

limit--
if limit == 0 || ref.Dataset.PreviousPath == "" {
break
}
ref.Path = ref.Dataset.PreviousPath
}
}

reqBody.History = history
res, err := req.UpdateJSON(reqBody)
if err != nil {
log.Debug(err.Error())
return
}

res = res.WithHeaders("phase", "response")
if err := ws.sendMessage(res); err != nil {
log.Debug(err.Error())
return
}
}
return
}
1 change: 1 addition & 0 deletions p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,5 +393,6 @@ func MakeHandlers(n *QriNode) map[MsgType]HandlerFunc {
MtEvents: n.handleEvents,
MtConnected: n.handleConnected,
MtResolveDatasetRef: n.handleResolveDatasetRef,
MtDatasetLog: n.handleDatasetLog,
}
}
14 changes: 6 additions & 8 deletions p2p/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
swarm "gx/ipfs/QmemVjhp1UuWPQqrWSvPcaqH3QJRMjMqNm4T2RULMkDDQe/go-libp2p-swarm"
)

// ConnectedQriProfiles lists all connected peers that support the qri protocol
Expand Down Expand Up @@ -172,16 +173,13 @@ func (n *QriNode) ConnectToPeer(ctx context.Context, p PeerConnectionParams) (*p
return nil, err
}

// TODO - restore
// snet, ok := n.Host.Network().(*swarm.Network)
// if !ok {
// return nil, fmt.Errorf("peerhost network was not swarm")
// }
// // clear backoff b/c we're explicitly dialing this peer
// snet.Swarm().Backoff().Clear(pinfo.ID)
if swarm, ok := n.Host.Network().(*swarm.Swarm); ok {
// clear backoff b/c we're explicitly dialing this peer
swarm.Backoff().Clear(pinfo.ID)
}

if err := n.Host.Connect(ctx, pinfo); err != nil {
return nil, fmt.Errorf("connect %s failure: %s", pinfo.ID.Pretty(), err)
return nil, fmt.Errorf("host connect %s failure: %s", pinfo.ID.Pretty(), err)
}

if err := n.AddQriPeer(pinfo); err != nil {
Expand Down

0 comments on commit 813bf0d

Please sign in to comment.