Skip to content

Commit

Permalink
feat(dscache): Remove operations update the dscache if it exists
Browse files Browse the repository at this point in the history
  • Loading branch information
dustmop committed Apr 28, 2020
1 parent 91494bd commit 3212add
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 41 deletions.
1 change: 0 additions & 1 deletion base/ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func RenameDatasetRef(ctx context.Context, r repo.Repo, curr, next dsref.Ref) (*
Path: next.Path,
}


// Renaming the pretty name of a dataset
if currRef.Path != "" || nextRef.Path != "" || currRef.ProfileID != "" || nextRef.ProfileID != "" {
return nil, fmt.Errorf("can only rename using references that are human-friendly")
Expand Down
159 changes: 159 additions & 0 deletions cmd/save_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,165 @@ func TestSaveDscacheExistingDataset(t *testing.T) {
}
}

func TestSaveDscacheThenRemoveAll(t *testing.T) {
run := NewTestRunner(t, "test_peer", "qri_test_save_dscache_remove")
defer run.Delete()

// Save a dataset with one version.
run.MustExec(t, "qri save --body testdata/movies/body_two.json me/movie_ds")

// Save another dataset.
run.MustExec(t, "qri save --body testdata/movies/body_ten.csv me/another_ds")

// List with the --use-dscache flag, which builds the dscache from the logbook.
run.MustExec(t, "qri list --use-dscache")

// Access the dscache
repo, err := run.RepoRoot.Repo()
if err != nil {
t.Fatal(err)
}
cache := repo.Dscache()

// Dscache should have two references, one for each save operation.
actual := cache.VerboseString(false)
expect := `Dscache:
Dscache.Users:
0) user=test_peer profileID=QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
Dscache.Refs:
0) initID = wkr66hbcqitgufnn7sp4iablkvogdwiqcin3pzugdb2fnngmct4q
profileID = QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
topIndex = 1
cursorIndex = 1
prettyName = another_ds
bodySize = 224
bodyRows = 8
commitTime = 978311101
numErrors = 1
headRef = /ipfs/QmPkT97iEFn7JQaoqdWsheuYcL4fh8adyG7gdC88sPrcds
1) initID = vkys37xzcxpmw5zexzhyhpok3whl2vfeep2tyeegwnm2cxrr3umq
profileID = QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
topIndex = 1
cursorIndex = 1
prettyName = movie_ds
bodySize = 79
bodyRows = 2
commitTime = 978310921
headRef = /ipfs/QmQUBYGKBJGp1R5tCURnMJL6Bb7v1v3N32gpkqci6VcM98
`
if diff := cmp.Diff(expect, actual); diff != "" {
t.Errorf("result mismatch (-want +got):%s\n", diff)
}

// Remove one of those datasets.
run.MustExec(t, "qri remove --all me/another_ds")

// Because this test is using a memrepo, but the command runner instantiates its own repo
// the dscache is not reloaded. Manually reload it here by constructing a dscache from the
// same filename.
fs := localfs.NewFS()
cacheFilename := cache.Filename
ctx := context.Background()
cache = dscache.NewDscache(ctx, fs, nil, cacheFilename)

// Dscache should now have one reference.
actual = cache.VerboseString(false)
expect = `Dscache:
Dscache.Users:
0) user=test_peer profileID=QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
Dscache.Refs:
0) initID = vkys37xzcxpmw5zexzhyhpok3whl2vfeep2tyeegwnm2cxrr3umq
profileID = QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
topIndex = 1
cursorIndex = 1
prettyName = movie_ds
bodySize = 79
bodyRows = 2
commitTime = 978310921
headRef = /ipfs/QmQUBYGKBJGp1R5tCURnMJL6Bb7v1v3N32gpkqci6VcM98
`
if diff := cmp.Diff(expect, actual); diff != "" {
t.Errorf("result mismatch (-want +got):%s\n", diff)
}
}

func TestSaveDscacheThenRemoveVersions(t *testing.T) {
run := NewTestRunner(t, "test_peer", "qri_test_save_dscache_remove")
defer run.Delete()

// Save a dataset with one version.
run.MustExec(t, "qri save --body testdata/movies/body_ten.csv me/movie_ds")

// Save another version.
run.MustExec(t, "qri save --body testdata/movies/body_twenty.csv me/movie_ds")

// Save yet another version.
run.MustExec(t, "qri save --body testdata/movies/body_thirty.csv me/movie_ds")

// List with the --use-dscache flag, which builds the dscache from the logbook.
run.MustExec(t, "qri list --use-dscache")

// Access the dscache
repo, err := run.RepoRoot.Repo()
if err != nil {
t.Fatal(err)
}
cache := repo.Dscache()

// Dscache should have one reference. It has three commits.
actual := cache.VerboseString(false)
expect := `Dscache:
Dscache.Users:
0) user=test_peer profileID=QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
Dscache.Refs:
0) initID = vkys37xzcxpmw5zexzhyhpok3whl2vfeep2tyeegwnm2cxrr3umq
profileID = QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
topIndex = 3
cursorIndex = 3
prettyName = movie_ds
bodySize = 720
bodyRows = 28
commitTime = 978311161
numErrors = 1
headRef = /ipfs/QmQmCH7vaUTwijpErVKtqcPbu1PKW89aRCx8DJQaQjhYnB
`
if diff := cmp.Diff(expect, actual); diff != "" {
t.Errorf("result mismatch (-want +got):%s\n", diff)
}

// Remove one of those commits, keeping 1.
run.MustExec(t, "qri remove --revisions=1 me/movie_ds")

// Because this test is using a memrepo, but the command runner instantiates its own repo
// the dscache is not reloaded. Manually reload it here by constructing a dscache from the
// same filename.
fs := localfs.NewFS()
cacheFilename := cache.Filename
ctx := context.Background()
cache = dscache.NewDscache(ctx, fs, nil, cacheFilename)

// Dscache should now have one reference.
actual = cache.VerboseString(false)
expect = `Dscache:
Dscache.Users:
0) user=test_peer profileID=QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
Dscache.Refs:
0) initID = vkys37xzcxpmw5zexzhyhpok3whl2vfeep2tyeegwnm2cxrr3umq
profileID = QmeL2mdVka1eahKENjehK6tBxkkpk5dNQ1qMcgWi7Hrb4B
topIndex = 2
cursorIndex = 2
prettyName = movie_ds
bodySize = 224
bodyRows = 28
commitTime = 978310921
numErrors = 1
headRef = /ipfs/QmXte9h1Ztm1nyd4G1CUjWnkL82T2eY7qomMfY4LUXsn3Z
`
if diff := cmp.Diff(expect, actual); diff != "" {
t.Errorf("result mismatch (-want +got):%s\n", diff)
}
}

func TestSaveBadCaseCantBeUsedForNewDatasets(t *testing.T) {
run := NewTestRunner(t, "test_peer", "qri_save_bad_case")
defer run.Delete()
Expand Down
57 changes: 42 additions & 15 deletions dscache/dscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,16 @@ func (d *Dscache) update(act *logbook.Action) {
if err := d.updateInitDataset(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
case logbook.ActionDatasetChange:
if err := d.updateMoveCursor(act); err != nil && err != ErrNoDscache {
case logbook.ActionDatasetCommitChange:
if err := d.updateChangeCursor(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
case logbook.ActionDatasetDeleteAll:
if err := d.updateDeleteDataset(act); err != nil && err != ErrNoDscache {
log.Error(err)
}
case logbook.ActionDatasetRename:
// TODO(dustmop): Handle renames
}
}

Expand Down Expand Up @@ -271,8 +277,8 @@ func (d *Dscache) updateInitDataset(act *logbook.Action) error {
return nil
}

// Update modifies the dscache according to the provided action.
func (d *Dscache) updateMoveCursor(act *logbook.Action) error {
// Copy the entire dscache, except for the matching entry, rebuild that one to modify it
func (d *Dscache) updateChangeCursor(act *logbook.Action) error {
if d.IsEmpty() {
return ErrNoDscache
}
Expand All @@ -282,30 +288,28 @@ func (d *Dscache) updateMoveCursor(act *logbook.Action) error {
users := d.copyUserAssociationList(builder)
refs := d.copyReferenceListWithReplacement(
builder,
// Function to match the entry we're looking to replace
func(r *dscachefb.RefEntryInfo) bool {
return string(r.InitID()) == act.InitID
},
// Function to replace the matching entry
func(refStartMutationFunc func(builder *flatbuffers.Builder)) {
var metaTitle flatbuffers.UOffsetT
if act.Dataset != nil && act.Dataset.Meta != nil {
metaTitle = builder.CreateString(act.Dataset.Meta.Title)
if act.Info != nil {
metaTitle = builder.CreateString(act.Info.MetaTitle)
}
hashRef := builder.CreateString(string(act.HeadRef))
// Start building a ref object, by mutating an existing ref object.
refStartMutationFunc(builder)
// Add only the fields we want to change.
dscachefb.RefEntryInfoAddTopIndex(builder, int32(act.TopIndex))
dscachefb.RefEntryInfoAddCursorIndex(builder, int32(act.TopIndex))
if act.Dataset != nil && act.Dataset.Meta != nil {
if act.Info != nil {
dscachefb.RefEntryInfoAddMetaTitle(builder, metaTitle)
}
if act.Dataset != nil && act.Dataset.Commit != nil {
dscachefb.RefEntryInfoAddCommitTime(builder, act.Dataset.Commit.Timestamp.Unix())
}
if act.Dataset != nil && act.Dataset.Structure != nil {
dscachefb.RefEntryInfoAddBodySize(builder, int64(act.Dataset.Structure.Length))
dscachefb.RefEntryInfoAddBodyRows(builder, int32(act.Dataset.Structure.Entries))
dscachefb.RefEntryInfoAddNumErrors(builder, int32(act.Dataset.Structure.ErrCount))
dscachefb.RefEntryInfoAddCommitTime(builder, act.Info.CommitTime.Unix())
dscachefb.RefEntryInfoAddBodySize(builder, int64(act.Info.BodySize))
dscachefb.RefEntryInfoAddBodyRows(builder, int32(act.Info.BodyRows))
dscachefb.RefEntryInfoAddNumErrors(builder, int32(act.Info.NumErrors))
}
dscachefb.RefEntryInfoAddHeadRef(builder, hashRef)
// Don't call RefEntryInfoEnd, that is handled by copyReferenceListWithReplacement
Expand All @@ -317,6 +321,29 @@ func (d *Dscache) updateMoveCursor(act *logbook.Action) error {
return d.save()
}

// Copy the entire dscache, except leave out the matching entry.
func (d *Dscache) updateDeleteDataset(act *logbook.Action) error {
if d.IsEmpty() {
return ErrNoDscache
}
// Flatbuffers for go do not allow mutation (for complex types like strings). So we construct
// a new flatbuffer entirely, copying the old one while omitting the entry we want to remove.
builder := flatbuffers.NewBuilder(0)
users := d.copyUserAssociationList(builder)
refs := d.copyReferenceListWithReplacement(
builder,
func(r *dscachefb.RefEntryInfo) bool {
return string(r.InitID()) == act.InitID
},
// Pass a nil function, so the matching entry is not replaced, it is omitted
nil,
)
root, serialized := d.finishBuilding(builder, users, refs)
d.Root = root
d.Buffer = serialized
return d.save()
}

func convertEntryToVersionInfo(r *dscachefb.RefEntryInfo) dsref.VersionInfo {
return dsref.VersionInfo{
InitID: string(r.InitID()),
Expand Down
10 changes: 7 additions & 3 deletions dscache/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func (d *Dscache) copyUserAssociationList(builder *flatbuffers.Builder) flatbuff
return builder.EndVector(len(userList))
}

// For each entry in the dscache, copy it to the builder, unless it matches according to our
// findMatchFunc, in which case, replace it by calling replaceRefFunc.
func (d *Dscache) copyReferenceListWithReplacement(
builder *flatbuffers.Builder,
findMatchFunc func(*dscachefb.RefEntryInfo) bool,
Expand All @@ -41,9 +43,11 @@ func (d *Dscache) copyReferenceListWithReplacement(
startRefBuildFunc := func(_ *flatbuffers.Builder) {
d.copyReference(builder, &r)
}
replaceRefFunc(startRefBuildFunc)
ref := dscachefb.RefEntryInfoEnd(builder)
refList = append(refList, ref)
if replaceRefFunc != nil {
replaceRefFunc(startRefBuildFunc)
ref := dscachefb.RefEntryInfoEnd(builder)
refList = append(refList, ref)
}
continue
}
d.copyReference(builder, &r)
Expand Down
2 changes: 1 addition & 1 deletion lib/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *RemoteMethods) Fetch(p *FetchParams, res *[]DatasetLogItem) error {
}
}

items := logbook.Items(logs, reporef.ConvertToDsref(ref), 0, -1)
items := logbook.ConvertLogsToItems(logs, reporef.ConvertToDsref(ref))
log.Debugf("found %d items: %v", len(items), items)
if len(items) == 0 {
return repo.ErrNoHistory
Expand Down
12 changes: 8 additions & 4 deletions logbook/action.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package logbook

import (
"github.com/qri-io/dataset"
"github.com/qri-io/qri/dsref"
)

// ActionType is the type of action that a logbook just completed
Expand All @@ -10,8 +10,12 @@ type ActionType byte
const (
// ActionDatasetNameInit is an action that inits a dataset name
ActionDatasetNameInit ActionType = iota
// ActionDatasetChange is an action for when a dataset changes
ActionDatasetChange
// ActionDatasetCommitChange is an action for when a dataset changes its newest commit
ActionDatasetCommitChange
// ActionDatasetDeleteAll is an action for when a dataset is entirely deleted
ActionDatasetDeleteAll
// ActionDatasetRename is when a dataset is renamed
ActionDatasetRename
)

// Action represents the result of an action that logbook just completed
Expand All @@ -23,5 +27,5 @@ type Action struct {
Username string
PrettyName string
HeadRef string
Dataset *dataset.Dataset
Info *dsref.VersionInfo
}
Loading

0 comments on commit 3212add

Please sign in to comment.