# # (c) 2017 Red Hat Inc. # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # from __future__ import (absolute_import, division, print_function) __metaclass__ = type from abc import ABCMeta, abstractmethod from functools import wraps from ansible.errors import AnsibleError, AnsibleConnectionFailure from ansible.module_utils._text import to_bytes, to_text from ansible.module_utils.six import with_metaclass try: from scp import SCPClient HAS_SCP = True except ImportError: HAS_SCP = False try: from __main__ import display except ImportError: from ansible.utils.display import Display display = Display() def enable_mode(func): @wraps(func) def wrapped(self, *args, **kwargs): prompt = self._connection.get_prompt() if not to_text(prompt, errors='surrogate_or_strict').strip().endswith('#'): raise AnsibleError('operation requires privilege escalation') return func(self, *args, **kwargs) return wrapped class CliconfBase(with_metaclass(ABCMeta, object)): """ A base class for implementing cli connections .. note:: String inputs to :meth:`send_command` will be cast to byte strings within this method and as such are not required to be made byte strings beforehand. Please avoid using literal byte strings (``b'string'``) in :class:`CliConfBase` plugins as this can lead to unexpected errors when running on Python 3 List of supported rpc's: :get_config: Retrieves the specified configuration from the device :edit_config: Loads the specified commands into the remote device :get: Execute specified command on remote device :get_capabilities: Retrieves device information and supported rpc methods :commit: Load configuration from candidate to running :discard_changes: Discard changes to candidate datastore Note: List of supported rpc's for remote device can be extracted from output of get_capabilities() :returns: Returns output received from remote device as byte string Usage: from ansible.module_utils.connection import Connection conn = Connection() conn.get('show lldp neighbors detail'') conn.get_config('running') conn.edit_config(['hostname test', 'netconf ssh']) """ __rpc__ = ['get_config', 'edit_config', 'get_capabilities', 'get', 'enable_response_logging', 'disable_response_logging'] def __init__(self, connection): self._connection = connection self.history = list() self.response_logging = False def _alarm_handler(self, signum, frame): """Alarm handler raised in case of command timeout """ display.display('closing shell due to command timeout (%s seconds).' % self._connection._play_context.timeout, log_only=True) self.close() def send_command(self, command, prompt=None, answer=None, sendonly=False, newline=True, prompt_retry_check=False): """Executes a command over the device connection This method will execute a command over the device connection and return the results to the caller. This method will also perform logging of any commands based on the `nolog` argument. :param command: The command to send over the connection to the device :param prompt: A regex pattern to evalue the expected prompt from the command :param answer: The answer to respond with if the prompt is matched. :param sendonly: Bool value that will send the command but not wait for a result. :param newline: Bool value that will append the newline character to the command :param prompt_retry_check: Bool value for trying to detect more prompts :returns: The output from the device after executing the command """ kwargs = { 'command': to_bytes(command), 'sendonly': sendonly, 'newline': newline, 'prompt_retry_check': prompt_retry_check } if prompt is not None: kwargs['prompt'] = to_bytes(prompt) if answer is not None: kwargs['answer'] = to_bytes(answer) resp = self._connection.send(**kwargs) if not self.response_logging: self.history.append(('*****', '*****')) else: self.history.append((kwargs['command'], resp)) return resp def get_base_rpc(self): """Returns list of base rpc method supported by remote device""" return self.__rpc__ def get_history(self): """ Returns the history file for all commands This will return a log of all the commands that have been sent to the device and all of the output received. By default, all commands and output will be redacted unless explicitly configured otherwise. :return: An ordered list of command, output pairs """ return self.history def reset_history(self): """ Resets the history of run commands :return: None """ self.history = list() def enable_response_logging(self): """Enable logging command response""" self.response_logging = True def disable_response_logging(self): """Disable logging command response""" self.response_logging = False @abstractmethod def get_config(self, source='running', filter=None, format='text'): """Retrieves the specified configuration from the device This method will retrieve the configuration specified by source and return it to the caller as a string. Subsequent calls to this method will retrieve a new configuration from the device :param source: The configuration source to return from the device. This argument accepts either `running` or `startup` as valid values. :param filter: For devices that support configuration filtering, this keyword argument is used to filter the returned configuration. The use of this keyword argument is device dependent adn will be silently ignored on devices that do not support it. :param format: For devices that support fetching different configuration format, this keyword argument is used to specify the format in which configuration is to be retrieved. :return: The device configuration as specified by the source argument. """ pass @abstractmethod def edit_config(self, candidate, check_mode=False, replace=None): """Loads the candidate configuration into the network device This method will load the specified candidate config into the device and merge with the current configuration unless replace is set to True. If the device does not support config replace an errors is returned. :param candidate: The configuration to load into the device and merge with the current running configuration :param check_mode: Boolean value that indicates if the device candidate configuration should be pushed in the running configuration or discarded. :param replace: Specifies the way in which provided config value should replace the configuration running on the remote device. If the device doesn't support config replace, an error is return. :return: Returns response of executing the configuration command received from remote host """ pass @abstractmethod def get(self, command, prompt=None, answer=None, sendonly=False, newline=True): """Execute specified command on remote device This method will retrieve the specified data and return it to the caller as a string. :param command: command in string format to be executed on remote device :param prompt: the expected prompt generated by executing command, this can be a string or a list of strings :param answer: the string to respond to the prompt with :param sendonly: bool to disable waiting for response, default is false :param newline: bool to indicate if newline should be added at end of answer or not :return: """ pass @abstractmethod def get_capabilities(self): """Returns the basic capabilities of the network device This method will provide some basic facts about the device and what capabilities it has to modify the configuration. The minimum return from this method takes the following format. eg: { 'rpc': [list of supported rpcs], 'network_api': , # the name of the transport 'device_info': { 'network_os': , 'network_os_version': , 'network_os_model': , 'network_os_hostname': , 'network_os_image': , 'network_os_platform': , }, 'device_operations': { 'supports_replace': , # identify if config should be merged or replaced is supported 'supports_commit': , # identify if commit is supported by device or not 'supports_rollback': , # identify if rollback is supported or not 'supports_defaults': , # identify if fetching running config with default is supported 'supports_commit_comment': , # identify if adding comment to commit is supported of not 'supports_onbox_diff: , # identify if on box diff capability is supported or not 'supports_generate_diff: , # identify if diff capability is supported within plugin 'supports_multiline_delimiter: , # identify if multiline demiliter is supported within config 'support_match: , # identify if match is supported 'support_diff_ignore_lines: , # identify if ignore line in diff is supported } 'format': [list of supported configuration format], 'match': ['line', 'strict', 'exact', 'none'], 'replace': ['line', 'block', 'config'], } :return: capability as json string """ pass def commit(self, comment=None): """Commit configuration changes This method will perform the commit operation on a previously loaded candidate configuration that was loaded using `edit_config()`. If there is a candidate configuration, it will be committed to the active configuration. If there is not a candidate configuration, this method should just silently return. :return: None """ return self._connection.method_not_found("commit is not supported by network_os %s" % self._play_context.network_os) def discard_changes(self): """Discard candidate configuration This method will discard the current candidate configuration if one is present. If there is no candidate configuration currently loaded, then this method should just silently return :returns: None """ return self._connection.method_not_found("discard_changes is not supported by network_os %s" % self._play_context.network_os) def copy_file(self, source=None, destination=None, proto='scp', timeout=30): """Copies file over scp/sftp to remote device :param source: Source file path :param destination: Destination file path on remote device :param proto: Protocol to be used for file transfer, supported protocol: scp and sftp :param timeout: Specifies the wait time to receive response from remote host before triggering timeout exception :return: None """ ssh = self._connection.paramiko_conn._connect_uncached() if proto == 'scp': if not HAS_SCP: raise AnsibleError("Required library scp is not installed. Please install it using `pip install scp`") with SCPClient(ssh.get_transport(), socket_timeout=timeout) as scp: out = scp.put(source, destination) elif proto == 'sftp': with ssh.open_sftp() as sftp: sftp.put(source, destination) def get_file(self, source=None, destination=None, proto='scp', timeout=30): """Fetch file over scp/sftp from remote device :param source: Source file path :param destination: Destination file path :param proto: Protocol to be used for file transfer, supported protocol: scp and sftp :param timeout: Specifies the wait time to receive response from remote host before triggering timeout exception :return: None """ """Fetch file over scp/sftp from remote device""" ssh = self._connection.paramiko_conn._connect_uncached() if proto == 'scp': if not HAS_SCP: raise AnsibleError("Required library scp is not installed. Please install it using `pip install scp`") with SCPClient(ssh.get_transport(), socket_timeout=timeout) as scp: scp.get(source, destination) elif proto == 'sftp': with ssh.open_sftp() as sftp: sftp.get(source, destination)