Source code for adles.parser

import ipaddress
import logging
import os
import sys
from io import TextIOWrapper
from typing import Optional, Tuple

from yaml import YAMLError, load

try:  # Attempt to use C-based YAML parser if it's available
    from yaml import CLoader as Loader
except ImportError:  # Fallback to using pure Python YAML parser
    from yaml import Loader  # noqa: T484

from adles import utils


# PyYAML Reference: http://pyyaml.org/wiki/PyYAMLDocumentation
[docs]def parse_yaml_file(file: TextIOWrapper) -> Optional[dict]: """Parses a YAML file and returns a nested dictionary containing its contents. :param file: Handle of file to parse :return: Parsed file contents""" try: # Parses the YAML file into a dict return load(file, Loader=Loader) except YAMLError as exc: logging.critical("Could not parse YAML file %s", file.name) if hasattr(exc, 'problem_mark'): # Tell user exactly where the syntax error is mark = exc.problem_mark logging.error("Error position: (%s:%s)", mark.line + 1, mark.column + 1) else: logging.error("Error: %s", exc) return None
# PyYAML Reference: http://pyyaml.org/wiki/PyYAMLDocumentation
[docs]def parse_yaml(filename: str) -> Optional[dict]: """Parses a YAML file and returns a nested dictionary containing its contents. :param filename: Name of YAML file to parse :return: Parsed file contents""" try: # Enables use of stdin if '-' is specified with sys.stdin if filename == '-' else open(filename) as f: try: # Parses the YAML file into a dict return load(f, Loader=Loader) except YAMLError as exc: logging.critical("Could not parse YAML file %s", filename) if hasattr(exc, 'problem_mark'): # Tell user exactly where the syntax error is mark = exc.problem_mark logging.error("Error position: (%s:%s)", mark.line + 1, mark.column + 1) else: logging.error("Error: %s", exc) return None except FileNotFoundError: logging.critical("Could not find YAML file for parsing: %s", filename) return None
def _checker(value_list: list, source: str, data: dict, flag: str) -> int: """Checks if values in the list are in data (Syntax warnings or errors). :param value_list: List of values to check :param source: Name of source that's being checked :param data: Data being checked :param flag: What to do if value not found ("warnings" | "errors") :return: Number of hits (warnings/errors)""" num_hits = 0 for value in value_list: if value not in data: if flag == "warnings": logging.warning("Missing %s in %s", value, source) elif flag == "errors": logging.error("Missing %s in %s", value, source) else: logging.error("Invalid flag for _checker: %s", flag) num_hits += 1 if num_hits > 0: logging.info("Total number of %s in %s: %d", flag, source, num_hits) return num_hits def _verify_exercise_metadata_syntax(metadata: dict) -> Tuple[int, int]: """Verifies that the syntax for exercise metadata matches the specification. :param metadata: metadata :return: Number of errors, Number of warnings""" warnings = ["description", "version", "folder-name"] errors = ["name", "prefix", "infra-file"] num_warnings = _checker(warnings, "metadata", metadata, "warnings") num_errors = _checker(errors, "metadata", metadata, "errors") if "infra-file" in metadata: infra_file = metadata["infra-file"] if not os.path.exists(infra_file): logging.error("Could not open infra-file '%s'", infra_file) num_errors += 1 else: err, warn = verify_infra_syntax(parse_yaml(infra_file)) num_errors += err num_warnings += warn return num_errors, num_warnings def _verify_groups_syntax(groups: dict) -> Tuple[int, int]: """Verifies that the syntax for groups matches the specification. :param groups: groups :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 for key, value in groups.items(): if "instances" in value: # Template groups if not isinstance(value["instances"], int): logging.error("Instances must be an Integer for group %s", key) num_errors += 1 if "ad-group" in value: if not isinstance(value["ad-group"], str): logging.error("AD group must be a string") num_errors += 1 elif "filename" in value: e, w = _check_group_file(value["filename"]) num_errors += e num_warnings += w else: logging.error("Invalid user specification method for " "template group %s", key) num_errors += 1 else: # Regular groups (not templates) if "ad-group" in value: if not isinstance(value["ad-group"], str): logging.error("AD group must be a string") num_errors += 1 elif "filename" in value: e, w = _check_group_file(value["filename"]) num_errors += e num_warnings += w elif "user-list" in value: if not isinstance(value["user-list"], list): logging.error("Username specification must be a " "list for group %s", key) num_errors += 1 else: logging.error("Invalid user specification method " "for group %s", key) num_errors += 1 return num_errors, num_warnings def _check_group_file(filename: str) -> Tuple[int, int]: """Verifies user info file for a group. :param filename: Name of user info JSON file :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 if utils.read_json(filename) is None: logging.error("Invalid user info file %s", filename) num_errors += 1 return num_errors, num_warnings def _verify_services_syntax(services: dict) -> Tuple[int, int]: """Verifies that the syntax for services matches the specification. :param dict services: services :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 for key, value in services.items(): if "network-interfaces" in value and \ not isinstance(value["network-interfaces"], list): logging.error("Network interfaces must be a list for " "service %s", key) num_errors += 1 if "provisioner" in value: num_errors += _checker(["name", "file"], "provisioner for service %s" % key, value["provisioner"], "errors") if "note" in value and not isinstance(value["note"], str): logging.error("Note must be a string for service %s", key) num_errors += 1 if "template" in value: pass elif "image" in value or "dockerfile" in value: pass elif "compose-file" in value: pass else: logging.error("Invalid service definition: %s", key) num_errors += 1 return num_errors, num_warnings def _verify_resources_syntax(resources: dict) -> Tuple[int, int]: """Verifies that the syntax for resources matches the specification. :param dict resources: resources :return: Number of errors, Number of warnings""" warnings = [] errors = ["lab", "resource"] num_warnings = _checker(warnings, "resources", resources, "warnings") num_errors = _checker(errors, "resources", resources, "errors") return num_errors, num_warnings def _verify_networks_syntax(networks: dict) -> Tuple[int, int]: """Verifies that the syntax for networks matches the specification. :param networks: networks :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 net_types = ["unique-networks", "generic-networks"] if not any(net in networks for net in net_types): logging.error("Network specification exists but is empty!") num_errors += 1 else: for name, network in networks.items(): err, warn = _verify_network(name, network) num_errors += err num_warnings += warn return num_errors, num_warnings def _verify_network(name: str, network: dict) -> Tuple[int, int]: """Verifies syntax of a specific network. :param name: Name of network :param network: the network :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 for key, value in network.items(): # Subnet verification if "subnet" not in value: logging.warning("No subnet specified for %s %s", name, key) num_warnings += 1 else: try: subnet = ipaddress.ip_network(value["subnet"]) except ValueError as err: logging.error("Invalid format for subnet '%s': %s", str(value["subnet"]), str(err)) num_errors += 1 else: if subnet.is_reserved \ or subnet.is_link_local \ or subnet.is_multicast \ or subnet.is_loopback: logging.error("%s %s is in a invalid IP address space", name, key) num_errors += 1 elif not subnet.is_private: logging.warning("Non-private subnet used for %s %s", name, key) num_warnings += 1 # VLAN verification if "vlan" in value: if name == "unique-networks" and int(value["vlan"]) >= 2000: logging.error("VLAN must be less than 2000 for network %s", key) num_errors += 1 elif name == "generic-networks": logging.error("VLAN specification is not allowed " "for network %s", key) num_errors += 1 # Increment verification if "increment" in value: if name == "unique-networks": logging.error("Increment cannot be used for network %s", key) num_errors += 1 elif not isinstance(value["increment"], bool): logging.error("Increment must be a boolean for network %s", key) num_errors += 1 return num_errors, num_warnings def _verify_folders_syntax(folders: dict) -> Tuple[int, int]: """Verifies that the syntax for folders matches the specification. :param folders: folders :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 keywords = ["group", "master-group", "instances", "description", "enabled"] for key, value in folders.items(): if key in keywords: continue if not isinstance(value, dict): logging.error("Invalid configuration %s", str(key)) num_errors += 1 continue # Check instances syntax, regardless of parent or base if "instances" in value: if not isinstance(value["instances"], int): pass elif "number" in value["instances"]: if not isinstance(value["instances"]["number"], int): logging.error("Number of instances for folder '%s' " "must be an Integer", key) num_errors += 1 elif "size-of" in value["instances"]: pass else: logging.error("Must specify number of instances " "for folder '%s'", key) num_errors += 1 # Check if parent or base if "services" in value: # It's a base folder if "group" not in value: logging.error("No group specified for folder '%s'", key) num_errors += 1 for skey, svalue in value["services"].items(): if "service" not in svalue: logging.error("Service %s is unnamed in folder '%s'", skey, key) num_errors += 1 if "networks" in svalue and \ not isinstance(svalue["networks"], list): logging.error("Network specifications must be a list " "for service '%s' " "in folder '%s'", skey, key) num_errors += 1 if "scoring" in svalue: err, warn = _verify_scoring_syntax(skey, svalue["scoring"]) num_errors += err num_warnings += warn else: # It's a parent folder if not isinstance(value, dict): logging.error("Could not verify syntax of folder '%s': " "'%s' is not a folder!", str(key), str(value)) num_errors += 1 else: err, warn = _verify_folders_syntax(value) num_errors += err num_warnings += warn return num_errors, num_warnings def _verify_scoring_syntax(service_name: str, scoring: dict) -> Tuple[int, int]: """Verifies syntax for the scoring definition of a service. :param service_name: Name of the service for which the scoring specification applies :param scoring: scoring parameters :return: Number of errors, Number of warnings""" warnings = ["ports", "protocols"] errors = ["criteria"] num_warnings = _checker(warnings, "service %s" % service_name, scoring, "warnings") num_errors = _checker(errors, "service %s" % service_name, scoring, "errors") return num_errors, num_warnings
[docs]def verify_infra_syntax(infra: dict) -> Tuple[int, int]: """Verifies the syntax of an infrastructure specification. :param infra: infrastructure :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 warnings = [] errors = [] for platform, config in infra.items(): if platform == "vmware-vsphere": # VMware vSphere configurations warnings = ["port", "login-file", "datacenter", "datastore", "server-root", "vswitch"] errors = ["hostname", "template-folder"] if "login-file" in config and \ utils.read_json(config["login-file"]) is None: logging.error("Invalid vSphere infrastructure login-file: %s", config["login-file"]) num_errors += 1 if "host-list" in config and \ not isinstance(config["host-list"], list): logging.error("Invalid type for vSphere host-list: %s", type(config["host-list"])) num_errors += 1 if "thresholds" in config: num_errors += _checker(["folder", "service"], "infrastructure", config["thresholds"], "errors") elif platform == "docker": # Docker configurations warnings = ["url"] errors = [] if "registry" in config: num_errors += _checker(["url", "login-file"], "infrastructure", config["registry"], "errors") elif platform in ["cloud"]: logging.info("Platform %s is not yet implemented", platform) else: logging.error("Unknown infrastructure platform: %s", str(platform)) num_warnings += 1 continue # Skip the syntax verification of unknown platforms num_warnings += _checker(warnings, "infrastructure", config, "warnings") num_errors += _checker(errors, "infrastructure", config, "errors") return num_errors, num_warnings
[docs]def verify_exercise_syntax(spec: dict) -> Tuple[int, int]: """Verifies the syntax of an environment specification. :param spec: Dictionary of environment specification :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 funcs = {"metadata": _verify_exercise_metadata_syntax, "groups": _verify_groups_syntax, "services": _verify_services_syntax, "resources": _verify_resources_syntax, "networks": _verify_networks_syntax, "folders": _verify_folders_syntax} required = ["metadata", "groups", "services", "networks", "folders"] optional = ["resources"] for key, func in funcs.items(): if key not in spec: if key in required: logging.error("Required definition %s was not found", key) num_errors += 1 elif key in optional: logging.info('Optional definition "%s" was not found', key) else: logging.warning("Unknown definition found: %s", key) num_warnings += 1 else: err, warn = func(spec[key]) num_errors += err num_warnings += warn return num_errors, num_warnings
[docs]def verify_package_syntax(package: dict) -> Tuple[int, int]: """Verifies the syntax of an package specification. :param package: Dictionary representation of the package specification :return: Number of errors, Number of warnings""" num_warnings = 0 num_errors = 0 # Check syntax of metadata section if "metadata" not in package: logging.error("Metadata section not specified for package!") num_errors += 1 else: metadata_warnings = ["name", "description", "version"] metadata_errors = ["timestamp", "tag"] num_warnings += _checker(metadata_warnings, "metadata", package["metadata"], "warnings") num_errors += _checker(metadata_errors, "metadata", package["metadata"], "errors") # Check syntax of contents section if "contents" not in package: logging.error("Contents section not specified for package!") num_errors += 1 else: content_warnings = ["infrastructure", "scoring", "results", "templates", "materials"] content_errors = ["environment"] num_warnings += _checker(content_warnings, "contents", package["contents"], "warnings") num_errors += _checker(content_errors, "contents", package["contents"], "errors") return num_errors, num_warnings
[docs]def check_syntax(specfile_path: str, spec_type: str = "exercise") -> Optional[dict]: """Checks the syntax of a specification file. :param specfile_path: Path to the YAML specification file :param spec_type: Type of specification file (exercise | package | infra) :return: The specification """ spec = parse_yaml(specfile_path) if spec is None: logging.critical("Failed to ingest specification file %s", os.path.basename(specfile_path)) return None logging.info("Successfully ingested specification file '%s'", os.path.basename(specfile_path)) if spec_type == "exercise": logging.info("Checking exercise syntax...") errors, warnings = verify_exercise_syntax(spec) elif spec_type == "package": logging.info("Checking package syntax...") errors, warnings = verify_package_syntax(spec) elif spec_type == "infra": logging.info("Checking infrastructure syntax...") errors, warnings = verify_infra_syntax(spec) else: logging.error("Unknown specification type in for check_syntax: %s", str(spec_type)) return None if errors == 0 and warnings == 0: logging.info("Syntax check successful!") return spec elif errors == 0: logging.warning("Syntax check successful, but there were %d warnings", warnings) return spec else: logging.error("Syntax check failed! Errors: %d\tWarnings: %d", errors, warnings) return None