paramiko 与 ansible_runner 在 Python 环境中实现类似流式输出
1. paramiko
1.1. 依赖
1.2. 代码
example_paramiko.py |
---|
| import paramiko
import time
def run():
host_ip = '192.168.1.1'
ssh_user = 'root'
ssh_port = 22
command = """
echo "hello world"
whoami
sleep 10s
ifconfig
"""
n_bytes = 1024
timeout = 600 # 超过10分钟未结束,强制结束
try:
with paramiko.SSHClient() as ssh_client:
# 自动添加主机密钥(如果不添加,首次连接时会提示是否信任主机)
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到远程主机
ssh_client.connect(hostname=host_ip, port=ssh_port, username=ssh_user, timeout=3)
# 执行命令
stdin, stdout, stderr = ssh_client.exec_command(command)
channel = stdout.channel
start_time = time.time()
while not channel.exit_status_ready():
# 读取标准输出
if channel.recv_ready():
data = channel.recv(n_bytes)
if data.decode():
has_data = True
print(data.decode())
# 读取标准错误
if channel.recv_stderr_ready():
error_data = channel.recv_stderr(n_bytes)
print(error_data.decode())
if time.time() - start_time >= 600: # 超过10分钟未结束,强制结束
channel.send('exit'.encode())
print('\033[31mTask execution time out (need <=10min)\033[0m')
break
time.sleep(0.5)
# 读取剩余的输出
while channel.recv_ready():
data = channel.recv(n_bytes)
print(data.decode())
while channel.recv_stderr_ready():
error_data = channel.recv_stderr(n_bytes)
print(error_data.decode())
except Exception as e:
print(e)
if __name__ == '__main__':
run()
|
1.3. 预览
2. ansible_runner
2.1. 依赖
依赖包pip install ansible ansible-runner
2.2. 代码
example_ansible_runner.py |
---|
| import tempfile
import ansible_runner
def run():
host_ip = '192.168.1.1'
ssh_user = 'root'
ssh_port = 22
playbook = """
---
- hosts: all
gather_facts: no
tasks:
- name: "test"
shell: echo "Hello World"
- name: "ls"
shell: ls -lh
- name: "sleep"
shell: sleep 5s
- name: "show ifconfig"
shell: ifconfig
"""
def event_handler(event):
print(event['stdout'])
with tempfile.TemporaryDirectory() as private_data_dir:
with open(f'{private_data_dir}/playbook.yml', 'w') as f:
f.write(playbook)
ansible_result = ansible_runner.run(
inventory={
'all': {
'hosts': {host_ip: {'ansible_user': ssh_user, 'ansible_port': ssh_port}},
}
},
playbook=f'{private_data_dir}/playbook.yml',
private_data_dir=private_data_dir,
host_pattern='all',
verbosity=1, # 详细输出作为参数传入 [等效于 -v(0=默认,1=-v,2=-vv,3=-vvv)]
quiet=True,
event_handler=event_handler,
envvars={
'ANSIBLE_STDOUT_CALLBACK': 'yaml',
'ANSIBLE_FORCE_COLOR': 'true'
},
timeout=600, # 任务超过10分钟未结束,强制结束
)
if ansible_result.status == 'timeout':
print('\033[31mPlaybook execution timed out (need <=10min)\033[0m')
if __name__ == '__main__':
run()
|
2.3. 预览