Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

RFC: Change Blockstore to work with Multihashes instead of CIDs. #13

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
metrics "github.com/ipfs/go-metrics-interface"
mh "github.com/multiformats/go-multihash"
)

type cacheHave bool
Expand Down Expand Up @@ -35,13 +36,13 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
return c, nil
}

func (b *arccache) DeleteBlock(k cid.Cid) error {
func (b *arccache) Delete(k mh.Multihash) error {
if has, _, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}

b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
b.arc.Remove(string(k)) // Invalidate cache before deleting.
err := b.blockstore.Delete(k)
switch err {
case nil, ErrNotFound:
b.cacheHave(k, false)
Expand All @@ -53,16 +54,16 @@ func (b *arccache) DeleteBlock(k cid.Cid) error {

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) {
func (b *arccache) hasCached(k mh.Multihash) (has bool, size int, ok bool) {
b.total.Inc()
if !k.Defined() {
if k == nil {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, -1, false
}

h, ok := b.arc.Get(k.KeyString())
h, ok := b.arc.Get(string(k))
if ok {
b.hits.Inc()
switch h := h.(type) {
Expand All @@ -75,7 +76,7 @@ func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) {
return false, -1, false
}

func (b *arccache) Has(k cid.Cid) (bool, error) {
func (b *arccache) Has(k mh.Multihash) (bool, error) {
if has, _, ok := b.hasCached(k); ok {
return has, nil
}
Expand All @@ -87,7 +88,7 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
return has, nil
}

func (b *arccache) GetSize(k cid.Cid) (int, error) {
func (b *arccache) GetSize(k mh.Multihash) (int, error) {
if _, blockSize, ok := b.hasCached(k); ok {
return blockSize, nil
}
Expand All @@ -106,27 +107,27 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
return nil, ErrNotFound
}

if has, _, ok := b.hasCached(k); ok && !has {
if has, _, ok := b.hasCached(k.Hash()); ok && !has {
return nil, ErrNotFound
}

bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.cacheHave(k, false)
b.cacheHave(k.Hash(), false)
} else if bl != nil {
b.cacheSize(k, len(bl.RawData()))
b.cacheSize(k.Hash(), len(bl.RawData()))
}
return bl, err
}

func (b *arccache) Put(bl blocks.Block) error {
if has, _, ok := b.hasCached(bl.Cid()); ok && has {
if has, _, ok := b.hasCached(bl.Cid().Hash()); ok && has {
return nil
}

err := b.blockstore.Put(bl)
if err == nil {
b.cacheSize(bl.Cid(), len(bl.RawData()))
b.cacheSize(bl.Cid().Hash(), len(bl.RawData()))
}
return err
}
Expand All @@ -136,7 +137,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
if has, _, ok := b.hasCached(block.Cid().Hash()); !ok || (ok && !has) {
good = append(good, block)
}
}
Expand All @@ -145,7 +146,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
return err
}
for _, block := range good {
b.cacheSize(block.Cid(), len(block.RawData()))
b.cacheSize(block.Cid().Hash(), len(block.RawData()))
}
return nil
}
Expand All @@ -154,15 +155,15 @@ func (b *arccache) HashOnRead(enabled bool) {
b.blockstore.HashOnRead(enabled)
}

func (b *arccache) cacheHave(c cid.Cid, have bool) {
b.arc.Add(c.KeyString(), cacheHave(have))
func (b *arccache) cacheHave(k mh.Multihash, have bool) {
b.arc.Add(string(k), cacheHave(have))
}

func (b *arccache) cacheSize(c cid.Cid, blockSize int) {
b.arc.Add(c.KeyString(), cacheSize(blockSize))
func (b *arccache) cacheSize(k mh.Multihash, blockSize int) {
b.arc.Add(string(k), cacheSize(blockSize))
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan mh.Multihash, error) {
return b.blockstore.AllKeysChan(ctx)
}

Expand Down
39 changes: 19 additions & 20 deletions arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
writeHitTheDatastore = true
})

arc.DeleteBlock(exampleBlock.Cid())
arc.Delete(exampleBlock.Cid().Hash())
arc.Put(exampleBlock)
if !writeHitTheDatastore {
t.Fail()
Expand All @@ -76,9 +76,9 @@ func TestElideDuplicateWrite(t *testing.T) {
func TestHasRequestTriggersCache(t *testing.T) {
arc, _, cd := createStores(t)

arc.Has(exampleBlock.Cid())
arc.Has(exampleBlock.Cid().Hash())
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Cid()); has || err != nil {
if has, err := arc.Has(exampleBlock.Cid().Hash()); has || err != nil {
t.Fatal("has was true but there is no such block")
}

Expand All @@ -90,7 +90,7 @@ func TestHasRequestTriggersCache(t *testing.T) {

trap("has hit datastore", cd, t)

if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil {
if has, err := arc.Has(exampleBlock.Cid().Hash()); !has || err != nil {
t.Fatal("has returned invalid result")
}
}
Expand All @@ -104,10 +104,10 @@ func TestGetFillsCache(t *testing.T) {

trap("has hit datastore", cd, t)

if has, err := arc.Has(exampleBlock.Cid()); has || err != nil {
if has, err := arc.Has(exampleBlock.Cid().Hash()); has || err != nil {
t.Fatal("has was true but there is no such block")
}
if blockSize, err := arc.GetSize(exampleBlock.Cid()); blockSize > -1 || err != nil {
if blockSize, err := arc.GetSize(exampleBlock.Cid().Hash()); blockSize > -1 || err != nil {
t.Fatal("getsize was true but there is no such block")
}

Expand All @@ -119,27 +119,27 @@ func TestGetFillsCache(t *testing.T) {

trap("has hit datastore", cd, t)

if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil {
if has, err := arc.Has(exampleBlock.Cid().Hash()); !has || err != nil {
t.Fatal("has returned invalid result")
}
if blockSize, err := arc.GetSize(exampleBlock.Cid()); blockSize == -1 || err != nil {
if blockSize, err := arc.GetSize(exampleBlock.Cid().Hash()); blockSize == -1 || err != nil {
t.Fatal("getsize returned invalid result")
}
}

func TestGetAndDeleteFalseShortCircuit(t *testing.T) {
arc, _, cd := createStores(t)

arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
arc.Has(exampleBlock.Cid().Hash())
arc.GetSize(exampleBlock.Cid().Hash())

trap("get hit datastore", cd, t)

if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err != ErrNotFound {
t.Fatal("get returned invalid result")
}

if arc.DeleteBlock(exampleBlock.Cid()) != ErrNotFound {
if arc.Delete(exampleBlock.Cid().Hash()) != ErrNotFound {
t.Fatal("expected ErrNotFound error")
}
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestHasAfterSucessfulGetIsCached(t *testing.T) {
arc.Get(exampleBlock.Cid())

trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
arc.Has(exampleBlock.Cid().Hash())
}

func TestGetSizeAfterSucessfulGetIsCached(t *testing.T) {
Expand All @@ -182,7 +182,7 @@ func TestGetSizeAfterSucessfulGetIsCached(t *testing.T) {
arc.Get(exampleBlock.Cid())

trap("has hit datastore", cd, t)
arc.GetSize(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid().Hash())
}

func TestGetSizeMissingZeroSizeBlock(t *testing.T) {
Expand All @@ -195,20 +195,19 @@ func TestGetSizeMissingZeroSizeBlock(t *testing.T) {
arc.Get(emptyBlock.Cid())

trap("has hit datastore", cd, t)
if blockSize, err := arc.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil {
if blockSize, err := arc.GetSize(emptyBlock.Cid().Hash()); blockSize != 0 || err != nil {
t.Fatal("getsize returned invalid result")
}
untrap(cd)

arc.Get(missingBlock.Cid())

trap("has hit datastore", cd, t)
if blockSize, err := arc.GetSize(missingBlock.Cid()); blockSize != -1 || err != nil {
if blockSize, err := arc.GetSize(missingBlock.Cid().Hash()); blockSize != -1 || err != nil {
t.Fatal("getsize returned invalid result")
}
}


func TestDifferentKeyObjectsWork(t *testing.T) {
arc, bs, cd := createStores(t)

Expand All @@ -224,18 +223,18 @@ func TestDifferentKeyObjectsWork(t *testing.T) {
t.Fatal(err)
}

arc.Has(ncid)
arc.Has(ncid.Hash())
}

func TestPutManyCaches(t *testing.T) {
arc, _, cd := createStores(t)
arc.PutMany([]blocks.Block{exampleBlock})

trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
arc.Has(exampleBlock.Cid().Hash())
arc.GetSize(exampleBlock.Cid().Hash())
untrap(cd)
arc.DeleteBlock(exampleBlock.Cid())
arc.Delete(exampleBlock.Cid().Hash())

arc.Put(exampleBlock)
trap("PunMany has hit datastore", cd, t)
Expand Down
27 changes: 14 additions & 13 deletions blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
dsq "github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
logging "github.com/ipfs/go-log"
mh "github.com/multiformats/go-multihash"
)

var log = logging.Logger("blockstore")
Expand All @@ -32,12 +33,12 @@ var ErrNotFound = errors.New("blockstore: block not found")
// Blockstore wraps a Datastore block-centered methods and provides a layer
// of abstraction which allows to add different caching strategies.
type Blockstore interface {
DeleteBlock(cid.Cid) error
Has(cid.Cid) (bool, error)
Delete(mh.Multihash) error
Has(mh.Multihash) (bool, error)
Get(cid.Cid) (blocks.Block, error)

// GetSize returns the CIDs mapped BlockSize
GetSize(cid.Cid) (int, error)
GetSize(mh.Multihash) (int, error)

// Put puts a given block to the underlying datastore
Put(blocks.Block) error
Expand All @@ -49,7 +50,7 @@ type Blockstore interface {
// AllKeysChan returns a channel from which
// the CIDs in the Blockstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
AllKeysChan(ctx context.Context) (<-chan mh.Multihash, error)

// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
Expand Down Expand Up @@ -173,12 +174,12 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {
return t.Commit()
}

func (bs *blockstore) Has(k cid.Cid) (bool, error) {
return bs.datastore.Has(dshelp.CidToDsKey(k))
func (bs *blockstore) Has(k mh.Multihash) (bool, error) {
return bs.datastore.Has(dshelp.MultihashToDsKey(k))
}

func (bs *blockstore) GetSize(k cid.Cid) (int, error) {
bdata, err := bs.datastore.Get(dshelp.CidToDsKey(k))
func (bs *blockstore) GetSize(k mh.Multihash) (int, error) {
bdata, err := bs.datastore.Get(dshelp.MultihashToDsKey(k))
if err == ds.ErrNotFound {
return -1, ErrNotFound
}
Expand All @@ -188,8 +189,8 @@ func (bs *blockstore) GetSize(k cid.Cid) (int, error) {
return len(bdata), nil
}

func (bs *blockstore) DeleteBlock(k cid.Cid) error {
err := bs.datastore.Delete(dshelp.CidToDsKey(k))
func (bs *blockstore) Delete(k mh.Multihash) error {
err := bs.datastore.Delete(dshelp.MultihashToDsKey(k))
if err == ds.ErrNotFound {
return ErrNotFound
}
Expand All @@ -200,7 +201,7 @@ func (bs *blockstore) DeleteBlock(k cid.Cid) error {
// this is very simplistic, in the future, take dsq.Query as a param?
//
// AllKeysChan respects context.
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan mh.Multihash, error) {

// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
Expand All @@ -209,7 +210,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, err
}

output := make(chan cid.Cid, dsq.KeysOnlyBufSize)
output := make(chan mh.Multihash, dsq.KeysOnlyBufSize)
go func() {
defer func() {
res.Close() // ensure exit (signals early exit, too)
Expand All @@ -227,7 +228,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
}

// need to convert to key.Key using key.KeyFromDsKey.
k, err := dshelp.DsKeyToCid(ds.RawKey(e.Key))
k, err := dshelp.DsKeyToMultihash(ds.RawKey(e.Key))
if err != nil {
log.Warningf("error parsing key from DsKey: %s", err)
continue
Expand Down
Loading