欢迎访问www.showerlee.com, 您的支持就是我前进的动力.

[PYTHON] 核心编程笔记(18.多线程编程)

showerlee 2013-12-31 11:20 Programming, PYTHON 阅读 (9,909) 1条评论

18.1 引言/动机

18.2 线程和进程

18.2.1 什么是进程(重量级进程)?

计算机程序只不过是磁盘中可执行的,二进制(或其他类型)的数据,他们只有在被读取到内存中,被操作系统调用时才开始他们的生命期,进程是程序的一次执行,每个进程都有自己的地址空间,内存,数据栈以及其他记录其运行轨迹的赋值数据,操作系统管理在其上运行的所有进程,并为这些进程公平地分配时间,进程可以通过fork和spawn操作来完成其他任务,其之间用进程间通讯(IPC)

18.2.2 什么是线程(轻量级进程)?

线程跟进程有些相似,不同的是,所有线程运行在同一个进程中,共享相同运行环境,他们可以想象成是在主进程或"主线程"中并行运行的"迷你进程".

一个进程中的各个线程之间共享同一片数据空间,从而更方便的共享数据和相互通讯,线程一般都是并行执行.

18.3 Python,线程和全局解释器锁

18.3.1 全局解释器锁(GIL)

Python代码的执行由Python虚拟机(也叫解释器主循环)来控制,对虚拟机的访问由卷曲解释器锁(GIL)来控制,正式这个锁能保证同一时刻只有一个线程在运行

在多线程环境中,Python虚拟机按一下方式执行:

1.设置GIL

2.切换到一个线程去运行

3.运行:1.执行数量的字节码指令 或2.线程主动让出控制(调用time.sleep(0))

4.把线程设置为睡眠状态

5.解锁GIL

6.再次重复以上所有步骤

18.3.2 退出线程

当一个线程结束计算,它就退出了,线程可以调用thread.exit()之类的退出函数.或标准的sys.exit()或抛出一个SystemExit异常等,不过你不可以直接杀掉一个线程

主线程应该是一个好的管理者,它了解每个线程都做些什么事,线程都需要什么数据和参数,以及在线程结束的时候,他们提供了什么结果,这样主线程就可以把各个线程的结果组成一个有意义的最后结果.

18.3.3 在Python中使用线程

在解释器里判断线程是否可用,只要导入thread模块后未报错即表示线程可用

>>> import thread

>>>

18.3.4 没有线程支持的情况

例,单线程中运行的循环()

在单线程中顺序执行两个循环,一定要一个循环结束,另一个才能开始,总时间是各个循环运行时间之和

# vi onethr.py

-----------------------------------------

#!/usr/bin/env python

from time import sleep,ctime

def loop0():

   print 'start loop 0 at:',ctime()

   sleep(4)

   print 'loop 0 done at:',ctime()

def loop1():

   print 'start loop 1 at:',ctime()

   sleep(2)

   print 'loop 1 done at:',ctime()

def main():

   print 'starting at:',ctime()

   loop0()

   loop1()

   print 'all DONE at:', ctime()

if __name__ == '__main__':

   main()

-----------------------------------------

输出:

# python onethr.py

starting at: Wed Dec  4 07:01:26 2013

start loop 0 at: Wed Dec  4 07:01:26 2013

loop 0 done at: Wed Dec  4 07:01:30 2013

start loop 1 at: Wed Dec  4 07:01:30 2013

loop 1 done at: Wed Dec  4 07:01:32 2013

all DONE at: Wed Dec  4 07:01:32 2013

假定loop0()和loop1()例做的不是睡眠,而是各自独立,不相关的运算,各自运算结果到最后将汇总成一个最终的结果

18.3.5 Python的threading模块

Python提供了thread,threading和Queue等多线程编程模块

注:避免使用thread模块

因为使用thread模块里的属性有可能会与threading出现冲突,threading较thread对线程支持更加完善'

18.4 thread模块

thread模块和锁对象

函数描述

thread模块函数

start_new_thread(function,

args, kwargs=None)产生一个新的进程,在新线程中用指定的参数和可选kwargs来调用这个函数

allocate_lock()分配一个LookType类型的锁对象

exit()让线程退出

LockType类型锁对象方法

acquire(wait=None)尝试获得锁对象

locked()如果获取了锁对象返回True,否则返回False

release()释放锁

例,这儿执行的是和onethr.py中一样的循环,不同的是,这次我们使用的是thread模块提供的简单的多线程机制,两个循环

# vi mtsleep1.py

------------------------------

#!/usr/bin/env python

import thread

from time import sleep,ctime

def loop0():

   print 'start loop 0 at:', ctime()

   sleep(4)

   print 'loop 0 done at:',ctime()

def loop1():

   print 'start loop 1 at:', ctime()

   sleep(2)

   print 'loop 1 done at:', ctime()

def main():

   print 'starting at:',ctime()

   thread.start_new_thread(loop0, ())

   thread.start_new_thread(loop1, ())

   sleep(6)

   print 'all DONE at:', ctime()

if __name__=='__main__':

   main()

-------------------------------

这个程序的输出与之前的输出大不相同,之前是运行了6,7秒,现在是4秒

因为睡眠4秒和2秒的代码现在是并发执行的,这样就使得运行时间被缩短了

在这里,我们使用了sleep()函数作为我们的同步机制

# python mtsleep1.py

-----------------------------------------

starting at: Fri Dec 20 23:56:42 2013

start loop 1 at: Fri Dec 20 23:56:42 2013

start loop 0 at: Fri Dec 20 23:56:42 2013

loop 1 done at: Fri Dec 20 23:56:44 2013

loop 0 done at: Fri Dec 20 23:56:46 2013

all DONE at: Fri Dec 20 23:56:48 2013

------------------------------------------

例,通过使用锁来完成任务

这里,使用锁比mtsleep1.py那里在主线程中使用sleep()函数更合理

# vi mtsleep2.py

------------------------------------

#!/usr/bin/env python

import thread

from time import sleep,ctime

loops = [4,2]

def loop(nloop,nsec,lock):

   print 'start loop', nloop, 'at:', ctime()

   sleep(nsec)

   print 'loop', nloop, 'done at:', ctime()

   lock.release()

def main():

   print 'starting at:',ctime()

   locks = []

   nloops = range(len(loops))

   for i in nloops:

       lock = thread.allocate_lock()

       lock.acquire()

       locks.append(lock)

   for i in nloops:

       thread.start_new_thread(loop, (i, loops[i], locks[i]))

   for i in nloops:

       while locks[i].locked(): pass

   print 'all DONE at:', ctime()

if __name__ == '__main__':

   main()

------------------------------------

# python mtsleep2.py

---------------------------

starting at: Sat Dec 21 01:13:06 2013

start loop 1 at: Sat Dec 21 01:13:06 2013

start loop 0 at: Sat Dec 21 01:13:06 2013

loop 1 done at: Sat Dec 21 01:13:08 2013

loop 0 done at: Sat Dec 21 01:13:10 2013

all DONE at: Sat Dec 21 01:13:10 2013

---------------------------

在线程结束时,线程要自己去做解锁操作,最后一个循环只是坐在那一直等,直到两个锁都被解锁为止才继续运行,由于我们顺序检查每一个锁,所以我们可能会要长时间地等待运行时间长且放在前面的线程,当这些线程的锁释放后,后面的锁可能早就释放了,结果主线程只能毫不停歇地完成对后面这些锁的检查

thread模块只是作为演示,在使用多线程程序应该使用更高级别的模块,如threading等

18.5 threading模块

接下来,我们要介绍的是更高级别的threading模块,他不仅提供了Thread类,还提供了各种非常好用的同步机制

这里我们不在提到锁原语,而Thread类也有某种同步机制

threading模块对象

threading模块对象描述

Thread标识一个线程的执行对象

RLock锁原语对象(跟thread模块里的锁对象相同)

Condition条件变量对象能让一个线程停下来,等待其他线程满足了某个条件,如,状态的改变或值的改变

Event通用的条件变量,多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活.

Semaphore为等待锁的线程提供一个类似"等候室"的结构

BoundedSemaphore与Semaphpre类似,只是它不允许超过初始值

Timer与Thread相似,只是,它要等待一段时间后才开始运行.

核心提示:守护线程

守护线程一般是一个等待客户请求的服务器,如果没有客户提出请求,它就在那等着,如果你设定一个线程为守护线程,就标识你在说这个线程是不重要的,在进程退出的时候,不用等待这个线程退出

如果你的主线程要退出的时候,不用等待那些子线程完成,那就设定这些线程的daemon属性

如果你想要等待子线程完成再退出,那就什么都不用做

整个Python会在所有的非守护线程退出后才会结束,即进程中没有非守护线程存在的时候才结束

18.5.1 Thread类

用Thread类,你可以用多种方法来创建线程:

1.创建一个Thread的实例,传给它一个函数

2.创建一个Thread的实例,传给它一个可调用的类对象

3.从Thread派生出一个子类,创建一个这个子类的实例

函数描述

start()开始线程的执行

run()定义线程的功能的函数(一般会北子类重写)

join(timeout=None)程序挂起,直到线程结束:如果给了timeout,则最多阻塞timeout秒

getName()返回线程的名字

setName(name)设置线程的名字

isAlive()布尔标志,标识这个线程是否还在运行中

isDaemon()返回线程的daemon标志

setDaemon(daemonic)把线程的daemon标志设为daemonic

创建一个Thread的实例,传给它一个函数

例,使用thread模块,threading模块的Thread类有一个join()函数,允许主线程等待线程的结束

# vi mtsleep3.py

-------------------------------

#!/usr/bin/env python

import threading

from time import sleep,ctime

loops = [4,2]

def loop(nloop,nsec):

   print 'start loop', nloop, 'at:', ctime()

#!/usr/bin/env python

import threading

from time import sleep,ctime

loops = [4,2]

def loop(nloop,nsec):

   print 'start loop', nloop, 'at:', ctime()

   sleep(nsec)

   print 'loop',nloop, 'done at:', ctime()

def main():

   print 'starting at:', ctime()

   threads = []

   nloops = range(len(loops))

   for i in nloops:

       t = threading.Thread(target=loop,args=(i,loops[i]))

       threads.append(t)

   for i in nloops:

       threads[i].start()

   for i in nloops:

       threads[i].join()

   print 'all DONE at:', ctime()

if __name__ == '__main__':

   main()

-------------------------------

# python mtsleep3.py

-------------------------------

starting at: Sat Dec 21 12:14:57 2013

start loop 0 at: Sat Dec 21 12:14:57 2013

start loop 1 at: Sat Dec 21 12:14:57 2013

loop 1 done at: Sat Dec 21 12:14:59 2013

loop 0 done at: Sat Dec 21 12:15:01 2013

all DONE at: Sat Dec 21 12:15:01 2013

--------------------------------

所有线程都创建了之后,再一起调用start()函数启动,而不是创建一个启动一个,而且不用再管理一堆锁,只要简单对每个线程调用join()函数就可以

创建一个Thread实例,传给它一个可调用的类对象

此例中,我们传了一个可调用的类的实例,而不是仅传一个函数,相对mtsleep3.py中的方法来说,这样做更具面向对象的概念

# vi mtsleep4.py

------------------------------

#!/usr/bin/env python

import threading

from time import sleep,ctime

loops = [4,2]

class ThreadFunc(object):

   def __init__(self,func,args,name=''):

       self.name = name

       self.func = func

       self.args = args

   def __call__(self):

       apply(self.func, self.args)

def loop(nloop, nsec):

   print 'start loop', nloop, 'at:', ctime()

   sleep(nsec)

   print 'loop',nloop,'done at:',ctime()

def main():

   print 'starting at:',ctime()

   threads = []

   nloops = range(len(loops))

   for i in nloops:

       t = threading.Thread(target=ThreadFunc(loop,(i,loops[i]),loop.__name__))

       threads.append(t)

   for i in nloops:

       threads[i].start()

   for i in nloops:

       threads[i].join()

   print 'all DONE at:', ctime()

if __name__ == '__main__':

   main()

------------------------------

# python mtsleep4.py

---------------------------

starting at: Sat Dec 21 18:51:07 2013

start loop 0 at: Sat Dec 21 18:51:07 2013

start loop 1 at: Sat Dec 21 18:51:07 2013

loop 1 done at: Sat Dec 21 18:51:09 2013

loop 0 done at: Sat Dec 21 18:51:11 2013

all DONE at: Sat Dec 21 18:51:11 2013

----------------------------

18.5.4 斐波那契,阶乘和累加和

从Thread派生出一个子类,创建一个这个子类的实例

例,我们现在要子类化Thread类,而不是创建它的实例

# vi mtsleep5.py

-------------------------------

#!/usr/bin/env python

import threading

from time import sleep, time, ctime

loops = [ 4, 2 ]

class MyThread(threading.Thread):

   '''

   MyThread derives from the threading.Thread baseclass.

   Rather than a callable class, the run() method is

   automatically invoked when the thread begins execution.

   '''

   # constructor

   def __init__(self, func, args, name=''):

       threading.Thread.__init__(self)

       self.name = name

       self.setFunc(func, args)

   def setFunc(self, func, args):

       '''

       setFunc() is the method that sets the function

       to be called as well as its arguments

       '''

       self.func = func

       self.args = args

   def getResult(self):

       '''

       getResult() provides the return value

       from the function call

       '''

       return self.res

   def run(self):

       '''

       save the return value from the function call

       into an instance var rather than returning it

       '''

       # * 1.6 * self.res = self.func(*self.args)

       self.res = apply(self.func, self.args)

# loop() is the same as before

def loop(nloop, nsec):

   print 'start loop', nloop, 'at:', ctime(time())

   sleep(nsec)

   print 'loop', nloop, 'done at:', ctime(time())

def main():

   # variable setup

   print 'starting threads...'

   threads = []

   nloops = range(len(loops))

   # allocate (but do not start) threads

   for i in nloops:

       t = MyThread(loop, (i, loops[i]), loop.__name__)

       threads.append(t)

   # start threads

   for i in nloops:

       threads[i].start()

   # wait for all threads to complete

   for i in nloops:

       threads[i].join()

   print 'all DONE'

if __name__ == '__main__':

   main()

-------------------------------

# python mtsleep5.py

------------------------------

starting threads...

start loop 0 at: Sat Dec 21 23:44:07 2013

start loop 1 at: Sat Dec 21 23:44:07 2013

loop 1 done at: Sat Dec 21 23:44:09 2013

loop 0 done at: Sat Dec 21 23:44:11 2013

all DONE

-----------------------------

为了让mtsleep5.py中,Thread的子类更为通用,我们把子类单独放在一个模块中,加上一个getResult()函数用以返回函数的运行结果

例,myThread子类化Thread

# vi myThread.py

------------------------------------

#!/usr/bin/env python

import threading

from time import time, ctime

class MyThread(threading.Thread):

   def __init__(self, func, args, name=''):

       threading.Thread.__init__(self)

       self.name = name

       self.func = func

       self.args = args

   def getResult(self):

       return self.res

   def run(self):

       print 'starting', self.name, 'at:', \

   ctime()

       self.res = apply(self.func, self.args)

       print self.name, 'finished at:', \

   ctime()

------------------------------------

例,斐波那契,阶乘和累加和

在这个多线程程序中,我们会分别在单线程和多线程环境中,运行三个递归函数

# vi mtfacfib.py

------------------------------

#!/usr/bin/env python

from myThread import MyThread

from time import time, ctime, sleep

def fib(x):

   sleep(0.005)

   if x < 2: return 1

   return (fib(x-2) + fib(x-1))

def fac(x):

   sleep(0.1)

   if x < 2: return 1

   return (x * fac(x-1))

def sum(x):

   sleep(0.1)

   if x < 2: return 1

   return (x + sum(x-1))

funcs = (fib, fac, sum)

n = 12

def main():

   nfuncs = range(len(funcs))

   print '*** SINGLE THREAD'

   for i in nfuncs:

       print 'starting', funcs[i].__name__, \

   'at:', ctime(time())

       print funcs[i](n)

       print funcs[i].__name__, 'finished at:', \

   ctime(time())

   print '\n*** MULTIPLE THREADS'

   threads = []

   for i in nfuncs:

       t = MyThread(funcs[i], (n,),

   funcs[i].__name__)

       threads.append(t)

   for i in nfuncs:

       threads[i].start()

   for i in nfuncs:

       threads[i].join()

       print threads[i].getResult()

   print 'all DONE at:', ctime(time())

if __name__ == '__main__':

   main()

------------------------------

# python mtfacfib.py

-----------------------------------------

*** SINGLE THREAD

starting fib at: Sat Dec 21 23:55:31 2013

233

fib finished at: Sat Dec 21 23:55:34 2013

starting fac at: Sat Dec 21 23:55:34 2013

479001600

fac finished at: Sat Dec 21 23:55:35 2013

starting sum at: Sat Dec 21 23:55:35 2013

78

sum finished at: Sat Dec 21 23:55:37 2013

*** MULTIPLE THREADS

starting fib at: Sat Dec 21 23:55:37 2013

starting fac at: Sat Dec 21 23:55:37 2013

starting sum at: Sat Dec 21 23:55:37 2013

fac finished at: Sat Dec 21 23:55:38 2013

sum finished at: Sat Dec 21 23:55:38 2013

fib finished at: Sat Dec 21 23:55:39 2013

233

479001600

78

all DONE at: Sat Dec 21 23:55:39 2013

------------------------------------------

18.5.5 threading模块中的其他函数

函数描述

activeCount()当前活动的线程对象的数量

currentThread()返回当前线程对象

enumerate()返回当前活动线程的列表

settrace(func)为所有线程设置一个跟踪函数

setprofile(func) 为所有线程设置一个profile函数

18.5.5 生产者-消费者问题和Queue模块

这个实现中使用了Queue对象和随机地生产(和消耗)货物的方式,生产者和消费者相互独立并且并发的运行

# vi prodcons.py

----------------------------------

#!/usr/bin/env python

from random import randint

from time import time, ctime, sleep

from Queue import Queue

from myThread import MyThread

def writeQ(queue):

   print 'producing object for Q...',

   queue.put('xxx', 1)

   print "size now", queue.qsize()

def readQ(queue):

   val = queue.get(1)

   print 'consumed object from Q... size now', \

       queue.qsize()

def writer(queue, loops):

   for i in range(loops):

       writeQ(queue)

       sleep(randint(1, 3))

def reader(queue, loops):

   for i in range(loops):

       readQ(queue)

       sleep(randint(2, 5))

funcs = (writer, reader)

nfuncs = range(len(funcs))

def main():

   nloops = randint(2, 5)

   q = Queue(32)

   threads = []

   for i in nfuncs:

       t = MyThread(funcs[i], (q, nloops), \

           funcs[i].__name__)

       threads.append(t)

   for i in nfuncs:

       threads[i].start()

   for i in nfuncs:

       threads[i].join()

   print 'all DONE'

if __name__ == '__main__':

   main()

----------------------------------

# python prodcons.py

-----------------------------------------

starting writer at: Sun Dec 22 00:00:36 2013

producing object for Q... size now 1

starting reader at: Sun Dec 22 00:00:36 2013

consumed object from Q... size now 0

producing object for Q... size now 1

consumed object from Q... size now 0

producing object for Q... size now 1

consumed object from Q... size now 0

writer finished at: Sun Dec 22 00:00:42 2013

reader finished at: Sun Dec 22 00:00:46 2013

all DONE

-------------------------------------------

本例中,一个要完成多项任务的程序,可以考虑没有任务使用一个线程,这样的程序在设计上相对于单线程做所有事的程序来说,更为清晰

18.6 相关模块

模块描述

thread基本的,底级别的线程模块

threading高级别的线程和同步对象

Queue供多线程使用的同步先进先出(FIFO)队列

mutex互斥对象

SocketServer具有线程控制的TCP和UDP管理

正文部分到此结束
版权声明:除非注明,本文由(showerlee)原创,转载请保留文章出处!
本文链接:http://www.showerlee.com/archives/1055

继续浏览:PYTHON

目前有1条大神的评论

loading
  1. 沙发
    yinjiweb2014年1月3日下午3:40 回复

    新年快乐,博客“马上”红火。

发表评论

icon_wink.gif icon_neutral.gif icon_mad.gif icon_twisted.gif icon_smile.gif icon_eek.gif icon_sad.gif icon_rolleyes.gif icon_razz.gif icon_redface.gif icon_surprised.gif icon_mrgreen.gif icon_lol.gif icon_idea.gif icon_biggrin.gif icon_evil.gif icon_cry.gif icon_cool.gif icon_arrow.gif icon_confused.gif icon_question.gif icon_exclaim.gif