diff --git a/provider/provider_test.go b/provider/provider_test.go index 14cd68521464..7ef007b03a70 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -19,6 +19,15 @@ type mockRouting struct { provided chan cid.Cid } +func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { + r.provided <- cid + return nil +} + +func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo { + return nil +} + func mockContentRouting() *mockRouting { r := mockRouting{} r.provided = make(chan cid.Cid) @@ -68,13 +77,3 @@ func TestAnnouncement(t *testing.T) { } } } - -func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { - r.provided <- cid - return nil -} - -// Search for peers who are able to provide a given key -func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo { - return nil -} diff --git a/provider/queue.go b/provider/queue.go index 1220779547b0..8d89d2ffa540 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -2,13 +2,14 @@ package provider import ( "context" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/query" "math" "strconv" "strings" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" + query "github.com/ipfs/go-datastore/query" ) // Queue provides a durable, FIFO interface to the datastore for storing cids @@ -100,11 +101,11 @@ func (q *Queue) nextEntry() (datastore.Key, cid.Cid) { // Run dequeues and enqueues when available. func (q *Queue) work() { go func() { - for { k, c := q.nextEntry() var dequeue chan cid.Cid + // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue if c != cid.Undef { dequeue = q.dequeue } diff --git a/provider/queue_test.go b/provider/queue_test.go index 0dd3bab09cad..29f482fb97ba 100644 --- a/provider/queue_test.go +++ b/provider/queue_test.go @@ -52,7 +52,7 @@ func TestBasicOperation(t *testing.T) { assertOrdered(cids, queue, t) } -func TestInitialization(t *testing.T) { +func TestSparseDatastore(t *testing.T) { ctx := context.Background() defer ctx.Done() @@ -63,7 +63,59 @@ func TestInitialization(t *testing.T) { } cids := makeCids(10) + for _, c := range cids { + queue.Enqueue(c) + } + + // remove entries in the middle + err = queue.ds.Delete(queue.queueKey(5)) + if err != nil { + t.Fatal(err) + } + + err = queue.ds.Delete(queue.queueKey(6)) + if err != nil { + t.Fatal(err) + } + + expected := append(cids[:5], cids[7:]...) + assertOrdered(expected, queue, t) +} + +func TestMangledData(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(10) + for _, c := range cids { + queue.Enqueue(c) + } + + // remove entries in the middle + err = queue.ds.Put(queue.queueKey(5), []byte("borked")) + + expected := append(cids[:5], cids[6:]...) + assertOrdered(expected, queue, t) +} + + +func TestInitialization(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(10) for _, c := range cids { queue.Enqueue(c) }