Skip to content

Commit

Permalink
chore: added ctx to most of the vault domain
Browse files Browse the repository at this point in the history
  • Loading branch information
aryanjassal committed Dec 10, 2024
1 parent e90e6e8 commit 7b14e96
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 124 deletions.
7 changes: 6 additions & 1 deletion src/client/handlers/VaultsSecretsRemove.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { ContextCancellable } from '@matrixai/contexts';
import type { DB } from '@matrixai/db';
import type { ResourceAcquire } from '@matrixai/resources';
import type { JSONValue } from '@matrixai/rpc';
import type {
ClientRPCRequestParams,
ClientRPCResponseResult,
Expand Down Expand Up @@ -31,6 +33,9 @@ class VaultsSecretsRemove extends DuplexHandler<
SecretsRemoveHeaderMessage | SecretIdentifierMessageTagged
>
>,
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue>,
ctx: ContextCancellable,
): AsyncGenerator<ClientRPCResponseResult<SuccessOrErrorMessage>> {
const { db, vaultManager }: { db: DB; vaultManager: VaultManager } =
this.container;
Expand Down Expand Up @@ -59,7 +64,7 @@ class VaultsSecretsRemove extends DuplexHandler<
}
const acquire = await vaultManager.withVaults(
[vaultId],
async (vault) => vault.acquireWrite(),
async (vault) => vault.acquireWrite(ctx),
);
vaultAcquires.push(acquire);
}
Expand Down
2 changes: 1 addition & 1 deletion src/git/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ async function* generatePackRequest(
wants,
haves,
},
// ctx,
ctx,
);
// Reply that we have no common history and that we need to send everything
yield packetLineBuffer(gitUtils.NAK_BUFFER);
Expand Down
55 changes: 30 additions & 25 deletions src/git/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ContextCancellable } from '@matrixai/contexts';
import type {
Capability,
CapabilityList,
Expand All @@ -13,7 +14,6 @@ import git from 'isomorphic-git';
import { requestTypes } from './types';
import * as utils from '../utils';
import * as validationErrors from '../validation/errors';
import {ContextCancellable} from "@matrixai/contexts";

// Constants
// Total number of bytes per pack line minus the 4 size bytes and 1 channel byte
Expand Down Expand Up @@ -68,15 +68,18 @@ const DUMMY_PROGRESS_BUFFER = Buffer.from('progress is at 50%', BUFFER_FORMAT);
* This will generate references and the objects they point to as a tuple.
* `HEAD` is always yielded first along with all branches.
*/
async function* listReferencesGenerator({
efs,
dir,
gitDir,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
}, ctx: ContextCancellable): AsyncGenerator<[Reference, ObjectId], void, void> {
async function* listReferencesGenerator(
{
efs,
dir,
gitDir,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
},
ctx: ContextCancellable,
): AsyncGenerator<[Reference, ObjectId], void, void> {
const refs: Array<[string, Promise<string>]> = await git
.listBranches({
fs: efs,
Expand Down Expand Up @@ -146,28 +149,30 @@ async function referenceCapability({
* The walk is preformed recursively and concurrently using promises.
* Inspecting the git data structure objects is done using `isomorphic-git`.
*/
async function listObjects({
efs,
dir,
gitDir,
wants,
haves,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
wants: ObjectIdList;
haves: ObjectIdList;
}/*, ctx: ContextCancellable*/): Promise<ObjectIdList> {
// TODO: add support for ctx
async function listObjects(
{
efs,
dir,
gitDir,
wants,
haves,
}: {
efs: EncryptedFS;
dir: string;
gitDir: string;
wants: ObjectIdList;
haves: ObjectIdList;
},
ctx: ContextCancellable,
): Promise<ObjectIdList> {
const commits = new Set<string>();
const trees = new Set<string>();
const blobs = new Set<string>();
const tags = new Set<string>();
const havesSet: Set<string> = new Set(haves);

async function walk(objectId: ObjectId, type: ObjectType): Promise<void> {
// ctx.signal.throwIfAborted();
ctx.signal.throwIfAborted();
// If object was listed as a have then we don't need to walk over it
if (havesSet.has(objectId)) return;
switch (type) {
Expand Down
70 changes: 47 additions & 23 deletions src/vaults/VaultInternal.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ReadCommitResult } from 'isomorphic-git';
import type { EncryptedFS } from 'encryptedfs';
import type { ContextCancellable } from '@matrixai/contexts';
import type { DB, DBTransaction, LevelPath } from '@matrixai/db';
import type { RPCClient } from '@matrixai/rpc';
import type { ResourceAcquire, ResourceRelease } from '@matrixai/resources';
Expand Down Expand Up @@ -27,11 +28,12 @@ import {
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import { withF, withG } from '@matrixai/resources';
import { context, cancellable } from '@matrixai/contexts/dist/decorators';
import { RWLockWriter } from '@matrixai/async-locks';
import { tagLast } from './types';
import * as vaultsUtils from './utils';
import * as vaultsErrors from './errors';
import * as vaultsEvents from './events';
import { tagLast } from './types';
import * as ids from '../ids';
import * as nodesUtils from '../nodes/utils';
import * as gitUtils from '../git/utils';
Expand Down Expand Up @@ -441,13 +443,20 @@ class VaultInternal {
/**
* With context handler for using a vault in a writable context.
*/
public async writeF(
f: (fs: FileSystemWritable) => Promise<void>,
ctx?: Partial<ContextCancellable>,
tran?: DBTransaction,
): Promise<void>;
@ready(new vaultsErrors.ErrorVaultNotRunning())
@cancellable(true)
public async writeF(
f: (fs: FileSystemWritable) => Promise<void>,
@context ctx: ContextCancellable,
tran?: DBTransaction,
): Promise<void> {
if (tran == null) {
return this.db.withTransactionF((tran) => this.writeF(f, tran));
return this.db.withTransactionF((tran) => this.writeF(f, ctx, tran));
}

return withF([this.lock.write()], async () => {
Expand Down Expand Up @@ -475,7 +484,7 @@ class VaultInternal {
try {
await f(this.efsVault);
// After doing mutation we need to commit the new history
await this.createCommit();
await this.createCommit(ctx);
} catch (e) {
// Error implies dirty state
await this.cleanWorkingDirectory();
Expand All @@ -494,15 +503,16 @@ class VaultInternal {
@ready(new vaultsErrors.ErrorVaultNotRunning())
public writeG<T, TReturn, TNext>(
g: (fs: FileSystemWritable) => AsyncGenerator<T, TReturn, TNext>,
ctx: ContextCancellable,
tran?: DBTransaction,
): AsyncGenerator<T, TReturn, TNext> {
if (tran == null) {
return this.db.withTransactionG((tran) => this.writeG(g, tran));
return this.db.withTransactionG((tran) => this.writeG(g, ctx, tran));
}

const efsVault = this.efsVault;
const vaultMetadataDbPath = this.vaultMetadataDbPath;
const createCommit = () => this.createCommit();
const createCommit = () => this.createCommit(ctx);
const cleanWorkingDirectory = () => this.cleanWorkingDirectory();
return withG([this.lock.write()], async function* () {
if (
Expand Down Expand Up @@ -559,6 +569,7 @@ class VaultInternal {
*/
@ready(new vaultsErrors.ErrorVaultNotRunning())
public acquireWrite(
ctx: ContextCancellable,
tran?: DBTransaction,
): ResourceAcquire<FileSystemWritable> {
return async () => {
Expand Down Expand Up @@ -593,7 +604,7 @@ class VaultInternal {
if (e == null) {
try {
// After doing mutation we need to commit the new history
await this.createCommit();
await this.createCommit(ctx);
} catch (e_) {
e = e_;
// Error implies dirty state
Expand Down Expand Up @@ -968,7 +979,7 @@ class VaultInternal {
* and the old history is removed from the old canonical head to the branch point. This is to maintain the strict
* non-branching linear history.
*/
protected async createCommit() {
protected async createCommit(ctx: ContextCancellable) {
// Forced wait for 1 ms to allow difference in mTime between file changes
await utils.sleep(1);
// Checking if commit is appending or branching
Expand Down Expand Up @@ -1080,7 +1091,7 @@ class VaultInternal {
});
// We clean old history if a commit was made on previous version
if (headRef !== masterRef) {
await this.garbageCollectGitObjectsLocal(masterRef, headRef);
await this.garbageCollectGitObjectsLocal(masterRef, headRef, ctx);
}
}
}
Expand Down Expand Up @@ -1131,7 +1142,13 @@ class VaultInternal {
* This will walk the current canonicalBranch history and delete any objects that are not a part of it.
* This is costly since it will compare the walked tree with all existing objects.
*/
protected async garbageCollectGitObjectsGlobal() {
protected async garbageCollectGitObjectsGlobal(
ctx?: Partial<ContextCancellable>,
): Promise<void>;
@cancellable(true)
protected async garbageCollectGitObjectsGlobal(
@context ctx: ContextCancellable,
) {
const objectIdsAll = await gitUtils.listObjectsAll({
fs: this.efs,
gitDir: this.vaultGitDir,
Expand All @@ -1143,13 +1160,16 @@ class VaultInternal {
gitdir: this.vaultGitDir,
ref: vaultsUtils.canonicalBranch,
});
const reachableObjects = await gitUtils.listObjects({
efs: this.efs,
dir: this.vaultDataDir,
gitDir: this.vaultGitDir,
wants: [masterRef],
haves: [],
});
const reachableObjects = await gitUtils.listObjects(
{
efs: this.efs,
dir: this.vaultDataDir,
gitDir: this.vaultGitDir,
wants: [masterRef],
haves: [],
},
ctx,
);
// Walk from head to all reachable objects
for (const objectReachable of reachableObjects) {
objects.delete(objectReachable);
Expand All @@ -1172,14 +1192,18 @@ class VaultInternal {
protected async garbageCollectGitObjectsLocal(
startId: string,
stopId: string,
ctx: ContextCancellable,
) {
const objects = await gitUtils.listObjects({
efs: this.efs,
dir: this.vaultDataDir,
gitDir: this.vaultGitDir,
wants: [startId],
haves: [stopId],
});
const objects = await gitUtils.listObjects(
{
efs: this.efs,
dir: this.vaultDataDir,
gitDir: this.vaultGitDir,
wants: [startId],
haves: [stopId],
},
ctx,
);
const deletePs: Array<Promise<void>> = [];
for (const objectId of objects) {
deletePs.push(
Expand Down
Loading

0 comments on commit 7b14e96

Please sign in to comment.