




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第python多进程及通信实现异步任务的方法目录一、python多进程及通信基本用法1、多进程的基本实现a、Process重写run方法b、使用Process和target方法c、直接使用Process类2、多进程的通信a、Queueb、Pipe二、python多进程实战1、使用进程池快速抽取数据2、多进程及通信完成数据清洗和保存3、多进程及通信实现异步任务需求写在最前面,说实话python多进程这块儿知识对于很少使用python多进程或者没有实际使用过多python进程解决问题的人来说,还是有一定难度的。本人也是很少接触多进程的场景,对于python多进程的使用也是比较陌生的。在接触了一些多进程的业务场景下,对python多进程的使用进行了学习,觉得很有必要进行一个梳理总结。
一、python多进程及通信基本用法
python中多进程及其通信,是比较重要的一块儿内容,作为python程序员,这块儿内容要基本掌握。
1、多进程的基本实现
python多进程的使用一般是调用multiprocessing包中的Process和Pool(进程池),其中Process的用法又有多种,基本函数
p.start()启动一个已经初始化的进程
p.join()让进程运行完了以后,主进程再执行
a、Process重写run方法
MultiOneProcess类继承了multiprocessing的Process类,然后重写它的run方法,实现具体业务逻辑功能;主程序启动10个进程。
frommultiprocessingimportProcess
count=0
classMultiOneProcess(Process):
def__init__(self,name):
super().__init__()
=name
defrun(self)-None:
globalcount
count+=1
print('processname%sisrunning----count:%d'%(,count))
if__name__=='__main__':
p_list=[]
foriinrange(10):
name='process_%d'%i
p=MultiOneProcess(name=name)
p.start()
p_list.append(p)
forpinp_list:
p.join()
print('thismainprocess')
b、使用Process和target方法
定义一个进程类继承Process类,同时在super()初始化中传入target函数
frommultiprocessingimportProcess
count=0
classMultiTwoProcess(Process):
def__init__(self,name):
super().__init__(target=self.do_fun)
=name
defdo_fun(self):
globalcount
count+=1
print('processname%sisrunning----count:%d'%(name,count))
if__name__=='__main__':
p_list=[]
foriinrange(10):
name='process_%d'%i
p=MultiTwoProcess(name)
p.start()
p_list.append(p)
forpinp_list:
p.join()
print('thismainprocess')
代码中定义了一个类MultiTwoProcess类,类中定义了do_fun函数,把它作为参数传入到target中。
c、直接使用Process类
传入target函数,同时传入args参数,注意args参数是一个元组,切不能省略最后一个逗号
frommultiprocessingimportProcess
count=0
defdo_fun(name):
globalcount
count+=1
print('processname%sisrunning----count:%d'%(name,count))
if__name__=='__main__':
p_list=[]
foriinrange(10):
name='process_%d'%i
p=Process(target=do_fun,args=(name,))
p.start()
p_list.append(p)
forpinp_list:
p.join()
print('thismainprocess')
以上三者运行的结果,是一样的,如下:
2、多进程的通信
进程之间的通信一般都采用Queue和pipe,区别是:pipe只能在两个进程之间调用,而Queue是可以多个进程间调用的;效率上pipe效率更高,Queue是基于pipe实现的,效率比pipe要低一点。
a、Queue
常用API,
存放数据
queue.put(obj,block=True,timeout=None)
当block=False的时候,如果Queue已经满了,那么就会跑出Queue.Full异常;
当block=True且timeout有正值的时候,Queue已经满了,Queue会阻塞timeout时间,超出时间就会抛出同样的异常
获取数据
queue.get(block=True,timeout=None)
当block=False的时候,如果Queue为空,那么就会跑出Queue.Empty异常;
当block=True且timeout有正值的时候,Queue已经为空,Queue会阻塞timeout时间,超出时间就会抛出同样的异常
以上2个API是阻塞;还有两个非堵塞的API
queue.put(obj,block=False)和queue.put_nowait(obj)等效
queue.get(block=False)和queue.get_nowait()等效
简单的实现,一个进程发送数据,另外2个进程接收数据,就可以使用queue通信
frommultiprocessingimportProcess,Queue
defsend(q):
whileTrue:
q.put('发送一个数据')
defreceive1(q):
whileTrue:
s=q.get()
print('receive1:',s)
defreceive2(q):
whileTrue:
s=q.get()
print('receive2:',s)
if__name__=='__main__':
q=Queue()
p1=Process(target=send,args=(q,))
p2=Process(target=receive1,args=(q,))
p3=Process(target=receive2,args=(q,))
p1.start()
p2.start()
p3.start()
p1进程不断的往q中存放数据;p2和p3不停的从q中取数据(有竞争的再取),所以打印结果是无序的
b、Pipe
Pipe(duplex=True)返回2个连通端(p1,p2);当duplex=True时,双向通信,p1发送,p2接收;p2发送,p1接收。
当duplex=True时,单向通信,p1只能发送,p2只能接收。
常用API,pipe.send()pipe.recv()
frommultiprocessingimportProcess,Pipe
deffun2(p):
whileTrue:
s=p.recv()
print('接收一个数据:',s)
deffun1(p):
whileTrue:
print('发送一个数据:pipe')
p.send('pipe')
if__name__=='__main__':
pi1,pi2=Pipe(duplex=True)
p1=Process(target=fun1,args=(pi1,))
p2=Process(target=fun2,args=(pi2,))
p1.start()
p2.start()
结果如下:
二、python多进程实战
不同的业务场景使用多进程的方式和复杂度也不相同,就我遇见过的一些场景进行演示和说明。
1、使用进程池快速抽取数据
场景描述:有1000个Excel文件的数据需要进行抽取和清洗,要把不符合我们需求的数据过滤掉,保留质量很高的数据;每个Excel都有几十万或者上百万的数据,那么怎么快速的完成这个任务呢?
首先整体上而言,可以把单个Excel的处理并行起来;那么可以使用多进程,其次这个需要返回结果,要保留合格的数据,比较简单的就是采用进程池了,它能够很方便的把进程处理的结果进行返回,并且返回的还是一个生成器;如果还需要更快,那么可以把单个Excel中的每条数据的处理并行起来。代码层面上,采用pool进程池来完成这个任务(本文没有对进程池的使用和API做说明),具体的实现方式采取pool.imap()
if__name__=='__main__':
#所有Excel的路径
all_paths=glob('../data/original_data/*')
sysInfo_list=['我通过了好友请求,现在你俩可以开始聊天了','我通过了你的朋友验证请求,现在我们可以开始聊天了','已通过你的朋友验证请求,现在可以开始聊天了','不支持此消息,请在手机上查看',
'微信红包']
interval=25
iflen(all_paths)//interval*intervallen(all_paths):
k=len(all_paths)//interval+1
else:
k=len(all_paths)//interval
#分段处理,每段25个Excel
foriinrange(k):
paths=all_paths[i*interval:(i+1)*interval]
ifi*interval=100andi*interval200:
params=[]
forpathintqdm(paths):
params.append((path,sysInfo_list))
#多进程处理——进程池、以及进度显示
withPool(20)asp:
res=list(tqdm(p.imap(extract_data,params),total=len(params),desc='extract_data'))
all_df=[]
fordfsinres:
iflen(dfs)0:
all_df.extend(dfs)
df=pd.concat(all_df,axis=0)
save_path='../data/weikong_clean_data_'+str(i*interval)+'_'+str(i*interval+len(paths)-1)+'.xlsx'
writer=pd.ExcelWriter(save_path)
df.to_excel(writer,index=False)
writer.save()
writer.close()
2、多进程及通信完成数据清洗和保存
场景描述:从Excel中读取数据,数据格式是整通整通的对话,每通对话有一定的轮数;保存数据到2个txt中,一个是顺序保留,一个是倒序保留;整体对话顺序不变,每通对话内部顺序倒序。
正序:
倒序:
要想实现这样的任务,粗暴的做法是,用两个list,一个保留正序的,一个保留倒序的,然后分别对这两个list进行文件写入操作。但是如果数据量很多在内存有限的时候,只能满足不了两个list的情况下怎么实现呢?
我的实现方式就是开启两个进程,一个进程保留一个正序list,写入文件的同时对每个元素(每通)对话进行倒序,然后把倒序后的数据通过Queue或者Pipe传入到另外一个进程,让另外的进程进行写文件操作。
defsave_mmi_train_data(queue):
withopen('../data/finetune_mmi_data/train.txt','w',encoding='utf-8')asf:
whileTrue:
save_list=queue.get()
iflen(save_list)==0:
break
forlineinsave_list:
f.write(line)
defsave_mmi_val_data(queue):
withopen('../data/finetune_mmi_data/val.txt','w',encoding='utf-8')asf:
whileTrue:
save_list=queue.get()
iflen(save_list)==0:
break
forlineinsave_list:
f.write(line)
defget_funtine_data(paths):
all_groups=[]
forpathintqdm(paths,desc='loaddatafromexcle'):
df=pd.read_excel(path)
df.dropna(inplace=True)
df.drop_duplicates(inplace=True,keep='first')
groups=list(df.groupby(by=['坐席id','客户微信id']))
all_groups.extend(groups)
print('len(all_groups)',len(all_groups))
train,val=train_test_split(all_groups,test_size=10000/len(all_groups),random_state=1)
print('len(train)',len(train))
print('len(val)',len(val))
train_std_path='../data/finetune_std_data/train.txt'
val_std_path='../data/finetune_std_data/val.txt'
train_mmi_queue=Queue()
save_funtine_data(train,train_std_path,train_mmi_queue,save_mmi_train_data)
val_mmi_queue=Queue()
save_funtine_data(val,val_std_path,val_mmi_queue,save_mmi_val_data)
defsave_funtine_data(groups,save_std_path,queue,fun):
p=Process(target=fun,args=(queue,))
p.start()
withopen(save_std_path,'w',encoding='utf-8')asf:
forgroupintqdm(groups,desc='findandsavefuntinedialoguedatas'):
new_df=group[1]
df_roles=new_df['是否客服'].values.tolist()
df_contents=new_df['消息内容'].values.tolist()
roles=[]
contents=[]
forrole,contentinzip(df_roles,df_contents):
content=content.replace('\n','')
content=emoji.replace_emoji(content,'')
iflen(content)0andcontent!="":
roles.append(role)
contents.append(content)
save_list=[]
save_str=""
forindex,roleinenumerate(roles):
content=contents[index].replace('\n','')
content=emoji.replace_emoji(content,'')
ifcontent[-1]notinpunctuations:
content+=';'
ifindex==0:
ifrole=="是":
save_str+="坐席:"+content
else:
save_str+="客户:"+content
else:
ifrole!=roles[index-1]:
f.write(save_str[0:-1]+'\n')
save_list.append(save_str[0:-1]+'\n')
ifrole=="是":
save_str="坐席:"+content
else:
save_str="客户:"+content
else:
save_str+=content
iflen(save_str)1:
save_list.append(save_str[0:-1]+'\n')
f.write(save_str[0:-1]+'\n')
f.write('\n')
#切片反转
save_list=save_list[::-1]
save_list.append('\n')
iflen(save_list)0:
queue.put(save_list)
#注意传入一个空值,让倒序进程结束
queue.put([])
p.join()
要注意的是,倒序进程中使用whileTrue无限循环,需要传入一个空值,能够让它在正序进程结束的同时知道数据写完了,跳出循环。以上代码比较简单就不一一说明了。
3、多进程及通信实现异步任务需求
场景描述:假定一个模型推理系统,网络模块负责接受请求传输的数据,把数据传输给数据处理模块;数据处理模块负责处理数据(比如说语音流或者视频流等,这些数据处理对CPU的消耗很大),处理完后把数据传输给模型推理模块;模型推理模块负责对数据进行推理并把结果返回给网络模块。要求就是网络模块、数据处理模块和模型推理模块是独立的,可以并行的完成自己的任务,3个模块是异步的,其实可以把这个系统简化的使用多进程来实现。
每个模块可以用一个进程来表示,内部的逻辑可以开启子进程来实现,然后模块直接的数据传输就可以使用多进程的通信来实现,同时也创建一个全局的Queue变量,让每个模块的进程按需使用。
画了一个简单的结构和流程图,如下:
注意的是模块之间的数据传输,使用queue传输的时候,数据量越小,效率越高,所以可以在网络模块这端提前把数据进行处理。
函数入口文件
importa
importb
importc
fromwhole_queueimportWholeQueue
importos
if__name__=='__main__':
print("mainprocess:",os.getpid())
whole_queue=WholeQueue()
b_pool_size=2
c_pool_size=6
Module_list=[
a.A(whole_queue,b_pool_size),
b.B(whole_queue,b_pool_size,c_pool_size),
c.C(whole_queue,c_pool_size)
forpinModule_list:
p.start()
公共队列类
classWholeQueue():
def__init__(self):
self.queues=dict()
defregister(self,queuename,queue):
self.queues[queuename]=queue
各个子模块类
a
frommultiprocessingimportProcess,Queue
importtime
importrandom
importos
classA(Process):
def__init__(self,whole_queue,b_pool_size):
super().__init__(target=self.do_run)
self.whole_queue=whole_queue
self.b_pool_size=b_pool_size
self.queue_list=[]
queue=Queue()
self.whole_queue.register('A',queue)
self.queue_list.append(queue)
self.count=0
defdo_run(self):
print("A.do_runprocess:",os.getpid())
a_send_pro=Process(target=self.send)
a_send_pro.start()
a_receive_pro=Process(target=self.receive)
a_receive_pro.start()
defsend(self):
print("A.sendprocess:",os.getpid())
whileTrue:
time.sleep(0.001)
self.whole_queue.queues['B_%d'%(self.count%self.b_pool_size)].put_nowait(self.count)
self.count+=1
defreceive(self):
print("A.receiveprocess:",os.getpid())
whileTrue:
rece=self.whole_queue.queues['A'].get()
print(rece)
b
frommultiprocessingimportProcess,Queue
importtime
importrandom
importos
classB(Process):
def__init__(self,whole_queue,b_pool_size,c_pool_size):
super().__init__(target=self.do_run)
self.whole_queue=whole_queue
self.b_pool_size=b_pool_size
self.c_pool_size=c_pool_size
self.queue_list=[]
foriinrange(self.b_pool_size):
queue=Queue()
self.whole_queue.register('B_%d'%i,queue)
self.queue_list.append(queue)
self.count=0
defdo_run(self):
print("B.do_runprocess:",os.getpid())
foriinrange(self.b_pool_size):
p=Process(target=ponent,args=(self.queue_list[i],))
p.start()
defcomponent(self,queue):
print("B.componentprocess:",os.getpid())
whileTrue:
time.sleep(0.01)
info=queue.get()
componext_info='component_'+str(info)
self.whole_queue.queues['C_%d'%(info%self.c_pool_size)].put(componext_info)
c
frommultiprocessingimportProcess,Queue
frommodelimportModel
importtime
importrandom
importos
classC(Process):
def__init__(self,whole_queue,c_pool_size):
super().__init__(target=self.do_run)
self.whole_queue=whole_queue
self.c_pool_size=c_pool_size
self.queue_list=[]
foriinrange(self.c_pool_size):
queue=Queue()
self.whole_queue.register('C_%d'%i,queue)
self.queue_list.append(queue)
#self.cache_queue=None
#self.result_queue=None
#self.infer_queue=None
defdo_run(self):
cache_queue=Queue()
result_queue=Queue()
infer_queue=Queue()
print("C.do_runprocess:",os.getpid())
foriinra
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025出版合同翻译协议范本
- 2025国际服务贸易合同主体的范围
- 办公电脑软硬件维护服务合同
- 生物技术农业应用合作合同书
- 纺织设计师考试内容纲要试题及答案
- 浙江国企招聘2025温州泰顺县国有企业社会招聘20人笔试参考题库附带答案详解
- 2025河南郑州空中丝路文化传媒有限公司社会招聘6人笔试参考题库附带答案详解
- 2025年福建省福州市中国冶金地质总局二局招聘8人笔试参考题库附带答案详解
- 2025安徽省科创投资有限公司社会招聘10人笔试参考题库附带答案详解
- 高效备考2024年国际商业美术设计师考试试题及答案
- 运输与配送管理选择题复习题库
- 线性代数矩阵
- S22天天高速安庆至潜山段(凉亭至月山)环境影响报告书
- 某厂蒸汽管道安装吹扫及试运行方案
- 清华大学出版社机械制图习题集参考答案(课堂PPT)
- 安徽金轩科技有限公司 年产60万吨硫磺制酸项目环境影响报告书
- 儿科护理学智慧树知到答案章节测试2023年石河子大学
- 两篇古典英文版成语故事百鸟朝凤英文版
- GB/T 37573-2019露天煤矿边坡稳定性年度评价技术规范
- GB/T 19634-2021体外诊断检验系统自测用血糖监测系统通用技术条件
- GB/T 119.1-2000圆柱销不淬硬钢和奥氏体不锈钢
评论
0/150
提交评论