前言 線程是操作系統(tǒng)能夠進行運算調(diào)度的最小單位,它被包含在進程之中,是進程中的實際運作單位。由于CPython的GIL限制,多線程實際為單線程,大多只用來處理IO密集型任務(wù)。
Python一般用標(biāo)準(zhǔn)庫threading
來進行多線程編程。
基本使用 方式1,創(chuàng)建threading.Thread
類的示例 import threading
import time
def task1 ( counter: int ) :
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , start time: { time. strftime( '%F %T' ) } " )
num = counter
while num > 0 :
time. sleep( 3 )
num -= 1
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , end time: { time. strftime( '%F %T' ) } " )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = threading. Thread( target= task1, args= ( 7 , ) )
t2 = threading. Thread( target= task1, args= ( 5 , ) )
t3 = threading. Thread( target= task1, args= ( 3 , ) )
t1. start( )
t2. start( )
t3. start( )
t1. join( )
t2. join( )
t3. join( )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
執(zhí)行輸出示例
main thread: MainThread, start time: 2024-10-26 12:42:37
thread: Thread-1 (task1), args: 7, start time: 2024-10-26 12:42:37
thread: Thread-2 (task1), args: 5, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, end time: 2024-10-26 12:42:46
thread: Thread-2 (task1), args: 5, end time: 2024-10-26 12:42:52
thread: Thread-1 (task1), args: 7, end time: 2024-10-26 12:42:58
main thread: MainThread, end time: 2024-10-26 12:42:58
方式2,繼承threading.Thread
類,重寫run()
和__init__()
方法 import threading
import time
class MyThread ( threading. Thread) :
def __init__ ( self, counter: int ) :
super ( ) . __init__( )
self. counter = counter
def run ( self) :
print ( f"thread: { threading. current_thread( ) . name} , args: { self. counter} , start time: { time. strftime( '%F %T' ) } " )
num = self. counter
while num > 0 :
time. sleep( 3 )
num -= 1
print ( f"thread: { threading. current_thread( ) . name} , args: { self. counter} , end time: { time. strftime( '%F %T' ) } " )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = MyThread( 7 )
t2 = MyThread( 5 )
t3 = MyThread( 3 )
t1. start( )
t2. start( )
t3. start( )
t1. join( )
t2. join( )
t3. join( )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
繼承threading.Thread
類也可以寫成這樣,調(diào)用外部函數(shù)。
import threading
import time
def task1 ( counter: int ) :
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , start time: { time. strftime( '%F %T' ) } " )
num = counter
while num > 0 :
time. sleep( 3 )
num -= 1
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , end time: { time. strftime( '%F %T' ) } " )
class MyThread ( threading. Thread) :
def __init__ ( self, target, args: tuple ) :
super ( ) . __init__( )
self. target = target
self. args = args
def run ( self) :
self. target( * self. args)
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = MyThread( target= task1, args= ( 7 , ) )
t2 = MyThread( target= task1, args= ( 5 , ) )
t3 = MyThread( target= task1, args= ( 3 , ) )
t1. start( )
t2. start( )
t3. start( )
t1. join( )
t2. join( )
t3. join( )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
多線程同步 如果多個線程共同對某個數(shù)據(jù)修改,則可能出現(xiàn)不可預(yù)料的后果,這時候就需要某些同步機制。比如如下代碼,結(jié)果是隨機的(個人電腦用python3.13實測結(jié)果都是0,而低版本的python3.6運行結(jié)果的確是隨機的)
import threading
import time
num = 0
def task1 ( counter: int ) :
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , start time: { time. strftime( '%F %T' ) } " )
global num
for _ in range ( 100000000 ) :
num = num + counter
num = num - counter
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , end time: { time. strftime( '%F %T' ) } " )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = threading. Thread( target= task1, args= ( 7 , ) )
t2 = threading. Thread( target= task1, args= ( 5 , ) )
t3 = threading. Thread( target= task1, args= ( 3 , ) )
t4 = threading. Thread( target= task1, args= ( 6 , ) )
t5 = threading. Thread( target= task1, args= ( 8 , ) )
t1. start( )
t2. start( )
t3. start( )
t4. start( )
t5. start( )
t1. join( )
t2. join( )
t3. join( )
t4. join( )
t5. join( )
print ( f"num: { num} " )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
Lock-鎖 使用互斥鎖可以在一個線程訪問數(shù)據(jù)時,拒絕其它線程訪問,直到解鎖。threading.Thread
中的Lock()
和Rlock()
可以提供鎖功能。
import threading
import time
num = 0
mutex = threading. Lock( )
def task1 ( counter: int ) :
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , start time: { time. strftime( '%F %T' ) } " )
global num
mutex. acquire( )
for _ in range ( 100000 ) :
num = num + counter
num = num - counter
mutex. release( )
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , end time: { time. strftime( '%F %T' ) } " )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = threading. Thread( target= task1, args= ( 7 , ) )
t2 = threading. Thread( target= task1, args= ( 5 , ) )
t3 = threading. Thread( target= task1, args= ( 3 , ) )
t1. start( )
t2. start( )
t3. start( )
t1. join( )
t2. join( )
t3. join( )
print ( f"num: { num} " )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
Semaphore-信號量 互斥鎖是只允許一個線程訪問共享數(shù)據(jù),而信號量是同時允許一定數(shù)量的線程訪問共享數(shù)據(jù)。比如銀行有5個窗口,允許同時有5個人辦理業(yè)務(wù),后面的人只能等待,待柜臺有空閑才可以進入。
import threading
import time
from random import randint
semaphore = threading. BoundedSemaphore( 5 )
def business ( name: str ) :
semaphore. acquire( )
print ( f" { time. strftime( '%F %T' ) } { name} is handling" )
time. sleep( randint( 3 , 10 ) )
print ( f" { time. strftime( '%F %T' ) } { name} is done" )
semaphore. release( )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
threads = [ ]
for i in range ( 10 ) :
t = threading. Thread( target= business, args= ( f"thread- { i} " , ) )
threads. append( t)
for t in threads:
t. start( )
for t in threads:
t. join( )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
執(zhí)行輸出
main thread: MainThread, start time: 2024-10-26 17:40:10
2024-10-26 17:40:10 thread-0 is handling
2024-10-26 17:40:10 thread-1 is handling
2024-10-26 17:40:10 thread-2 is handling
2024-10-26 17:40:10 thread-3 is handling
2024-10-26 17:40:10 thread-4 is handling
2024-10-26 17:40:15 thread-2 is done
2024-10-26 17:40:15 thread-5 is handling
2024-10-26 17:40:16 thread-0 is done
2024-10-26 17:40:16 thread-6 is handling
2024-10-26 17:40:19 thread-3 is done
2024-10-26 17:40:19 thread-4 is done
2024-10-26 17:40:19 thread-7 is handling
2024-10-26 17:40:19 thread-8 is handling
2024-10-26 17:40:20 thread-1 is done
2024-10-26 17:40:20 thread-9 is handling
2024-10-26 17:40:21 thread-6 is done
2024-10-26 17:40:23 thread-7 is done
2024-10-26 17:40:24 thread-5 is done
2024-10-26 17:40:24 thread-8 is done
2024-10-26 17:40:30 thread-9 is done
main thread: MainThread, end time: 2024-10-26 17:40:30
Condition-條件對象 Condition
對象能讓一個線程A停下來,等待其他線程,其他線程通知后線程A繼續(xù)運行。
import threading
import time
import random
class Employee ( threading. Thread) :
def __init__ ( self, username: str , cond: threading. Condition) :
self. username = username
self. cond = cond
super ( ) . __init__( )
def run ( self) :
with self. cond:
print ( f" { time. strftime( '%F %T' ) } { self. username} 到達公司" )
self. cond. wait( )
print ( f" { time. strftime( '%F %T' ) } { self. username} 開始工作" )
time. sleep( random. randint( 1 , 5 ) )
print ( f" { time. strftime( '%F %T' ) } { self. username} 工作完成" )
class Boss ( threading. Thread) :
def __init__ ( self, username: str , cond: threading. Condition) :
self. username = username
self. cond = cond
super ( ) . __init__( )
def run ( self) :
with self. cond:
print ( f" { time. strftime( '%F %T' ) } { self. username} 發(fā)出通知" )
self. cond. notify_all( )
time. sleep( 2 )
if __name__ == "__main__" :
cond = threading. Condition( )
boss = Boss( "老王" , cond)
employees = [ ]
for i in range ( 5 ) :
employees. append( Employee( f"員工 { i} " , cond) )
for employee in employees:
employee. start( )
boss. start( )
boss. join( )
for employee in employees:
employee. join( )
執(zhí)行輸出
2024-10-26 21:16:20 員工0 到達公司
2024-10-26 21:16:20 員工1 到達公司
2024-10-26 21:16:20 員工2 到達公司
2024-10-26 21:16:20 員工3 到達公司
2024-10-26 21:16:20 員工4 到達公司
2024-10-26 21:16:20 老王 發(fā)出通知
2024-10-26 21:16:20 員工4 開始工作
2024-10-26 21:16:23 員工4 工作完成
2024-10-26 21:16:23 員工1 開始工作
2024-10-26 21:16:28 員工1 工作完成
2024-10-26 21:16:28 員工2 開始工作
2024-10-26 21:16:30 員工2 工作完成
2024-10-26 21:16:30 員工0 開始工作
2024-10-26 21:16:31 員工0 工作完成
2024-10-26 21:16:31 員工3 開始工作
2024-10-26 21:16:32 員工3 工作完成
Event-事件 在 Python 的 threading
模塊中,Event
是一個線程同步原語,用于在多個線程之間進行簡單的通信。Event
對象維護一個內(nèi)部標(biāo)志,線程可以使用 wait()
方法阻塞,直到另一個線程調(diào)用 set()
方法將標(biāo)志設(shè)置為 True
。一旦標(biāo)志被設(shè)置為 True
,所有等待的線程將被喚醒并繼續(xù)執(zhí)行。
Event
的主要方法
set()
:將事件的內(nèi)部標(biāo)志設(shè)置為 True
,并喚醒所有等待的線程。clear()
:將事件的內(nèi)部標(biāo)志設(shè)置為 False
。is_set()
:返回事件的內(nèi)部標(biāo)志是否為 True
。wait(timeout=None)
:如果事件的內(nèi)部標(biāo)志為 False
,則阻塞當(dāng)前線程,直到標(biāo)志被設(shè)置為 True
或超時(如果指定了 timeout
)。import threading
import time
import random
class Employee ( threading. Thread) :
def __init__ ( self, username: str , cond: threading. Event) :
self. username = username
self. cond = cond
super ( ) . __init__( )
def run ( self) :
print ( f" { time. strftime( '%F %T' ) } { self. username} 到達公司" )
self. cond. wait( )
print ( f" { time. strftime( '%F %T' ) } { self. username} 開始工作" )
time. sleep( random. randint( 1 , 5 ) )
print ( f" { time. strftime( '%F %T' ) } { self. username} 工作完成" )
class Boss ( threading. Thread) :
def __init__ ( self, username: str , cond: threading. Event) :
self. username = username
self. cond = cond
super ( ) . __init__( )
def run ( self) :
print ( f" { time. strftime( '%F %T' ) } { self. username} 發(fā)出通知" )
self. cond. set ( )
if __name__ == "__main__" :
cond = threading. Event( )
boss = Boss( "老王" , cond)
employees = [ ]
for i in range ( 5 ) :
employees. append( Employee( f"員工 { i} " , cond) )
for employee in employees:
employee. start( )
boss. start( )
boss. join( )
for employee in employees:
employee. join( )
執(zhí)行輸出
2024-10-26 21:22:28 員工0 到達公司
2024-10-26 21:22:28 員工1 到達公司
2024-10-26 21:22:28 員工2 到達公司
2024-10-26 21:22:28 員工3 到達公司
2024-10-26 21:22:28 員工4 到達公司
2024-10-26 21:22:28 老王 發(fā)出通知
2024-10-26 21:22:28 員工0 開始工作
2024-10-26 21:22:28 員工1 開始工作
2024-10-26 21:22:28 員工3 開始工作
2024-10-26 21:22:28 員工4 開始工作
2024-10-26 21:22:28 員工2 開始工作
2024-10-26 21:22:30 員工3 工作完成
2024-10-26 21:22:31 員工4 工作完成
2024-10-26 21:22:31 員工2 工作完成
2024-10-26 21:22:32 員工0 工作完成
2024-10-26 21:22:32 員工1 工作完成
使用隊列 Python的queue
模塊提供同步、線程安全的隊列類。以下示例為使用queue實現(xiàn)的生產(chǎn)消費者模型
import threading
import time
import random
import queue
class Producer ( threading. Thread) :
"""多線程生產(chǎn)者類."""
def __init__ (
self, tname: str , channel: queue. Queue, done: threading. Event
) :
self. tname = tname
self. channel = channel
self. done = done
super ( ) . __init__( )
def run ( self) - > None :
"""Method representing the thread's activity."""
while True :
if self. done. is_set( ) :
print (
f" { time. strftime( '%F %T' ) } { self. tname} 收到停止信號事件"
)
break
if self. channel. full( ) :
print (
f" { time. strftime( '%F %T' ) } { self. tname} report: 隊列已滿, 全部停止生產(chǎn)"
)
self. done. set ( )
else :
num = random. randint( 100 , 1000 )
self. channel. put( f" { self. tname} - { num} " )
print (
f" { time. strftime( '%F %T' ) } { self. tname} 生成數(shù)據(jù) { num} , queue size: { self. channel. qsize( ) } "
)
time. sleep( random. randint( 1 , 5 ) )
class Consumer ( threading. Thread) :
"""多線程消費者類."""
def __init__ (
self, tname: str , channel: queue. Queue, done: threading. Event
) :
self. tname = tname
self. channel = channel
self. done = done
self. counter = 0
super ( ) . __init__( )
def run ( self) - > None :
"""Method representing the thread's activity."""
while True :
if self. done. is_set( ) :
print (
f" { time. strftime( '%F %T' ) } { self. tname} 收到停止信號事件"
)
break
if self. counter >= 3 :
print (
f" { time. strftime( '%F %T' ) } { self. tname} report: 全部停止消費"
)
self. done. set ( )
continue
if self. channel. empty( ) :
print (
f" { time. strftime( '%F %T' ) } { self. tname} report: 隊列為空, counter: { self. counter} "
)
self. counter += 1
time. sleep( 1 )
continue
else :
data = self. channel. get( )
print (
f" { time. strftime( '%F %T' ) } { self. tname} 消費數(shù)據(jù) { data} , queue size: { self. channel. qsize( ) } "
)
time. sleep( random. randint( 1 , 5 ) )
self. counter = 0
if __name__ == "__main__" :
done_p = threading. Event( )
done_c = threading. Event( )
channel = queue. Queue( 30 )
threads_producer = [ ]
threads_consumer = [ ]
for i in range ( 8 ) :
threads_producer. append( Producer( f"producer- { i} " , channel, done_p) )
for i in range ( 6 ) :
threads_consumer. append( Consumer( f"consumer- { i} " , channel, done_c) )
for t in threads_producer:
t. start( )
for t in threads_consumer:
t. start( )
for t in threads_producer:
t. join( )
for t in threads_consumer:
t. join( )
線程池 在面向?qū)ο缶幊讨?,?chuàng)建和銷毀對象是很費時間的,因為創(chuàng)建一個對象要獲取內(nèi)存資源或其他更多資源。在多線程程序中,生成一個新線程之后銷毀,然后再創(chuàng)建一個,這種方式就很低效。池化多線程,也就是線程池就為此而生。
將任務(wù)添加到線程池中,線程池會自動指定一個空閑的線程去執(zhí)行任務(wù),當(dāng)超過最大線程數(shù)時,任務(wù)需要等待有新的空閑線程才會被執(zhí)行。Python一般可以使用multiprocessing
模塊中的Pool
來創(chuàng)建線程池。
import time
from multiprocessing. dummy import Pool as ThreadPool
def foo ( n) :
time. sleep( 2 )
if __name__ == "__main__" :
start = time. time( )
for n in range ( 5 ) :
foo( n)
print ( "single thread time: " , time. time( ) - start)
start = time. time( )
t_pool = ThreadPool( processes= 5 )
rst = t_pool. map ( foo, range ( 5 ) )
t_pool. close( )
t_pool. join( )
print ( "thread pool time: " , time. time( ) - start)
線程池執(zhí)行器 python的內(nèi)置模塊concurrent.futures
提供了ThreadPoolExecutor
類。這個類結(jié)合了線程和隊列的優(yōu)勢,可以用來平行執(zhí)行任務(wù)。
import time
from random import randint
from concurrent. futures import ThreadPoolExecutor
def foo ( ) - > None :
time. sleep( 2 )
return randint( 1 , 100 )
if __name__ == "__main__" :
start = time. time( )
futures = [ ]
with ThreadPoolExecutor( max_workers= 5 ) as executor:
for n in range ( 10 ) :
futures. append( executor. submit( foo) )
for future in futures:
print ( future. result( ) )
print ( "thread pool executor time: " , time. time( ) - start)
執(zhí)行輸出
44
19
86
48
35
74
59
99
58
53
thread pool executor time: 4.001955032348633
ThreadPoolExecutor
類的最大優(yōu)點在于:如果調(diào)用者通過submit
方法把某項任務(wù)交給它執(zhí)行,那么會獲得一個與該任務(wù)相對應(yīng)的Future
實例,當(dāng)調(diào)用者在這個實例上通過result
方法獲取執(zhí)行結(jié)果時,ThreadPoolExecutor
會把它在執(zhí)行任務(wù)的過程中所遇到的異常自動拋給調(diào)用者。而ThreadPoolExecutor
類的缺點是IO并行能力不高,即便把max_worker
設(shè)為100,也無法高效處理任務(wù)。更高需求的IO任務(wù)可以考慮換異步協(xié)程方案。
轉(zhuǎn)自https://www.cnblogs.com/XY-Heruo/p/18514316
該文章在 2025/1/17 10:06:25 編輯過