跳转至

paramiko 与 ansible_runner 在 Python 环境中实现类似流式输出

1. paramiko

1.1. 依赖

pip install paramiko

1.2. 代码

example_paramiko.py
import paramiko
import time


def run():
    host_ip = '192.168.1.1'
    ssh_user = 'root'
    ssh_port = 22
    command = """
    echo "hello world"
    whoami
    sleep 10s
    ifconfig
    """
    n_bytes = 1024
    timeout = 600   # 超过10分钟未结束,强制结束
    try:
        with paramiko.SSHClient() as ssh_client:
            # 自动添加主机密钥(如果不添加,首次连接时会提示是否信任主机)
            ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            # 连接到远程主机
            ssh_client.connect(hostname=host_ip, port=ssh_port, username=ssh_user, timeout=3)

            # 执行命令
            stdin, stdout, stderr = ssh_client.exec_command(command)
            channel = stdout.channel

            start_time = time.time()

            while not channel.exit_status_ready():
                # 读取标准输出
                if channel.recv_ready():
                    data = channel.recv(n_bytes)
                    if data.decode():
                        has_data = True
                    print(data.decode())

                # 读取标准错误
                if channel.recv_stderr_ready():
                    error_data = channel.recv_stderr(n_bytes)
                    print(error_data.decode())

                if time.time() - start_time >= 600:  # 超过10分钟未结束,强制结束
                    channel.send('exit'.encode())
                    print('\033[31mTask execution time out (need <=10min)\033[0m')
                    break

                time.sleep(0.5)

            # 读取剩余的输出
            while channel.recv_ready():
                data = channel.recv(n_bytes)
                print(data.decode())

            while channel.recv_stderr_ready():
                error_data = channel.recv_stderr(n_bytes)
                print(error_data.decode())

    except Exception as e:
        print(e)


if __name__ == '__main__':
    run()

1.3. 预览

2. ansible_runner

2.1. 依赖

依赖包
pip install ansible ansible-runner

2.2. 代码

example_ansible_runner.py
import tempfile

import ansible_runner


def run():
    host_ip = '192.168.1.1'
    ssh_user = 'root'
    ssh_port = 22
    playbook = """
---
- hosts: all
  gather_facts: no
  tasks:
    - name: "test"
      shell: echo "Hello World"
    - name: "ls"
      shell: ls -lh
    - name: "sleep"
      shell: sleep 5s
    - name: "show ifconfig"
      shell: ifconfig
    """

    def event_handler(event):
        print(event['stdout'])

    with tempfile.TemporaryDirectory() as private_data_dir:
        with open(f'{private_data_dir}/playbook.yml', 'w') as f:
            f.write(playbook)
        ansible_result = ansible_runner.run(
            inventory={
                'all': {
                    'hosts': {host_ip: {'ansible_user': ssh_user, 'ansible_port': ssh_port}},
                }
            },
            playbook=f'{private_data_dir}/playbook.yml',
            private_data_dir=private_data_dir,
            host_pattern='all',
            verbosity=1,  # 详细输出作为参数传入 [等效于 -v(0=默认,1=-v,2=-vv,3=-vvv)]
            quiet=True,
            event_handler=event_handler,
            envvars={
                'ANSIBLE_STDOUT_CALLBACK': 'yaml',
                'ANSIBLE_FORCE_COLOR': 'true'
            },
            timeout=600,  # 任务超过10分钟未结束,强制结束
        )
        if ansible_result.status == 'timeout':
            print('\033[31mPlaybook execution timed out (need <=10min)\033[0m')


if __name__ == '__main__':
    run()

2.3. 预览