Source code for adles.interfaces.interface

import logging
from abc import ABC, abstractmethod


[docs]class Interface(ABC): """Base class for all Interfaces.""" # Names/prefixes master_prefix = "(MASTER) " master_root_name = "MASTER-FOLDERS" def __init__(self, infra, spec): """ :param dict infra: Full infrastructure configuration :param dict spec: Full exercise specification """ self._log = logging.getLogger(self.__class__.__name__) self.infra = infra # Save the infrastructure configuration self.spec = spec # Save the exercise specification self.metadata = spec["metadata"] # Save the exercise spec metadata self.services = spec["services"] self.networks = spec["networks"] # Networks for platforms self.folders = spec["folders"] self.thresholds = {} # Thresholds for platforms self.groups = {} # Groups for platforms
[docs] @abstractmethod def create_masters(self): """Master creation phase.""" pass
[docs] @abstractmethod def deploy_environment(self): """Environment deployment phase.""" pass
[docs] @abstractmethod def cleanup_masters(self, network_cleanup=False): """ Cleans up master instances. :param bool network_cleanup: If networks should be cleaned up """ pass
[docs] @abstractmethod def cleanup_environment(self, network_cleanup=False): """ Cleans up a deployed environment. :param bool network_cleanup: If networks should be cleaned up """ pass
def _instances_handler(self, spec, obj_name, obj_type): """ Determines number of instances and optional prefix using specification. :param dict spec: Dict of folder :param str obj_name: Name of the thing being handled :param str obj_type: Type of the thing being handled (folder | service) :return: Number of instances, Prefix :rtype: tuple(int, str) """ num = 1 prefix = "" if "instances" in spec: if isinstance(spec["instances"], int): num = int(spec["instances"]) else: if "prefix" in spec["instances"]: prefix = str(spec["instances"]["prefix"]) if "number" in spec["instances"]: num = int(spec["instances"]["number"]) elif "size-of" in spec["instances"]: # size_of = spec["instances"]["size-of"]) # num = int(self._get_group(size_of).size # if num < 1: num = 1 # WORKAROUND FOR AD-GROUPS else: self._log.error("Unknown instances specification: %s", str(spec["instances"])) num = 0 # Check if the number of instances exceeds # the configured thresholds for the interface thr = self.thresholds[obj_type] if num > thr["error"]: self._log.error("%d instances of %s '%s' is beyond the " "configured %s threshold of %d", num, obj_type, obj_name, self.__name__, thr["error"]) raise Exception("Threshold exceeded") elif num > thr["warn"]: self._log.warning("%d instances of %s '%s' is beyond the " "configured %s threshold of %d", num, obj_type, obj_name, self.__name__, thr["warn"]) return num, prefix def _path(self, path, name): """ Generates next step of the path for deployment of Masters. :param str path: Current path :param str name: Name to add to the path :return: The updated path :rtype: str """ return str(path + '/' + self.master_prefix + name) @staticmethod def _is_enabled(spec): """ Determines if a spec is enabled. :param dict spec: Specification to check :return: If the spec is enabled :rtype: bool """ if "enabled" in spec: return bool(spec["enabled"]) else: return True def _determine_net_type(self, network_label): """ Determines the type of a network. :param str network_label: Name of network to determine type of :return: Type of the network ("generic-networks" | "unique-networks") :rtype: str """ for net_name, net_value in self.networks.items(): vals = {k for k in net_value} if network_label in vals: return net_name self._log.error("Could not find type for network '%s'", network_label) return "" def _get_group(self, group_name): """ Provides a uniform way to get information about normal groups and template groups. :param str group_name: Name of the group :return: Group object :rtype: :class:`Group` """ from adles.group import Group if group_name in self.groups: group = self.groups[group_name] if isinstance(group, Group): # Normal groups return group elif isinstance(group, list): # Template groups return group[0] else: self._log.error("Unknown type for group '%s': %s", str(group_name), str(type(group))) else: self._log.error("Could not get group '%s' from groups", group_name) def __repr__(self): return "%s(%s, %s)" % (str(self.__class__), str(self.spec), str(self.infra)) def __str__(self): return str([x for x in self.infra.keys()]) def __hash__(self): return hash(str(self)) def __eq__(self, other): return isinstance(other, self.__class__) and \ self.infra == other.infra and \ self.spec == other.spec