需求背景:
随着第一版远程执行发布使用paramiko模块进行远程调用脚本的稳定,开始着手第二版关于使用ansible runner执行远程发布任务
paramiko和ansible runner各自的优缺点:
- Paramiko:
- Paramiko 是一个 Python 的 SSH 库,用于在远程主机上执行命令。
- 优点:
- 直接通过 SSH 连接到远程主机,无需额外的中间层。
- 可以实现细粒度的远程命令执行控制。
- 缺点:
- 需要编写更多的代码来处理连接和执行命令的细节。
- 对于复杂的远程任务管理和控制,需要自行实现。
- Ansible Runner:
- Ansible Runner 是 Ansible 生态系统中的一个组件,用于以编程方式运行 Ansible playbook 和任务。
- 优点:
- 基于 Ansible 的模型,提供了高级的任务执行、主机管理和变量处理功能。
- 可以通过配置文件和参数轻松定义任务和执行环境。
- 支持并发执行、扩展性和可配置性。
- 缺点:
- 需要安装和配置 Ansible Runner。
- 需要对 Ansible playbook 的编写和组织有一定的了解。
- 对于简单的远程命令执行,可能相对于 Paramiko 来说有一些额外的开销
总的来说,Ansible Runner更适合执行更复杂的任务,对于简单的远程命令执行比较适合用Paramiko
关于paramiko的使用,以下对paramiko.SSHClient()一个简单的操作类封装
# -*- coding:UTF-8 -*-
import paramiko
class SSHClient(object):
"""链接远程服务器"""
def __init__(self, host, user, port, password=None, pkey=None, timeout=30):
self.ssh = paramiko.SSHClient() # 创建ssh对象
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) # 设置允许链接不在konws_hosts 内的主机
if password:
self.ssh.connect(hostname=host, username=user, port=port, password=password, timeout=timeout)
if pkey:
private_key = paramiko.RSAKey.from_private_key_file(pkey)
self.ssh.connect(hostname=host, username=user, port=port, pkey=private_key, timeout=timeout)
self.err_flag = False
self.chan = None
self.tran = self.ssh.get_transport()
# self.sftp = self.tran.open_sftp() # 打开一个sftp链接通道
self.sftp = paramiko.SFTPClient.from_transport(self.tran)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# self.sftp.close()
if self.chan:
self.chan.close()
self.tran.close()
self.ssh.close()
def exe_command(self, cmd, timeout=None, callback=None, logname=None):
"""执行命令"""
self.chan = self.tran.open_session()
if timeout:
self.chan.settimeout = timeout
self.chan.exec_command(cmd)
stdout = ""
while True:
try:
recv = self.chan.recv(128).decode("utf8", "ignore")
except Exception as e:
print(e)
self.err_flag = True
print(recv)
stdout += recv
# 写入日志文件
if callback and recv:
callback(recv, logname)
# 退出判断
if len(recv) == 0:
break
if self.err_flag:
break
if self.get_recv_status():
break
return stdout
def get_recv_status(self):
"""获取退出状态"""
return self.chan.recv_exit_status() if not self.err_flag else -1
def get_sftp(self):
return self.sftp
好了,接下来重点介绍Ansible Runner
安装
为了方便查看源码,我选择在windows上下载ansible runner模块,但是这个模块你在windows上是无法直接用pip下载的,这里我下载的是Ansible Runner 2.2.1版本的whl文件,地址链接:ansible-runner · PyPI
安装ansible runner
pip install ansible_runner-2.2.1-py3-none-any.whl
环境
创建SSH免交互登录
Ansible通过SSH对设备进行管理,而ssh包含两种认证方式:一种时通过密码认证,另外一种时通过密钥对验证,前者必须和系统交互,而后者时免交互登录,如果希望通过ansible自动管理设备,应该配置为面交互登录被登录设备
1,在主控机上使用ssh-keygen生成私钥和公钥文件
2,通过ssh-copy-id复制公钥到远端机器上
3,实现效果 ssh直接远程连接到主机不交互
源码查看
run函数详情
项目中主要使用到了ansible_runner.interface.run()
打开第三方包文件
打开后源码
def init_runner(**kwargs):
'''
Initialize the Runner() instance
This function will properly initialize both run() and run_async()
functions in the same way and return a value instance of Runner.
See parameters given to :py:func:`ansible_runner.interface.run`
'''
# If running via the transmit-worker-process method, we must only extract things as read-only
# inside of one of these commands. That could be either transmit or worker.
if kwargs.get('streamer') not in ('worker', 'process'):
dump_artifacts(kwargs)
if kwargs.get('streamer'):
# undo any full paths that were dumped by dump_artifacts above in the streamer case
private_data_dir = kwargs['private_data_dir']
project_dir = os.path.join(private_data_dir, 'project')
playbook_path = kwargs.get('playbook') or ''
if os.path.isabs(playbook_path) and playbook_path.startswith(project_dir):
kwargs['playbook'] = os.path.relpath(playbook_path, project_dir)
inventory_path = kwargs.get('inventory') or ''
if os.path.isabs(inventory_path) and inventory_path.startswith(private_data_dir):
kwargs['inventory'] = os.path.relpath(inventory_path, private_data_dir)
roles_path = kwargs.get('envvars', {}).get('ANSIBLE_ROLES_PATH') or ''
if os.path.isabs(roles_path) and roles_path.startswith(private_data_dir):
kwargs['envvars']['ANSIBLE_ROLES_PATH'] = os.path.relpath(roles_path, private_data_dir)
debug = kwargs.pop('debug', None)
logfile = kwargs.pop('logfile', None)
if not kwargs.pop("ignore_logging", True):
output.configure()
if debug in (True, False):
output.set_debug('enable' if debug is True else 'disable')
if logfile:
output.set_logfile(logfile)
event_callback_handler = kwargs.pop('event_handler', None)
status_callback_handler = kwargs.pop('status_handler', None)
artifacts_handler = kwargs.pop('artifacts_handler', None)
cancel_callback = kwargs.pop('cancel_callback', None)
if cancel_callback is None:
# attempt to load signal handler.
# will return None if we are not in the main thread
cancel_callback = signal_handler()
finished_callback = kwargs.pop('finished_callback', None)
streamer = kwargs.pop('streamer', None)
if streamer:
if streamer == 'transmit':
stream_transmitter = Transmitter(**kwargs)
return stream_transmitter
if streamer == 'worker':
stream_worker = Worker(**kwargs)
return stream_worker
if streamer == 'process':
stream_processor = Processor(event_handler=event_callback_handler,
status_handler=status_callback_handler,
artifacts_handler=artifacts_handler,
cancel_callback=cancel_callback,
finished_callback=finished_callback,
**kwargs)
return stream_processor
if kwargs.get("process_isolation", False):
pi_executable = kwargs.get("process_isolation_executable", "podman")
if not check_isolation_executable_installed(pi_executable):
print(f'Unable to find process isolation executable: {pi_executable}')
sys.exit(1)
kwargs.pop('_input', None)
kwargs.pop('_output', None)
rc = RunnerConfig(**kwargs)
rc.prepare()
return Runner(rc,
event_handler=event_callback_handler,
status_handler=status_callback_handler,
artifacts_handler=artifacts_handler,
cancel_callback=cancel_callback,
finished_callback=finished_callback)
def run(**kwargs):
'''
Run an Ansible Runner task in the foreground and return a Runner object when complete.
:param str private_data_dir: The directory containing all runner metadata needed to invoke the runner
module. Output artifacts will also be stored here for later consumption.
:param str ident: The run identifier for this invocation of Runner. Will be used to create and name
the artifact directory holding the results of the invocation.
:param bool json_mode: Store event data in place of stdout on the console and in the stdout file
:param str or list playbook: The playbook (either a list or dictionary of plays, or as a path relative to
``private_data_dir/project``) that will be invoked by runner when executing Ansible.
:param str module: The module that will be invoked in ad-hoc mode by runner when executing Ansible.
:param str module_args: The module arguments that will be supplied to ad-hoc mode.
:param str host_pattern: The host pattern to match when running in ad-hoc mode.
:param str or dict or list inventory: Overrides the inventory directory/file (supplied at ``private_data_dir/inventory``) with
a specific host or list of hosts. This can take the form of:
- Path to the inventory file in the ``private_data_dir``
- Native python dict supporting the YAML/json inventory structure
- A text INI formatted string
- A list of inventory sources, or an empty list to disable passing inventory
:param str role: Name of the role to execute.
:param dict or list roles_path: Directory or list of directories to assign to ANSIBLE_ROLES_PATH
:param dict envvars: Environment variables to be used when running Ansible. Environment variables will also be
read from ``env/envvars`` in ``private_data_dir``
:param dict extravars: Extra variables to be passed to Ansible at runtime using ``-e``. Extra vars will also be
read from ``env/extravars`` in ``private_data_dir``.
:param dict passwords: A dictionary containing password prompt patterns and response values used when processing output from
Ansible. Passwords will also be read from ``env/passwords`` in ``private_data_dir``.
:param dict settings: A dictionary containing settings values for the ``ansible-runner`` runtime environment. These will also
be read from ``env/settings`` in ``private_data_dir``.
:param str ssh_key: The ssh private key passed to ``ssh-agent`` as part of the ansible-playbook run.
:param str cmdline: Command line options passed to Ansible read from ``env/cmdline`` in ``private_data_dir``
:param bool suppress_env_files: Disable the writing of files into the ``env`` which may store sensitive information
:param str limit: Matches ansible's ``--limit`` parameter to further constrain the inventory to be used
:param int forks: Control Ansible parallel concurrency
:param int verbosity: Control how verbose the output of ansible-playbook is
:param bool quiet: Disable all output
:param str artifact_dir: The path to the directory where artifacts should live, this defaults to 'artifacts' under the private data dir
:param str project_dir: The path to the playbook content, this defaults to 'project' within the private data dir
:param int rotate_artifacts: Keep at most n artifact directories, disable with a value of 0 which is the default
:param int timeout: The timeout value in seconds that will be passed to either ``pexpect`` of ``subprocess`` invocation
(based on ``runner_mode`` selected) while executing command. It the timeout is triggered it will force cancel the
execution.
:param str streamer: Optionally invoke ansible-runner as one of the steps in the streaming pipeline
:param io.FileIO _input: An optional file or file-like object for use as input in a streaming pipeline
:param io.FileIO _output: An optional file or file-like object for use as output in a streaming pipeline
:param Callable event_handler: An optional callback that will be invoked any time an event is received by Runner itself, return True to keep the event
:param Callable cancel_callback: An optional callback that can inform runner to cancel (returning True) or not (returning False)
:param Callable finished_callback: An optional callback that will be invoked at shutdown after process cleanup.
:param Callable status_handler: An optional callback that will be invoked any time the status changes (e.g...started, running, failed, successful, timeout)
:param Callable artifacts_handler: An optional callback that will be invoked at the end of the run to deal with the artifacts from the run.
:param bool process_isolation: Enable process isolation, using either a container engine (e.g. podman) or a sandbox (e.g. bwrap).
:param str process_isolation_executable: Process isolation executable or container engine used to isolate execution. (default: podman)
:param str process_isolation_path: Path that an isolated playbook run will use for staging. (default: /tmp)
:param str or list process_isolation_hide_paths: A path or list of paths on the system that should be hidden from the playbook run.
:param str or list process_isolation_show_paths: A path or list of paths on the system that should be exposed to the playbook run.
:param str or list process_isolation_ro_paths: A path or list of paths on the system that should be exposed to the playbook run as read-only.
:param str container_image: Container image to use when running an ansible task (default: quay.io/ansible/ansible-runner:devel)
:param list container_volume_mounts: List of bind mounts in the form 'host_dir:/container_dir. (default: None)
:param list container_options: List of container options to pass to execution engine.
:param str directory_isolation_base_path: An optional path will be used as the base path to create a temp directory, the project contents will be
copied to this location which will then be used as the working directory during playbook execution.
:param str fact_cache: A string that will be used as the name for the subdirectory of the fact cache in artifacts directory.
This is only used for 'jsonfile' type fact caches.
:param str fact_cache_type: A string of the type of fact cache to use. Defaults to 'jsonfile'.
:param bool omit_event_data: Omits extra ansible event data from event payload (stdout and event still included)
:param bool only_failed_event_data: Omits extra ansible event data unless it's a failed event (stdout and event still included)
:param bool check_job_event_data: Check if job events data is completely generated. If event data is not completely generated and if
value is set to 'True' it will raise 'AnsibleRunnerException' exception,
if set to 'False' it log a debug message and continue execution. Default value is 'False'
:returns: A :py:class:`ansible_runner.runner.Runner` object, or a simple object containing ``rc`` if run remotely
'''
r = init_runner(**kwargs)
r.run()
return r
上述代码是以个名为run的函数,它用于运行Ansible Runner任务并在前台返回一个Runner对象
**kwargs接收的是一个可变参数,然后执行一个init_runner将参数传入,init_runner是一个初始化Runner实例的函数,根据传入的参数配置Runner返回了一个Runner实例,然后Runner对象的run()方法,最后返回这个Runner对象
上面的参数比较多,我就说下一些比较重要的参数吧:
playbook:要运行的Playbook路径
module:在ad-hoc模式下要调用的模块,用的比较多的就是shell(执行shell命令),copy(文件管理,拷贝本地文件或目录到目标机器)
module_args:要传递给ad-hoc模式的模块参数,简单来说就是要执行的命令,比如
ansible web -m shell -a "ifconfig eth0|grep addr" module_args就相当于-a 后面要执行的命令,获取网卡信息
host_pattern:在ad-hoc模式下匹配的主机模式,说简单点就是定义Ansible要执行的主机,可以引用一个主机,一个ip地址,一个清单组,一个集合组或清单中所有的主机,比如
ansible all -m shell -a "uptime" 这个all就是host_pattern,代表要所有/etc/ansible/host定义的主机都要执行uptime命令
ansible 10.0.0.91 -m shell -a "uptime" 这个 10.0.0.91就是host_pattern
当然host_pattern还可以是主机名,正则表达式,通配符等等,这里就不一一列举了
inventory: 用于覆盖默认的主机清单目录/文件路径,根据上面的代码注释,这个变量既可以是一个字符串,也可以是一个字典,也可以是一个列表
目前用比较重要的参数就先介绍到这里了,可以自己按需加入其他参数,在实际项目中使用
后面执行了 r = init_runner(**kwargs) ,r.run() 我们把代码追踪到这个run()函数上
这个截图我只是截了一部分
这个run函数大概执行内容有根据self.runner_mode的不同选择不同运行方式('subprocess'或'pexpect'),然后调用相应的运行器对象进行任务的执行,在任务执行之前会进行一系列的准备工作,包括创建工作目录,设置环境变量,创建输出文件,处理密码模式等,执行任务后,根据配置是否抑制输出文件,关闭相应的文件句柄,最后调用状态回调函数表示任务执行完毕
在调用run方法之前,实例化了Runner类,执行了构造方法__init__,里面有这段代码
因为项目中实际调用没有在配置中指定runner_mode,所以默认走的是pexpect,使用pexpect模块运行任务
pexpect这里简单介绍下,它是一个用于自动化控制和交互式进程的python模块,它通过监视和控制子进程的输出,实现了与子进程的自动交互
在ansible runner源码中,runner.py中run函数是这么用的pexpect模块,代码部分截取
try:
child = pexpect.spawn(
command[0],
command[1:],
cwd=cwd,
env=env,
ignore_sighup=True,
encoding='utf-8',
codec_errors='replace',
echo=False,
use_poll=self.config.pexpect_use_poll,
)
child.logfile_read = stdout_handle
except pexpect.exceptions.ExceptionPexpect as e:
child = collections.namedtuple(
'MissingProcess', 'exitstatus isalive close'
)(
exitstatus=127,
isalive=lambda: False,
close=lambda: None,
)
def _decode(x):
return x.decode('utf-8') if six.PY2 else x
# create the events directory (the callback plugin won't run, so it
# won't get created)
events_directory = os.path.join(self.config.artifact_dir, 'job_events')
if not os.path.exists(events_directory):
os.mkdir(events_directory, 0o700)
stdout_handle.write(_decode(str(e)))
stdout_handle.write(_decode('\n'))
job_start = time.time()
while child.isalive():
result_id = child.expect(password_patterns, timeout=self.config.pexpect_timeout, searchwindowsize=100)
password = password_values[result_id]
if password is not None:
child.sendline(password)
self.last_stdout_update = time.time()
if self.cancel_callback:
try:
self.canceled = self.cancel_callback()
except Exception as e:
# TODO: logger.exception('Could not check cancel callback - cancelling immediately')
# if isinstance(extra_update_fields, dict):
# extra_update_fields['job_explanation'] = "System error during job execution, check system logs"
raise CallbackError("Exception in Cancel Callback: {}".format(e))
if self.config.job_timeout and not self.canceled and (time.time() - job_start) > self.config.job_timeout:
self.timed_out = True
# if isinstance(extra_update_fields, dict):
# extra_update_fields['job_explanation'] = "Job terminated due to timeout"
if self.canceled or self.timed_out or self.errored:
self.kill_container()
Runner.handle_termination(child.pid, is_cancel=self.canceled)
if self.config.idle_timeout and (time.time() - self.last_stdout_update) > self.config.idle_timeout:
self.kill_container()
Runner.handle_termination(child.pid, is_cancel=False)
self.timed_out = True
stdout_handle.flush()
stdout_handle.close()
child.close()
self.rc = child.exitstatus if not (self.timed_out or self.canceled) else 254
if self.canceled:
self.status_callback('canceled')
elif self.rc == 0 and not self.timed_out:
self.status_callback('successful')
elif self.timed_out:
self.status_callback('timeout')
else:
self.status_callback('failed')
1,首先是使用pexpect.spawn()方法创建了一个子进程,并设置了相关参数,如果创建子进程失败就抛出异常
2,循环检查子进程的状态,执行相关操作,直到子进程不再存活,里面有处理密码提示,检查是否有取消回调函数,如果有则执行相应的取消操作,如果设置了任务超时时间,且任务没有取消,并且当前任务执行时间大于设置的超时时间则返回self.time_out =True,如果有被取消,超时或反正错误,则终止子进程
3,最后是刷新关闭文件句柄,关闭子进程,其中最重要的self.rc,可以看到如果项目中并没有设置time_out或者任务没有取消的话,那么self.rc等于这个子进程的退出状态值exitstatus, 还有一个self.status,两个都是run函数的返回值,这个self.status是根据子进程的退出状态和执行结果调用status_callback回调函数得到的,有一下几种情况,'canceled','successful','timeout','failed'
使用run函数测试下
import ansible_runner
r = ansible_runner.run(
inventory = {
"test":{
"hosts":{
"host01":{
"ansible_host":"10.0.0.91",
"ansible_port":59878,
"ansible_user":"root",
"ansible_ssh_pass":"1234"
},
}
}
},
module = "shell",
module_args = "ip addr| grep 'inet 10.0'|awk '{print $2}'",
host_pattern= "test",
)
print("{}: {}".format(r.status, r.rc))
当然如果你不用密码的话,本机和远程主机已经做了ssh互信的话,那么这个ansible_ssh_pass给换成ansible_ssh_private_key_file,后面带私钥路径
执行返回:status和rc
stats详情
项目代码里需要对ansible runner结果进行分析,那么必须要用到Runner实例的stats属性,在Ansible runner源码runner.py文件里找到对应的代码
@property
def stats(self):
'''
Returns the final high level stats from the Ansible run
Example:
{'dark': {}, 'failures': {}, 'skipped': {}, 'ok': {u'localhost': 2}, 'processed': {u'localhost': 1}}
'''
last_event = list(filter(lambda x: 'event' in x and x['event'] == 'playbook_on_stats',
self.events))
if not last_event:
return None
last_event = last_event[0]['event_data']
return dict(skipped=last_event.get('skipped', {}),
ok=last_event.get('ok', {}),
dark=last_event.get('dark', {}),
failures=last_event.get('failures', {}),
ignored=last_event.get('ignored', {}),
rescued=last_event.get('rescued', {}),
processed=last_event.get('processed', {}),
changed=last_event.get('changed', {}))
其中@property 可以使stats函数变成向属性.一样调用,加了这个装饰器,所以我们才能直接用runner实例.stats就可以调取到stats函数了
方法内部使用了filter()函数和lambda表达式来筛选出最后一个包含event键并且值为playbook_on_stats的事件,如果没有找到符合条件的事件,则last_event为空列表,返回None,否则提取其中的event_data字段,使用last_event.get方法获取各种统计信息
测试stats
import ansible_runner
r = ansible_runner.run(
inventory = {
"test":{
"hosts":{
"host01":{
"ansible_host":"10.0.0.91",
"ansible_port":59878,
"ansible_user":"root",
"ansible_ssh_pass":"1234"
},
"host02":{
"ansible_host":"10.0.39",
"ansible_port":59878,
"ansible_user":"root",
"ansible_ssh_pass":"1234"
},
}
}
},
module = "shell",
module_args = "ip addr| grep 'inet 10.0'|awk '{print $2}'",
host_pattern= "test",
)
print(r.stats)
# 检查ansible执行结果
success = True
result = {}
for h in ("host01", "host02"): # 遍历主机清单中的主机名
try:
if h in r.stats["ok"]:
result[h] = True
else:
result[h] = False
success = False
except Exception as e:
success = False
print(str(e))
print({"success": success, "msg": result})
返回: