-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathauditlogentry.py
84 lines (70 loc) · 2.8 KB
/
auditlogentry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import re
from django.db.models import prefetch_related_objects
from sentry_sdk import capture_exception
from sentry import audit_log
from sentry.api.serializers import Serializer, register, serialize
from sentry.models.auditlogentry import AuditLogEntry
def fix(data):
# There was a point in time where full Team objects
# got serialized into our AuditLogEntry.data, so these
# values need to be stripped and reduced down to integers
if data.get("teams"):
if hasattr(data["teams"][0], "id"):
data["teams"] = [t.id for t in data["teams"]]
# Only return fields in the `data` field that are actually used by the frontend
# Do not add fields to this list that contain sensitive information
required_fields = {"id", "slug", "old_slug", "new_slug", "name"}
return {k: v for k, v in data.items() if k in required_fields}
def override_actor_id(user):
# overrides the usage of actor_id only to make SCIM token
# name more readable (for now)
scim_prefix = "scim-internal-integration-"
scim_regex = re.compile(
scim_prefix
+ r"[0-9a-fA-F]{6}\-[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{7}"
)
scim_match = re.match(scim_regex, user.get_display_name())
return scim_match
@register(AuditLogEntry)
class AuditLogEntrySerializer(Serializer):
def get_attrs(self, item_list, user, **kwargs):
# TODO(dcramer); assert on relations
prefetch_related_objects(item_list, "actor")
prefetch_related_objects(item_list, "target_user")
users = {
d["id"]: d
for d in serialize(
{i.actor for i in item_list if i.actor_id}
| {i.target_user for i in item_list if i.target_user_id},
user,
)
}
return {
item: {
"actor": (
users[str(item.actor_id)]
if item.actor_id and not override_actor_id(item.actor)
else {"name": item.get_actor_name()}
),
"targetUser": users.get(str(item.target_user_id)) or item.target_user_id,
}
for item in item_list
}
def serialize(self, obj, attrs, user, **kwargs):
audit_log_event = audit_log.get(obj.event)
try:
note = audit_log_event.render(obj)
except KeyError as exc:
note = ""
capture_exception(exc)
return {
"id": str(obj.id),
"actor": attrs["actor"],
"event": audit_log_event.api_name,
"ipAddress": obj.ip_address,
"note": note,
"targetObject": obj.target_object,
"targetUser": attrs["targetUser"],
"data": fix(obj.data),
"dateCreated": obj.datetime,
}