Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): fix dataset update with external files #3379

Merged
merged 13 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 56 additions & 20 deletions renku/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
is_linked_file_updated,
update_linked_file,
)
from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi
from renku.core.dataset.providers.factory import ProviderFactory
from renku.core.dataset.providers.models import ProviderDataset
from renku.core.dataset.providers.git import GitProvider
from renku.core.dataset.providers.models import DatasetUpdateAction, ProviderDataset
from renku.core.dataset.request_model import ImageRequestModel
from renku.core.dataset.tag import get_dataset_by_tag, prompt_access_token, prompt_tag_selection
from renku.core.interface.dataset_gateway import IDatasetGateway
Expand Down Expand Up @@ -714,19 +716,43 @@ def update_datasets(
raise errors.ParameterError("No files matched the criteria.")
return imported_dataset_updates_view_models, []

git_files = []
provider_files: Dict[AddProviderInterface, List[DynamicProxy]] = {}
unique_remotes = set()
linked_files = []
local_files = []

for file in records:
if file.based_on:
git_files.append(file)
unique_remotes.add(file.based_on.url)
if file.based_on or file.source:
if not getattr(file.dataset, "provider", None):
if file.based_on:
uri = file.dataset.same_as.value if file.dataset.same_as else file.based_on.url
else:
uri = file.source
try:
file.dataset.provider = cast(
AddProviderInterface,
ProviderFactory.get_add_provider(uri),
)
except errors.DatasetProviderNotFound:
communication.warn(f"Couldn't find provider for file {file.path} in dataset {file.dataset.name}")
continue

if file.dataset.provider not in provider_files:
provider_files[file.dataset.provider] = []

provider_files[file.dataset.provider].append(file)

if isinstance(file.dataset.provider, GitProvider):
unique_remotes.add(file.based_on.url)
elif file.linked:
linked_files.append(file)
else:
local_files.append(file)
if not getattr(file.dataset, "provider", None):
file.dataset.provider = cast(AddProviderInterface, ProviderFactory.get_add_provider(file.entity.path))

if file.dataset.provider not in provider_files:
provider_files[file.dataset.provider] = []

provider_files[file.dataset.provider].append(file)

if ref and len(unique_remotes) > 1:
raise errors.ParameterError(
Expand All @@ -741,18 +767,24 @@ def update_datasets(
updated = update_linked_files(linked_files, dry_run=dry_run)
updated_files.extend(updated)

if git_files and not no_remote:
updated, deleted = update_dataset_git_files(files=git_files, ref=ref, delete=delete, dry_run=dry_run)
updated_files.extend(updated)
deleted_files.extend(deleted)
provider_context: Dict[str, Any] = {}

for provider, files in provider_files.items():
if (no_remote and cast(ProviderApi, provider).is_remote) or (
no_local and not cast(ProviderApi, provider).is_remote
):
continue

if local_files and not no_local:
updated, deleted, new = update_dataset_local_files(
records=local_files, check_data_directory=check_data_directory
results = provider.update_files(
files=files,
dry_run=dry_run,
delete=delete,
context=provider_context,
ref=ref,
check_data_directory=check_data_directory,
)
updated_files.extend(updated)
deleted_files.extend(deleted)
updated_files.extend(new)
updated_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.UPDATE)
deleted_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.DELETE)

if not dry_run:
if deleted_files and not delete:
Expand Down Expand Up @@ -1037,12 +1069,16 @@ def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_f
new_file = DatasetFile.from_path(
path=file.entity.path, based_on=file.based_on, source=file.source, checksum=checksums.get(file.entity.path)
)
modified_datasets[file.dataset.name] = file.dataset
modified_datasets[file.dataset.name] = (
file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset
)
file.dataset.add_or_update_files(new_file)

if delete:
for file in deleted_files:
modified_datasets[file.dataset.name] = file.dataset
modified_datasets[file.dataset.name] = (
file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset
)
file.dataset.unlink_file(file.entity.path)

datasets_provenance = DatasetsProvenance()
Expand Down Expand Up @@ -1230,7 +1266,7 @@ def should_include(filepath: Path) -> bool:
continue

record = DynamicProxy(file)
record.dataset = dataset
record.dataset = DynamicProxy(dataset)
records.append(record)

if not check_data_directory:
Expand Down
75 changes: 74 additions & 1 deletion renku/core/dataset/providers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
from renku.core import errors
from renku.core.constant import ProviderPriority
from renku.core.plugin import hookimpl
from renku.core.util import communication
from renku.core.util.os import delete_dataset_file
from renku.core.util.urls import is_uri_subfolder, resolve_uri
from renku.domain_model.constant import NO_VALUE, NoValueType
from renku.domain_model.dataset import RemoteEntity
from renku.domain_model.dataset_provider import IDatasetProviderPlugin
from renku.domain_model.project_context import project_context
from renku.infrastructure.immutable import DynamicProxy

if TYPE_CHECKING:
from renku.core.dataset.providers.models import (
DatasetAddMetadata,
DatasetUpdateMetadata,
ProviderDataset,
ProviderDatasetFile,
ProviderParameter,
Expand All @@ -42,12 +49,13 @@ class ProviderApi(IDatasetProviderPlugin):

priority: Optional[ProviderPriority] = None
name: Optional[str] = None
is_remote: Optional[bool] = None

def __init__(self, uri: str, **kwargs):
self._uri: str = uri or ""

def __init_subclass__(cls, **kwargs):
for required_property in ("priority", "name"):
for required_property in ("priority", "name", "is_remote"):
if getattr(cls, required_property, None) is None:
raise NotImplementedError(f"{required_property} must be set for {cls}")

Expand Down Expand Up @@ -85,6 +93,13 @@ def get_metadata(self, uri: str, destination: Path, **kwargs) -> List["DatasetAd
"""Get metadata of files that will be added to a dataset."""
raise NotImplementedError

@abc.abstractmethod
def update_files(
self, files: List[DynamicProxy], dry_run: bool, delete: bool, context: Dict[str, Any], **kwargs
) -> List["DatasetUpdateMetadata"]:
"""Update dataset files from the remote provider."""
raise NotImplementedError


class ExportProviderInterface(abc.ABC):
"""Interface defining export providers."""
Expand Down Expand Up @@ -143,6 +158,64 @@ def supports_storage(uri: str) -> bool:
"""Whether or not this provider supports a given URI storage."""
raise NotImplementedError

def update_files(
self,
files: List[DynamicProxy],
dry_run: bool,
delete: bool,
context: Dict[str, Any],
**kwargs,
) -> List["DatasetUpdateMetadata"]:
"""Update dataset files from the remote provider."""
from renku.core.dataset.providers.models import DatasetUpdateAction, DatasetUpdateMetadata

progress_text = f"Checking remote files for updates in dataset {files[0].dataset.name}"

results: List[DatasetUpdateMetadata] = []

try:
communication.start_progress(progress_text, len(files))

storage = self.get_storage()
hashes = storage.get_hashes(uri=files[0].dataset.storage)
for file in files:
communication.update_progress(progress_text, 1)
if not file.based_on:
continue

dst = project_context.metadata_path.parent / file.entity.path

hash = next((h for h in hashes if h.uri == file.based_on.url), None)

if hash:
if not dry_run and (
not file.dataset.storage
or not is_uri_subfolder(resolve_uri(file.dataset.storage), file.based_on.url)
):
# Redownload downloaded (not mounted) file
download_storage = self.get_storage()
download_storage.download(file.based_on.url, dst)
file.based_on = RemoteEntity(checksum=hash.hash if hash.hash else "", url=hash.uri, path=hash.path)
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE))
else:
if (
not dry_run
and not delete
and (
not file.dataset.storage
or not is_uri_subfolder(resolve_uri(file.dataset.storage), file.based_on.url)
)
):
# Delete downloaded (not mounted) file
delete_dataset_file(dst, follow_symlinks=True)
project_context.repository.add(dst, force=True)
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE))

finally:
communication.finalize_progress(progress_text)

return results


class CloudStorageProviderType(Protocol):
"""Intersection type for ``mypy`` hinting in storage classes."""
Expand Down
1 change: 1 addition & 0 deletions renku/core/dataset/providers/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class AzureProvider(ProviderApi, StorageProviderInterface, AddProviderInterface)

priority = ProviderPriority.HIGHEST
name = "Azure"
is_remote = True

def __init__(self, uri: str):
super().__init__(uri=uri)
Expand Down
1 change: 1 addition & 0 deletions renku/core/dataset/providers/dataverse.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class DataverseProvider(ProviderApi, ExportProviderInterface, ImportProviderInte

priority = ProviderPriority.HIGH
name = "Dataverse"
is_remote = True

def __init__(self, uri: str, is_doi: bool = False):
super().__init__(uri=uri)
Expand Down
1 change: 1 addition & 0 deletions renku/core/dataset/providers/doi.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DOIProvider(ProviderApi, ImportProviderInterface):

priority = ProviderPriority.HIGHER
name = "DOI"
is_remote = True

def __init__(self, uri: str, headers=None, timeout=3):
super().__init__(uri=uri)
Expand Down
1 change: 1 addition & 0 deletions renku/core/dataset/providers/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ExternalProvider(ProviderApi, StorageProviderInterface, AddProviderInterfa

priority = ProviderPriority.HIGHEST
name = "External"
is_remote = True

def __init__(self, uri: str):
super().__init__(uri=get_uri_absolute_path(uri).rstrip("/"))
Expand Down
77 changes: 74 additions & 3 deletions renku/core/dataset/providers/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,34 @@

import glob
import os
import shutil
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union

from renku.core import errors
from renku.core.dataset.pointer_file import create_external_file
from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi, ProviderPriority
from renku.core.storage import pull_paths_from_storage
from renku.core.util import communication
from renku.core.util.git import clone_repository, get_cache_directory_for_repository
from renku.core.util.os import get_files, is_subpath
from renku.core.util.metadata import is_linked_file
from renku.core.util.os import delete_dataset_file, get_files, is_subpath
from renku.core.util.urls import check_url, remove_credentials
from renku.domain_model.dataset import RemoteEntity
from renku.domain_model.project_context import project_context
from renku.infrastructure.immutable import DynamicProxy

if TYPE_CHECKING:
from renku.core.dataset.providers.models import DatasetAddMetadata, ProviderParameter
from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateMetadata, ProviderParameter


class GitProvider(ProviderApi, AddProviderInterface):
"""Git provider."""

priority = ProviderPriority.NORMAL
name = "Git"
is_remote = True

@staticmethod
def supports(uri: str) -> bool:
Expand Down Expand Up @@ -178,3 +183,69 @@ def get_file_metadata(src: Path, dst: Path) -> Optional["DatasetAddMetadata"]:
communication.warn(f"The following files overwrite each other in the destination project:/n/t{files_str}")

return results

def update_files(
self,
files: List[DynamicProxy],
dry_run: bool,
delete: bool,
context: Dict[str, Any],
ref: Optional[str] = None,
**kwargs,
) -> List["DatasetUpdateMetadata"]:
"""Update dataset files from the remote provider."""
from renku.core.dataset.providers.models import DatasetUpdateAction, DatasetUpdateMetadata

if "visited_repos" not in context:
context["visited_repos"] = {}

progress_text = "Checking git files for updates"

results: List[DatasetUpdateMetadata] = []

try:
communication.start_progress(progress_text, len(files))
for file in files:
communication.update_progress(progress_text, 1)
if not file.based_on:
continue

based_on = file.based_on
url = based_on.url
if url in context["visited_repos"]:
remote_repository = context["visited_repos"][url]
else:
communication.echo(msg="Cloning remote repository...")
path = get_cache_directory_for_repository(url=url)
remote_repository = clone_repository(url=url, path=path, checkout_revision=ref)
context["visited_repos"][url] = remote_repository

checksum = remote_repository.get_object_hash(path=based_on.path, revision="HEAD")
found = checksum is not None
changed = found and based_on.checksum != checksum

src = remote_repository.path / based_on.path
dst = project_context.metadata_path.parent / file.entity.path

if not found:
if not dry_run and delete:
delete_dataset_file(dst, follow_symlinks=True)
project_context.repository.add(dst, force=True)
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE))
elif changed:
if not dry_run:
# Fetch file if it is tracked by Git LFS
pull_paths_from_storage(remote_repository, remote_repository.path / based_on.path)
if is_linked_file(path=src, project_path=remote_repository.path):
delete_dataset_file(dst, follow_symlinks=True)
create_external_file(target=src.resolve(), path=dst)
else:
shutil.copy(src, dst)
file.based_on = RemoteEntity(
checksum=checksum, path=based_on.path, url=based_on.url # type: ignore
)
results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE))
finally:
communication.finalize_progress(progress_text)

return results
Loading