Skip to content

多线程

多任务:同时可以运行多个任务

一个程序可以同时执行多个事件, 就是一个多任务的程序, 实际是程序之间的快速轮转

python
from time import sleep
import threading

def sing():
    for i in range(5):
        print("Singing...")
        sleep(1)

def dance():
    for i in range(5):
        print("Dancing...")
        sleep(1)

if __name__ == "__main__":
    t1 = threading.Thread(target=sing)
    t2 = threading.Thread(target=dance)
    t1.start() # start the thread
    t2.start()
    t1.join() # wait for the thread to finish
    t2.join()
    print("Main thread is over.")

并发: 假的多任务, 快速切换实现

并行: 真的多任务

python里面有一个thread模块, 但是比较底层, 建议使用threading的封装

threading使用

基础使用

python
t1 = threading.Thread(target=dance) # 获取一个线程, 这一个线程执行dance这一个函数
t1.start() # start the thread

可以使用Thread这一个类创建一个线程, 创建以后不会立即执行, 需要使用start进行启动, 这一个线程会使用创建的时候target参数指定的执行的代码

主线程的代码结束以后不会立即结束, 而是会等待子线程的结束, 为了回收垃圾, 子线程执行完以后则会直接结束

获取线程数量

python
print(threading.enumerate())
bash
[<_MainThread(MainThread, started 16076)>, <Thread(Thread-1 (dance), started 104)>, <Thread(Thread-2 (dance), started 
35924)>, <Thread(Thread-3 (dance), started 5528)>, <Thread(Thread-4 (dance), started 11448)>, <Thread(Thread-5 (dance), started 
33788)>]

获取一个列表, 这一个列表里面有现在的线程

传递参数

python
t = threading.Thread(target=dance, args=("John", i + 18))

这一个args是一个元组, 这一个元组会被拆包传递给这一个函数

还可以通过字典的方式进行传递参数

python
t = threading.Thread(target=dance, kwargs={"name": "Alice", "age": i+18})

继承的方式创建

python
from time import sleep
import threading

class MyThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        print(f"Thread {self.name} started")
        sleep(2)
        print(f"Thread {self.name} finished")


if __name__ == "__main__":
    threads = []
    for i in range(3):
        thread = MyThread(name=f"Thread-{i}")
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()
    print("Main thread finished")

在调用start函数以后会执行这一个类里面的run方法

这一个类结束的时候会调用__del__可以在这一个类里面处理需要关闭的信息等

互斥锁

使用全局变量的时候需要使用互斥锁进行保护

python
mutex = threading.Lock()
mutex.acquire()
mutex.release()

这一个是不可以重入的锁

python
acquire(blocking=True, timeout=-1) # 时间是按秒计算的
python
mutex = threading.RLock()

这一个锁是可以重入的

栅栏对象

在所有的线程都调用with以后这一个线程才会被允许继续进行

python
b = Barrier(2, timeout=5) # 两个, 5秒以后超时

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)

多进程

python
import multiprocessing as mp
import time 

def test():
    """This function will be run by a separate process"""
    while True:
        print("Hello from the child process")
        time.sleep(1)

if __name__ == "__main__":
    # Create a new process
    p = mp.Process(target=test)
    # Start the process
    p.start()
    while True:
        print("Hello from the main process")
        time.sleep(1)
python
import multiprocessing as mp
import time 

class MyProcess(mp.Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        while True:
            print(f"Process {self.name} is running")
            time.sleep(1)
            print(f"Process {self.name} is done")


if __name__ == "__main__":
    p1 = MyProcess("p1")
    p2 = MyProcess("p2")

    p1.start()
    p2.start()

可以使用以上的两种方法实现一个进程

参数传递

python
def test(name, age, m):
    pass
mp.Process(target=test, args=("jiao", 21), kwargs={"m", 20})

变量

进程里面的全局变量是不共享的

进程间通信IPC

可以使用文件, 管道, 共享内存之类的方式实现

python里面可以使用队列进行通信

基础使用

python
import multiprocessing 
import time

q = multiprocessing.Queue(3) # 3 is the maximum number of items that can be stored in the queue
q.put(1)
q.put(2)
print(q.full()) # False
q.put(3)
print(q.full()) # True

try:
    q.put(4, timeout=1) # This will raise a Full exception
except:
    print('The queue is full! now we have %d items in the queue' % q.qsize())

try:
    q.put_nowait(4) # This will raise a Full exception
except:
    print('The queue is full! now we have %d items in the queue' % q.qsize())

try:
    q.put(4, block=False) # This will raise a Full exception
except:
    print('The queue is full! now we have %d items in the queue' % q.qsize())

if not q.empty():
    for i in range(q.qsize()):
        print(q.get()) 	# 这一个函数也有timeout参数以及nowait函数
bash
PS E:\JHY\python\2024-5-10-multithread> python -u "e:\JHY\python\2024-5-10-multithread\main.py"
False
True
The queue is full! now we have 3 items in the queue
The queue is full! now we have 3 items in the queue
The queue is full! now we have 3 items in the queue
1
2
3

进程间通信

python
from multiprocessing import Process, Queue
import time
import random
def task1(q):
    for value in ['a', 'b', 'c', 'd', 'e']:
        print("Task1 is putting value: ", value)
        time.sleep(random.random())
        q.put(value)

def task2(q):
    while True:
        if not q.empty():
            value = q.get()
            print("task2 is get value: ", value)
            time.sleep(random.random())
    
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=task1, args=(q,))
    p2 = Process(target=task2, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("Done")

进程池

方式开启的进程的数量过多的时候会导致开启的进程的数量过多导致大量的资源浪费在切换任务

python
from multiprocessing import Pool
import time
import os
import random

def worker(num):
    for i in range(5):
        print(f'pid = %d, num = %d' % (os.getpid(), num))
        time.sleep(random.random())

if __name__ == '__main__':
    p = Pool(3)
    for i in range(10):
        p.apply_async(worker, args=(i,)) # 添加任务
    p.close()	# 以后不会添加任务
    p.join()	# 所有的子进程结束以后开启主进程, 这一个前面必须close, 否则报错
    print('All done')
python
apply_async(func[, args[, kwds[, callback[, error_callback]]]])

如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。会将抛出的异常对象作为参数传递给 error_callback 执行。

通信

在使用进程池的时候通信不能使用之前的那一队列了, 要使用multiprocessing模块里面的Manager创建一个对象, 使用这个对象的Queue方法创建一个新的队列

python
from multiprocessing import Manager, Pool
from time import sleep

def reader(q):
    print('Reader waiting')
    for i in range(q.qsize()):
        print('Read from queue:', q.get())

def writer(q): 
    print('Writer waiting')
    for i in "Hello World":
        q.put(i)

if __name__ == '__main__':
    q = Manager().Queue()
    po = Pool()
    po.apply_async(writer, (q,))
    sleep(1)
    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print('Main process end')

常用的函数

apply/apply_async

这两个函数的参数不同

pyhon
apply(func[, args[, kwds]])
apply_async(func[, args[, kwds[, callback[, error_callback]]]])

第一个函数会等待传入的函数的结束返回结果, 第二个不会, apply_async方法返回一个AsyncResult对象。可以通过AsyncResult对象的get()方法获取函数的返回值。

协程

另一个实现多任务的额方式, 使用的执行单元比线程还要小

可以理解为在一个线程里面开多任务, 线程里面的某一个函数在一个地方记录自己的信息, 之后切换到另一个函数里面执行, 切换的次数以及时间是开发者自己决定的

线程和协程都是处理并发的方法,但是它们之间有一些重要的区别:

  1. 调度方式:线程是由操作系统进行调度,通过时间片轮转或者优先级调度来分配CPU资源。而协程是由代码自己进行调度,通过yield、await等关键字来主动释放CPU资源。

  2. 内存消耗:每个线程都需要独立的堆栈和上下文切换开销,因此创建大量线程会消耗大量内存。而协程在一个线程内执行,并且共享线程的堆栈,因此创建大量协程的内存消耗较小。

  3. 并发能力:线程能够利用多核处理器并行执行,因为每个线程都有自己的CPU核心。而协程在单个线程上执行,因此无法利用多核处理器进行并行操作。但是使用协程可以避免线程之间的上下文切换开销,从而提高并发性能。

  4. 错误处理:由于线程是由操作系统调度的,因此当一个线程出现异常时,整个进程可能会崩溃。而协程是由代码自己调度的,可以在代码中捕获和处理异常,从而提高程序的健壮性。

综上所述,线程适合于CPU密集型任务,并且需要并行执行时,而协程适合于IO密集型任务,并且需要高并发能力时。因此在实际开发中,可以根据具体的需求选择合适的并发处理方式。

python
import time

def work1():
    while True:
        print('work1')
        yield
        time.sleep(1)


def work2():
    while True:
        print('work2')
        yield
        time.sleep(1)

def main():
    w1 = work1()
    w2 = work2()
    while True:
        next(w1)
        next(w2)

if __name__ == '__main__':
    main()

可以使用这一个方式实现多任务类似的形式

使用greenlet实现

python
from greenlet import greenlet
import time


def test1():
    while True:
        print("---A---")
        gr2.switch()
        time.sleep(0.5)

def test2():
    while True:
        print("---B---")
        gr1.switch()
        time.sleep(0.5)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

使用gevent

在网络里面IO一般会使用比较长的时间, 可以使用gevent库实现自动切换

这一个库可以在遇到IO(文件, 网络)的时候自动进行切换其他的gevent

python
import gevent

def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        gevent.sleep(0)

g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()  # 这是一个耗时操作, 所以会开始执行协程
g2.join()
g3.join()
bash
PS E:\JHY\python\2024-5-10-multithread> python -u "e:\JHY\python\2024-5-10-multithread\main.py"
<Greenlet at 0x1e10a9f4b80: f(5)> 0
<Greenlet at 0x1e10c5944a0: f(5)> 0
<Greenlet at 0x1e10c594540: f(5)> 0
<Greenlet at 0x1e10a9f4b80: f(5)> 1
<Greenlet at 0x1e10c5944a0: f(5)> 1
<Greenlet at 0x1e10c594540: f(5)> 1
<Greenlet at 0x1e10a9f4b80: f(5)> 2
<Greenlet at 0x1e10c5944a0: f(5)> 2
<Greenlet at 0x1e10c594540: f(5)> 2
<Greenlet at 0x1e10a9f4b80: f(5)> 3
<Greenlet at 0x1e10c5944a0: f(5)> 3
<Greenlet at 0x1e10c594540: f(5)> 3
<Greenlet at 0x1e10a9f4b80: f(5)> 4
<Greenlet at 0x1e10c5944a0: f(5)> 4
<Greenlet at 0x1e10c594540: f(5)> 4

在使用这一个库的时候需要使用这一个库里面的函数才可以进行切换, 但是这样不方便

修饰其他IO函数

python
from gevent import monkey
# 有耗时操作时需要打补丁
monkey.patch_all()

Make the standard library cooperative.

回收

python
import gevent
import time
from gevent import monkey
# 有耗时操作时需要打补丁
monkey.patch_all()

def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(1)

gevent.joinall([
    gevent.spawn(f, 5),
    gevent.spawn(f, 5),
    gevent.spawn(f, 5),
])

选择

  • 计算密集型

多进程: 使用多个核进行计算

  • IO密集型

多线程或者协程