Skip to content

Commit

Permalink
Add support for GetCountFromStartKey for InMemoryDbStore (#7047)
Browse files Browse the repository at this point in the history
- Currently InMemoryDbStore does not implement the method GetCountFromOffset from interface IDbStore. This results in NotImplemented exception when using InMemoryDbStore. 
- The method GetCountFromOffset should be called GetCountFromStartKey, as the DbStore does not have a concept of offset. 
- Added a test for the GetCountFromStartKey method
  • Loading branch information
varunpuranik authored Dec 6, 2023
1 parent c38aa54 commit f1a9da3
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ public Task IterateBatch(int batchSize, Func<byte[], byte[], Task> callback, Can

public Task<ulong> Count() => Task.FromResult((ulong)Math.Max(Interlocked.Read(ref this.count), 0));

public Task<ulong> GetCountFromOffset(byte[] offset)
public Task<ulong> GetCountFromStartKey(byte[] startKey)
{
var iterator = this.db.NewIterator(this.Handle);
iterator.Seek(offset);
iterator.Seek(startKey);

ulong count = 0;
while (iterator.Valid())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,6 @@ public Task Remove(byte[] key, CancellationToken cancellationToken)

public Task<ulong> Count() => this.dbStore.Count();

public Task<ulong> GetCountFromOffset(byte[] offset) => this.dbStore.GetCountFromOffset(offset);
public Task<ulong> GetCountFromStartKey(byte[] startKey) => this.dbStore.GetCountFromStartKey(startKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ await decryptedValue.ForEachAsync(

public Task<ulong> Count() => this.entityStore.Count();

public Task<ulong> GetCountFromOffset(TK offset) => this.entityStore.GetCountFromOffset(offset);
public Task<ulong> GetCountFromStartKey(TK startKey) => this.entityStore.GetCountFromStartKey(startKey);

public void Dispose()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Task<bool> Contains(TK key, CancellationToken cancellationToken)

public Task<ulong> Count() => this.dbStore.Count();

public Task<ulong> GetCountFromOffset(TK offset) => this.dbStore.GetCountFromOffset(offset);
public Task<ulong> GetCountFromStartKey(TK startKey) => this.dbStore.GetCountFromStartKey(startKey);

public void Dispose()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ public interface IKeyValueStore<TK, TV> : IDisposable

Task<ulong> Count();

Task<ulong> GetCountFromOffset(TK offset);
Task<ulong> GetCountFromStartKey(TK startKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,22 @@ public async Task Remove(byte[] key, CancellationToken cancellationToken)

public Task<ulong> Count() => Task.FromResult((ulong)this.keyValues.Count);

public Task<ulong> GetCountFromOffset(byte[] offset) => throw new NotImplementedException();
public async Task<ulong> GetCountFromStartKey(byte[] startKey)
{
List<(byte[] key, byte[] value)> snapshot = await this.GetSnapshot(CancellationToken.None);
int count = 0;
for (int i = 0; i < snapshot.Count; i++)
{
byte[] key = snapshot[i].key;
if (key.SequenceEqual(startKey))
{
count = snapshot.Count - i;
break;
}
}

return (ulong)count;
}

public void Dispose()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Task IterateBatch(int batchSize, Func<TK, TV, Task> callback, Cancellatio

public Task<ulong> Count() => this.underlyingStore.Count();

public Task<ulong> GetCountFromOffset(TK offset) => this.underlyingStore.GetCountFromOffset(this.keyMapper.From(offset));
public Task<ulong> GetCountFromStartKey(TK startKey) => this.underlyingStore.GetCountFromStartKey(this.keyMapper.From(startKey));

Task IterateBatch(Option<TK> startKey, int batchSize, Func<TK, TV, Task> callback, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ public void Dispose()

public Task<ulong> Count() => Task.FromResult(0UL);

public Task<ulong> GetCountFromOffset(TK offset) => Task.FromResult(0UL);
public Task<ulong> GetCountFromStartKey(TK startKey) => Task.FromResult(0UL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ await this.entityStore.IterateBatch(

public Task<ulong> Count() => this.entityStore.Count();

public Task<ulong> GetCountFromOffset(long offset) => this.entityStore.GetCountFromOffset(StoreUtils.GetKeyFromOffset(offset));
public Task<ulong> GetCountFromOffset(long offset) => this.entityStore.GetCountFromStartKey(StoreUtils.GetKeyFromOffset(offset));

public void Dispose()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,6 @@ public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntit

public Task<ulong> Count() => this.underlyingKeyValueStore.Count();

public Task<ulong> GetCountFromOffset(TK offset) => this.underlyingKeyValueStore.GetCountFromOffset(offset);
public Task<ulong> GetCountFromStartKey(TK startKey) => this.underlyingKeyValueStore.GetCountFromStartKey(startKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,36 @@ public async Task GetBatchTest(Option<long> defaultHeadOffset)
}
}

[Theory]
[MemberData(nameof(GetDefaultHeadOffset))]
public async Task GetCountFromOffsetBatch(Option<long> defaultHeadOffset)
{
// Arrange
string entityId = $"getCountFromOffset{Guid.NewGuid().ToString()}";
long startOffset = defaultHeadOffset.GetOrElse(0);
ISequentialStore<Item> sequentialStore = await this.GetSequentialStore(entityId, defaultHeadOffset);

// Try to get the batch, should return empty batch.
ulong count = await sequentialStore.GetCountFromOffset(startOffset);
Assert.Equal(0UL, count);

// Add 100 elements
for (int i = 0; i < 100; i++)
{
long offset = await sequentialStore.Append(new Item { Prop1 = i });
Assert.Equal(i + startOffset, offset);
}

count = await sequentialStore.GetCountFromOffset(startOffset);
Assert.Equal(100UL, count);

count = await sequentialStore.GetCountFromOffset(startOffset + 50);
Assert.Equal(50UL, count);

count = await sequentialStore.GetCountFromOffset(startOffset + 100);
Assert.Equal(0UL, count);
}

protected abstract IEntityStore<byte[], TV> GetEntityStore<TV>(string entityName);

public static IEnumerable<object[]> GetDefaultHeadOffset()
Expand Down

0 comments on commit f1a9da3

Please sign in to comment.