百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

30天学会Python编程:12. Python并发编程

zhezhongyun 2025-06-10 04:04 33 浏览

12.1 并发编程基础

12.1.1 并发模型对比

12.1.2 GIL全局解释器锁

关键影响

  • 同一时间只有一个线程执行Python字节码
  • 对CPU密集型任务影响显著
  • I/O密集型任务仍可受益于多线程

12.2 多线程编程

12.2.1 threading模块

基本使用

import threading

def worker(num):
    print(f"Worker {num} 开始执行")
    # 模拟工作
    import time
    time.sleep(1)
    print(f"Worker {num} 执行完成")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

12.2.2 线程同步

锁机制示例

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:  # 自动获取和释放锁
            self.value += 1

counter = Counter()

def increment_worker():
    for _ in range(100000):
        counter.increment()

threads = [threading.Thread(target=increment_worker) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最终计数: {counter.value}")  # 应为200000

12.3 多进程编程

12.3.1 multiprocessing模块

基本使用

from multiprocessing import Process
import os

def task(name):
    print(f"子进程 {name} (PID: {os.getpid()}) 执行中...")
    # CPU密集型计算
    result = sum(i*i for i in range(1000000))
    print(f"子进程 {name} 完成")

if __name__ == '__main__':
    processes = []
    for i in range(4):  # 4核CPU常用
        p = Process(target=task, args=(i,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

12.3.2 进程池

from multiprocessing import Pool

def cpu_intensive(n):
    return sum(i * i for i in range(n))

if __name__ == '__main__':
    with Pool(4) as pool:  # 4个工作进程
        # map方法并行处理
        results = pool.map(cpu_intensive, range(10000, 10010))
        print(results)

12.4 异步编程(asyncio)

12.4.1 协程基础

import asyncio

async def fetch_data(url):
    print(f"开始获取 {url}")
    await asyncio.sleep(2)  # 模拟IO操作
    print(f"完成获取 {url}")
    return f"{url} 的数据"

async def main():
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    
    results = await asyncio.gather(task1, task2)
    print(results)

asyncio.run(main())

12.4.2 异步IO操作

import aiohttp
import asyncio

async def fetch_page(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        "http://example.com",
        "http://example.org",
        "http://example.net"
    ]
    tasks = [fetch_page(url) for url in urls]
    pages = await asyncio.gather(*tasks)
    print(f"获取了 {len(pages)} 个页面")

asyncio.run(main())

12.5 并发工具

12.5.1 concurrent.futures

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request

URLS = ['http://example.com', 'http://example.org']

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {
        executor.submit(load_url, url, 60): url 
        for url in URLS
    }
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url} 页面长度为 {len(data)}")
        except Exception as e:
            print(f"{url} 获取失败: {e}")

12.5.2 队列通信

from queue import Queue
from threading import Thread

def producer(q, items):
    for item in items:
        print(f"生产: {item}")
        q.put(item)
    q.put(None)  # 结束信号

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")

q = Queue()
producer_thread = Thread(target=producer, args=(q, [1,2,3]))
consumer_thread = Thread(target=consumer, args=(q,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

12.6 应用举例

案例1:并发Web爬虫

import aiohttp
import asyncio
from urllib.parse import urljoin
from bs4 import BeautifulSoup

async def crawl(start_url, max_depth=2):
    visited = set()
    queue = [(start_url, 0)]
    
    async with aiohttp.ClientSession() as session:
        while queue:
            url, depth = queue.pop(0)
            if url in visited or depth > max_depth:
                continue
                
            try:
                print(f"抓取: {url}")
                async with session.get(url) as response:
                    html = await response.text()
                    visited.add(url)
                    
                    if depth < max_depth:
                        soup = BeautifulSoup(html, 'html.parser')
                        tasks = []
                        for link in soup.find_all('a', href=True):
                            next_url = urljoin(url, link['href'])
                            if next_url not in visited:
                                queue.append((next_url, depth + 1))
            except Exception as e:
                print(f"抓取失败 {url}: {e}")

asyncio.run(crawl("http://example.com"))

案例2:实时数据处理管道

import threading
import queue
import random
import time

class DataPipeline:
    def __init__(self):
        self.raw_data_queue = queue.Queue()
        self.processed_data = []
        self.lock = threading.Lock()
    
    def data_source(self):
        """模拟数据源"""
        while True:
            data = random.randint(1, 100)
            self.raw_data_queue.put(data)
            time.sleep(0.1)
    
    def data_processor(self):
        """数据处理工作线程"""
        while True:
            data = self.raw_data_queue.get()
            # 模拟处理延迟
            time.sleep(0.2)
            result = data * 2
            
            with self.lock:
                self.processed_data.append(result)
                print(f"处理数据: {data} -> {result} (队列大小: {self.raw_data_queue.qsize()})")
    
    def start(self):
        """启动处理管道"""
        threads = [
            threading.Thread(target=self.data_source, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True)
        ]
        
        for t in threads:
            t.start()
        
        try:
            while True:
                time.sleep(1)
                with self.lock:
                    print(f"当前处理结果数: {len(self.processed_data)}")
        except KeyboardInterrupt:
            print("停止管道")

if __name__ == '__main__':
    pipeline = DataPipeline()
    pipeline.start()

12.7 学习路线图

12.8 学习总结

  1. 核心要点
  • 理解GIL的影响和应对策略
  • 掌握线程同步原语的使用
  • 区分CPU密集和IO密集任务的并发方案
  • 熟悉async/await编程模型
  1. 实践建议
  • IO密集型使用多线程或异步
  • CPU密集型使用多进程
  • 共享数据必须加锁保护
  • 合理控制并发数量
  1. 进阶方向
  • 分布式任务队列(Celery)
  • 基于事件的驱动架构
  • 异步数据库驱动
  • 协程与生成器的深度结合
  1. 常见陷阱
  • 多线程中的竞态条件
  • 忘记释放锁导致的死锁
  • 异步函数中阻塞调用
  • 进程间通信的性能瓶颈

持续更新Python编程学习日志与技巧,敬请关注!


#编程# #学习# #在头条记录我的2025# #python#


相关推荐

Python入门学习记录之一:变量_python怎么用变量

写这个,主要是对自己学习python知识的一个总结,也是加深自己的印象。变量(英文:variable),也叫标识符。在python中,变量的命名规则有以下三点:>变量名只能包含字母、数字和下划线...

python变量命名规则——来自小白的总结

python是一个动态编译类编程语言,所以程序在运行前不需要如C语言的先行编译动作,因此也只有在程序运行过程中才能发现程序的问题。基于此,python的变量就有一定的命名规范。python作为当前热门...

Python入门学习教程:第 2 章 变量与数据类型

2.1什么是变量?在编程中,变量就像一个存放数据的容器,它可以存储各种信息,并且这些信息可以被读取和修改。想象一下,变量就如同我们生活中的盒子,你可以把东西放进去,也可以随时拿出来看看,甚至可以换成...

绘制学术论文中的“三线表”具体指导

在科研过程中,大家用到最多的可能就是“三线表”。“三线表”,一般主要由三条横线构成,当然在变量名栏里也可以拆分单元格,出现更多的线。更重要的是,“三线表”也是一种数据记录规范,以“三线表”形式记录的数...

Python基础语法知识--变量和数据类型

学习Python中的变量和数据类型至关重要,因为它们构成了Python编程的基石。以下是帮助您了解Python中的变量和数据类型的分步指南:1.变量:变量在Python中用于存储数据值。它们充...

一文搞懂 Python 中的所有标点符号

反引号`无任何作用。传说Python3中它被移除是因为和单引号字符'太相似。波浪号~(按位取反符号)~被称为取反或补码运算符。它放在我们想要取反的对象前面。如果放在一个整数n...

Python变量类型和运算符_python中变量的含义

别再被小名词坑哭了:Python新手常犯的那些隐蔽错误,我用同事的真实bug拆给你看我记得有一次和同事张姐一起追查一个看似随机崩溃的脚本,最后发现罪魁祸首竟然是她把变量命名成了list。说实话...

从零开始:深入剖析 Spring Boot3 中配置文件的加载顺序

在当今的互联网软件开发领域,SpringBoot无疑是最为热门和广泛应用的框架之一。它以其强大的功能、便捷的开发体验,极大地提升了开发效率,成为众多开发者构建Web应用程序的首选。而在Spr...

Python中下划线 ‘_’ 的用法,你知道几种

Python中下划线()是一个有特殊含义和用途的符号,它可以用来表示以下几种情况:1在解释器中,下划线(_)表示上一个表达式的值,可以用来进行快速计算或测试。例如:>>>2+...

解锁Shell编程:变量_shell $变量

引言:开启Shell编程大门Shell作为用户与Linux内核之间的桥梁,为我们提供了强大的命令行交互方式。它不仅能执行简单的文件操作、进程管理,还能通过编写脚本实现复杂的自动化任务。无论是...

一文学会Python的变量命名规则!_python的变量命名有哪些要求

目录1.变量的命名原则3.内置函数尽量不要做变量4.删除变量和垃圾回收机制5.结语1.变量的命名原则①由英文字母、_(下划线)、或中文开头②变量名称只能由英文字母、数字、下画线或中文字所组成。③英文字...

更可靠的Rust-语法篇-区分语句/表达式,略览if/loop/while/for

src/main.rs://函数定义fnadd(a:i32,b:i32)->i32{a+b//末尾表达式}fnmain(){leta:i3...

C++第五课:变量的命名规则_c++中变量的命名规则

变量的命名不是想怎么起就怎么起的,而是有一套固定的规则的。具体规则:1.名字要合法:变量名必须是由字母、数字或下划线组成。例如:a,a1,a_1。2.开头不能是数字。例如:可以a1,但不能起1a。3....

Rust编程-核心篇-不安全编程_rust安全性

Unsafe的必要性Rust的所有权系统和类型系统为我们提供了强大的安全保障,但在某些情况下,我们需要突破这些限制来:与C代码交互实现底层系统编程优化性能关键代码实现某些编译器无法验证的安全操作Rus...

探秘 Python 内存管理:背后的神奇机制

在编程的世界里,内存管理就如同幕后的精密操控者,确保程序的高效运行。Python作为一种广泛使用的编程语言,其内存管理机制既巧妙又复杂,为开发者们提供了便利的同时,也展现了强大的底层控制能力。一、P...