Skip to content

Commit

Permalink
Merge pull request #481 from stepan-anokhin/457-celery-upgrade
Browse files Browse the repository at this point in the history
Small fixes
  • Loading branch information
johnhbenetech authored Apr 6, 2022
2 parents 335d5cb + a8b3c14 commit f6fbffe
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 38 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ jobs:
working-directory: ./server
run: |
python -m pip install --upgrade pip
pip install flake8 'black~=20.8b1'
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Check formatting with black
Expand Down
2 changes: 0 additions & 2 deletions extract_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def main(config, list_of_files, frame_sampling, save_frames):
if list_of_files is None:
luigi.build(
[
ExifTask(config=config),
SignaturesTask(config=config),
DBSignaturesTask(config=config),
],
Expand All @@ -56,7 +55,6 @@ def main(config, list_of_files, frame_sampling, save_frames):
else:
luigi.build(
[
ExifFileListFileTask(config=config, path_list_file=list_of_files),
SignaturesByPathListFileTask(config=config, path_list_file=list_of_files),
DBSignaturesByPathListFileTask(config=config, path_list_file=list_of_files),
],
Expand Down
20 changes: 17 additions & 3 deletions generate_matches.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import click
import luigi

from winnow.pipeline.luigi.matches import MatchesReportTask, MatchesByFileListTask
from winnow.pipeline.luigi.matches import (
MatchesReportTask,
MatchesByFileListTask,
DBMatchesTask,
DBMatchesByFileListTask,
)
from winnow.utils.config import resolve_config


Expand Down Expand Up @@ -38,10 +43,19 @@ def main(config, list_of_files, frame_sampling, save_frames):
logging.config.fileConfig("./logging.conf")

if list_of_files is None:
luigi.build([MatchesReportTask(config=config)], local_scheduler=True, workers=1)
luigi.build(
[MatchesReportTask(config=config), DBMatchesTask(config=config)],
local_scheduler=True,
workers=1,
)
else:
luigi.build(
[MatchesByFileListTask(config=config, path_list_file=list_of_files)], local_scheduler=True, workers=1
[
MatchesByFileListTask(config=config, path_list_file=list_of_files),
DBMatchesByFileListTask(config=config, path_list_file=list_of_files),
],
local_scheduler=True,
workers=1,
)


Expand Down
65 changes: 37 additions & 28 deletions server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Flask-SQLAlchemy==2.4.4
PyYAML==5.4
psycopg2==2.8.6
fire==0.3.1
celery==5.0.5
celery==5.2.3
Flask-SocketIO==5.0.0
eventlet==0.31.0
Werkzeug==2.0.3
Expand All @@ -14,38 +14,47 @@ cryptography==3.3.1
grpcio==1.43.0
grpcio-tools==1.43.0
redis==4.1.2
black==22.3.0
flake8==4.0.1

# Locked dependencies
amqp==5.1.0
attrs==21.4.0
bidict==0.22.0
billiard==3.6.4.0
# dependencies discovered on 2022-04-06 16:27:10.903076
pytest==7.1.1
pyflakes==2.4.0
py==1.11.0
MarkupSafe==2.1.1
pytz==2022.1
six==1.16.0
dnspython==1.16.0
typing-extensions==4.1.1
pycodestyle==2.8.0
pathspec==0.9.0
click==8.1.2
pyparsing==3.0.7
cffi==1.15.0
click==7.1.2
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
vine==5.0.0
platformdirs==2.5.1
amqp==5.1.0
Deprecated==1.2.13
dnspython==1.16.0
greenlet==1.1.2
iniconfig==1.1.1
Jinja2==3.1.0
kombu==5.2.4
MarkupSafe==2.1.1
packaging==21.3
pluggy==1.0.0
prompt-toolkit==3.0.28
protobuf==3.19.4
py==1.11.0
mccabe==0.6.1
pycparser==2.21
pyparsing==3.0.7
pytest==7.1.1
python-engineio==4.3.1
click-plugins==1.1.1
packaging==21.3
billiard==3.6.4.0
python-socketio==5.5.2
pytz==2022.1
six==1.16.0
wcwidth==0.2.5
kombu==5.2.4
bidict==0.22.0
mypy-extensions==0.4.3
protobuf==3.20.0
iniconfig==1.1.1
pluggy==1.0.0
Jinja2==3.1.1
termcolor==1.1.0
attrs==21.4.0
click-didyoumean==0.3.0
click-repl==0.2.0
prompt-toolkit==3.0.29
tomli==2.0.1
vine==5.0.0
wcwidth==0.2.5
wrapt==1.14.0
python-engineio==4.3.1
greenlet==1.1.2
2 changes: 1 addition & 1 deletion server/server/api/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def validate_frame_dto(data: Dict) -> str:


def handle_create_example_from_frame(template: Template):
"""Create example from the """
"""Create example from the"""

request_payload = request.get_json()
if request_payload is None:
Expand Down
4 changes: 2 additions & 2 deletions server/server/queue/file_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


class _LogFileStream(LogStream):
def __init__(self, file: TextIO, callback: Callable, finished: bool, chunk_size=100 * 2 ** 10):
def __init__(self, file: TextIO, callback: Callable, finished: bool, chunk_size=100 * 2**10):
self._file: TextIO = file
self._callback: Callable = callback
self._finished = finished
Expand Down Expand Up @@ -57,7 +57,7 @@ def dispose(self):


class FileStreamer:
def __init__(self, timeout: float = 0.1, chunk_size: int = 100 * 2 ** 10):
def __init__(self, timeout: float = 0.1, chunk_size: int = 100 * 2**10):
self._timeout: float = timeout
self._chunk_size: int = chunk_size
self._poll_condition = threading.Condition()
Expand Down
61 changes: 60 additions & 1 deletion winnow/pipeline/luigi/matches.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,65 @@ def requires(self):
)


class DBMatchesByFileListTask(PipelineTask):
"""Populate database with file matches for videos listed in a text file."""

path_list_file: str = luigi.Parameter()
fingerprint_size: int = luigi.IntParameter(default=500)
metric: str = luigi.Parameter(default="angular")
n_trees: int = luigi.IntParameter(default=10)
max_matches: int = luigi.IntParameter(default=20)

# Task logs properties
LOG_TASK_NAME = "MatchFilesTask"
LOG_HAYSTACK_ATTR = "haystack_prefix"
LOG_NEEDLES_ATTR = "needles_prefix"

def run(self):
self.logger.info("Reading paths list from %s", self.path_list_file)
with open(self.path_list_file, "r") as list_file:
paths = list(map(str.strip, list_file.readlines()))
self.progress.increase(0.1)
self.logger.info("%s paths was loaded from %s", len(paths), self.path_list_file)

self.logger.info("Reading fingerprints", len(paths))
file_keys = list(map(self.pipeline.coll.file_key, paths))
signatures = self.pipeline.repr_storage.signature
feature_vectors = [FeatureVector(key=key, features=signatures.read(key)) for key in file_keys]
self.progress.increase(0.1)
self.logger.info("Loaded %s fingerprints", len(feature_vectors))

max_distance = self.config.proc.match_distance
self.logger.info("Performing match detection with max distance=%s.", max_distance)
matching = self.progress.subtask(0.7)
neighbor_matcher = NeighborMatcher(haystack=feature_vectors, max_matches=self.max_matches, metric=self.metric)
matches = neighbor_matcher.find_matches(needles=feature_vectors, max_distance=max_distance, progress=matching)
self.logger.info("Found %s matches", len(matches))

if self.config.proc.filter_dark_videos:
self.logger.info("Filtering dark videos with threshold %s", self.config.proc.filter_dark_videos_thr)
matches, metadata = filter_dark(file_keys, matches, self.pipeline, self.progress.subtask(0.03), self.logger)
result_storage = self.pipeline.result_storage
result_storage.add_metadata((key.path, key.hash, meta) for key, meta in metadata.items())
self.progress.increase(0.03)

def _entry(detected_match: DetectedMatch):
"""Flatten (query_key, match_key, dist) match entry."""
query, match = detected_match.needle_key, detected_match.haystack_key
return query.path, query.hash, match.path, match.hash, detected_match.distance

self.logger.info("Saving %s matches to the database", len(matches))
result_storage = self.pipeline.result_storage
result_storage.add_matches(_entry(match) for match in matches)
self.logger.info("Done!")

def output(self):
return ConstTarget(exists=False)

def requires(self):
SignaturesByPathListFileTask(config=self.config, path_list_file=self.path_list_file)


class MatchesReportTask(PipelineTask):
"""Find file matches and write results into CSV report."""

Expand Down Expand Up @@ -356,7 +415,7 @@ def run(self):
self.logger.info("Loaded %s fingerprints", len(feature_vectors))

max_distance = self.config.proc.match_distance
self.logger.info("Performing match detection wi.")
self.logger.info("Performing match detection with max distance=%s.", max_distance)
matching = self.progress.subtask(0.7)
neighbor_matcher = NeighborMatcher(haystack=feature_vectors, max_matches=self.max_matches, metric=self.metric)
matches = neighbor_matcher.find_matches(needles=feature_vectors, max_distance=max_distance, progress=matching)
Expand Down

0 comments on commit f6fbffe

Please sign in to comment.