Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): fix dataset update with external files #3379

Merged
merged 13 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 52 additions & 159 deletions renku/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import shutil
import urllib
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast

Expand All @@ -29,22 +30,19 @@
from renku.core import errors
from renku.core.config import get_value, remove_value, set_value
from renku.core.dataset.datasets_provenance import DatasetsProvenance
from renku.core.dataset.pointer_file import (
create_external_file,
delete_external_file,
is_linked_file_updated,
update_linked_file,
)
from renku.core.dataset.pointer_file import delete_external_file, is_linked_file_updated, update_linked_file
from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi
from renku.core.dataset.providers.factory import ProviderFactory
from renku.core.dataset.providers.models import ProviderDataset
from renku.core.dataset.providers.git import GitProvider
from renku.core.dataset.providers.models import DatasetUpdateAction, ProviderDataset
from renku.core.dataset.request_model import ImageRequestModel
from renku.core.dataset.tag import get_dataset_by_tag, prompt_access_token, prompt_tag_selection
from renku.core.interface.dataset_gateway import IDatasetGateway
from renku.core.storage import check_external_storage, pull_paths_from_storage, track_paths_in_storage
from renku.core.storage import check_external_storage, track_paths_in_storage
from renku.core.util import communication
from renku.core.util.datetime8601 import local_now
from renku.core.util.git import clone_repository, get_cache_directory_for_repository, get_git_user
from renku.core.util.metadata import is_linked_file, prompt_for_credentials, read_credentials, store_credentials
from renku.core.util.git import get_git_user
from renku.core.util.metadata import prompt_for_credentials, read_credentials, store_credentials
from renku.core.util.os import (
create_symlink,
delete_dataset_file,
Expand Down Expand Up @@ -72,7 +70,6 @@

if TYPE_CHECKING:
from renku.core.interface.storage import IStorage
from renku.infrastructure.repository import Repository


@validate_arguments(config=dict(arbitrary_types_allowed=True))
Expand Down Expand Up @@ -571,6 +568,7 @@ def remove_files(dataset):
total_size=calculate_total_size(importer.provider_dataset_files),
clear_files_before=True,
datadir=datadir,
storage=provider_dataset.storage,
)

new_dataset.update_metadata_from(provider_dataset)
Expand Down Expand Up @@ -714,19 +712,32 @@ def update_datasets(
raise errors.ParameterError("No files matched the criteria.")
return imported_dataset_updates_view_models, []

git_files = []
provider_files: Dict[AddProviderInterface, List[DynamicProxy]] = defaultdict(list)
unique_remotes = set()
linked_files = []
local_files = []

for file in records:
if file.based_on:
git_files.append(file)
unique_remotes.add(file.based_on.url)
elif file.linked:
if file.linked:
linked_files.append(file)
else:
local_files.append(file)
if not getattr(file, "provider", None):
if file.based_on:
uri = file.dataset.same_as.value if file.dataset.same_as else file.based_on.url
else:
uri = file.source
try:
file.provider = cast(
AddProviderInterface,
ProviderFactory.get_add_provider(uri),
)
except errors.DatasetProviderNotFound:
communication.warn(f"Couldn't find provider for file {file.path} in dataset {file.dataset.name}")
continue

provider_files[file.provider].append(file)

if isinstance(file.provider, GitProvider):
unique_remotes.add(file.based_on.url)

if ref and len(unique_remotes) > 1:
raise errors.ParameterError(
Expand All @@ -741,18 +752,24 @@ def update_datasets(
updated = update_linked_files(linked_files, dry_run=dry_run)
updated_files.extend(updated)

if git_files and not no_remote:
updated, deleted = update_dataset_git_files(files=git_files, ref=ref, delete=delete, dry_run=dry_run)
updated_files.extend(updated)
deleted_files.extend(deleted)
provider_context: Dict[str, Any] = {}

for provider, files in provider_files.items():
if (no_remote and cast(ProviderApi, provider).is_remote) or (
no_local and not cast(ProviderApi, provider).is_remote
):
continue

if local_files and not no_local:
updated, deleted, new = update_dataset_local_files(
records=local_files, check_data_directory=check_data_directory
results = provider.update_files(
files=files,
dry_run=dry_run,
delete=delete,
context=provider_context,
ref=ref,
check_data_directory=check_data_directory,
)
updated_files.extend(updated)
deleted_files.extend(deleted)
updated_files.extend(new)
updated_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.UPDATE)
deleted_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.DELETE)

if not dry_run:
if deleted_files and not delete:
Expand Down Expand Up @@ -974,154 +991,30 @@ def move_files(dataset_gateway: IDatasetGateway, files: Dict[Path, Path], to_dat
datasets_provenance.add_or_update(to_dataset, creator=creator)


def update_dataset_local_files(
records: List[DynamicProxy], check_data_directory: bool
) -> Tuple[List[DynamicProxy], List[DynamicProxy], List[DynamicProxy]]:
"""Update files metadata from the git history.

Args:
records(List[DynamicProxy]): File records to update.
check_data_directory(bool): Whether to check the dataset's data directory for new files.
Returns:
Tuple[List[DynamicProxy], List[DynamicProxy]]: Tuple of updated and deleted file records.
"""
updated_files: List[DynamicProxy] = []
deleted_files: List[DynamicProxy] = []
new_files: List[DynamicProxy] = []
progress_text = "Checking for local updates"

try:
communication.start_progress(progress_text, len(records))
check_paths = []
records_to_check = []

for file in records:
communication.update_progress(progress_text, 1)

if file.based_on or file.linked:
continue

if not (project_context.path / file.entity.path).exists():
deleted_files.append(file)
continue

check_paths.append(file.entity.path)
records_to_check.append(file)

checksums = project_context.repository.get_object_hashes(check_paths)

for file in records_to_check:
current_checksum = checksums.get(file.entity.path)
if not current_checksum:
deleted_files.append(file)
elif current_checksum != file.entity.checksum:
updated_files.append(file)
elif check_data_directory and not any(file.entity.path == f.entity.path for f in file.dataset.files):
datadir = file.dataset.get_datadir()
try:
get_safe_relative_path(file.entity.path, datadir)
except ValueError:
continue

new_files.append(file)
finally:
communication.finalize_progress(progress_text)

return updated_files, deleted_files, new_files


def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_files: List[DynamicProxy], delete: bool):
modified_datasets = {}
checksums = project_context.repository.get_object_hashes([file.entity.path for file in updated_files])
for file in updated_files:
new_file = DatasetFile.from_path(
path=file.entity.path, based_on=file.based_on, source=file.source, checksum=checksums.get(file.entity.path)
)
modified_datasets[file.dataset.name] = file.dataset
modified_datasets[file.dataset.name] = (
file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset
)
file.dataset.add_or_update_files(new_file)

if delete:
for file in deleted_files:
modified_datasets[file.dataset.name] = file.dataset
modified_datasets[file.dataset.name] = (
file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset
)
file.dataset.unlink_file(file.entity.path)

datasets_provenance = DatasetsProvenance()
for dataset in modified_datasets.values():
datasets_provenance.add_or_update(dataset, creator=get_git_user(repository=project_context.repository))


def update_dataset_git_files(
files: List[DynamicProxy], ref: Optional[str], delete: bool, dry_run: bool
) -> Tuple[List[DynamicProxy], List[DynamicProxy]]:
"""Update files and dataset metadata according to their remotes.

Args:
files(List[DynamicProxy]): List of files to be updated.
ref(Optional[str]): Reference to use for update.
delete(bool, optional): Indicates whether to delete files or not (Default value = False).
dry_run(bool): Whether to perform update or only print changes.

Returns:
Tuple[List[DynamicProxy], List[DynamicProxy]]: Tuple of updated and deleted file records.
"""
visited_repos: Dict[str, "Repository"] = {}
updated_files: List[DynamicProxy] = []
deleted_files: List[DynamicProxy] = []

progress_text = "Checking files for updates"

try:
communication.start_progress(progress_text, len(files))
for file in files:
communication.update_progress(progress_text, 1)
if not file.based_on:
continue

based_on = file.based_on
url = based_on.url
if url in visited_repos:
remote_repository = visited_repos[url]
else:
communication.echo(msg="Cloning remote repository...")
path = get_cache_directory_for_repository(url=url)
remote_repository = clone_repository(url=url, path=path, checkout_revision=ref)
visited_repos[url] = remote_repository

checksum = remote_repository.get_object_hash(path=based_on.path, revision="HEAD")
found = checksum is not None
changed = found and based_on.checksum != checksum

src = remote_repository.path / based_on.path
dst = project_context.metadata_path.parent / file.entity.path

if not found:
if not dry_run and delete:
delete_dataset_file(dst, follow_symlinks=True)
project_context.repository.add(dst, force=True)
deleted_files.append(file)
elif changed:
if not dry_run:
# Fetch file if it is tracked by Git LFS
pull_paths_from_storage(remote_repository, remote_repository.path / based_on.path)
if is_linked_file(path=src, project_path=remote_repository.path):
delete_dataset_file(dst, follow_symlinks=True)
create_external_file(target=src.resolve(), path=dst)
else:
shutil.copy(src, dst)
file.based_on = RemoteEntity(
checksum=checksum, path=based_on.path, url=based_on.url # type: ignore
)
updated_files.append(file)
finally:
communication.finalize_progress(progress_text)

if not updated_files and (not delete or not deleted_files):
# Nothing to commit or update
return [], deleted_files

return updated_files, deleted_files


def update_linked_files(records: List[DynamicProxy], dry_run: bool) -> List[DynamicProxy]:
"""Update files linked to other files in the project.

Expand Down Expand Up @@ -1230,7 +1123,7 @@ def should_include(filepath: Path) -> bool:
continue

record = DynamicProxy(file)
record.dataset = dataset
record.dataset = DynamicProxy(dataset)
records.append(record)

if not check_data_directory:
Expand Down
Loading