百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

30天学会Python编程:12. Python并发编程

zhezhongyun 2025-06-10 04:04 5 浏览

12.1 并发编程基础

12.1.1 并发模型对比

12.1.2 GIL全局解释器锁

关键影响

  • 同一时间只有一个线程执行Python字节码
  • 对CPU密集型任务影响显著
  • I/O密集型任务仍可受益于多线程

12.2 多线程编程

12.2.1 threading模块

基本使用

import threading

def worker(num):
    print(f"Worker {num} 开始执行")
    # 模拟工作
    import time
    time.sleep(1)
    print(f"Worker {num} 执行完成")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

12.2.2 线程同步

锁机制示例

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:  # 自动获取和释放锁
            self.value += 1

counter = Counter()

def increment_worker():
    for _ in range(100000):
        counter.increment()

threads = [threading.Thread(target=increment_worker) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最终计数: {counter.value}")  # 应为200000

12.3 多进程编程

12.3.1 multiprocessing模块

基本使用

from multiprocessing import Process
import os

def task(name):
    print(f"子进程 {name} (PID: {os.getpid()}) 执行中...")
    # CPU密集型计算
    result = sum(i*i for i in range(1000000))
    print(f"子进程 {name} 完成")

if __name__ == '__main__':
    processes = []
    for i in range(4):  # 4核CPU常用
        p = Process(target=task, args=(i,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

12.3.2 进程池

from multiprocessing import Pool

def cpu_intensive(n):
    return sum(i * i for i in range(n))

if __name__ == '__main__':
    with Pool(4) as pool:  # 4个工作进程
        # map方法并行处理
        results = pool.map(cpu_intensive, range(10000, 10010))
        print(results)

12.4 异步编程(asyncio)

12.4.1 协程基础

import asyncio

async def fetch_data(url):
    print(f"开始获取 {url}")
    await asyncio.sleep(2)  # 模拟IO操作
    print(f"完成获取 {url}")
    return f"{url} 的数据"

async def main():
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    
    results = await asyncio.gather(task1, task2)
    print(results)

asyncio.run(main())

12.4.2 异步IO操作

import aiohttp
import asyncio

async def fetch_page(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        "http://example.com",
        "http://example.org",
        "http://example.net"
    ]
    tasks = [fetch_page(url) for url in urls]
    pages = await asyncio.gather(*tasks)
    print(f"获取了 {len(pages)} 个页面")

asyncio.run(main())

12.5 并发工具

12.5.1 concurrent.futures

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request

URLS = ['http://example.com', 'http://example.org']

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {
        executor.submit(load_url, url, 60): url 
        for url in URLS
    }
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url} 页面长度为 {len(data)}")
        except Exception as e:
            print(f"{url} 获取失败: {e}")

12.5.2 队列通信

from queue import Queue
from threading import Thread

def producer(q, items):
    for item in items:
        print(f"生产: {item}")
        q.put(item)
    q.put(None)  # 结束信号

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")

q = Queue()
producer_thread = Thread(target=producer, args=(q, [1,2,3]))
consumer_thread = Thread(target=consumer, args=(q,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

12.6 应用举例

案例1:并发Web爬虫

import aiohttp
import asyncio
from urllib.parse import urljoin
from bs4 import BeautifulSoup

async def crawl(start_url, max_depth=2):
    visited = set()
    queue = [(start_url, 0)]
    
    async with aiohttp.ClientSession() as session:
        while queue:
            url, depth = queue.pop(0)
            if url in visited or depth > max_depth:
                continue
                
            try:
                print(f"抓取: {url}")
                async with session.get(url) as response:
                    html = await response.text()
                    visited.add(url)
                    
                    if depth < max_depth:
                        soup = BeautifulSoup(html, 'html.parser')
                        tasks = []
                        for link in soup.find_all('a', href=True):
                            next_url = urljoin(url, link['href'])
                            if next_url not in visited:
                                queue.append((next_url, depth + 1))
            except Exception as e:
                print(f"抓取失败 {url}: {e}")

asyncio.run(crawl("http://example.com"))

案例2:实时数据处理管道

import threading
import queue
import random
import time

class DataPipeline:
    def __init__(self):
        self.raw_data_queue = queue.Queue()
        self.processed_data = []
        self.lock = threading.Lock()
    
    def data_source(self):
        """模拟数据源"""
        while True:
            data = random.randint(1, 100)
            self.raw_data_queue.put(data)
            time.sleep(0.1)
    
    def data_processor(self):
        """数据处理工作线程"""
        while True:
            data = self.raw_data_queue.get()
            # 模拟处理延迟
            time.sleep(0.2)
            result = data * 2
            
            with self.lock:
                self.processed_data.append(result)
                print(f"处理数据: {data} -> {result} (队列大小: {self.raw_data_queue.qsize()})")
    
    def start(self):
        """启动处理管道"""
        threads = [
            threading.Thread(target=self.data_source, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True)
        ]
        
        for t in threads:
            t.start()
        
        try:
            while True:
                time.sleep(1)
                with self.lock:
                    print(f"当前处理结果数: {len(self.processed_data)}")
        except KeyboardInterrupt:
            print("停止管道")

if __name__ == '__main__':
    pipeline = DataPipeline()
    pipeline.start()

12.7 学习路线图

12.8 学习总结

  1. 核心要点
  • 理解GIL的影响和应对策略
  • 掌握线程同步原语的使用
  • 区分CPU密集和IO密集任务的并发方案
  • 熟悉async/await编程模型
  1. 实践建议
  • IO密集型使用多线程或异步
  • CPU密集型使用多进程
  • 共享数据必须加锁保护
  • 合理控制并发数量
  1. 进阶方向
  • 分布式任务队列(Celery)
  • 基于事件的驱动架构
  • 异步数据库驱动
  • 协程与生成器的深度结合
  1. 常见陷阱
  • 多线程中的竞态条件
  • 忘记释放锁导致的死锁
  • 异步函数中阻塞调用
  • 进程间通信的性能瓶颈

持续更新Python编程学习日志与技巧,敬请关注!


#编程# #学习# #在头条记录我的2025# #python#


相关推荐

JavaScript做个贪吃蛇小游戏(过关-加速),无需网络直接玩。

JavaScript做个贪吃蛇小游戏(过关-则加速)在浏览器打开文件,无需网络直接玩。<!DOCTYPEhtml><htmllang="en"><...

大模型部署加速方法简单总结(大模型 ai)

以下对大模型部署、压缩、加速的方法做一个简单总结,为后续需要备查。llama.cppGithub:https://github.com/ggerganov/llama.cppLLaMA.cpp项...

安徽医大第一医院应用VitaFlow Liberty(R)Flex为患者焕然一“心”

近日,在安徽医科大学第一附属医院心血管内科负责人暨北京安贞医院安徽医院业务副院长喻荣辉教授的鼎力支持和卓越带领下,凭借着先进的VitaFlowLiberty(R)Flex经导管主动脉瓣可回收可...

300 多行代码搞定微信 8.0 的「炸」「裂」特效!

微信8.0更新的一大特色就是支持动画表情,如果发送的消息只有一个内置的表情图标,这个表情会有一段简单的动画,一些特殊的表情还有全屏特效,例如烟花表情有全屏放烟花的特效,炸弹表情有爆炸动画并且消息和...

让div填充屏幕剩余高度的方法(div填充20px)

技术背景在前端开发中,经常会遇到需要让某个div元素填充屏幕剩余高度的需求,比如创建具有固定头部和底部,中间内容区域自适应填充剩余空间的布局。随着CSS技术的发展,有多种方法可以实现这一需求。实现步骤...

css之div内容居中(css中div怎么居中)

div中的内容居中显示,包括水平和垂直2个方向。<html><head><styletype="text/css">...

使用uniapp开发小程序遇到的一些问题及解决方法

1、swiper组件自定义知识点swiper组件的指示点默认是圆圈,想要自己设置指示点,需要获得当前索引,然后赋给当前索引不同的样式,然后在做个动画就可以了。*关键点用change方法,然后通过e.d...

微信小程序主页面排版(怎样设置小程序的排版)

开发小程序的话首先要了解里面的每个文件的作用小程序没有DOM对象,一切基于组件化小程序的四个重要的文件*.js*.wxml--->view结构---->html*.wxss--...

Vue动态组件的实践与原理探究(vue动态组件component原理)

我司有一个工作台搭建产品,允许通过拖拽小部件的方式来搭建一个工作台页面,平台内置了一些常用小部件,另外也允许自行开发小部件上传使用,本文会从实践的角度来介绍其实现原理。ps.本文项目使用VueCLI...

【HarmonyOS Next之旅】兼容JS的类Web开发(四) -> tabs

目录1->创建Tabs2->设置Tabs方向3->设置样式4->显示页签索引5->场景示例编辑1->创建Tabs在pages/index目录...

CSS:前端必会的flex布局,我把布局代码全部展示出来了

进入我的主页,查看更多CSS的分享!首先呢,先去看文档,了解flex是什么,这里不做赘述。当然,可以看下面的代码示例,辅助你理解。一、row将子元素在水平方向进行布局:1.垂直方向靠顶部,水平方向靠...

【HarmonyOS Next之旅】兼容JS的类Web开发(四) -> swiper

目录1->创建Swiper组件2->添加属性3->设置样式4->绑定事件5->场景示例编辑1->创建Swiper组件在pages/index...

CSS:Flex布局,网页排版神器!(css3 flex布局)

还在为网页排版抓狂?别担心,CSS的flex布局来了,让你轻松玩转各种页面布局,实现网页设计自由!什么是Flex布局?Flex布局,也称为弹性布局,是CSS中的一种强大布局方式,它能够让你...

移动WEB开发之flex布局,附携程网首页案例制作

一、flex布局体验传统布局兼容性好布局繁琐局限性,不能再移动端很好的布局1.1flex弹性布局:操作方便,布局极为简单,移动端应用很广泛PC端浏览器支持情况较差IE11或更低版本,不支持或仅部...

2024最新升级–前端内功修炼 5大主流布局系统进阶(mk分享)

2024最新升级–前端内功修炼5大主流布局系统进阶(mk分享)获课》789it.top/14658/前端布局是网页设计中至关重要的一环,它决定了网页的结构和元素的排列方式。随着前端技术的不断发展,现...