Python使用Condition对象实现多线程同步

2017-05-06 董付国 Python小屋 Python小屋

使用Condition对象可以在某些事件触发后才处理数据或执行特定的功能代码,可以用于不同线程之间的通信或通知,以实现更高级别的同步。在内部实现上,Condition对象总是与某种锁对象相关联。

Condition对象除了具有acquire()和release()方法之外,还有wait()、wait_for()、notify()、notify_all()等方法:

  • wait(timeout=None)方法会释放锁,并阻塞当前线程直到超时或其他线程针对同一个Condition对象调用了notify()/notify_all()方法,被唤醒之后当前线程会重新尝试获取锁并在成功获取锁之后结束wait()方法,然后继续执行;

  • wait_for(predicate, timeout=None)方法阻塞当前线程直到超时或者指定条件得到满足;

  • notify(n=1)唤醒等待该Condition对象的一个或多个线程,该方法并不负责释放锁;

  • notify_all()方法会唤醒等待该Condition对象的所有线程。


本文代码模拟了经典的生产者-消费者问题,使用列表模拟物品池,生产者往里放物品,消费者从中获取物品,物品池满时生产者等待,空时消费者等待。

import threading

from random import randint

from time import sleep


#自定义生产者线程类

class Producer(threading.Thread):

    def __init__(self, threadname):

        threading.Thread.__init__(self,name=threadname)

        

    def run(self):

        global x

        

        while True:

            sleep(1)

            #获取锁

            con.acquire()

            #假设共享列表中最多能容纳5个元素

            if len(x) == 5:

                #如果共享列表已满,生产者等待

                print('Producer is waiting.....')

                con.wait()

            else:

                r = randint(1, 1000)

                print('Produced:', r)

                #产生新元素,添加至共享列表

                x.append(r)

                #唤醒等待条件的线程

                con.notify()

                

            #释放锁

            con.release()


#自定义消费者线程类

class Consumer(threading.Thread):

    def __init__(self, threadname):

        threading.Thread.__init__(self, name =threadname)

        

    def run(self):

        global x

        

        while True:

            sleep(2)

            #获取锁

            con.acquire()

            if not x:

                #等待

                print('Consumer is waiting.....')

                con.wait()

            else:

                print('Consumed:', x.pop(0))

                con.notify()

                

            con.release()

        

#创建Condition对象以及生产者线程和消费者线程

con = threading.Condition()


x = []


Producer('Producer').start()

Consumer('Consumer').start()


某次运行部分结果如下

Produced: 696

Produced: 970

Consumed: 696

Produced: 546

Produced: 30

Consumed: 970

Produced: 824

Produced: 68

Consumed: 546

Produced: 409

Produced: 172

Consumed: 30

Produced: 820

Producer is waiting.....

Consumed: 824

Produced: 2

Producer is waiting.....

Consumed: 68

Produced: 473

Producer is waiting.....

Consumed: 409

Produced: 167

Consumed: 172

Produced: 192

Producer is waiting.....

Consumed: 820

Produced: 789

Consumed: 2

Produced: 855

Producer is waiting.....

Consumed: 473

Produced: 754

Producer is waiting.....

Consumed: 167

Produced: 549

Consumed: 192

Produced: 100

Producer is waiting.....

Consumed: 789

Produced: 862

Consumed: 855

Produced: 537

Producer is waiting.....

Consumed: 754

Produced: 715

Consumed: 549

Produced: 79

Producer is waiting.....

Consumed: 100

Produced: 445

Consumed: 862

Produced: 64

Producer is waiting.....

Consumed: 537

Produced: 755

Consumed: 715

Produced: 8

^C(按Crtl+Break退出)