超碰人人人人人,亚洲AV午夜福利精品一区二区,亚洲欧美综合区丁香五月1区,日韩欧美亚洲系列

LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

Python多線程快速入門

freeflydom
2025年1月17日 9:57 本文熱度 376

前言

線程是操作系統(tǒng)能夠進行運算調(diào)度的最小單位,它被包含在進程之中,是進程中的實際運作單位。由于CPython的GIL限制,多線程實際為單線程,大多只用來處理IO密集型任務(wù)。

Python一般用標(biāo)準(zhǔn)庫threading來進行多線程編程。

基本使用

  • 方式1,創(chuàng)建threading.Thread類的示例
import threading
import time
def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    num = counter
    while num > 0:
        time.sleep(3)
        num -= 1
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創(chuàng)建三個線程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    # join() 用于阻塞主線程, 等待子線程執(zhí)行完畢
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

執(zhí)行輸出示例

main thread: MainThread, start time: 2024-10-26 12:42:37
thread: Thread-1 (task1), args: 7, start time: 2024-10-26 12:42:37
thread: Thread-2 (task1), args: 5, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, end time: 2024-10-26 12:42:46
thread: Thread-2 (task1), args: 5, end time: 2024-10-26 12:42:52
thread: Thread-1 (task1), args: 7, end time: 2024-10-26 12:42:58
main thread: MainThread, end time: 2024-10-26 12:42:58
  • 方式2,繼承threading.Thread類,重寫run()__init__()方法
import threading
import time
class MyThread(threading.Thread):
    def __init__(self, counter: int):
        super().__init__()
        self.counter = counter
    def run(self):
        print(f"thread: {threading.current_thread().name}, args: {self.counter}, start time: {time.strftime('%F %T')}")
        num = self.counter
        while num > 0:
            time.sleep(3)
            num -= 1
        print(f"thread: {threading.current_thread().name}, args: {self.counter}, end time: {time.strftime('%F %T')}")
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創(chuàng)建三個線程
    t1 = MyThread(7)
    t2 = MyThread(5)
    t3 = MyThread(3)
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    # join() 用于阻塞主線程, 等待子線程執(zhí)行完畢
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

繼承threading.Thread類也可以寫成這樣,調(diào)用外部函數(shù)。

import threading
import time
def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    num = counter
    while num > 0:
        time.sleep(3)
        num -= 1
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
class MyThread(threading.Thread):
    def __init__(self, target, args: tuple):
        super().__init__()
        self.target = target
        self.args = args
    
    def run(self):
        self.target(*self.args)
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創(chuàng)建三個線程
    t1 = MyThread(target=task1, args=(7,))
    t2 = MyThread(target=task1, args=(5,))
    t3 = MyThread(target=task1, args=(3,))
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    # join() 用于阻塞主線程, 等待子線程執(zhí)行完畢
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

多線程同步

如果多個線程共同對某個數(shù)據(jù)修改,則可能出現(xiàn)不可預(yù)料的后果,這時候就需要某些同步機制。比如如下代碼,結(jié)果是隨機的(個人電腦用python3.13實測結(jié)果都是0,而低版本的python3.6運行結(jié)果的確是隨機的)

import threading
import time
num = 0
def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    global num
    for _ in range(100000000):
        num = num + counter
        num = num - counter
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創(chuàng)建三個線程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))
    t4 = threading.Thread(target=task1, args=(6,))
    t5 = threading.Thread(target=task1, args=(8,))
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()
    # join() 用于阻塞主線程, 等待子線程執(zhí)行完畢
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    t5.join()
    
    print(f"num: {num}")
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

Lock-鎖

使用互斥鎖可以在一個線程訪問數(shù)據(jù)時,拒絕其它線程訪問,直到解鎖。threading.Thread中的Lock()Rlock()可以提供鎖功能。

import threading
import time
num = 0
mutex = threading.Lock()
def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    global num
    mutex.acquire()
    for _ in range(100000):
        num = num + counter
        num = num - counter
    mutex.release()
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創(chuàng)建三個線程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    # join() 用于阻塞主線程, 等待子線程執(zhí)行完畢
    t1.join()
    t2.join()
    t3.join()
    
    print(f"num: {num}")
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

Semaphore-信號量

互斥鎖是只允許一個線程訪問共享數(shù)據(jù),而信號量是同時允許一定數(shù)量的線程訪問共享數(shù)據(jù)。比如銀行有5個窗口,允許同時有5個人辦理業(yè)務(wù),后面的人只能等待,待柜臺有空閑才可以進入。

import threading
import time
from random import randint
semaphore = threading.BoundedSemaphore(5)
def business(name: str):
    semaphore.acquire()
    print(f"{time.strftime('%F %T')} {name} is handling")
    time.sleep(randint(3, 10))
    print(f"{time.strftime('%F %T')} {name} is done")
    semaphore.release()
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    threads = []
    for i in range(10):
        t = threading.Thread(target=business, args=(f"thread-{i}",))
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

執(zhí)行輸出

main thread: MainThread, start time: 2024-10-26 17:40:10
2024-10-26 17:40:10 thread-0 is handling
2024-10-26 17:40:10 thread-1 is handling
2024-10-26 17:40:10 thread-2 is handling
2024-10-26 17:40:10 thread-3 is handling
2024-10-26 17:40:10 thread-4 is handling
2024-10-26 17:40:15 thread-2 is done
2024-10-26 17:40:15 thread-5 is handling
2024-10-26 17:40:16 thread-0 is done
2024-10-26 17:40:16 thread-6 is handling
2024-10-26 17:40:19 thread-3 is done
2024-10-26 17:40:19 thread-4 is done
2024-10-26 17:40:19 thread-7 is handling
2024-10-26 17:40:19 thread-8 is handling
2024-10-26 17:40:20 thread-1 is done
2024-10-26 17:40:20 thread-9 is handling
2024-10-26 17:40:21 thread-6 is done
2024-10-26 17:40:23 thread-7 is done
2024-10-26 17:40:24 thread-5 is done
2024-10-26 17:40:24 thread-8 is done
2024-10-26 17:40:30 thread-9 is done
main thread: MainThread, end time: 2024-10-26 17:40:30

Condition-條件對象

Condition對象能讓一個線程A停下來,等待其他線程,其他線程通知后線程A繼續(xù)運行。

import threading
import time
import random
class Employee(threading.Thread):
    def __init__(self, username: str, cond: threading.Condition):
        self.username = username
        self.cond = cond
        super().__init__()
    def run(self):
        with self.cond:
            print(f"{time.strftime('%F %T')} {self.username} 到達公司")
            self.cond.wait()  # 等待通知
            print(f"{time.strftime('%F %T')} {self.username} 開始工作")
            time.sleep(random.randint(1, 5))
            print(f"{time.strftime('%F %T')} {self.username} 工作完成")
class Boss(threading.Thread):
    def __init__(self, username: str, cond: threading.Condition):
        self.username = username
        self.cond = cond
        super().__init__()
    def run(self):
        with self.cond:
            print(f"{time.strftime('%F %T')} {self.username} 發(fā)出通知")
            self.cond.notify_all()  # 通知所有線程
        time.sleep(2)
if __name__ == "__main__":
    cond = threading.Condition()
    boss = Boss("老王", cond)
    
    employees = []
    for i in range(5):
        employees.append(Employee(f"員工{i}", cond))
    for employee in employees:
        employee.start()
    boss.start()
    boss.join()
    for employee in employees:
        employee.join()

執(zhí)行輸出

2024-10-26 21:16:20 員工0 到達公司
2024-10-26 21:16:20 員工1 到達公司
2024-10-26 21:16:20 員工2 到達公司
2024-10-26 21:16:20 員工3 到達公司
2024-10-26 21:16:20 員工4 到達公司
2024-10-26 21:16:20 老王 發(fā)出通知
2024-10-26 21:16:20 員工4 開始工作
2024-10-26 21:16:23 員工4 工作完成
2024-10-26 21:16:23 員工1 開始工作
2024-10-26 21:16:28 員工1 工作完成
2024-10-26 21:16:28 員工2 開始工作
2024-10-26 21:16:30 員工2 工作完成
2024-10-26 21:16:30 員工0 開始工作
2024-10-26 21:16:31 員工0 工作完成
2024-10-26 21:16:31 員工3 開始工作
2024-10-26 21:16:32 員工3 工作完成

Event-事件

在 Python 的 threading 模塊中,Event 是一個線程同步原語,用于在多個線程之間進行簡單的通信。Event 對象維護一個內(nèi)部標(biāo)志,線程可以使用 wait() 方法阻塞,直到另一個線程調(diào)用 set() 方法將標(biāo)志設(shè)置為 True。一旦標(biāo)志被設(shè)置為 True,所有等待的線程將被喚醒并繼續(xù)執(zhí)行。

Event 的主要方法

  1. set():將事件的內(nèi)部標(biāo)志設(shè)置為 True,并喚醒所有等待的線程。
  2. clear():將事件的內(nèi)部標(biāo)志設(shè)置為 False
  3. is_set():返回事件的內(nèi)部標(biāo)志是否為 True。
  4. wait(timeout=None):如果事件的內(nèi)部標(biāo)志為 False,則阻塞當(dāng)前線程,直到標(biāo)志被設(shè)置為 True 或超時(如果指定了 timeout)。
import threading
import time
import random
class Employee(threading.Thread):
    def __init__(self, username: str, cond: threading.Event):
        self.username = username
        self.cond = cond
        super().__init__()
    def run(self):
        print(f"{time.strftime('%F %T')} {self.username} 到達公司")
        self.cond.wait()  # 等待事件標(biāo)志為True
        print(f"{time.strftime('%F %T')} {self.username} 開始工作")
        time.sleep(random.randint(1, 5))
        print(f"{time.strftime('%F %T')} {self.username} 工作完成")
class Boss(threading.Thread):
    def __init__(self, username: str, cond: threading.Event):
        self.username = username
        self.cond = cond
        super().__init__()
    def run(self):
        print(f"{time.strftime('%F %T')} {self.username} 發(fā)出通知")
        self.cond.set()
if __name__ == "__main__":
    cond = threading.Event()
    boss = Boss("老王", cond)
    
    employees = []
    for i in range(5):
        employees.append(Employee(f"員工{i}", cond))
    for employee in employees:
        employee.start()
    boss.start()
    boss.join()
    for employee in employees:
        employee.join()

執(zhí)行輸出

2024-10-26 21:22:28 員工0 到達公司
2024-10-26 21:22:28 員工1 到達公司
2024-10-26 21:22:28 員工2 到達公司
2024-10-26 21:22:28 員工3 到達公司
2024-10-26 21:22:28 員工4 到達公司
2024-10-26 21:22:28 老王 發(fā)出通知
2024-10-26 21:22:28 員工0 開始工作
2024-10-26 21:22:28 員工1 開始工作
2024-10-26 21:22:28 員工3 開始工作
2024-10-26 21:22:28 員工4 開始工作
2024-10-26 21:22:28 員工2 開始工作
2024-10-26 21:22:30 員工3 工作完成
2024-10-26 21:22:31 員工4 工作完成
2024-10-26 21:22:31 員工2 工作完成
2024-10-26 21:22:32 員工0 工作完成
2024-10-26 21:22:32 員工1 工作完成

使用隊列

Python的queue模塊提供同步、線程安全的隊列類。以下示例為使用queue實現(xiàn)的生產(chǎn)消費者模型

import threading
import time
import random
import queue
class Producer(threading.Thread):
    """多線程生產(chǎn)者類."""
    def __init__(
        self, tname: str, channel: queue.Queue, done: threading.Event
    ):
        self.tname = tname
        self.channel = channel
        self.done = done
        super().__init__()
    def run(self) -> None:
        """Method representing the thread's activity."""
        while True:
            if self.done.is_set():
                print(
                    f"{time.strftime('%F %T')} {self.tname} 收到停止信號事件"
                )
                break
            if self.channel.full():
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 隊列已滿, 全部停止生產(chǎn)"
                )
                self.done.set()
            else:
                num = random.randint(100, 1000)
                self.channel.put(f"{self.tname}-{num}")
                print(
                    f"{time.strftime('%F %T')} {self.tname} 生成數(shù)據(jù) {num}, queue size: {self.channel.qsize()}"
                )
                time.sleep(random.randint(1, 5))
class Consumer(threading.Thread):
    """多線程消費者類."""
    def __init__(
        self, tname: str, channel: queue.Queue, done: threading.Event
    ):
        self.tname = tname
        self.channel = channel
        self.done = done
        self.counter = 0
        super().__init__()
    def run(self) -> None:
        """Method representing the thread's activity."""
        while True:
            if self.done.is_set():
                print(
                    f"{time.strftime('%F %T')} {self.tname} 收到停止信號事件"
                )
                break
            if self.counter >= 3:
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 全部停止消費"
                )
                self.done.set()
                continue
            if self.channel.empty():
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 隊列為空, counter: {self.counter}"
                )
                self.counter += 1
                time.sleep(1)
                continue
            else:
                data = self.channel.get()
                print(
                    f"{time.strftime('%F %T')} {self.tname} 消費數(shù)據(jù) {data}, queue size: {self.channel.qsize()}"
                )
                time.sleep(random.randint(1, 5))
                self.counter = 0
if __name__ == "__main__":
    done_p = threading.Event()
    done_c = threading.Event()
    channel = queue.Queue(30)
    threads_producer = []
    threads_consumer = []
    for i in range(8):
        threads_producer.append(Producer(f"producer-{i}", channel, done_p))
    for i in range(6):
        threads_consumer.append(Consumer(f"consumer-{i}", channel, done_c))
    for t in threads_producer:
        t.start()
    for t in threads_consumer:
        t.start()
    for t in threads_producer:
        t.join()
    for t in threads_consumer:
        t.join()

線程池

在面向?qū)ο缶幊讨?,?chuàng)建和銷毀對象是很費時間的,因為創(chuàng)建一個對象要獲取內(nèi)存資源或其他更多資源。在多線程程序中,生成一個新線程之后銷毀,然后再創(chuàng)建一個,這種方式就很低效。池化多線程,也就是線程池就為此而生。

將任務(wù)添加到線程池中,線程池會自動指定一個空閑的線程去執(zhí)行任務(wù),當(dāng)超過最大線程數(shù)時,任務(wù)需要等待有新的空閑線程才會被執(zhí)行。Python一般可以使用multiprocessing模塊中的Pool來創(chuàng)建線程池。

import time
from multiprocessing.dummy import Pool as ThreadPool
def foo(n):
    time.sleep(2)
if __name__ == "__main__":
    start = time.time()
    for n in range(5):
        foo(n)
    print("single thread time: ", time.time() - start)
    start = time.time()
    t_pool = ThreadPool(processes=5)  # 創(chuàng)建線程池, 指定池中的線程數(shù)為5(默認(rèn)為CPU數(shù))
    rst = t_pool.map(foo, range(5))  # 使用map為每個元素應(yīng)用到foo函數(shù)
    t_pool.close()  # 阻止任何新的任務(wù)提交到線程池
    t_pool.join()  # 等待所有已提交的任務(wù)完成
    print("thread pool time: ", time.time() - start)

線程池執(zhí)行器

python的內(nèi)置模塊concurrent.futures提供了ThreadPoolExecutor類。這個類結(jié)合了線程和隊列的優(yōu)勢,可以用來平行執(zhí)行任務(wù)。

import time
from random import randint
from concurrent.futures import ThreadPoolExecutor
def foo() -> None:
    time.sleep(2)
    return randint(1,100)
if __name__ == "__main__":
    start = time.time()
    futures = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        for n in range(10):
            futures.append(executor.submit(foo))  # Fan out
            
    for future in futures:  # Fan in
        print(future.result())
    print("thread pool executor time: ", time.time() - start)

執(zhí)行輸出

44
19
86
48
35
74
59
99
58
53
thread pool executor time:  4.001955032348633

ThreadPoolExecutor類的最大優(yōu)點在于:如果調(diào)用者通過submit方法把某項任務(wù)交給它執(zhí)行,那么會獲得一個與該任務(wù)相對應(yīng)的Future實例,當(dāng)調(diào)用者在這個實例上通過result方法獲取執(zhí)行結(jié)果時,ThreadPoolExecutor會把它在執(zhí)行任務(wù)的過程中所遇到的異常自動拋給調(diào)用者。而ThreadPoolExecutor類的缺點是IO并行能力不高,即便把max_worker設(shè)為100,也無法高效處理任務(wù)。更高需求的IO任務(wù)可以考慮換異步協(xié)程方案。

轉(zhuǎn)自https://www.cnblogs.com/XY-Heruo/p/18514316


該文章在 2025/1/17 10:06:25 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點晴ERP是一款針對中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國內(nèi)大量中小企業(yè)的青睞。
點晴PMS碼頭管理系統(tǒng)主要針對港口碼頭集裝箱與散貨日常運作、調(diào)度、堆場、車隊、財務(wù)費用、相關(guān)報表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點,圍繞調(diào)度、堆場作業(yè)而開發(fā)的。集技術(shù)的先進性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點晴WMS倉儲管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質(zhì)期管理,貨位管理,庫位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號管理軟件。
點晴免費OA是一款軟件和通用服務(wù)都免費,不限功能、不限時間、不限用戶的免費OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved