Source code for adles.utils

import json
import logging
import logging.handlers
import os
import sys
import timeit
from typing import Callable, List, Optional, Tuple

    import tqdm
    TQDM = True

[docs] class TqdmHandler(logging.StreamHandler): def __init__(self, level=logging.NOTSET): super(self.__class__, self).__init__(level)
[docs] def emit(self, record): try: msg = self.format(record) tqdm.tqdm.write(msg) self.flush() except (KeyboardInterrupt, SystemExit): raise except: # noqa self.handleError(record)
except ImportError: TQDM = False # Credit to:
[docs]def time_execution(func: Callable) -> Callable: """Function decorator to time the execution of a function and log to debug. :param func: The function to time execution of :return: The decorated function""" def wrapper(*args, **kwargs): start_time = timeit.default_timer() ret = func(*args, **kwargs) end_time = timeit.default_timer() logging.debug("Elapsed time for %s: %f seconds", func.__name__, float(end_time - start_time)) return ret return wrapper
# From: list_dc_datastore_info in pyvmomi-community-samples #
[docs]def sizeof_fmt(num: float) -> str: """Generates the human-readable version of a file size. >>> sizeof_fmt(512) 512bytes >>> sizeof_fmt(2048) 2KB :param num: Robot-readable file size in bytes :return: Human-readable file size""" for item in ['bytes', 'KB', 'MB', 'GB']: if num < 1024.0: return "%3.1f%s" % (num, item) num /= 1024.0 return "%3.1f%s" % (num, 'TB')
[docs]def pad(value: int, length: int = 2) -> str: """ Adds leading and trailing zeros to value ("pads" the value). >>> pad(5) 05 >>> pad(9, 3) 009 :param value: integer value to pad :param length: Length to pad to :return: string of padded value""" return "{0:0>{width}}".format(value, width=length)
[docs]def read_json(filename: str) -> Optional[dict]: """Reads input from a JSON file and returns the contents. :param filename: Path to JSON file to read :return: Contents of the JSON file""" try: with open(filename) as json_file: return json.load(fp=json_file) except ValueError as message: logging.error("Syntax Error in JSON file '%s': %s", filename, str(message)) except FileNotFoundError: logging.error("Could not find file %s", filename) except Exception as message: logging.error("Could not open JSON file '%s': %s", filename, str(message)) return None
[docs]def split_path(path: str) -> Tuple[List[str], str]: """Splits a filepath. >>> split_path('/path/To/A/f1le') (['path', 'To', 'A'], 'file') :param path: Path to split :return: Path, basename""" folder_path, name = os.path.split(path.lower()) # Separate basename folder_path = folder_path.split('/') # Transform path into list if folder_path[0] == '': del folder_path[0] return folder_path, name
[docs]def handle_keyboard_interrupt(func: Callable) -> Callable: """Function decorator to handle keyboard interrupts in a consistent manner.""" # Based on: def wrapper(*args, **kwargs): try: ret = func(*args, **kwargs) except KeyboardInterrupt: # Output a blank line for readability print() # noqa: T001"Exiting...") sys.exit(0) return ret return wrapper
[docs]def setup_logging(filename: str, colors: bool = True, console_verbose: bool = False, server: Tuple[str, int] = None, show_progress: bool = True): """Configures the logging interface used by everything for output. :param filename: Name of file that logs should be saved to :param colors: Color the terminal output :param console_verbose: Print DEBUG logs to terminal :param server: SysLog server to forward logs to :param show_progress: Show live status as operations progress""" # Prepend spaces to separate logs from previous runs with open(filename, 'a', encoding='utf-8') as logfile: logfile.write(2 * '\n') # Format log output so it's human readable yet verbose base_format = "%(asctime)s %(levelname)-8s %(name)-7s %(message)s" time_format = "%H:%M:%S" # %Y-%m-%d formatter = logging.Formatter(fmt=base_format, datefmt=time_format) # Configures the base logger to append to a file logging.basicConfig(level=logging.DEBUG, format=base_format, datefmt=time_format, filename=filename, filemode='a') # Get the global root logger # Handlers added to this will propagate to all loggers logger = logging.root # Configure logging to a SysLog server # This prevents students from simply deleting the log files if server is not None: syslog = logging.handlers.SysLogHandler(address=server) syslog.setLevel(logging.DEBUG) syslog.setFormatter(formatter) logger.addHandler(syslog) logging.debug("Configured logging to SysLog server %s:%d", server[0], server[1]) # Record system information to aid in auditing and debugging # We do this before configuring console output to reduce verbosity from getpass import getuser from platform import python_version, system, release, node from datetime import date from adles import __version__ as adles_version logging.debug("Initialized logging, saving logs to %s", filename) logging.debug("Date %s", str( logging.debug("OS %s", str(system() + " " + release())) logging.debug("Hostname %s", str(node())) logging.debug("Username %s", str(getuser())) logging.debug("Directory %s", str(os.getcwd())) logging.debug("Python version %s", str(python_version())) logging.debug("Adles version %s", str(adles_version)) # If any of the libraries we're using have warnings, capture and log them logging.captureWarnings(capture=True) # Configure console output if TQDM and show_progress: console = TqdmHandler() else: console = logging.StreamHandler(stream=sys.stdout) if colors and "NO_COLOR" not in os.environ: try: from colorlog import ColoredFormatter formatter = ColoredFormatter(fmt="%(log_color)s" + base_format, datefmt=time_format, reset=True) logging.debug("Configured COLORED console logging output") except ImportError: logging.error("Colorlog is not installed. " "Using STANDARD console output...") console.setFormatter(formatter) console.setLevel((logging.DEBUG if console_verbose else logging.INFO)) logger.addHandler(console) # Warn if using old Python version if python_version() < '3.6': logger.error("Python version %s is unsupported. " "Please use Python 3.6+ instead. " "Proceed at your own risk!")
[docs]def get_vlan() -> int: """Generates a globally unique VLAN tags. :return: VLAN tag""" for i in range(2000, 4096): yield i
@handle_keyboard_interrupt def user_input(prompt: str, obj_name: str, func: Callable) -> Tuple[object, str]: """Continually prompts a user for input until the specified object is found. :param prompt: Prompt to bother user with :param obj_name: Name of the type of the object that we seek :param func: The function that shalt be called to discover the object :return: The discovered object (vimtype) and it's human name""" while True: item_name = input(prompt) item = func(item_name) if item:"Found %s: %s", obj_name, return item, item_name else: print("Couldn't find a %s with name %s. Perhaps try another? " # noqa: T001 % (obj_name, item_name)) @handle_keyboard_interrupt def default_prompt(prompt: str, default: str = None) -> Optional[str]: """Prompt the user for input. If they press enter, return the default. :param prompt: Prompt to display to user (do not include default value) :param default: Default return value :return: Value entered or default""" def_prompt = " [default: %s]: " % ('' if default is None else default) value = input(prompt + def_prompt) return default if value == '' else value