本文共 8368 字,大约阅读时间需要 27 分钟。
README:
1、需求- [ ] 利用RibbitMQ进行数据交互- [ ] 可以对多台服务器进行批量操作- [ ] 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印- [ ] 实现异步操作 备注- [ ] RabbitMQ队列: ①执行命令时,队列名为“rpc_queue2” ②查询数据时,用的是回调时随机生成的callback_queue名 ③conf/settings——Rabbitmq地址“192.168.17.102”,端口:5672,用户名:admin,密码:admin- [ ] SSH: RPC_Server/server.py——paramiko操作连接的测试Linux默认端口22,用户名:root,密码:123456- [ ] threading多线程: 实现命令执行后不等待执行结果,依然可以输入新的指令- [ ] 执行命令格式: -->>run ifconfig host 192.168.20.22 192.168.20.23 dir server端要执行的命令 host host后可跟一个或多个可以通过rabbitMQ的服务器地址- [ ] 查看后台所有的TASK_ID信息: -->>check_task 显示结果样式:TASK_ID【76786】 HOST【192.168.20.22】 COMMAND【dir】 TASK_ID【10307】 HOST【192.168.20.23】 COMMAND【dir】- [ ] 查看TASK_ID对应的执行结果: -->>get_task 10307程序目录结构:├── README.md├── RPC_Client│ ├── bin│ │ ├── __init__.py│ │ └── start.py #客户端启动程序│ ├── conf│ │ ├── __init__.py│ │ ├── __pycache__│ │ │ ├── __init__.cpython-36.pyc│ │ │ └── settings.cpython-36.pyc│ │ └── settings.py│ ├── core│ │ ├── __init__.py│ │ ├── main.py│ │ └── __pycache__│ │ ├── __init__.cpython-36.pyc│ │ └── main.cpython-36.pyc│ └── modules│ ├── client.py│ ├── __init__.py│ └── __pycache__│ ├── client.cpython-36.pyc│ └── __init__.cpython-36.pyc└── RPC_Server ├── conf │ ├── __pycache__ │ │ └── settings.cpython-36.pyc │ └── settings.py └── server.py #server端启动程序程序启动: 客户端启动:RPC_Client/bin/start.py 服务端启动:RPC_Server/server.py
import os,sys,platformif platform.system() == 'Windows': BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])else: BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])sys.path.append(BASE_DIR)from core import mainif __name__ == "__main__": handle = main.Handle() handle.start()
RPC客户度conf目录 settings.py
import os,sys,platformif platform.system() == 'Windows': BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])else: BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])sys.path.append(BASE_DIR)RabbitmqHost = '192.168.17.102'RabbitmqUser = 'admin'RabbitmqPwd = 'admin'credentails = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd)
RPC客户端主程序core/main.py
import pikaimport randomimport threadingfrom modules import clientfrom conf import settingsclass Handle(object): def __init__(self): # 建立连接,指定服务器的ip地址 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.RabbitmqHost,credentials=settings.credentails)) # 建立一个会话,每个channel代表一个会话任务 self.channel = self.connection.channel() def run_cmd(self,cmd, host): rpc_client = client.Client(self.connection,self.channel) task_id = str(random.randint(1000,9999)) #生成4位的Correlation id response = rpc_client.call(cmd, host) self.corr_id = response[1] print('Task_ID',task_id) self.info[task_id] = [self.corr_id,host,cmd,response[0],response[1]] def start(self): self.info = {} #task 返回任务信息字典 help = ''' 命令格式 执行系统命令:run command host eg: run ls 10.10.0.10 查看所有执行任务:check_task 查看指定任务结果:get_task id eg:get_task 6723 ''' print(help) while True: msg = input(">> ").strip() if msg.startswith('run') and len(msg.split()) >= 3: cmd = msg.split()[1] #多线程运行 th_join = [] for host in msg.split()[2:]: th = threading.Thread(target=self.run_cmd,args=(cmd,host,)) th.start() th_join.append(th) for t in th_join: t.join() elif msg == 'check_task': if not self.info: print("没有任务队列") continue else: for taskid,task in self.info.items(): print('TaskID [%s] Host [%s] COMMAND [%s]'%(taskid,task[1],task[2])) elif msg.startswith('get_task'): rpc_client = client.Client(self.connection,self.channel) if msg.split()[1] in self.info: task_id = msg.split()[1] callback_queue = self.info[task_id][3] correlation_id = self.info[task_id][4] task_result = rpc_client.get_task(callback_queue,correlation_id) del self.info[task_id] print(task_result.decode().strip()) else: print('输入的task ID不存在!') continue elif not msg: continue else: print('输入错误,请重新输入!') continue
RPC客户端modules
import pikaimport randomimport uuidclass Client(object): def __init__(self,connection,channel): self.connection = connection self.channel = channel # 对回调队列中的响应进行处理的函数 def on_response(self, ch, method, props, body): if self.correlation_id == props.correlation_id: self.response = body ch.basic_ack(delivery_tag=method.delivery_tag) def get_task(self,callback_queue,correlation_id): # 初始化 response self.response = None self.correlation_id = correlation_id # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理; self.channel.basic_consume(self.on_response,queue=callback_queue) while self.response is None: self.connection.process_data_events() return self.response # 发出RPC请求 def call(self,cmd,host): # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次 result = self.channel.queue_declare(exclusive=True) # 将次队列指定为当前客户端的回调队列 self.callback_queue = result.method.queue msg = cmd + " " + "".join(host) self.corr_id = str(uuid.uuid4()) #self.corr_id = corr_id # 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id` self.channel.basic_publish(exchange='', routing_key='rpc_queue2', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=msg) return self.callback_queue,self.corr_id
RPC服务器settings.py
RabbitmqHost = '192.168.17.102'RabbitmqUser = 'admin'RabbitmqPwd = 'admin'credentails = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd)
RPC服务端主程序:
#!/usr/bin/env pythonimport pikaimport paramikoimport os,sys,platformif platform.system() == 'Windows': BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])else: BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])sys.path.append(BASE_DIR)from conf import settings# 建立连接,服务器地址为localhost,可指定ip地址connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.RabbitmqHost,credentials=settings.credentails))# 建立会话channel = connection.channel()# 声明RPC请求队列channel.queue_declare(queue='rpc_queue2')# 数据处理方法def exec_cmd(cmd,host): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=host, port=22, username='root', password='123456',timeout=10) stdin, stdout, stderr = ssh.exec_command(cmd) stdout_result = stdout.read() stderr_result = stderr.read() result = stdout_result if stdout_result else stderr_result return result.decode() ssh.close()# 对RPC请求队列中的请求进行处理def on_request(ch, method, props, body): cmd = body.split()[0] host = body.split()[1] # 调用数据处理方法 response = exec_cmd(cmd,host) # 将处理结果(响应)发送到回调队列 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(on_request, queue='rpc_queue2')print(" [x] Awaiting RPC requests")channel.start_consuming()
转载于:https://blog.51cto.com/baiying/2065436