十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
这篇文章将为大家详细讲解有关python如何实现多进程,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
创新互联是专业的右玉网站建设公司,右玉接单;提供做网站、成都网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行右玉网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
多进程的含义
进程(Process)是具有一定独立功能的程序关于某个数据集合上的一次运行活动,是系统进行资源分配和调度的一个独立单位。
顾名思义,多进程就是启用多个进程同时运行。由于进程是线程的集合,而且进程是由一个或多个线程构成的,所以多进程的运行意味着有大于或等于进程数量的线程在运行。
由于进程中 GIL 的存在,Python 中的多线程并不能很好地发挥多核优势,一个进程中的多个线程,在同一时刻只能有一个线程运行。
而对于多进程来说,每个进程都有属于自己的 GIL,所以,在多核处理器下,多进程的运行是不会受 GIL 的影响的。因此,多进程能更好地发挥多核的优势。
当然,对于爬虫这种 IO 密集型任务来说,多线程和多进程影响差别并不大。对于计算密集型任务来说,Python 的多进程相比多线程,其多核运行效率会有成倍的提升。
总的来说,Python 的多进程整体来看是比多线程更有优势的。所以,在条件允许的情况下,能用多进程就尽量用多进程。
不过值得注意的是,由于进程是系统进行资源分配和调度的一个独立单位,所以各个进程之间的数据是无法共享的,如多个进程无法共享一个全局变量,
进程之间的数据共享需要有单独的机制来实现。
在Python中也有内置的库来实现多进程,它就是multiprocessing。multiprocessing提供了一系列的组件,如Process(进程)、Queue(队列)、
Semaphore(信号量)、Pipe(管道)、Lock(锁)、Pool(进程池)等。
import multiprocessing import time now = lambda: time.time() def work(index): print(index) time.sleep(index) def main(): start_dt = now() for i in range(5): p = multiprocessing.Process(target=work, args=(i,)) p.start() print(f"Time: {now() - start_dt}") if __name__ == "__main__": main()
import multiprocessingimport timenow = lambda: time.time()def work(i, index): print(f'{i}进程启动') time.sleep(index) print(f'{i}进程结束')def main(): start_dt = now() for i in range(5): p = multiprocessing.Process(target=work, args=(i, 5)) p.start() # 查看本机的 cpu 数量 print(f"CPU Numbers: {multiprocessing.cpu_count()}") # 查看全部活跃子进程的名称以及pid for p in multiprocessing.active_children(): print(p.name, p.pid) print(f"Time End: {now() - start_dt}")if __name__ == "__main__": main()
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Pid: {self.pid}, Name: {self.name}") def main(): for i in range(2, 5): p = MyProcess(i) p.start() if __name__ == "__main__": main()
在多进程中,同样存在守护进程的概念,如果一个进程被设置为守护进程,当父进程结束后,子进程会自动被终止,我们可以通过设置daemon属性来控制是否为守护进程。
还是原来的例子,增加了deamon属性的设置:
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() if __name__ == "__main__": main() # 主进程没有做任何事情 直接输入后结束 同时也终止了子进程的运行。 print("Main End.")
这样可以有效防止无控制地生成子进程。这样的写法可以让我们在主进程运行结束后无需额外担心子进程是否关闭,避免了独立子进程的运行。
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): processes = [] for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() processes.append(p) for p in processes: p.join() if __name__ == "__main__": main() # 主进程没有做任何事情 直接输入后结束 同时也终止了子进程的运行。 print("Main End.")
默认情况下,join是无限期的。也就是说,如果有子进程没有运行完毕,主进程会一直等待。这种情况下,如果子进程出现问题陷入了死循环,主进程也会无限等待下去。
怎么解决这个问题呢?可以给 join 方法传递一个超时参数,代表最长等待秒数。如果子进程没有在这个指定秒数之内完成,会被强制返回,主进程不再会等待。
也就是说这个参数设置了主进程等待该子进程的最长时间。
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): processes = [] for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() processes.append(p) for p in processes: # 主进程最多等待改进程 1 s p.join(1) if __name__ == "__main__": main() # 主进程没有做任何事情 直接输入后结束 同时也终止了子进程的运行。 print("Main End.")
当然,终止进程不止有守护进程这一种做法,我们也可以通过 terminate 方法来终止某个子进程,另外我们还可以通过 is_alive 方法判断进程是否还在运行。
import multiprocessing import time def task(): print("1") time.sleep(5) print("2") if __name__ == "__main__": p = multiprocessing.Process(target=task) # 使用 is_alive 判断当前进程进程是否在运行 print(f"First: {p}, {p.is_alive()}") p.start() print(f"During: {p}, {p.is_alive()}") p.terminate() # 即使此时已经调用了 terminate 进程的状态还是 True, 即运行状态 # 在调用 join 之后才变成了终止状态 print(f"After: {p}, {p.is_alive()}") p.join() print(f"Joined: {p}, {p.is_alive()}")
我们发现,有的输出结果没有换行。这是什么原因造成的呢?这种情况是由多个进程并行执行导致的,两个进程同时进行了输出,结果第一个进程的换行没有来得及输出,
第二个进程就输出了结果,导致最终输出没有换行。
那如何来避免这种问题?如果我们能保证,多个进程运行期间的任一时间,只能一个进程输出,其他进程等待,等刚才那个进程输出完毕之后,另一个进程再进行输出,
这样就不会出现输出没有换行的现象了。
这种解决方案实际上就是实现了进程互斥,避免了多个进程同时抢占临界区(输出)资源。我们可以通过multiprocessing中的Lock来实现。Lock,即锁,
在一个进程输出时,加锁,其他进程等待。等此进程执行结束后,释放锁,其他进程可以进行输出。
首先是一个不加锁的实例:
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop, lock: multiprocessing.Lock): super(MyProcess, self).__init__() self.loop = loop self.lock = lock def run(self): for count in range(self.loop): time.sleep(0.1) # self.lock.acquire() print(f"Pid: {self.pid}, LoopCount: {count}") # self.lock.release() def main(): lock = multiprocessing.Lock() for i in range(10, 15): p = MyProcess(i, lock) p.start() if __name__ == "__main__": main()
然后取消注释再次运行。
进程互斥锁可以使同一时刻只有一个进程能访问共享资源,如上面的例子所展示的那样,在同一时刻只能有一个进程输出结果。
但有时候我们需要允许多个进程来访问共享资源,同时还需要限制能访问共享资源的进程的数量。
这种需求该如何实现呢?可以用信号量,信号量是进程同步过程中一个比较重要的角色。它可以控制临界资源的数量,实现多个进程同时访问共享资源,限制进程的并发量。
我们可以用 multiprocessing 库中的 Semaphore 来实现信号量。
那么接下来我们就用一个实例来演示一下进程之间利用 Semaphore 做到多个进程共享资源,同时又限制同时可访问的进程数量,代码如下:
import multiprocessing import time ''' Semaphore管理一个内置的计数器, 每当调用acquire()时内置计数器-1; 调用release() 时内置计数器+1; 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程(进程)调用release()。 ''' buffer = multiprocessing.Queue(10) empty = multiprocessing.Semaphore(2) # 缓冲区闲适区空余数 full = multiprocessing.Semaphore(0) # 缓冲区占用区占用数 lock = multiprocessing.Lock() class Consumer(multiprocessing.Process): def run(self): global buffer, empty, full, lock while True: full.acquire() lock.acquire() buffer.get() print("Consumer pop an element.") time.sleep(1) lock.release() empty.release() class Producer(multiprocessing.Process): def __init__(self, name): super(Producer, self).__init__() self.name = name def run(self): global buffer, empty, full, lock # 生产者 Producer 使用 acquire 方法来占用一个缓冲区位置,缓冲区空闲区大小减 1,接下来进行加锁,对缓冲区进行操作, # 然后释放锁,最后让代表占用的缓冲区位置数量加 1,消费者则相反。 # 通过 Semaphore 我们很好地控制了进程对资源的并发访问数量。 while True: empty.acquire() lock.acquire() buffer.put(1) print(f"{self.name} Producer put an element.") time.sleep(1) lock.release() full.release() def main(): lst = [] for i in range(3): p = Producer(str(i)) p.daemon = True p.start() lst.append(p) c = Consumer() c.daemon = True c.start() c.join() for p in lst: p.join() print("Main End.") if __name__ == "__main__": main()
在上面的例子中我们使用Queue作为进程通信的共享队列使用。而如果我们把上面程序中的Queue换成普通的list,是完全起不到效果的,因为进程和进程之间的资源是不共享的。
即使在一个进程中改变了这个list,在另一个进程也不能获取到这个list的状态,所以声明全局变量对多进程是没有用处的。那进程如何共享数据呢?可以用Queue,即队列。
当然这里的队列指的是 multiprocessing 里面的 Queue。
刚才我们使用Queue实现了进程间的数据共享,那么进程之间直接通信,如收发信息,用什么比较好呢?可以用Pipe,管道。管道,我们可以把它理解为两个进程之间通信的通道。
管道可以是单向的,即half-duplex:一个进程负责发消息,另一个进程负责收消息;也可以是双向的duplex,即互相收发消息。
默认声明Pipe对象是双向管道,如果要创建单向管道,可以在初始化的时候传入 deplex 参数为 False。
import multiprocessing class Consumer(multiprocessing.Process): def __init__(self, pipe): super(Consumer, self).__init__() self.pipe = pipe def run(self): self.pipe.send("Consumer Words.") print(f"Consumer Recv: {self.pipe.recv()}") class Producer(multiprocessing.Process): def __init__(self, pipe): super(Producer, self).__init__() self.pipe = pipe def run(self): self.pipe.send("Producer Words.") print(f"Producer Recv: {self.pipe.recv()}") def main(): # 声明了一个默认为双向的管道,然后将管道的两端分别传给两个进程。 # 管道 Pipe 就像进程之间搭建的桥梁,利用它我们就可以很方便地实现进程间通信了。 pipe = multiprocessing.Pipe() c = Consumer(pipe[0]) p = Producer(pipe[1]) c.daemon = True p.daemon = True c.start() p.start() c.join() p.join() print("Main Process Ended.") if __name__ == "__main__": main()
我们讲了可以使用Process来创建进程,同时也讲了如何用Semaphore来控制进程的并发执行数量。假如现在我们遇到这么一个问题,我有10000个任务,
每个任务需要启动一个进程来执行,并且一个进程运行完毕之后要紧接着启动下一个进程,同时我还需要控制进程的并发数量,不能并发太高,
不然CPU处理不过来(如果同时运行的进程能维持在一个最高恒定值当然利用率是最高的)。那么我们该如何来实现这个需求呢?
用Process和Semaphore可以实现,但是实现起来比较烦琐。而这种需求在平时又是非常常见的。此时,我们就可以派上进程池了,即 multiprocessing 中的 Pool。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,就会创建一个新的进程用来执行该请求;
但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。
进程池1:
import multiprocessing import time def function(index): print(f"{index} start") time.sleep(3) print(f"{index} end") def main(): # 声明了一个大小为 3 的进程池,通过 processes 参数来指定,如果不指定,那么会自动根据处理器内核来分配进程数。 pool = multiprocessing.Pool(processes=3) for i in range(4): # 使用 apply_async 方法将进程添加进去,args 可以用来传递参数。 pool.apply_async(function, args=(i, )) print("Main start.") # 关闭进程池 使之不再接受新任务 pool.close() pool.join() print("Main end.") if __name__ == "__main__": main()
再介绍进程池一个更好用的map方法,可以将上述写法简化很多。map方法是怎么用的呢?第一个参数就是要启动的进程对应的执行方法,
第2个参数是一个可迭代对象,其中的每个元素会被传递给这个执行方法。
举个例子:现在我们有一个list,里面包含了很多URL,另外我们也定义了一个方法用来抓取每个URL内容并解析,
那么我们可以直接在map的第一个参数传入方法名,第 2 个参数传入 URL 数组。
进程池2:
import multiprocessing import requests def scrape(url): try: ret = requests.get(url).text print(ret[:10]) print() except: print("Get Error") def main(): pool = multiprocessing.Pool(processes=3) urls = [ 'http://data.eastmoney.com/hsgt/index.html', 'http://data.eastmoney.com/hsgtcg/gzcglist.html', 'https://www.runoob.com/MySQL/mysql-alter.html', 'https://blog.csdn.net/weixin_42329277/article/details/80735009?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task', ] pool.map(scrape, urls) pool.close() # pool.join() print("Main End.") if __name__ == "__main__": main()