-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(app): long term metric storage (#173)
Co-authored-by: Ralf Grubenmann <[email protected]>
- Loading branch information
Showing
17 changed files
with
1,167 additions
and
466 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
from dataclasses import dataclass, field | ||
import dataconf | ||
import json | ||
from typing import Optional, Union, List | ||
|
||
|
||
@dataclass | ||
class S3Config: | ||
"""The configuration needed to upload metrics to S3.""" | ||
endpoint: str | ||
bucket: str | ||
path_prefix: str | ||
access_key_id: str | ||
secret_access_key: str | ||
rotation_period_seconds: Union[str, int] = 86400 | ||
|
||
def __post_init__(self): | ||
if type(self.rotation_period_seconds) is str: | ||
self.rotation_period_seconds = int(self.rotation_period_seconds) | ||
|
||
|
||
@dataclass | ||
class MetricsBaseConfig: | ||
"""Base metrics/auditlog configuration.""" | ||
enabled: Union[str, bool] = False | ||
extra_labels: Union[str, List[str]] = field(default_factory=list) | ||
|
||
def __post_init__(self): | ||
if type(self.enabled) is str: | ||
self.enabled = self.enabled.lower() == "true" | ||
if type(self.extra_labels) is str: | ||
self.extra_labels = json.loads(self.extra_labels) | ||
|
||
|
||
@dataclass | ||
class AuditlogConfig(MetricsBaseConfig): | ||
"""The configuration used for the auditlogs.""" | ||
s3: Optional[S3Config] = None | ||
|
||
def __post_init__(self): | ||
super().__post_init__() | ||
if self.enabled and not self.s3: | ||
raise ValueError("If auditlog is enabled then the S3 configuration has to be provided.") | ||
|
||
@classmethod | ||
def dataconf_from_env(cls, prefix="AUDITLOG_"): | ||
return dataconf.env(prefix, cls) | ||
|
||
|
||
@dataclass | ||
class PrometheusMetricsConfig(MetricsBaseConfig): | ||
"""The configuration for prometheus metrics""" | ||
port: Union[str, int] = 8765 | ||
|
||
def __post_init__(self): | ||
super().__post_init__() | ||
if type(self.port) is str: | ||
self.port = int(self.port) | ||
|
||
@classmethod | ||
def dataconf_from_env(cls, prefix="METRICS_"): | ||
return dataconf.env(prefix, cls) |
This file was deleted.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from abc import ABC, abstractmethod | ||
from dataclasses import dataclass | ||
from datetime import datetime | ||
from dateutil import parser | ||
from typing import Any, Dict, Optional | ||
|
||
from controller.server_status_enum import ServerStatusEnum | ||
|
||
|
||
@dataclass | ||
class MetricEvent: | ||
"""Every element in the metrics queue that is created by | ||
amalthea and consumed by the metrics handlers conforms to this | ||
structure.""" | ||
event_timestamp: datetime | ||
session: Dict[str, Any] | ||
sessionCreationTimestamp: Optional[datetime] = None | ||
old_status: Optional[ServerStatusEnum] = None | ||
status: Optional[ServerStatusEnum] = None | ||
|
||
def __post_init__(self): | ||
if self.status and type(self.status) is str: | ||
self.status = ServerStatusEnum(self.status) | ||
if self.old_status and type(self.old_status) is str: | ||
self.old_status = ServerStatusEnum(self.old_status) | ||
if self.session.get("metadata", {}).get("creationTimestamp"): | ||
self.sessionCreationTimestamp = parser.isoparse( | ||
self.session.get("metadata", {}).get("creationTimestamp") | ||
) | ||
|
||
def __repr__(self) -> str: | ||
return ( | ||
f"MetricEvent(event_timestamp={self.event_timestamp}, old_status={self.old_status}, " | ||
f"status={self.status}, sessionCreationTimestamp={self.sessionCreationTimestamp}, " | ||
"session=<redacted>)" | ||
) | ||
|
||
|
||
class MetricEventHandler(ABC): | ||
"""Abstract class for the queue workers that will | ||
be doing the final publishing or persisting of any metrics | ||
that are generated by amalthea in the metrics queue.""" | ||
@abstractmethod | ||
def publish(self, metric_event: MetricEvent): | ||
pass |
Oops, something went wrong.