Skip to content

Commit

Permalink
Downgrades error levels and improves some logic (#3340)
Browse files Browse the repository at this point in the history
* Downgrades error levels and improves some logic

* Update src/dispatch/plugins/dispatch_google/drive/drive.py

Co-authored-by: Will Sheldon <[email protected]>

---------

Co-authored-by: Will Sheldon <[email protected]>
  • Loading branch information
mvilanova and wssheldon authored May 2, 2023
1 parent 465bbeb commit 7244a56
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 51 deletions.
53 changes: 23 additions & 30 deletions src/dispatch/document/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from sqlalchemy import func

from dispatch.database.core import SessionLocal
from dispatch.nlp import build_phrase_matcher, build_term_vocab, extract_terms_from_text
from dispatch.decorators import scheduled_project_task
from dispatch.project.models import Project
from dispatch.nlp import build_phrase_matcher, build_term_vocab, extract_terms_from_text
from dispatch.plugin import service as plugin_service
from dispatch.project.models import Project
from dispatch.scheduler import scheduler
from dispatch.term.models import Term
from dispatch.term import service as term_service
from dispatch.term.models import Term

from .service import get_all

Expand All @@ -21,46 +21,39 @@
@scheduled_project_task
def sync_document_terms(db_session: SessionLocal, project: Project):
"""Performs term extraction from known documents."""
p = plugin_service.get_active_instance(
plugin = plugin_service.get_active_instance(
db_session=db_session, plugin_type="storage", project_id=project.id
)

if not p:
log.debug("Tried to sync document terms but couldn't find any active storage plugins.")
if not plugin:
log.warn(f"Document terms not synced. No storage plugin enabled in {project.name} project.")
return

terms = term_service.get_all(db_session=db_session, project_id=project.id).all()
log.debug(f"Fetched {len(terms)} terms from database.")

term_strings = [t.text.lower() for t in terms if t.discoverable]
phrases = build_term_vocab(term_strings)
matcher = build_phrase_matcher("dispatch-term", phrases)

documents = get_all(db_session=db_session)
for doc in documents:
log.debug(f"Processing document. Name: {doc.name}")
for document in documents:
mime_type = "text/plain"
if "sheet" in document.resource_type:
mime_type = "text/csv"

try:
if "sheet" in doc.resource_type:
mime_type = "text/csv"
else:
mime_type = "text/plain"

doc_text = p.instance.get(doc.resource_id, mime_type)
extracted_terms = list(set(extract_terms_from_text(doc_text, matcher)))

matched_terms = (
db_session.query(Term)
.filter(func.upper(Term.text).in_([func.upper(t) for t in extracted_terms]))
.all()
)
document_text = plugin.instance.get(document.resource_id, mime_type)
except Exception as e:
log.warn(e)
continue

log.debug(f"Extracted the following terms from {doc.weblink}. Terms: {extracted_terms}")
extracted_terms = list(set(extract_terms_from_text(document_text, matcher)))

if matched_terms:
doc.terms = matched_terms
db_session.commit()
matched_terms = (
db_session.query(Term)
.filter(func.upper(Term.text).in_([func.upper(t) for t in extracted_terms]))
.all()
)

except Exception as e:
# even if one document fails we don't want them to all fail
log.exception(e)
if matched_terms:
document.terms = matched_terms
db_session.commit()
2 changes: 1 addition & 1 deletion src/dispatch/plugins/dispatch_google/drive/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def download_google_document(client: Any, file_id: str, mime_type: str = "text/p
_, response = downloader.next_chunk()
return fp.getvalue().decode("utf-8")
except (HttpError, OSError):
# Do no retry. Log the error fail.
# Do no retry and raise exception
raise Exception(f"Failed to export the file. Id: {file_id} MimeType: {mime_type}") from None


Expand Down
8 changes: 4 additions & 4 deletions src/dispatch/signal/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ def create_signal_instance(
signal_instance_in.signal = signal

if not signal:
msg = f"No signal definition found. Id: {external_id} Variant: {variant}"
log.error(msg)
msg = f"No signal definition found. External Id: {external_id} Variant: {variant}"
log.warn(msg)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=[{"msg": msg}],
) from None

if not signal.enabled:
msg = f"Signal definition not enabled. SignalName: {signal.name}"
msg = f"Signal definition not enabled. Signal Name: {signal.name}"
log.info(msg)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
status_code=status.HTTP_403_FORBIDDEN,
detail=[{"msg": msg}],
) from None

Expand Down
28 changes: 18 additions & 10 deletions src/dispatch/workflow/scheduled.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
import logging

from schedule import every
from dispatch.database.core import SessionLocal
from sqlalchemy.orm import Session

from dispatch.decorators import scheduled_project_task
from dispatch.messaging.strings import (
INCIDENT_WORKFLOW_COMPLETE_NOTIFICATION,
INCIDENT_WORKFLOW_UPDATE_NOTIFICATION,
)
from dispatch.plugin import service as plugin_service
from dispatch.plugin.models import PluginInstance
from dispatch.project.models import Project
from dispatch.scheduler import scheduler
from dispatch.workflow import service as workflow_service

from .enums import WorkflowInstanceStatus
from .flows import send_workflow_notification
from .models import WorkflowInstanceUpdate

from .models import WorkflowInstance, WorkflowInstanceUpdate

log = logging.getLogger(__name__)

WORKFLOW_SYNC_INTERVAL = 30 # seconds


def sync_workflow(db_session, project, workflow_plugin, instance, notify: bool = False):
def sync_workflow(
db_session: Session,
project: Project,
workflow_plugin: PluginInstance,
instance: WorkflowInstance,
notify: bool = False,
):
"""Performs workflow sync."""
log.debug(
f"Processing workflow instance. Instance: {instance.parameters} Workflow: {instance.workflow.name}"
Expand Down Expand Up @@ -88,18 +94,20 @@ def sync_workflow(db_session, project, workflow_plugin, instance, notify: bool =

@scheduler.add(every(WORKFLOW_SYNC_INTERVAL).seconds, name="workflow-sync")
@scheduled_project_task
def sync_all_workflows(db_session: SessionLocal, project: Project):
"""Syncs incident workflows."""
def sync_all_workflows(db_session: Session, project: Project):
"""Syncs all incident workflows."""
workflow_plugin = plugin_service.get_active_instance(
db_session=db_session, project_id=project.id, plugin_type="workflow"
)

if not workflow_plugin:
log.warning(
f"No workflow plugin is enabled. Project: {project.name}. Organization: {project.organization.name}"
f"Workflows not synced. No workflow plugin enabled in {project.name} project and {project.organization.name} organization."
)
return

instances = workflow_service.get_running_instances(db_session=db_session)
for i in instances:
sync_workflow(db_session, project, workflow_plugin, i)
workflow_instances = workflow_service.get_running_instances(
db_session=db_session, project=project
)
for instance in workflow_instances:
sync_workflow(db_session, project, workflow_plugin, instance)
16 changes: 10 additions & 6 deletions src/dispatch/workflow/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@

from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import true

from pydantic.error_wrappers import ErrorWrapper, ValidationError

from dispatch.case import service as case_service
from dispatch.config import DISPATCH_UI_URL
from dispatch.document import service as document_service
from dispatch.exceptions import NotFoundError
from dispatch.project import service as project_service
from dispatch.plugin import service as plugin_service
from dispatch.incident import service as incident_service
from dispatch.case import service as case_service
from dispatch.signal import service as signal_service
from dispatch.participant import service as participant_service
from dispatch.document import service as document_service
from dispatch.plugin import service as plugin_service
from dispatch.project import service as project_service
from dispatch.project.models.py import Project
from dispatch.signal import service as signal_service
from dispatch.workflow.enums import WorkflowInstanceStatus

from .models import (
Expand Down Expand Up @@ -119,10 +122,11 @@ def get_instance(*, db_session, instance_id: int) -> WorkflowInstance:
)


def get_running_instances(*, db_session) -> List[WorkflowInstance]:
def get_running_instances(*, db_session, project: Project) -> List[WorkflowInstance]:
"""Fetches all running instances."""
return (
db_session.query(WorkflowInstance)
.filter(WorkflowInstance.workflow.project.id == project.id)
.filter(
WorkflowInstance.status.in_(
(
Expand Down

0 comments on commit 7244a56

Please sign in to comment.