Skip to content

Commit

Permalink
script/eve-parity: add script for checking eve/keyword parity
Browse files Browse the repository at this point in the history
Currently this script has two commands: "missing" and "having".

"missing" will show eve fields that do not map to any keywords.

"having" will sohw eve fields along with their keyword mappsings,
while also validating that those keywords really exist.

Related to tickets: OISF#6463, OISF#4772
  • Loading branch information
jasonish committed Feb 21, 2025
1 parent 0389e9b commit 68e1839
Showing 1 changed file with 164 additions and 0 deletions.
164 changes: 164 additions & 0 deletions scripts/eve-parity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#! /usr/bin/env python3
#
# Tool for checking parity between the EVE schema and Suricata
# keywords.
#
# Usage: ./scripts/eve-parity.py [missing|having]
#
# ## unmapped-keywords
#
# Display all known keywords that are not mapped to an EVE field.
#
# ## unmapped-fields
#
# Display all eve fields that do not have a keyword mapping.
#
# ## mapped-fields
#
# Display all EVE fields that have a keyword mapping.


import sys
import subprocess
import json
import argparse


def main():
parser = argparse.ArgumentParser(description="EVE Parity Check Tool")
parser.add_argument(
"command", choices=["mapped-fields", "unmapped-keywords", "unmapped-fields"]
)
args = parser.parse_args()

keywords = load_known_keywords()
keys = load_schema()

if args.command == "mapped-fields":
mapped_fields(keywords, keys)
elif args.command == "unmapped-keywords":
unmapped_keywords(keywords, keys)
elif args.command == "unmapped-fields":
unmapped_fields(keywords, keys)


def unmapped_keywords(keywords, keys):
"""Report known keywords that are not mapped to an EVE field."""
schema_keywords = set()
for key in keys.keys():
if "keywords" in keys[key] and keys[key]["keywords"]:
for keyword in keys[key]["keywords"]:
schema_keywords.add(keyword)
unmapped = keywords - schema_keywords
for keyword in sorted(unmapped):
print(keyword)


def unmapped_fields(keywords, keys):
with_missing = set()

for key in keys.keys():
if "keywords" not in keys[key]:
with_missing.add(key)

# Print sorted.
for key in sorted(with_missing):
print(key)


def mapped_fields(keywords, keys):
for key in keys.keys():
if "keywords" in keys[key] and keys[key]["keywords"]:
for keyword in keys[key]["keywords"]:
if keyword not in keywords:
errprint("ERROR: Unknown keyword: {}".format(keyword))
print("{} -> [{}]".format(key, ", ".join(keys[key]["keywords"])))


def load_schema():
schema = json.load(open("etc/schema.json"))
stack = [(schema, [])]
keys = {}

while stack:
(current, path) = stack.pop(0)

for name, props in current["properties"].items():
if "$ref" in props:
ref = find_ref(schema, props["$ref"])
if not ref:
raise Exception("$ref not found: {}".format(props["$ref"]))
props = ref
if props["type"] in ["string", "integer", "boolean", "number"]:
# End of the line...
key = ".".join(path + [name])
keys[key] = props.get("suricata", {})
elif props["type"] == "object":
# An object can set "suricata.keywords" to false to
# disable descending into it. For examples, "stats".
keywords = props.get("suricata", {}).get("keywords")
if keywords is False:
# print("Skipping object {}, keywords disabled".format(".".join(path + [name])))
continue

if "properties" in props:
stack.insert(0, (props, path + [name]))
else:
# May want to warn that this object has no properties.
key = ".".join(path + [name])
keys[key] = {}
elif props["type"] == "array":
if "items" in props and "type" in props["items"]:
if "properties" in props["items"]:
stack.insert(
0,
(
props["items"],
path + ["{}".format(name)],
),
)
else:
# May want to warn that this array has no properties.
key = ".".join(path + [name])
keys[key] = {}
else:
# May want to warn that this array has no items.
key = ".".join(path + [name])
keys[key] = {}
else:
raise Exception("Unsupported type: {}".format(props["type"]))

return keys


def load_known_keywords():
keywords = set()
result = subprocess.check_output(["./src/suricata", "--list-keywords=csv"])
lines = result.decode().split("\n")
# Skip first line, as its a header line.
for line in lines[1:]:
parts = line.split(";")
if parts:
keywords.add(parts[0])
return keywords


def errprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)


def find_ref(schema: dict, ref: str) -> dict:
parts = ref.split("/")

root = parts.pop(0)
if root != "#":
raise Exception("Unsupported reference: {}".format(ref))

while parts:
schema = schema[parts.pop(0)]

return schema


if __name__ == "__main__":
sys.exit(main())

0 comments on commit 68e1839

Please sign in to comment.