Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-plugins: add new error handling methods #2750

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
024b9c7
replace occurances in vdk-csv and vdk-duckdb
duyguHsnHsn Oct 2, 2023
ddb49ec
replace occurances in vdk-impala
duyguHsnHsn Oct 2, 2023
d966ebd
replace in test in vdk-impala
duyguHsnHsn Oct 2, 2023
1b716cb
replace in vdk-ingest-http
duyguHsnHsn Oct 3, 2023
4bd961d
replace in vdk-kerbweos-auth
duyguHsnHsn Oct 3, 2023
bb87699
replace in vdk-sqlite
duyguHsnHsn Oct 3, 2023
5582f2a
replace in vdk-trino
duyguHsnHsn Oct 3, 2023
37b3257
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 3, 2023
ab26064
fix impala tests
duyguHsnHsn Oct 3, 2023
4352222
fix impala tests
duyguHsnHsn Oct 3, 2023
eacf5af
fix impala tests
duyguHsnHsn Oct 3, 2023
5ed7806
merge main
duyguHsnHsn Oct 4, 2023
83fca28
get impala tests green
duyguHsnHsn Oct 4, 2023
6453357
replace log and rethrow
duyguHsnHsn Oct 4, 2023
2d200b5
remove impala changes
duyguHsnHsn Oct 4, 2023
e6b112f
remove resolvableByActual
duyguHsnHsn Oct 4, 2023
b43ecc2
use ResolvableBy instead of errors.ResolvableBy
duyguHsnHsn Oct 4, 2023
03d679f
Merge branch 'main' into person/hduygu/vdk-plugins-add-new-error-hand…
duyguHsnHsn Oct 4, 2023
182ec23
remove unused import
duyguHsnHsn Oct 4, 2023
9ef6a71
add resolvableByActual
duyguHsnHsn Oct 4, 2023
ae61ed1
remove unused import
duyguHsnHsn Oct 4, 2023
6bffa71
remove print
duyguHsnHsn Oct 5, 2023
9303de8
Merge branch 'main' into person/hduygu/vdk-plugins-add-new-error-hand…
duyguHsnHsn Oct 5, 2023
ead504e
add log warns
duyguHsnHsn Oct 5, 2023
e705ab7
add log warns
duyguHsnHsn Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions projects/vdk-plugins/vdk-csv/src/vdk/plugin/csv/csv_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from vdk.api.plugin.hook_markers import hookimpl
from vdk.internal.builtin_plugins.run.cli_run import run
from vdk.internal.core import errors
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.csv.csv_export_job import csv_export_step
from vdk.plugin.csv.csv_ingest_job import csv_ingest_step

Expand Down Expand Up @@ -123,13 +124,13 @@ def ingest_csv(ctx: Context, file: str, table_name: str, options: str) -> None:
@click.pass_context
def export_csv(ctx: click.Context, query: str, file: str):
if os.path.exists(file):
errors.log_and_throw(
errors.ResolvableBy.USER_ERROR,
log,
"Cannot create the result csv file.",
f"""{file} already exists. """,
"Will not proceed with exporting",
"Use another name or choose another location for the file",
errors.report_and_throw(
UserCodeError(
"Cannot create the result csv file.",
f"""{file} already exists. """,
"Will not proceed with exporting",
"Use another name or choose another location for the file",
)
)
args = dict(query=query, fullpath=file)
ctx.invoke(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def test_csv_export_with_nonexistent_table(tmpdir):
"result3.csv",
]
)
print(result.exception)
assert isinstance(result.exception, OperationalError)
assert hasattr(result.exception, "_vdk_resolvable_actual")
assert (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import duckdb
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin
from vdk.internal.core import errors
from vdk.internal.core.errors import ResolvableBy
from vdk.internal.core.errors import UserCodeError
from vdk.plugin.duckdb.duckdb_configuration import DuckDBConfiguration
from vdk.plugin.duckdb.duckdb_connection import DuckDBConnection

Expand Down Expand Up @@ -39,13 +41,13 @@ def ingest_payload(
"""
target = target or self.conf.get_duckdb_file()
if not target:
errors.log_and_throw(
errors.ResolvableBy.USER_ERROR,
log,
"Failed to proceed with ingestion.",
"Target was not supplied as a parameter.",
"Will not proceed with ingestion.",
"Set the correct target parameter.",
errors.report_and_throw(
UserCodeError(
"Failed to proceed with ingestion.",
"Target was not supplied as a parameter.",
"Will not proceed with ingestion.",
"Set the correct target parameter.",
)
)
if not payload:
log.debug(
Expand Down Expand Up @@ -76,29 +78,20 @@ def __ingest_payload(
cur.execute(query, obj)
log.debug(f"{obj} ingested.")
except Exception as e:
errors.log_and_rethrow(
errors.ResolvableBy.PLATFORM_ERROR,
log,
"Failed to sent payload",
"Unknown error. Error message was : " + str(e),
"Will not be able to send the payload for ingestion",
"See error message for help ",
e,
wrap_in_vdk_error=True,
)
errors.report_and_rethrow(ResolvableBy.PLATFORM_ERROR, e)

def __check_destination_table_exists(
self, destination_table: str, cur: duckdb.cursor
) -> None:
if not self._check_if_table_exists(destination_table, cur):
errors.log_and_throw(
errors.ResolvableBy.USER_ERROR,
log,
"Cannot send payload for ingestion to DuckDB database.",
"destination_table does not exist in the target database.",
"Will not be able to send the payloads and will throw exception."
"Likely the job would fail",
"Make sure the destination_table exists in the target DuckDB database.",
errors.report_and_throw(
UserCodeError(
"Cannot send payload for ingestion to DuckDB database.",
"destination_table does not exist in the target database.",
"Will not be able to send the payloads and will throw exception."
"Likely the job would fail",
"Make sure the destination_table exists in the target DuckDB database.",
)
)

@staticmethod
Expand Down Expand Up @@ -130,18 +123,18 @@ def __create_query(

for obj in payload:
if collections.Counter(fields) != collections.Counter(obj.keys()):
errors.log_and_throw(
errors.ResolvableBy.USER_ERROR,
log,
"Failed to sent payload",
f"""
One or more column names in the input data did NOT
match corresponding column names in the database.
Input Table Columns: {list(obj.keys())}
Database Table Columns: {fields}
""",
"Will not be able to send the payload for ingestion",
"See error message for help ",
errors.report_and_throw(
UserCodeError(
"Failed to sent payload",
f"""
One or more column names in the input data did NOT
match corresponding column names in the database.
Input Table Columns: {list(obj.keys())}
Database Table Columns: {fields}
""",
"Will not be able to send the payload for ingestion",
"See error message for help ",
)
)

def __create_table_if_not_exists(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,7 @@ def ingest_payload(
connection.commit()
_log.debug("Payload was ingested.")
except Exception as e:
errors.log_and_rethrow(
errors.find_whom_to_blame_from_exception(e),
_log,
"Failed to send payload",
"Unknown error. Error message was : " + str(e),
"Will not be able to send the payload for ingestion",
"See error message for help ",
e,
wrap_in_vdk_error=True,
)
errors.log_and_rethrow(errors.find_whom_to_blame_from_exception(e), e)

@staticmethod
def _populate_query_parameters_tuple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import pytest
from vdk.internal.core import errors
from vdk.internal.core.errors import ResolvableBy
from vdk.plugin.impala import impala_plugin
from vdk.plugin.test_utils.util_funcs import cli_assert
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
Expand Down Expand Up @@ -694,7 +695,7 @@ def just_throw(*_, **kwargs):
res = self._run_job(template_name, template_args)
assert expected_why_it_happened_msg in res.output
errors.log_and_throw.assert_called_once_with(
to_be_fixed_by=errors.ResolvableBy.USER_ERROR,
to_be_fixed_by=ResolvableBy.USER_ERROR,
log=ANY,
what_happened="Data loading has failed.",
why_it_happened=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core import errors
from vdk.internal.core.errors import PlatformServiceError
from vdk.internal.core.errors import ResolvableBy
from vdk.internal.core.errors import UserCodeError
from vdk.internal.core.errors import VdkConfigurationError

log = logging.getLogger(__name__)
IngestionResult = NewType("IngestionResult", Dict)
Expand Down Expand Up @@ -140,17 +144,17 @@ def ingest_payload(
@staticmethod
def __verify_target(target):
if not target:
errors.log_and_throw(
errors.ResolvableBy.CONFIG_ERROR,
log,
what_happened="Cannot send payload for ingestion over http.",
why_it_happened="target has not been provided to the plugin. "
"Most likely it has been mis-configured",
consequences="Will not be able to send the payloads and will throw exception."
"Likely the job would fail",
countermeasures="Make sure you have set correct target - "
"either as VDK_INGEST_TARGET_DEFAULT configuration variable "
"or passed target to send_**for_ingestion APIs",
errors.report_and_throw(
VdkConfigurationError(
"Cannot send payload for ingestion over http.",
"Target has not been provided to the plugin. "
"Most likely it has been mis-configured",
"Will not be able to send the payloads and will throw exception."
"Likely the job would fail",
"Make sure you have set correct target - "
"either as VDK_INGEST_TARGET_DEFAULT configuration variable "
"or passed target to send_**for_ingestion APIs",
)
)

@staticmethod
Expand All @@ -159,14 +163,15 @@ def __amend_payload(payload, destination_table):
# TODO: Move all ingestion formatting logic to a separate plugin.
if not ("@table" in obj):
if not destination_table:
errors.log_and_throw(
errors.ResolvableBy.USER_ERROR,
log,
"Corrupt payload",
"""destination_table argument is empty, or @table key is
missing from payload.""",
"Payload would not be ingested, and data job may fail.",
"Re-send payload by including @table key/value pair, or pass a destination_table parameter to the ingestion method called.",
errors.report_and_throw(
UserCodeError(
"Corrupt payload",
"""destination_table argument is empty, or @table key is
missing from payload.""",
"Payload would not be ingested, and data job may fail.",
"Re-send payload by including @table key/value pair, or pass a destination_table "
"parameter to the ingestion method called.",
)
)
else:
obj["@table"] = destination_table
Expand Down Expand Up @@ -194,22 +199,22 @@ def __send_data(self, data, http_url, headers) -> IngestionResult:
verify=self._verify,
)
if 400 <= req.status_code < 500:
errors.log_and_throw(
errors.ResolvableBy.USER_ERROR,
log,
"Failed to sent payload",
f"HTTP Client error. status is {req.status_code} and message was : {req.text}",
"Will not be able to send the payload for ingestion",
"Fix the error and try again ",
errors.report_and_throw(
UserCodeError(
"Failed to sent payload",
f"HTTP Client error. status is {req.status_code} and message was : {req.text}",
"Will not be able to send the payload for ingestion",
"Fix the error and try again ",
)
)
if req.status_code >= 500:
errors.log_and_throw(
errors.ResolvableBy.PLATFORM_ERROR,
log,
"Failed to sent payload",
f"HTTP Server error. status is {req.status_code} and message was : {req.text}",
"Will not be able to send the payload for ingestion",
"Re-try the operation again. If error persist contact support team. ",
errors.report_and_throw(
PlatformServiceError(
"Failed to sent payload",
f"HTTP Server error. status is {req.status_code} and message was : {req.text}",
"Will not be able to send the payload for ingestion",
"Re-try the operation again. If error persist contact support team. ",
)
)
log.debug(
"Payload was ingested. Request Details: "
Expand All @@ -223,13 +228,4 @@ def __send_data(self, data, http_url, headers) -> IngestionResult:
}
)
except Exception as e:
errors.log_and_rethrow(
errors.ResolvableBy.PLATFORM_ERROR,
log,
"Failed to sent payload",
"Unknown error. Error message was : " + str(e),
"Will not be able to send the payload for ingestion",
"See error message for help ",
e,
wrap_in_vdk_error=True,
)
errors.report_and_rethrow(ResolvableBy.PLATFORM_ERROR, e)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional

from vdk.internal.core import errors
from vdk.internal.core.errors import VdkConfigurationError
from vdk.plugin.kerberos.base_authenticator import BaseAuthenticator
from vdk.plugin.kerberos.kerberos_configuration import KerberosPluginConfiguration
from vdk.plugin.kerberos.kinit_authenticator import KinitGSSAPIAuthenticator
Expand Down Expand Up @@ -39,12 +40,12 @@ def create_authenticator(
log.debug("No Kerberos authentication specified")
return None

errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.CONFIG_ERROR,
log=log,
what_happened="Provided environment variable VDK_KRB_AUTH has invalid value.",
why_it_happened=f"VDK was run with environment variable VDK_KRB_AUTH={authentication_type}, "
f"however '{authentication_type}' is invalid value for this variable.",
consequences=errors.MSG_CONSEQUENCE_DELEGATING_TO_CALLER__LIKELY_EXECUTION_FAILURE,
countermeasures="Provide either 'minikerberos' or 'kinit' for environment variable VDK_KRB_AUTH.",
errors.report_and_throw(
VdkConfigurationError(
"Provided environment variable VDK_KRB_AUTH has invalid value.",
f"VDK was run with environment variable VDK_KRB_AUTH={authentication_type}, "
f"however '{authentication_type}' is invalid value for this variable.",
errors.MSG_CONSEQUENCE_DELEGATING_TO_CALLER__LIKELY_EXECUTION_FAILURE,
"Provide either 'minikerberos' or 'kinit' for environment variable VDK_KRB_AUTH.",
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from abc import abstractmethod

from vdk.internal.core import errors
from vdk.internal.core.errors import VdkConfigurationError

log = logging.getLogger(__name__)

Expand All @@ -21,14 +22,14 @@ def __init__(
):
if not os.path.isfile(keytab_pathname):
f = os.path.abspath(keytab_pathname)
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.CONFIG_ERROR,
log=log,
what_happened=f"Cannot locate keytab file {keytab_pathname}.",
why_it_happened=f"Keytab file at {f} does not exist",
consequences="Kerberos authentication is impossible. "
"Subsequent operation that require authentication will fail.",
countermeasures=f"Ensure a keytab file is located at {f}.",
errors.report_and_throw(
VdkConfigurationError(
f"Cannot locate keytab file {keytab_pathname}.",
f"Keytab file at {f} does not exist",
"Kerberos authentication is impossible. "
"Subsequent operation that require authentication will fail.",
f"Ensure a keytab file is located at {f}.",
)
)
self._krb5_conf_filename = krb5_conf_filename
self._keytab_pathname = os.path.abspath(keytab_pathname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from vdk.internal.core import errors
from vdk.internal.core.config import Configuration
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.core.errors import VdkConfigurationError

KRB_AUTH = "KRB_AUTH"
KEYTAB_FOLDER = "KEYTAB_FOLDER"
Expand Down Expand Up @@ -51,15 +52,15 @@ def keytab_pathname(self):
keytab_folder = self.keytab_folder()
keytab_filename = self.keytab_filename()
if not keytab_filename:
errors.log_and_throw(
to_be_fixed_by=errors.ResolvableBy.CONFIG_ERROR,
log=logging.getLogger(__name__),
what_happened="Cannot find keytab file location.",
why_it_happened="Keytab filename cannot be inferred from configuration.",
consequences=errors.MSG_CONSEQUENCE_DELEGATING_TO_CALLER__LIKELY_EXECUTION_FAILURE,
countermeasures="Provide configuration variables KEYTAB_FILENAME KEYTAB_FOLDER. "
"During vdk run they are automatically inferred from data job location "
"but for other commands they need to be explicitly set.",
errors.report_and_throw(
VdkConfigurationError(
"Cannot find keytab file location.",
"Keytab filename cannot be inferred from configuration.",
errors.MSG_CONSEQUENCE_DELEGATING_TO_CALLER__LIKELY_EXECUTION_FAILURE,
"Provide configuration variables KEYTAB_FILENAME KEYTAB_FOLDER. "
"During vdk run they are automatically inferred from data job location "
"but for other commands they need to be explicitly set.",
)
)
if keytab_folder:
return os.path.join(keytab_folder, keytab_filename)
Expand Down
Loading