Source code for friendlyshell.base_shell

"""Common shell interaction logic shared between different shells"""
from __future__ import print_function
import os
import sys
import inspect
import subprocess
import pyparsing as pp
from six.moves import input
from friendlyshell.command_parsers import default_line_parser

# Path where configuration data is stored for friendly shells
CONFIG_FOLDER = os.path.expanduser(os.path.join("~", ".friendlyshell"))


# pylint: disable=no-member
[docs]class BaseShell(object): """Common base class for all Friendly Shells Defines basic IO and interactive shell logic common to all Friendly Shells """ def __init__(self, *args, **kwargs): super(BaseShell, self).__init__(*args, **kwargs) # characters preceding the cursor when prompting for command entry self.prompt = '> ' # text to be displayed upon launch of the shell, before displaying # the interactive prompt self.banner_text = None # Flag indicating whether this shell should be closed after the current # command finishes processing self._done = False # Command parser API for parsing tokens from command lines self._parser = default_line_parser() # input redirection to use instead of the default stdin self._input_stream = None # parent Friendly Shell this shell runs under # only used for nested sub-shells self._parent = None # default comment delimiter self.comment_delimiter = "#" # error code / return value produced by this shell self._return_code = 0 @property def return_code(self): """error / return code generated by operations run by this shell :rtype: :class:`int` """ return self._return_code @property def _config_folder(self): """Gets the folder where config and log files should be stored :rtype: :class:`str` """ # Create our config folder with restricted access to everyone but the # owner. This is just in case we write secrets to a log / history file # by accident then only the current user can see it. if not os.path.exists(CONFIG_FOLDER): os.makedirs(CONFIG_FOLDER, 0o700) return CONFIG_FOLDER def _process_error(self, err_code): """Helper method used to track error codes When running in batch mode this helper method with terminate the running shell session under the assumption that any subsequent operations that may be performed by the shell will likely fail as a result of the first failure anyway. :param int err_code: return code to set in the shell instance""" self._return_code = err_code # See if we're running in batch mode and terminate the shell if we are if self._input_stream: self.do_exit() def _get_input(self): """Gets input to be processed from the appropriate source :returns: the input line retrieved from the source :rtype: :class:`str` """ try: if self._input_stream: line = self._input_stream.readline() if not line: self._input_stream = None else: line = line.strip() self.info(self.prompt + line) else: line = input(self.prompt) if line: line = line.strip() return line except KeyboardInterrupt: # When the user enters CTRL+C to terminate the shell, we just # terminate the currently running shell. That way if there is # a parent shell in play control can be returned to it so the # user can attempt to recover from whatever operation they # tried to abort self._done = True return None except Exception as err: # pylint: disable=broad-except self.error( 'Unexpected error during input sequence: %s', err ) # Reserve the detailed debug info / stack trace to the debug # output only. This avoids spitting out lots of technical # garbage to the user self.debug(err, exc_info=True) self._done = True self._process_error(1) return None def _execute_command(self, func, parser): """Calls a command function with a set of parsed parameters :param func: the command function to execute :param parser: The parsed command parameters to pass to the command """ try: if not parser.params: func() return params_to_pass = parser.params num_params_total = self._count_params(func) if len(params_to_pass) > num_params_total: # If we have more tokens than parameters on the command function # we concatenate the extraneous tokens with the last parameter # assuming the command function is going to parse the tokens # itself or otherwise perform it's logic on the unparsed # input self.debug("too many tokens - concatenating extras") num_params_to_compress = \ len(params_to_pass) - num_params_total + 1 self.debug("params to compress %s", num_params_to_compress) compressed = " ".join(params_to_pass[-num_params_to_compress:]) self.debug("compressed params: %s", compressed) params_to_pass = params_to_pass[:-num_params_to_compress] params_to_pass.append(compressed) func(*params_to_pass) except Exception as err: # pylint: disable=broad-except # Log summary info about the error to standard error output self.error('Unknown error detected: %s', err) # Reserve the detailed debug info / stack trace to the debug # output only. This avoids spitting out lots of technical # garbage to the user self.debug(str(err), exc_info=True) self._process_error(1) except KeyboardInterrupt: self.debug("User interrupted operation...") # Typically, when a user cancels an operation there will be at # least some partial output generated by the command so we # write out a blank to ensure the interactive prompt appears on # the line below self.info("")
[docs] def do_native_shell(self, cmd): """Executes a shell command within the Friendly Shell environment""" self.debug("Running shell command %s", cmd) try: output = subprocess.check_output( cmd, shell=True, stderr=subprocess.STDOUT) self.info(output.decode("utf-8")) except subprocess.CalledProcessError as err: self.info("Failed to run command %s: %s", err.cmd, err.returncode) self.info(err.output) self._process_error(1) except KeyboardInterrupt: self.debug("User interrupted operation...") # Typically, when a user cancels an operation there will be at # least some partial output generated by the command so we # write out a blank to ensure the interactive prompt appears on # the line below self.info("")
[docs] @staticmethod def alias_native_shell(): """Gets the shorthand character for the 'native_shell' command :rtype: :class:`str` """ return "!"
[docs] def run_subshell(self, subshell): """Launches a child process for another shell under this one :param subshell: the new Friendly Shell to be launched""" subshell.run(input_stream=self._input_stream, parent=self) # save the return code from our child shell here in the parent so it # may be propagated back to the shell if needed self._return_code = subshell.return_code
[docs] def run(self, *_args, **kwargs): """Main entry point function that launches our command line interpreter This method will wait for input to be given via the command line, and process each command provided until a request to terminate the shell is given. :param input_stream: optional Python input stream object where commands should be loaded from. Typically this will be a file-like object containing commands to be run, but any input stream object should work. If not provided, input will be read from stdin using :meth:`input` :param parent: Optional parent shell which owns this shell. If none provided this shell is assumed to be a parent or first level shell session with no ancestry """ self._input_stream = \ kwargs.pop("input_stream") if "input_stream" in kwargs else None self._parent = kwargs.pop("parent") if "parent" in kwargs else None if self.banner_text: self.info(self.banner_text) while not self._done: line = self._get_input() if not line: continue if line.startswith(self.comment_delimiter): self.debug("Skipping comment line %s", line) continue # Before we process our command input, see if we need to # substitute any environment variables that may be used line = os.path.expandvars(line) parser = self._parse_line(line) if parser is None: self._process_error(1) continue func = self._find_command(parser.command) if not func: self.error("Command not found: %s", parser.command) self._process_error(1) continue if not self._check_params(func, parser): self._process_error(1) continue self._execute_command(func, parser)
def _check_params(self, func, parser): """Are there sufficient tokens to populate command parameters :param func: command function to be called :param parser: parsed tokens rom the shell :returns: true if there are sufficient parameters to call the command, false if not :rtype: :class:`bool` """ num_tokens = len(parser.params) if parser.params else 0 num_required_params = self._count_required_params(func) total_num_params = self._count_params(func) if total_num_params == 0 and num_tokens != 0: msg = "Command %s accepts no parameters but %s provided." self.error( msg, func.__name__.replace("do_", ""), num_tokens ) return False if num_tokens < num_required_params: msg = 'Command %s requires %s parameters but %s provided.' self.error( msg, func.__name__.replace("do_", ""), num_required_params, num_tokens) return False return True def _count_required_params(self, cmd_method): """Gets the number of required parameters from a command method :param cmd_method: :class:`inspect.Signature` for method to analyse :returns: Number of required parameters (ie: parameters without default values) for the given method :rtype: :class:`int` """ if sys.version_info < (3, 3): params = inspect.getargspec(cmd_method) # pylint: disable=deprecated-method self.debug( 'Command %s params are: %s', cmd_method.__name__, params) tmp = params.args if 'self' in tmp: tmp.remove('self') return len(tmp) - (len(params.defaults) if params.defaults else 0) func_sig = inspect.signature(cmd_method) # pylint: disable=no-member retval = 0 for cur_param in func_sig.parameters.values(): if cur_param.default is inspect.Parameter.empty: # pylint: disable=no-member retval += 1 return retval def _count_params(self, cmd_method): """Gets the total number of parameters from a command method :param cmd_method: :class:`inspect.Signature` for method to analyse :returns: Number of parameters supported by the given method :rtype: :class:`int` """ if sys.version_info < (3, 3): params = inspect.getargspec(cmd_method) # pylint: disable=deprecated-method self.debug( 'Command %s params are: %s', cmd_method.__name__, params) tmp = params.args if 'self' in tmp: tmp.remove('self') return len(tmp) func_sig = inspect.signature(cmd_method) # pylint: disable=no-member return len(func_sig.parameters) def _parse_line(self, line): """Parses a single line of command text and returns the parsed output :param str line: line of command text to be parsed :returns: Parser object describing all of the parsed command tokens :rtype: :class:`pyparsing.ParseResults`""" self.debug('Parsing command input "%s"...', line) try: retval = self._parser.parseString(line, parseAll=True) except pp.ParseException as err: self.error('Parsing error:') self.error('\t%s', err.pstr) self.error('\t%s^', ' ' * (err.col-1)) self.debug('Details: %s', err) return None self.debug('Parsed command line is "%s"', retval) return retval def _find_command(self, command_name): """Attempts to locate the command handler for a given command :param str command_name: The name of the command to find the handler for :returns: Reference to the method to be called to execute the command Returns None if no command method found :rtype: :class:`meth` """ self.debug("looking for command method...") # Gather all class methods, including static methods all_methods = inspect.getmembers(self, inspect.ismethod) all_methods.extend(inspect.getmembers(self, inspect.isfunction)) # See if we can find a 'do_' method for our command... for cur_method in all_methods: self.debug("Processing %s", cur_method) if cur_method[0] == 'do_' + command_name: self.debug("command method found: %s", cur_method[0]) return cur_method[1] # if no command method can be found for the specified token, # try looking up an alias for the command as well: self.debug("Looking for alias...") for cur_method in all_methods: if not cur_method[0].startswith("alias_"): continue self.debug("Found alias method %s", cur_method[0]) if cur_method[1]() == command_name: orig_cmd_name = cur_method[0][len("alias_"):] self.debug("Recursing to find alias command %s", orig_cmd_name) return self._find_command(orig_cmd_name) self.debug("No command found with name " + command_name) return None
[docs] def do_exit(self): """Terminates the command interpreter""" self.debug('Terminating interpreter...') self._done = True # See if our shell has any parents, and force them to quit too if self._parent: self._parent.do_exit()
[docs] def do_close(self): """Terminates the currently running shell""" self.debug( 'Closing shell %s (%s)', self.__class__.__name__, self.prompt) # Return control back to the parent Friendly Shell or the console, # whichever comes next in the shell's ancestry self._done = True
[docs] @staticmethod def help_close(): """Extended help for close method""" return """If the current shell is a sub-shell spawned by another """\ """Friendly Shell instance, control will return to the """\ """parent shell which will continue running"""
[docs] @staticmethod def info(message, *args, **_kwargs): """Displays an info message to the default output stream Default implementation just directs output to stdout. Use a logging mixin class to customize this behavior. See :class:`friendlyshell.basic_logger_mixin.BasicLoggerMixin` for examples. :param str message: text to be displayed""" print(message % args)
[docs] @staticmethod def warning(message, *args, **_kwargs): """Displays a non-critical warning message to the default output stream Default implementation just directs output to stdout. Use a logging mixin class to customize this behavior. See :class:`friendlyshell.basic_logger_mixin.BasicLoggerMixin` for examples. :param str message: text to be displayed""" print(message % args)
[docs] @staticmethod def error(message, *args, **_kwargs): """Displays a critical error message to the default output stream Default implementation just directs output to stdout. Use a logging mixin class to customize this behavior. See :class:`friendlyshell.basic_logger_mixin.BasicLoggerMixin` for examples. :param str message: text to be displayed""" print(message % args)
[docs] @staticmethod def debug(message, *args, **_kwargs): """Displays an internal-use-only debug message to verbose log file Default implementation hides all debug output. Use a logging mixin class to customize this behavior. See :class:`friendlyshell.basic_logger_mixin.BasicLoggerMixin` for examples. :param str message: text to be displayed"""
if __name__ == "__main__": pass