博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于RabbitMQ RPC实现的主机异步管理
阅读量:7096 次
发布时间:2019-06-28

本文共 8368 字,大约阅读时间需要 27 分钟。

README:

1、需求- [ ] 利用RibbitMQ进行数据交互- [ ] 可以对多台服务器进行批量操作- [ ] 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印- [ ] 实现异步操作 备注- [ ] RabbitMQ队列:                    ①执行命令时,队列名为“rpc_queue2”                    ②查询数据时,用的是回调时随机生成的callback_queue名                            ③conf/settings——Rabbitmq地址“192.168.17.102”,端口:5672,用户名:admin,密码:admin- [ ] SSH:                RPC_Server/server.py——paramiko操作连接的测试Linux默认端口22,用户名:root,密码:123456- [ ] threading多线程:                    实现命令执行后不等待执行结果,依然可以输入新的指令- [ ] 执行命令格式:                 -->>run ifconfig host 192.168.20.22 192.168.20.23                        dir     server端要执行的命令                        host    host后可跟一个或多个可以通过rabbitMQ的服务器地址- [ ] 查看后台所有的TASK_ID信息:                 -->>check_task     显示结果样式:TASK_ID【76786】    HOST【192.168.20.22】    COMMAND【dir】                  TASK_ID【10307】    HOST【192.168.20.23】    COMMAND【dir】- [ ] 查看TASK_ID对应的执行结果:                 -->>get_task 10307程序目录结构:├── README.md├── RPC_Client│   ├── bin│   │   ├── __init__.py│   │   └── start.py  #客户端启动程序│   ├── conf│   │   ├── __init__.py│   │   ├── __pycache__│   │   │   ├── __init__.cpython-36.pyc│   │   │   └── settings.cpython-36.pyc│   │   └── settings.py│   ├── core│   │   ├── __init__.py│   │   ├── main.py│   │   └── __pycache__│   │       ├── __init__.cpython-36.pyc│   │       └── main.cpython-36.pyc│   └── modules│       ├── client.py│       ├── __init__.py│       └── __pycache__│           ├── client.cpython-36.pyc│           └── __init__.cpython-36.pyc└── RPC_Server    ├── conf    │   ├── __pycache__    │   │   └── settings.cpython-36.pyc    │   └── settings.py    └── server.py #server端启动程序程序启动:    客户端启动:RPC_Client/bin/start.py    服务端启动:RPC_Server/server.py

基于RabbitMQ RPC实现的主机异步管理

RPC 客户端bin目录start.py

import os,sys,platformif platform.system() == 'Windows':    BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])else:    BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])sys.path.append(BASE_DIR)from core import mainif __name__ == "__main__":    handle = main.Handle()    handle.start()

RPC客户度conf目录 settings.py

import os,sys,platformif platform.system() == 'Windows':    BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])else:    BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])sys.path.append(BASE_DIR)RabbitmqHost = '192.168.17.102'RabbitmqUser = 'admin'RabbitmqPwd = 'admin'credentails = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd)

RPC客户端主程序core/main.py

import pikaimport randomimport threadingfrom modules import clientfrom conf import settingsclass Handle(object):    def __init__(self):        # 建立连接,指定服务器的ip地址        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.RabbitmqHost,credentials=settings.credentails))        # 建立一个会话,每个channel代表一个会话任务        self.channel = self.connection.channel()    def run_cmd(self,cmd, host):        rpc_client = client.Client(self.connection,self.channel)        task_id = str(random.randint(1000,9999))  #生成4位的Correlation id        response = rpc_client.call(cmd, host)        self.corr_id = response[1]        print('Task_ID',task_id)        self.info[task_id] = [self.corr_id,host,cmd,response[0],response[1]]    def start(self):        self.info = {}  #task 返回任务信息字典        help = '''        命令格式            执行系统命令:run command host  eg: run ls 10.10.0.10            查看所有执行任务:check_task            查看指定任务结果:get_task id eg:get_task 6723        '''        print(help)        while True:            msg = input(">> ").strip()            if msg.startswith('run') and len(msg.split()) >= 3:                cmd = msg.split()[1]                #多线程运行                th_join = []                for host in msg.split()[2:]:                    th = threading.Thread(target=self.run_cmd,args=(cmd,host,))                    th.start()                    th_join.append(th)                for t in th_join:                    t.join()            elif msg == 'check_task':                if not self.info:                    print("没有任务队列")                    continue                else:                    for taskid,task in self.info.items():                        print('TaskID [%s]     Host [%s]     COMMAND [%s]'%(taskid,task[1],task[2]))            elif msg.startswith('get_task'):                rpc_client = client.Client(self.connection,self.channel)                if msg.split()[1] in self.info:                    task_id = msg.split()[1]                    callback_queue = self.info[task_id][3]                    correlation_id = self.info[task_id][4]                    task_result = rpc_client.get_task(callback_queue,correlation_id)                    del self.info[task_id]                    print(task_result.decode().strip())                else:                    print('输入的task ID不存在!')                    continue            elif not msg:                continue            else:                print('输入错误,请重新输入!')                continue

RPC客户端modules

import pikaimport randomimport uuidclass Client(object):    def __init__(self,connection,channel):        self.connection = connection        self.channel = channel    # 对回调队列中的响应进行处理的函数    def on_response(self, ch, method, props, body):        if self.correlation_id == props.correlation_id:            self.response = body        ch.basic_ack(delivery_tag=method.delivery_tag)    def get_task(self,callback_queue,correlation_id):        # 初始化 response        self.response = None        self.correlation_id = correlation_id        # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理;        self.channel.basic_consume(self.on_response,queue=callback_queue)        while self.response is None:            self.connection.process_data_events()        return self.response    # 发出RPC请求    def call(self,cmd,host):        # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次        result = self.channel.queue_declare(exclusive=True)        # 将次队列指定为当前客户端的回调队列        self.callback_queue = result.method.queue        msg = cmd + " " + "".join(host)        self.corr_id = str(uuid.uuid4())        #self.corr_id = corr_id        # 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id`        self.channel.basic_publish(exchange='',                                   routing_key='rpc_queue2',                                   properties=pika.BasicProperties(                                       reply_to=self.callback_queue,                                       correlation_id=self.corr_id,                                   ),                                   body=msg)        return self.callback_queue,self.corr_id

RPC服务器settings.py

RabbitmqHost = '192.168.17.102'RabbitmqUser = 'admin'RabbitmqPwd = 'admin'credentails = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd)

RPC服务端主程序:

#!/usr/bin/env pythonimport pikaimport paramikoimport os,sys,platformif platform.system() == 'Windows':    BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])else:    BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])sys.path.append(BASE_DIR)from conf import  settings# 建立连接,服务器地址为localhost,可指定ip地址connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.RabbitmqHost,credentials=settings.credentails))# 建立会话channel = connection.channel()# 声明RPC请求队列channel.queue_declare(queue='rpc_queue2')# 数据处理方法def exec_cmd(cmd,host):    ssh = paramiko.SSHClient()    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())    ssh.connect(hostname=host, port=22, username='root', password='123456',timeout=10)    stdin, stdout, stderr = ssh.exec_command(cmd)    stdout_result = stdout.read()    stderr_result = stderr.read()    result = stdout_result if stdout_result else stderr_result    return result.decode()    ssh.close()# 对RPC请求队列中的请求进行处理def on_request(ch, method, props, body):    cmd = body.split()[0]    host = body.split()[1]    # 调用数据处理方法    response = exec_cmd(cmd,host)    # 将处理结果(响应)发送到回调队列    ch.basic_publish(exchange='',                     routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id = props.correlation_id),                     body=str(response))    ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(on_request, queue='rpc_queue2')print(" [x] Awaiting RPC requests")channel.start_consuming()

转载于:https://blog.51cto.com/baiying/2065436

你可能感兴趣的文章
git安装和使用案例
查看>>
cisco路由器进入rommon模式
查看>>
awk Tips
查看>>
继承抽象类
查看>>
(摘)Excel 2007查询操作中的函数应用
查看>>
使用SCOM资源工具包来协助MP开发及调试
查看>>
从Linux终端管理进程:10个你必须知道的命令
查看>>
centos7 命令补全
查看>>
JAVA递归中的垃圾回收
查看>>
PyTables Windows平台安装说明
查看>>
Linux crontab定时执行任务 命令格式与详细例子
查看>>
IDC: 2018年智能家居将打破设备孤岛瓶颈
查看>>
Redis的三种启动方式
查看>>
python多线程之线程锁三(同一时间允许多个线程)
查看>>
PinPoint分布式全链路监控
查看>>
【Flume】HDFSSink配置参数说明
查看>>
面向对象学习
查看>>
CentOS7修改主机名
查看>>
python端口扫描报警
查看>>
差异备份的一个实现--总论和数据结构
查看>>