diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 78467ce9456..548c4a62d9b 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -11,6 +11,7 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" + decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" @@ -489,3 +490,165 @@ func TestWantlistCleanup(t *testing.T) { t.Fatal("should only have keys[0] in wantlist") } } + +func assertLedgerMatch(ra, rb *decision.Receipt) error { + if ra.Sent != rb.Recv { + return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d sent vs %d recvd", ra.Sent, rb.Recv) + } + + if ra.Recv != rb.Sent { + return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d recvd vs %d sent", ra.Recv, rb.Sent) + } + + if ra.Exchanged != rb.Exchanged { + return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged) + } + + return nil +} + +func assertLedgerEqual(ra, rb *decision.Receipt) error { + if ra.Value != rb.Value { + return fmt.Errorf("mismatch in ledgers (value/debt ratio): %f vs %f ", ra.Value, rb.Value) + } + + if ra.Sent != rb.Sent { + return fmt.Errorf("mismatch in ledgers (sent bytes): %d vs %d", ra.Sent, rb.Sent) + } + + if ra.Recv != rb.Recv { + return fmt.Errorf("mismatch in ledgers (recvd bytes): %d vs %d", ra.Recv, rb.Recv) + } + + if ra.Exchanged != rb.Exchanged { + return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged) + } + + return nil +} + +func newReceipt(sent, recv, exchanged uint64) *decision.Receipt { + return &decision.Receipt{ + Peer: "test", + Value: float64(sent) / (1 + float64(recv)), + Sent: sent, + Recv: recv, + Exchanged: exchanged, + } +} + +func TestBitswapLedgerOneWay(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + bg := blocksutil.NewBlockGenerator() + + t.Log("Test ledgers match when one peer sends block to another") + + instances := sg.Instances(2) + blocks := bg.Blocks(1) + err := instances[0].Exchange.HasBlock(blocks[0]) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) + if err != nil { + t.Fatal(err) + } + + ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) + rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + + // compare peer ledger receipts + err = assertLedgerMatch(ra, rb) + if err != nil { + t.Fatal(err) + } + + // check that receipts have intended values + ratest := newReceipt(1, 0, 1) + err = assertLedgerEqual(ratest, ra) + if err != nil { + t.Fatal(err) + } + rbtest := newReceipt(0, 1, 1) + err = assertLedgerEqual(rbtest, rb) + if err != nil { + t.Fatal(err) + } + + t.Log(blk) + for _, inst := range instances { + err := inst.Exchange.Close() + if err != nil { + t.Fatal(err) + } + } +} + +func TestBitswapLedgerTwoWay(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + bg := blocksutil.NewBlockGenerator() + + t.Log("Test ledgers match when two peers send one block to each other") + + instances := sg.Instances(2) + blocks := bg.Blocks(2) + err := instances[0].Exchange.HasBlock(blocks[0]) + if err != nil { + t.Fatal(err) + } + + err = instances[1].Exchange.HasBlock(blocks[1]) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) + if err != nil { + t.Fatal(err) + } + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + blk, err = instances[0].Exchange.GetBlock(ctx, blocks[1].Cid()) + if err != nil { + t.Fatal(err) + } + + ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) + rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + + // compare peer ledger receipts + err = assertLedgerMatch(ra, rb) + if err != nil { + t.Fatal(err) + } + + // check that receipts have intended values + rtest := newReceipt(1, 1, 2) + err = assertLedgerEqual(rtest, ra) + if err != nil { + t.Fatal(err) + } + + err = assertLedgerEqual(rtest, rb) + if err != nil { + t.Fatal(err) + } + + t.Log(blk) + for _, inst := range instances { + err := inst.Exchange.Close() + if err != nil { + t.Fatal(err) + } + } +} diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index f4b17080098..6c1a9e93653 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -286,6 +286,9 @@ func (e *Engine) AddBlock(block blocks.Block) { func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error { l := e.findOrCreate(p) + l.lk.Lock() + defer l.lk.Unlock() + for _, block := range m.Blocks() { l.SentBytes(len(block.RawData())) l.wantList.Remove(block.Cid()) diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 6c6fe0e8be2..028b9735d88 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -6,6 +6,8 @@ import ( "sync" "time" + bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" + process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" @@ -63,6 +65,12 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { "Block": envelope.Block.Cid().String(), }) + // update the BS ledger to reflect sent message + // TODO: Should only track *useful* messages in ledger + outgoing := bsmsg.New(false) + outgoing.AddBlock(envelope.Block) + bs.engine.MessageSent(envelope.Peer, outgoing) + bs.wm.SendBlock(ctx, envelope) bs.counterLk.Lock() bs.blocksSent++