Python多任务教程:进程、线程、协程
1.进程
进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体。进程是一种抽象的概念,从来没有统一的标准定义。进程一般由程序、数据集合和进程控制块三部分组成。程序用于描述进程要完成的功能,是控制进程执行的指令集;数据集合是程序在执行时所需要的数据和工作区;程序控制块包含进程的描述信息和控制信息是进程存在的唯一标志。
进程具有的特征:
- 动态性:进程是程序的一次执行过程,是临时的,有生命期的,是动态产生,动态消亡的。
- 并发性:任何进程都可以同其他进程一起并发执行。
- 独立性:进程是系统进行资源分配和调度的一个独立单位。
- 结构性:进程由程序、数据和进程控制块三部分组成。
实现多进程
import multiprocessing
import time
def run1(sleep_time):
while True:
print("-- 1 --")
time.sleep(sleep_time)
def run2(sleep_time):
while True:
print("-- 2 --")
time.sleep(sleep_time)
def main():
# 创建进程对象。
# target:指定线程调用的函数名。注:等号后跟方法名不能加括号,如果加了也能执行函数但threading功能无效
# args:指定调用函数时传递的参数。注:args是一个数组变量参数,只传一个参数时,需要在参数后面添加逗号
p1 = multiprocessing.Process(target=run1, args=(1,))
p2 = multiprocessing.Process(target=run2, args=(1,))
# 启用子进程
p1.start()
p2.start()
# join方法等待子进程执行结束
p1.join()
p2.join()
print("子进程结束")
if __name__ == "__main__":
main()
运行上面代码,查看任务管理器python的启动进程数。
代码中只启动了两个子进程,但是为什么有3个python进程?这是因为,python会创建一个主进程(第1个进程),当运行到p1.start()时会创建一个子进程(第2个进程),当运行到p2.start()时又会创建一个子进程(第3个进程)
2.进程池
进程的创建和删除是需要消耗计算机资源的,如果有大量任务需要多进程完成,则可能需要频繁的创建删除进程,这会给计算机带来较多的资源消耗。进程池的出现解决了这个问题,它的原理是创建适当的进程放入进程池,等待待处理的事件,当处理完事件后进程不会销毁,仍然在进程池中等待处理其他事件,直到事件全部处理完毕,进程退出。 进程的复用降低了资源的消耗。
实现进程池
import time, os
from multiprocessing import Pool
def worker(msg):
start_time = time.time()
print(F"{msg}开始执行,进程pid为{os.getpid()}")
time.sleep(1)
end_time = time.time()
print(F"{msg}执行完毕,耗时{end_time - start_time}")
def main():
po = Pool(3) # 定义进程池最大进程数为3
for i in range(10):
# 每次循环会用空闲出的子进程调用目标
po.apply_async(worker, args=(i,)) # 若调用的函数报错,进程池中不会打印报错信息
po.close() # 关闭进程池,关闭后,不再接收新的目标
po.join() # 等待进程池中所有子进程执行完,必须放在close()之后。若没有join()操作,主进程执行完后直接关闭
print("--end--")
if __name__ == "__main__":
main()
3.线程
在早期的操作系统中并没有线程的概念,进程是拥有资源和独立运行的最小单位,也是程序执行的最小单位。任务调度采用的是时间片轮转的抢占式调度方式,而进程是任务调度的最小单位,每个进程有各自独立的一块内存,使得各个进程之间内存地址相互隔离。后来,随着计算机的发展,对CPU的要求越来越高,进程之间的切换开销较大,已经无法满足越来越复杂的程序的要求了。于是就发明了线程,线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。一个进程可以有一个或多个线程,各个线程之间共享程序的内存空间(也就是所在进程的内存空间)。一个标准的线程由线程ID,当前指令指针PC,寄存器和堆栈组成。而进程由内存空间(代码,数据,进程空间,打开的文件)和一个或多个线程组成。
实现多线程
import time
import threading
def say(sleep_time):
for i in range(5):
print(f"说{i+1}下")
time.sleep(sleep_time)
def dance():
for i in range(10):
print(f"跳{i+1}下")
time.sleep(1)
def main():
# 创建线程对象
# target:指定线程调用的函数名。注:等号后跟方法名不能加括号,如果加了也能执行函数但threading功能无效
# args:指定调用函数时传递的参数。注:args是一个数组变量参数,只传一个参数时,需要在参数后面添加逗号
t1 = threading.Thread(target=say, args=(1,))
t2 = threading.Thread(target=dance)
# 启动线程
t1.start()
t2.start()
# 查看正在运行的线程
while True:
now_threading = threading.enumerate()
print(now_threading)
# 当子线程全部运行结束后,仅剩1个主线程
if len(now_threading) <= 1:
break
time.sleep(1)
if __name__ == "__main__":
main()
多线程的资源竞争问题
因为多线程共享全局变量,当线程还没执行完当前任务,操作系统就自动轮流调度执行其他任务,就可能会产生资源竞争的问题。
比如下例中,执行 g_num+=1 时,会将其分成3步执行:1.取值;2.运算;3.保存运算结果,在CPU执行任务时,若刚运行1 2 步就交替执行下一个任务,再返回来保存结果,因为共享全局变量,此时运算结果可能已被重新赋值。
import time
import threading
g_num = 0
def sum1(num):
global g_num
for i in range(num):
g_num += 1
print(F"sum1:{g_num}")
def sum2(num):
global g_num
for i in range(num):
g_num += 1
print(F"sum2:{g_num}")
def main():
t1 = threading.Thread(target=sum1, args=(1000000,))
t2 = threading.Thread(target=sum2, args=(1000000,))
t1.start()
t2.start()
time.sleep(2)
print(g_num) # 执行后,预期结果为2000000;实际不是
if __name__ == "__main__":
main()
执行结果
从结果可以看出,sum1和sum2不为1000000,总和不为2000000,这就是上面说的资源竞争问题
互斥锁解决资源竞争问题
import threading
import time
# 定义一个全局变量
g_num = 0
# 创建一个互斥锁,默认是没有上锁的
mutex = threading.Lock()
def sum1(num):
global g_num
# mutex.acquire() # 若在此处上锁,要等下面循环执行完才会解锁,若循环时间太长,会导致另外的线程堵塞等待。
for i in range(num):
# 上锁,如果之前没有被上锁,那么此时上锁成功。 上锁原则:一般对产生资源竞争的代码上锁。如果上锁之前 已经被上锁了,那么此时会堵塞在这里,直到 这个锁被解开为止。
mutex.acquire()
g_num += 1
# 解锁
mutex.release()
print("-----in test1 g_num=%d----" % g_num)
def sum2(num):
global g_num
for i in range(num):
mutex.acquire()
g_num += 1
mutex.release()
print("-----in test2 g_num=%d=----" % g_num)
def main():
t1 = threading.Thread(target=sum1, args=(1000000,))
t2 = threading.Thread(target=sum2, args=(1000000,))
t1.start()
t2.start()
# 等待上面的2个线程执行完毕....
time.sleep(2)
print("-----in main Thread g_num = %d---" % g_num)
if __name__ == "__main__":
main()
运行结果
死锁
在线程间共享多个资源的时候,如果两个线程分别占用部分资源并且同时等待对方的资源,就会造成死锁。尽管死锁很少发生,但一旦发生就会造成应用停止响应。下面看一个死锁例子。
import time
import threading
# 创建多个锁
mutexA = threading.Lock()
mutexB = threading.Lock()
def print1():
mutexA.acquire()
time.sleep(2) # 等待B锁稳定
print("打印A1")
mutexB.acquire()
print("打印B1")
mutexB.release()
mutexA.release()
def print2():
mutexB.acquire()
time.sleep(1) # 等待A锁稳定
print("打印B2")
mutexA.acquire()
print("打印A2")
mutexA.release()
mutexB.release()
def main():
t1 = threading.Thread(target=print1)
t2 = threading.Thread(target=print2)
t1.start()
t2.start()
if __name__ == "__main__":
main()
执行结果
避免死索办法:1、添加超时时间;2、银行家算法(让锁按预期上锁和解锁)
4.协程
协程,又称微线程。协程的作用是在执行函数A时可以随时中断去执行函数B,然后中断函数B继续执行函数A(可以自由切换)。但这一过程并不是函数调用,这一整个过程看似像多线程,然而协程只有一个线程执行。
协程的优势:
- 执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。
- 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。
gevent
gevent是第三方库,通过 greenlet 实现 coroutine,创建、调度的开销比 线程(thread) 还小,因此程序内部的 执行流 效率高。
其基本思想是:当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
gevent常用方法:
- gevent.spawn() 创建一个普通的Greenlet对象并切换
- gevent.spawn_later(seconds=3) 延时创建一个普通的Greenlet对象并切换
- gevent.spawn_raw() 创建的协程对象属于一个组
- gevent.getcurrent() 返回当前正在执行的greenlet
- gevent.joinall(jobs) 将协程任务添加到事件循环,接收一个任务列表
- gevent.wait() 可以替代join函数等待循环结束,也可以传入协程对象列表
- gevent.kill() 杀死一个协程
- gevent.killall() 杀死一个协程列表里的所有协程
- monkey.patch_all() 非常重要,会自动将python的一些标准模块替换成gevent框架
import gevent
def task(n):
for i in range(n):
print(gevent.getcurrent(), i)
if __name__ == '__main__':
g1 = gevent.spawn(task, 3)
g2 = gevent.spawn(task, 3)
g3 = gevent.spawn(task, 3)
g1.join()
g2.join()
g3.join()
运行结果
可以看到3个greenlet是依次运行而不是交替运行。要让greenlet交替运行,可以通过gevent.sleep()交出控制权:
import gevent
def task(n):
for i in range(n):
print(gevent.getcurrent(), i)
gevent.sleep(1)
if __name__ == '__main__':
g1 = gevent.spawn(task, 3)
g2 = gevent.spawn(task, 3)
g3 = gevent.spawn(task, 3)
g1.join()
g2.join()
g3.join()
运行结果
当然在实际的代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时gevent会自动完成,所以gevent需要将Python自带的一些标准库的运行方式由阻塞式调用变为协作式运行。这一过程在启动时通过monkey patch完成:
import time
import gevent
from gevent import monkey
# 猴子补丁,会自动将python的一些标准模块替换成gevent框架。慎用,它创造了“隐式的副作用”,如果出现问题 它很多时候是极难调试的。
monkey.patch_all() # 注意:若导出的模块函数不会被替换,比如from time import sleep,sleep不会被替换
def task(n):
for i in range(n):
print(gevent.getcurrent(), i)
time.sleep(1) # 会被gevent自动替换为gevent.sleep()
if __name__ == '__main__':
g1 = gevent.spawn(task, 3)
g2 = gevent.spawn(task, 3)
g3 = gevent.spawn(task, 3)
g1.join()
g2.join()
g3.join()
执行结果
上面的流程看起来比较繁琐,可以使用 gevent.joinall() 方法简化流程:
import time
import gevent
from gevent import monkey
# 猴子补丁,会自动将python的一些标准模块替换成gevent框架。慎用,它创造了“隐式的副作用”,如果出现问题 它很多时候是极难调试的。
monkey.patch_all() # 注意:若导出的模块函数不会被替换,比如from time import sleep,sleep不会被替换
'''
学习中遇到问题没人解答?小编创建了一个Python学习交流群:711312441
寻找有志同道合的小伙伴,互帮互助,群里还有不错的视频学习教程和PDF电子书!
'''
def task(n):
for i in range(n):
print(gevent.getcurrent(), i)
time.sleep(1) # 会被gevent自动替换为gevent.sleep()
if __name__ == '__main__':
gevent.joinall([
gevent.spawn(task, 4),
gevent.spawn(task, 4),
gevent.spawn(task, 4),
])
执行结果