百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

30天学会Python编程:12. Python并发编程

zhezhongyun 2025-06-10 04:04 19 浏览

12.1 并发编程基础

12.1.1 并发模型对比

12.1.2 GIL全局解释器锁

关键影响

  • 同一时间只有一个线程执行Python字节码
  • 对CPU密集型任务影响显著
  • I/O密集型任务仍可受益于多线程

12.2 多线程编程

12.2.1 threading模块

基本使用

import threading

def worker(num):
    print(f"Worker {num} 开始执行")
    # 模拟工作
    import time
    time.sleep(1)
    print(f"Worker {num} 执行完成")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

12.2.2 线程同步

锁机制示例

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:  # 自动获取和释放锁
            self.value += 1

counter = Counter()

def increment_worker():
    for _ in range(100000):
        counter.increment()

threads = [threading.Thread(target=increment_worker) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最终计数: {counter.value}")  # 应为200000

12.3 多进程编程

12.3.1 multiprocessing模块

基本使用

from multiprocessing import Process
import os

def task(name):
    print(f"子进程 {name} (PID: {os.getpid()}) 执行中...")
    # CPU密集型计算
    result = sum(i*i for i in range(1000000))
    print(f"子进程 {name} 完成")

if __name__ == '__main__':
    processes = []
    for i in range(4):  # 4核CPU常用
        p = Process(target=task, args=(i,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

12.3.2 进程池

from multiprocessing import Pool

def cpu_intensive(n):
    return sum(i * i for i in range(n))

if __name__ == '__main__':
    with Pool(4) as pool:  # 4个工作进程
        # map方法并行处理
        results = pool.map(cpu_intensive, range(10000, 10010))
        print(results)

12.4 异步编程(asyncio)

12.4.1 协程基础

import asyncio

async def fetch_data(url):
    print(f"开始获取 {url}")
    await asyncio.sleep(2)  # 模拟IO操作
    print(f"完成获取 {url}")
    return f"{url} 的数据"

async def main():
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    
    results = await asyncio.gather(task1, task2)
    print(results)

asyncio.run(main())

12.4.2 异步IO操作

import aiohttp
import asyncio

async def fetch_page(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        "http://example.com",
        "http://example.org",
        "http://example.net"
    ]
    tasks = [fetch_page(url) for url in urls]
    pages = await asyncio.gather(*tasks)
    print(f"获取了 {len(pages)} 个页面")

asyncio.run(main())

12.5 并发工具

12.5.1 concurrent.futures

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request

URLS = ['http://example.com', 'http://example.org']

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {
        executor.submit(load_url, url, 60): url 
        for url in URLS
    }
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url} 页面长度为 {len(data)}")
        except Exception as e:
            print(f"{url} 获取失败: {e}")

12.5.2 队列通信

from queue import Queue
from threading import Thread

def producer(q, items):
    for item in items:
        print(f"生产: {item}")
        q.put(item)
    q.put(None)  # 结束信号

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")

q = Queue()
producer_thread = Thread(target=producer, args=(q, [1,2,3]))
consumer_thread = Thread(target=consumer, args=(q,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

12.6 应用举例

案例1:并发Web爬虫

import aiohttp
import asyncio
from urllib.parse import urljoin
from bs4 import BeautifulSoup

async def crawl(start_url, max_depth=2):
    visited = set()
    queue = [(start_url, 0)]
    
    async with aiohttp.ClientSession() as session:
        while queue:
            url, depth = queue.pop(0)
            if url in visited or depth > max_depth:
                continue
                
            try:
                print(f"抓取: {url}")
                async with session.get(url) as response:
                    html = await response.text()
                    visited.add(url)
                    
                    if depth < max_depth:
                        soup = BeautifulSoup(html, 'html.parser')
                        tasks = []
                        for link in soup.find_all('a', href=True):
                            next_url = urljoin(url, link['href'])
                            if next_url not in visited:
                                queue.append((next_url, depth + 1))
            except Exception as e:
                print(f"抓取失败 {url}: {e}")

asyncio.run(crawl("http://example.com"))

案例2:实时数据处理管道

import threading
import queue
import random
import time

class DataPipeline:
    def __init__(self):
        self.raw_data_queue = queue.Queue()
        self.processed_data = []
        self.lock = threading.Lock()
    
    def data_source(self):
        """模拟数据源"""
        while True:
            data = random.randint(1, 100)
            self.raw_data_queue.put(data)
            time.sleep(0.1)
    
    def data_processor(self):
        """数据处理工作线程"""
        while True:
            data = self.raw_data_queue.get()
            # 模拟处理延迟
            time.sleep(0.2)
            result = data * 2
            
            with self.lock:
                self.processed_data.append(result)
                print(f"处理数据: {data} -> {result} (队列大小: {self.raw_data_queue.qsize()})")
    
    def start(self):
        """启动处理管道"""
        threads = [
            threading.Thread(target=self.data_source, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True)
        ]
        
        for t in threads:
            t.start()
        
        try:
            while True:
                time.sleep(1)
                with self.lock:
                    print(f"当前处理结果数: {len(self.processed_data)}")
        except KeyboardInterrupt:
            print("停止管道")

if __name__ == '__main__':
    pipeline = DataPipeline()
    pipeline.start()

12.7 学习路线图

12.8 学习总结

  1. 核心要点
  • 理解GIL的影响和应对策略
  • 掌握线程同步原语的使用
  • 区分CPU密集和IO密集任务的并发方案
  • 熟悉async/await编程模型
  1. 实践建议
  • IO密集型使用多线程或异步
  • CPU密集型使用多进程
  • 共享数据必须加锁保护
  • 合理控制并发数量
  1. 进阶方向
  • 分布式任务队列(Celery)
  • 基于事件的驱动架构
  • 异步数据库驱动
  • 协程与生成器的深度结合
  1. 常见陷阱
  • 多线程中的竞态条件
  • 忘记释放锁导致的死锁
  • 异步函数中阻塞调用
  • 进程间通信的性能瓶颈

持续更新Python编程学习日志与技巧,敬请关注!


#编程# #学习# #在头条记录我的2025# #python#


相关推荐

Chinese vice premier calls for multilateralism at Davos

DAVOS,Switzerland,Jan.21(Xinhua)--ChineseVicePremierDingXuexiangdeliveredaspeechatthe...

用C++ Qt手把手打造炫酷汽车仪表盘

一、项目背景与核心价值在车载HMI(人机交互界面)开发领域,虚拟仪表盘是智能座舱的核心组件。本项目基于C++Qt框架实现一个具备专业级效果的时速表模块,涵盖以下技术要点:Qt图形绘制核心机制(QPa...

系列专栏(八):JS的第七种基本类型Symbols

ES6作为新一代JavaScript标准,已正式与广大前端开发者见面。为了让大家对ES6的诸多新特性有更深入的了解,MozillaWeb开发者博客推出了《ES6InDepth》系列文章。CSDN...

MFC界面开发工具BCG v31.1 - 增强功能区、工具箱功能

点击“了解更多”获取工具亲爱的BCGSoft用户,我们非常高兴地宣布BCGControlBarProfessionalforMFC和BCGSuiteforMFCv31.2正式发布!新版本支...

雅居乐上调出售吉隆坡项目保留金,预计亏损扩大至6.64亿元

1月2日,雅居乐集团(03383.HK)发布有关出售一家附属公司股权披露交易的补充公告。此前雅居乐集团曾公告,2023年11月8日(交易时段后),集团子公司AgileRealEstateDeve...

Full text: Address by Vice Premier Ding Xuexiang&#39;s at World Economic Forum Annual Meeting 2025

DAVOS,Switzerland,Jan.21(Xinhua)--ChineseVicePremierDingXuexiangonTuesdaydeliveredasp...

手机性能好不好 GPU玄学曲线告诉你

前言各位在看测试者对手机进行评测时或许会见过“安卓玄学曲线”,所谓中的安卓玄学曲线真名为“ProfileGPURendering”。大多数情况下,在系统“开发者选项中被称为“GPU显示配置文件”或...

小迈科技 X Hologres:高可用的百亿级广告实时数仓建设

通过本文,我们将会介绍小迈科技如何通过Hologres搭建高可用的实时数仓。一、业务介绍小迈科技成立于2015年1月,是一家致力以数字化领先为优势,实现业务高质量自增长的移动互联网科技公司。始...

vue3新特征和所有的属性,方法汇总及其对应源码分析

vue3新特征汇总与源码分析(备注:vue3使用typescript编写)何为应用?constapp=Vue.createApp({})app就是一个应用。应用的配置和应用的API就是app应用...

China&#39;s stability redefines global trade in a volatile era

ContainersareunloadedatQingdaoPort,eastChina'sShandongProvince,December10,2024.[Photo/X...

QML 实现图片帧渐隐渐显轮播

前言所谓图片帧渐隐渐显轮播就是,一组图片列表,当前图片逐渐改变透明度隐藏,同时下一张图片逐渐改变透明度显示,依次循环,达到渐隐渐显的效果,该效果常用于图片展示,相比左右自动切换的轮播方式来说,这种方式...

前端惊魂夜:我竟在CSS里写出了JavaScript?

凌晨两点,写字楼里只剩下我工位上的一盏孤灯。咖啡杯见底,屏幕的光映在疲惫的眼镜片上。为了实现一个极其复杂的动态渐变效果,我翻遍了MDN文档,试遍了所有已知的CSS技巧,却始终差那么一口气。“要是CSS...

10 个派上用场的 Flutter 小部件

尝试学习一门新语言可能会令人恐惧和厌烦。很多时候,我们希望我们知道早先存在的某些功能。在今天的文章中,我将告诉你我希望早点知道的最方便的颤振小部件。SpacerSpacer创建一个可调整的空白空...

让我的 Flutter 代码整洁 10 倍的 5 种

如果你曾在Flutter中使用过SingleTickerProviderStateMixin来制作动画,猜猜怎么着?你已经使用过Mixin了——恭喜你,你已经处于一段你甚至不知道的关...

daisyUI - 主题漂亮、代码纯净!免费开源的 Tailwind CSS 组件库

漂亮有特色的CSS组件库,组件代码非常简洁,也支持深度定制主题、定制组件,可以搭配Vue/React等框架使用。关于daisyUIdaisyUI是一款极为流行的CSSUI组件库,...