21天学习挑战赛——Python 多进程

发布于:2022-08-08 ⋅ 阅读:(496) ⋅ 点赞:(0)

目录

​1. 进程概述

1.1 程序与进程

1.2 进程的状态

2. 进程创建——multiprocessing

2.1 Process类

2.2 用法举例

2.2.1 两个while循环一起执行

2.2.2 进程pid

2.2.3 子进程-指定函数-传参

2.2.4 进程间不同享全局变量

3. 进程间同步——Queue

3.1 Queue类方法介绍

3.2 Queue的使用

3.3 Queue用法举例

4. 进程间同步——Lock

4.1 程序不加锁

4.2 程序加锁

5. 进程池Pool

5.1 Pool类介绍

5.2 Pool用法举例 

5.3 进程池中的Queue

6. 进程和线程对比

6.1 功能

6.2 区别

 6.3 优缺点


活动地址:CSDN21天学习挑战赛

​1. 进程概述

1.1 程序与进程

  • 程序:例如 xxx.py 这是就是程序,是静态的。
  • 进程:一个程序运行起来后,“代码+用到的资源”称之为进程,它是操作系统分配资源的基本单元。不仅可以通过线程完成多任务,进程也可以。

1.2 进程的状态

        工作中,任务数往往大于 CPU 的核数,即一定有一些任务正在执行,而另外一些任务再等待 CPU 进行执行,因此导致了不同的状态。

  • 就绪态:运行的条件都已经满足,正在等待 CPU 执行
  • 执行态:CPU 正在执行其功能
  • 等待态:等待某些条件满足,例如一个程序执行到 sleep ,此时就处于等待态

2. 进程创建——multiprocessing

2.1 Process类

        mutiprocessing 模块通过创建一个 Process 对象然后调用它的 start() 方法来生成进程,Processthreading.Thread API(Application Programming interface即应用程序编程接口) 相同。

函数语法:mutiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

参数说明:

  • group:指定进程组,大多数情况下用不到
  • target:如果传递了函数的引用,任务这个子进程就执行函数里的代码
  • name:给进程设定一个名字,可以不设定
  • args:给 target 指定的函数传递的参数,以元组的方式进行传递
  • kwargs:给 target 指定的函数传递命名参数
mutiprocessing.Process 对象方法和属性
方法名/属性 说明
run() 进程具体执行的方法
start() 启动子进程实例(创建子进程)
joint([timeout]) 如果可选参数 timeout 是默认值 None,则将阻塞至 joint() 方法的进程终止;如果 timeout 是一个正数,则最多会阻塞 timeout 秒
name 当前进程的别名,默认为 Process-N,N为从1开始递增的整数
pid 当前进程的 pid(进程号)
is_alive() 判断进程、子进程是否还活着
exitcode 子进程的退出代码
daemon 进程的守护标志,是一个布尔值
authkey 进程的身份验证密钥
sentinel 系统对象的数字句柄,当进程结束时将变为 ready
terminate() 不管任务是否完成,立即终止子进程
kill() 与 terminate() 相同,但在 Unix 上使用 SIGKILL 信号
close() 关闭 Process 对象,释放与之关联的所有资源

2.2 用法举例

2.2.1 两个while循环一起执行

from multiprocessing import  Process
import time

def run_proc():
    """子进程要执行的代码"""
    while True:
        print('---2---')
        time.sleep(1)

if __name__ == '__main__':
    p = Process(target=run_proc)
    p.start()
    while True:
        print('---1---')
        time.sleep(1)

运行结果:

 分析:创建子进程时,只需要传入一个执行参数的名称(不加括号)和函数的参数,创建一个 Process 实例,用 start() 方法启动。

2.2.2 进程pid

from multiprocessing import Process
import os
import time

def run_proc():
    """子进程要执行的代码"""
    print('子进程运行中,pid=%d...'%os.getpid()) #os.getpid()获取当前进程的进程号
    print('子进程将要结束...')

if __name__ == '__main__':
    print('父进程pid:%d'%os.getpid()) #os.getpid()获取当前进程的进程号
    p = Process(target=run_proc)
    p.start()

运行结果:

 

 2.2.3 子进程-指定函数-传参

from multiprocessing import Process
import os
from time import sleep

def run_proc(name, age, **kwargs):
    """子进程要执行的代码"""
    for i in range(10):
        print('子进程运行中,name=%s,age=%d,pid=%d...'%(name, age, os.getpid())) #os.getpid()获取当前进程的进程号
        print(kwargs)
        sleep(0.2)

if __name__ == '__main__':
    p = Process(target=run_proc, args=('test',18), kwargs={'m':20})
    p.start()
    sleep(1)
    p.terminate()
    p.join()

运行结果:

 2.2.4 进程间不同享全局变量

from multiprocessing import Process
import os
from time import sleep

nums = [11,22]

def work1():
    """子进程要执行的代码"""
    print("in process1 pid=%d, nums=%s"%(os.getpid(), nums))
    for i in range(3):
        nums.append(1)
        sleep(1)
        print("in process1 pid=%d, nums=%s"%(os.getpid(), nums))

def work2():
    """子进程要执行的代码"""
    print("in process2 pid=%d, nums=%s"%(os.getpid(), nums))

if __name__ == '__main__':
    p1 = Process(target=work1)
    p1.start()
    p1.join()

    p2 = Process(target=work2)
    p2.start()

 运行结果:

 分析:由于以上代码中加入了 joint 方法,将会阻塞其他的方法,直到调用 joint 方法的进程结束。

3. 进程间同步——Queue

        Process 之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。

3.1 Queue类方法介绍

Queue类方法
方法名 说明
q=Queue() 初始化 Queue() 对象,若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)
Queue.qsize() 返回当前队列包含的消息数量
Queue.empty() 如果队列为空,返回 True,否则返回 False
Queue.full() 如果队列满了,返回 True,否则返回 Fasle
Queue.get([block[, timeout]]) 获取队列中的一条消息,然后将其从队列中移除,block 的默认值为 True。1.如果 block 使用默认值,且没有设置 timeout(单位为秒),消息队列如果为空,此时程序将被阻塞(停在读取状态),直到凑够消息队列读到消息为止,如果设置了 timeout,则会等到 timeout 秒,若还没读取到任何消息,则抛出“Queue Empty”异常。2.如果 block 值为 False,消息队列如果为空,则会立即抛出“Queue Empty”异常
Queue.get_nowait() 相当 Queue.get(False)
Queue.put(item, [block[, timeout]]) 将 item 消息写入队列,block 默认值为 True。1.如果 block 使用默认值,且没有设置 timeout(单位为秒),消息队列如果没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置了 timeout ,则会等待 timeout 秒,若还没有空间,则抛出“Queue Full”异常。2.如果 block 值为 False,消息队列如果没有空间可写入,则会立即抛出“Queue Full”异常
Queue.put_nowait(item) 相当于 Queue,put(item, False)

3.2 Queue的使用

        可以使用 multiprocessing模块 的Queue实现多进程之间的数据传递,Queue 本身是一个消息队列程序。首先用一个小实例来演示一下 Queue 的工作原理:

from  multiprocessing import Queue

q = Queue(3) #初始化一个Queue对象,最多可接受三条put消息
q.put('消息1')
q.put('消息2')
print(q.full()) #输出 False
q.put('消息3')
print(q.full()) #输出 True
#由于消息队列已满,下面的try会抛出异常,第一个try会等待2秒后再抛出异常,第二个try会立刻抛出异常
try:
    q.put('消息4', True, 2)
except:
    print('消息队列已满,现有消息数量:%s'%q.qsize())
#输出:消息队列已满,现有消息数量:3
try:
    q.put_nowait('消息4')
except:
    print('消息队列已满,现有消息数量:%s'%q.qsize())
#输出:消息队列已满,现有消息数量:3
#推荐使用方式:先判断消息队列是否已满,再写入
if not q.full():
    q.put_nowait('消息4')
#读取数据时,先判断消息队列是否为空,再读取
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())
# 输出:
#     消息1
#     消息2
#     消息3

 3.3 Queue用法举例

        以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据,并且在进程中进行阻塞其他进程。

from  multiprocessing import Queue,Process
import os, time, random

#写数据进程执行的代码
def write(q):
    for value in ['A','B','C']:
        print('Put %s to queue...'%value)
        q.put(value)
        time.sleep(random.random())
#读数据进程进行的代码
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.'% value)
            time.sleep(random.random())
        else:
            break
if __name__ == '__main__':
    #父进程创建Queue,并传给各个子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    #启动子进程pw,写入
    pw.start()
    #等待pw结果
    pw.join()
    #启动子进程pr,读取
    pr.start()
    pr.join()
    print('所有数据都写入并且读取完毕')
# 输出:
#     Put A to queue...
#     Put B to queue...
#     Put C to queue...
#     Get A from queue.
#     Get B from queue.
#     Get C from queue.

4. 进程间同步——Lock

        锁是为了确保数据一致性,比如读写锁,每个进程给一个变量增加1,但是如果在一个进程读取但还没写入的时候,另外的进程也同时读取了,并写入该值,则最后写入的值是错误的,这时候就需要加锁来保持数据一致性。

        通过使用 Lock 来控制一段代码在同一时间只能被一个进程执行。Lock 对象的两个方法,acquire() 来获取锁,release() 用来释放锁。当一个进程调用 acquire() 时,如果锁的状态为 unlocked,那么会立即修改为 locked 并返回,这时该进程获得了锁。如果锁的状态为 locked,那么调用 acquire() 的进程将阻塞。

Lock 语法说明
lock = mutiprocessing.Lock() 创建一个锁
lock.acquire() 获取锁
lock.release() 释放锁
with lock 自动获取、释放锁,类似于 with open() as f:

4.1 程序不加锁

import  multiprocessing
import time

def add(num, value):
    print('add{0}:num={1}'.format(value, num))
    for i in range(0,2):
        num += value
        print('add{0}:num={1}'.format(value, num))
        time.sleep(1)

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    num = 0
    p1 = multiprocessing.Process(target=add, args=(num,1))
    p2 = multiprocessing.Process(target=add, args=(num,2))
    p1.start()
    p2.start()
# 输出:
#     add1:num=0
#     add1:num=1
#     add2:num=0
#     add2:num=2
#     add1:num=2
#     add2:num=4

分析:由于在以上程序中没有加锁,所以程序运行时,没有顺序,两个进程交替执行。

4.2 程序加锁

import  multiprocessing
import time

def add(num, value, lock):
    try:
        lock.acquire()
        print('add{0}:num={1}'.format(value, num))
        for i in range(0,2):
            num += value
            print('add{0}:num={1}'.format(value, num))
            time.sleep(1)
    except Exception as err:
        raise err
    finally:
        lock.release()

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    num = 0
    p1 = multiprocessing.Process(target=add, args=(num,1,lock))
    p2 = multiprocessing.Process(target=add, args=(num,2,lock))
    p1.start()
    p2.start()
# 输出:
#     add1:num=0
#     add1:num=1
#     add1:num=2
#     add2:num=0
#     add2:num=2
#     add2:num=4

分析:由于以上代码中加了锁,所以会执行完其中的进程,才会执行其他的进程,并且谁抢到锁谁先执行。

5. 进程池Pool

        当需要创建的子进程数量不多时,可以之间利用 multiprocessing 中的 Process 动态生成多个进程,但如果时上百甚至上千个目标,手动地去创建进程地工作量巨大,此时就可以用到 multiprocessing 模块提供地 Pool 方法。

5.1 Pool类介绍

函数语法:multipeocessing.pool.Pool([processing[, initialize[, initargs[, maxtasksperchild[, context]]]]])

参数说明:

  • processing : 工作进程数目,如果 processing 为 None,则使用 os.cpu_count() 返回的值。
  • initializer : 如果 initializer 不为 None,则每个工作进程会在启动时调用 initializer(*initargs)。
  • maxtaskperchild : 一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为例释放未使用的资源。
  • context : 用于指定启动的工作进程的上下文。

向进程提交任务的两种方式: 

  • apply(func[, args[, kwds]]) : 阻塞方式。
  • apply_async(func[, args[, kwds]]) : 非阻塞方式。使用非阻塞方式调用 func (并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表。
multiprocessing.Pool 常用函数
方法名 说明
close() 关闭 Pool(),使其不再接受新的任务
terminate() 不管任务是否完成,立即终止
join() 主进程阻塞,等待子进程的退出,必须在 close() 或 terminate() 之后使用

5.2 Pool用法举例 

        初始化 Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 中时,如果还有池没满,那么就会创建一个新的进程来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,具体看以下示例:

from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print('%s 开始执行,进程号为%d'%(msg,os.getpid()))
    #random.random()随机生成0-1之间的浮点数
    time.sleep(random.random()*2)
    t_stop = time.time()
    print(msg,'执行完毕,耗时%0.2f'%(t_stop-t_start))

if __name__ == '__main__':
    po = Pool(3) #定义一个进程池,最大进程数为3
    for i in range(0,10):
        #Pool().apply_async(要调用的目标,(传递给目标的参数元组,))
        #每次循环将会用空闲出来的子进程去调用目标
        po.apply_async(worker,(i,))
    print('---start---')
    po.close() #关闭进程池,关闭后po不再接收到新的请求
    po.join() #等待po中所有子进程执行完成,必须放在close语句之后
    print('---end---')
# 输出:
#     ---start---
#     0 开始执行,进程号为3456
#     1 开始执行,进程号为20900
#     2 开始执行,进程号为12724
#     1 执行完毕,耗时0.54
#     3 开始执行,进程号为20900
#     0 执行完毕,耗时1.18
#     4 开始执行,进程号为3456
#     4 执行完毕,耗时0.49
#     5 开始执行,进程号为3456
#     3 执行完毕,耗时1.19
#     6 开始执行,进程号为20900
#     2 执行完毕,耗时1.82
#     7 开始执行,进程号为12724
#     5 执行完毕,耗时0.40
#     8 开始执行,进程号为3456
#     8 执行完毕,耗时0.69
#     9 开始执行,进程号为3456
#     6 执行完毕,耗时1.04
#     7 执行完毕,耗时1.41
#     9 执行完毕,耗时0.92
#     ---end---

分析:在执行 po = Pool(3) 代码之前,必须判断一下是否为‘__main__’,否则将报错。

5.3 进程池中的Queue

        如果要使用 Pool 创建进行,就需要使用 mutiprocessing.Manager() 中的 Queue(),而不是 multiprocessing.Queue(),否则会得到一条报错信息: RuntimeError:Queue objects should only be shared between peocesses through inheritance.

进程池中进程通信用法用法举例:

from multiprocessing import Pool,Manager
import os, time, random

def reader(q):
    print('reader启动(%s),父进程为(%s)'%(os.getpid(),os.getpid()))
    for i in range(q.qsize()):
        print('reader从Queue获取到消息:%s'%q.get(True))
def writer(q):
    print('writer启动(%s),父进程为(%s)'%(os.getpid(),os.getpid()))
    for i in 'XiaoLang':
        q.put(i)

if __name__ == '__main__':
    print('(%s) start'%os.getpid())
    q = Manager().Queue() #使用Manager中的Queue
    po = Pool()
    po.apply_async(writer,(q,))

    time.sleep(1) #先让上面的任务向Queue存入数据,然后再让下面的任务开始从中读数据

    po.apply_async(reader,(q,))
    po.close()
    po.join()
    print('(%s) end'%os.getpid())
# 输出:
#     (7200) start
#     writer启动(12988),父进程为(12988)
#     reader启动(21440),父进程为(21440)
#     reader从Queue获取到消息:X
#     reader从Queue获取到消息:i
#     reader从Queue获取到消息:a
#     reader从Queue获取到消息:o
#     reader从Queue获取到消息:L
#     reader从Queue获取到消息:a
#     reader从Queue获取到消息:n
#     reader从Queue获取到消息:g
#     (7200) end

注意:再获取线程池的大小时,使用到的是 q.qsize() 而不是 q.size() 。

6. 进程和线程对比

6.1 功能

  • 进程:能够完成多任务,比如在一台电脑上运动多个QQ。
  • 线程:能够完成多任务,比如一个QQ中的多个聊天窗口。

定义不同:

  • 进程是系统进行资源分配和调度的一个独立单位。
  • 线程是进程的一个实体,是 CPU 调度和分配的基本单位。它是比进程更小的能独立运行的基本单位,线程自己基本上不拥有系统资源,只拥有一点在运行中不可少的资源(如程序计数器、一组寄存器和栈),但是它可以与同属一个进程的其他线程共享所拥有的全部资源。

6.2 区别

  • 一个程序只要有一个进程,一个进程至少有一个线程。线程的划分尺度效于进程(资源比进程少),使得多线程程序的并发性高;进程执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
  • 线程不能够独立执行,必须依存在进程中。
  • 可以将进程理解为工厂中的一条流水线,而其中的线程就是流水线上的工人。

 6.3 优缺点

  • 线程:执行开销小,但不利于资源的管理和保护。
  • 进程:执行开销大,但利于资源的管理和保护。

 

本文含有隐藏内容,请 开通VIP 后查看