百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

多线程实现消息推送并可重试3次以及1小时后重试

zhezhongyun 2025-05-05 20:10 2 浏览

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025

@author: 1
"""

import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql
from concurrent.futures import ThreadPoolExecutor
import logging
from queue import Queue
import os
from pathlib import Path

# 获取虚拟环境目录
venv_dir = os.environ.get('VIRTUAL_ENV', None)
if venv_dir:
    log_path = Path(venv_dir) / "app.log"
else:
    log_path = Path.home() / "app.log"

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler(log_path),
        logging.StreamHandler()
    ]
)

# 信息等级定义
INFO_LEVELS = {
    '紧急': {'retry_interval': 60, 'max_retries': 3},
    '重要': {'retry_interval': 60, 'max_retries': 3},
    '一般': {'retry_interval': 60, 'max_retries': 3}
}

# 数据库连接配置
DB_CONFIG = {
    'host': '************',
    'user': 'root',
    'password': '*******',
    'database': '***********',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

# 创建线程池
executor = ThreadPoolExecutor(max_workers=20)

class MessageSender:
    def __init__(self):
        self.message_queue = Queue()
        self.lock = threading.RLock()  # 可重入锁
        self.scheduled_messages = {}
        
    def send_message(self, group, message, info_level, attempt=1):
        """发送消息到企业微信机器人"""
        headers = {'Content-Type': 'application/json'}
        payload = {
            "msgtype": "text",
            "text": {
                "content": message
            }
        }
        
        try:
            response = requests.post(group['robot_webhook'], json=payload, headers=headers, timeout=10)
            if response.status_code == 200 and response.json().get('errcode') == 0:
                logging.info(f"成功发送到群 {group['group_name']}")
                return True
            else:
                logging.warning(f"发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
                return False
        except Exception as e:
            logging.error(f"发送到群 {group['group_name']} 异常: {e}")
            return False

    def retry_send(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
        """独立的重试逻辑,不影响其他消息"""
        max_retries = INFO_LEVELS[info_level]['max_retries']
        retry_interval = INFO_LEVELS[info_level]['retry_interval']
        final_retry_delay = 3600  # 1小时(秒)
        
        success = self.send_message(group, message, info_level, attempt)
        
        if not success:
            if attempt <= max_retries:
                # 前3次快速重试
                logging.info(f"将在 {retry_interval} 秒后重试 (尝试 {attempt}/{max_retries})")
                time.sleep(retry_interval)
                self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt + 1)
            else:
                # 3次失败后,1小时后再试
                logging.warning(f"3次重试失败,将在1小时后再次尝试")
                time.sleep(final_retry_delay)
                # 重置尝试次数
                self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt=1)

    def send_to_superior_groups(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
        """向上级群组发送消息,每个上级群组独立处理"""
        current_group = group
        while current_group['parent_group_id'] is not None:
            print('retry5')
            superior = self.find_group_by_id(current_group['parent_group_id'])
            if superior:
                # 为每个上级群组创建独立线程
                print('retry6')
                executor.submit(self.retry_send, superior, message, info_level, upno, sendplan, plantime)
                if attempt == 2:
                    break
                print('retry7')
                attempt = attempt + 1
                if upno == 1:
                    break
                print('retry8')
                current_group = superior
            else:
                break

    def find_group_by_id(self, group_id):
        """根据ID查找群组信息"""
        try:
            connection = pymysql.connect(**DB_CONFIG)  # 不加锁
            with connection.cursor() as cursor:
                with self.lock:  # 只锁住查询部分
                    cursor.execute(
                        "SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE id = %s",
                        (group_id,)
                    )
                    return cursor.fetchone()
        except Exception as e:
            logging.error(f"查找群组时出错: {e}")
            return None
        finally:
            if 'connection' in locals() and connection:
                connection.close()  # 确保连接关闭

    def find_group_by_name(self, group_name):
        """根据名称查找群组信息"""
        try:
            with self.lock:
                connection = pymysql.connect(**DB_CONFIG)
                with connection.cursor() as cursor:
                    sql = "SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE group_name = %s"
                    cursor.execute(sql, (group_name,))
                    print('按群名查找,',group_name)
                    result = cursor.fetchone()
                    print('执行命令,')
                    print(f'查询结果: {result}')
                    print(1111)
                    if not result:
                        logging.warning(f"未找到群组: {group_name}")
                    return result
        except Exception as e:
            print(f"查找群组时出错: {e}")
            logging.error(f"查找群组时出错: {e}")

            return None
        finally:
            if 'connection' in locals() and connection:
                connection.close()

    def schedule_message(self, row):
        """根据plantime安排消息发送"""
        try:
            plantime = row['plantime']
            sendplan = row['sendplan'] # 1即时发送 2 定时发送
            print('sendplan,',sendplan)
            target_group2 = row['role']
            print('目前消息群,',target_group2)
            if sendplan == 1:
                print('如果sendplan=1,立即发送')
                # 如果sendplan=1,立即发送
                self.process_message(row)
                return
            print('如果sendplan不是1,定时发送')
            # 将plantime转换为datetime对象
            if isinstance(plantime, str):
                send_time = datetime.strptime(plantime, '%Y-%m-%d %H:%M:%S')
            else:
                send_time = plantime
            
            now = datetime.now()
            if send_time <= now:
                # 如果发送时间已过,立即发送
                print('如果发送时间已过,立即发送,',target_group2)
                target_group = self.find_group_by_name(target_group2)
                print('hhh')
                if not target_group:
                    print('jjj')
                    logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
                    return  # 直接返回,不继续执行

                self.process_message(row)
            else:
                # 计算延迟时间(秒)
                delay = (send_time - now).total_seconds()
                
                # 为消息创建定时任务
                message_id = row['id']
                print('为消息创建定时任务,',message_id)
                if message_id not in self.scheduled_messages:
                    timer = threading.Timer(delay, self.process_message, args=(row,))
                    timer.start()
                    print('已安排')
                    self.scheduled_messages[message_id] = timer
                    logging.info(f"已安排消息 {message_id} 在 {send_time} 发送")
                
        except Exception as e:
            logging.error(f"安排消息发送时出错: {e}")

    def process_message(self, row):
        """处理单条消息"""
        try:
            message = f"通知: {row['result']}"
            target_group = self.find_group_by_name(row['role'])
            
            if not target_group:
                logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
                return  # 直接返回,不继续执行
            
            upno = row['upno']
            sendplan = row['sendplan']
            plantime = row['plantime']
            
            # 主发送
            self.retry_send(target_group, message, row['levelname'], upno, sendplan, plantime)
            
            # 向上级发送
            self.send_to_superior_groups(target_group, message, row['levelname'], upno, sendplan, plantime)
            
            # 更新数据库状态
            self.update_message_status(row['id'])
            
            # 从预定消息中移除
            if row['id'] in self.scheduled_messages:
                del self.scheduled_messages[row['id']]
                
        except Exception as e:
            logging.error(f" 处理消息时出错: {e}")

    def update_message_status(self, message_id):
        """更新消息状态"""
        try:
            with self.lock:
                connection = pymysql.connect(**DB_CONFIG)
                with connection.cursor() as cursor:
                    update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
                    cursor.execute(update_sql, (message_id,))
                    connection.commit()
        except Exception as e:
            logging.error(f"更新消息状态时出错: {e}")
        finally:
            if 'connection' in locals() and connection:
                connection.close()

    def check_and_schedule_messages(self):
        """检查并安排消息发送"""
        try:
            with self.lock:
                connection = pymysql.connect(**DB_CONFIG)
                with connection.cursor() as cursor:
                    sql = """SELECT id, sqlcmd, result, createdate, role, details, levelname, upno, sendplan, plantime 
                            FROM ai_dayairesult 
                            WHERE isend = 0"""
                    cursor.execute(sql)
                    results = cursor.fetchall()
                    
                    for row in results:
                        try:
                            self.schedule_message(row)  # 即使某条失败,继续下一条
                        except Exception as e:
                            logging.error(f" 安排消息 {row['id']} 时出错: {e}")
                            
        except Exception as e:
            logging.error(f" 检查并安排消息时出错: {e}")
        finally:
            if 'connection' in locals() and connection:
                connection.close()

def run_scheduler(sender):
    """运行定时任务"""
    # 每分钟检查一次待发送消息
    schedule.every(1).minutes.do(sender.check_and_schedule_messages)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

if __name__ == "__main__":
    sender = MessageSender()

    # 启动调度器线程
    scheduler_thread = threading.Thread(target=run_scheduler, args=(sender,))
    scheduler_thread.daemon = True
    scheduler_thread.start()
    
    # 立即执行一次检查
    sender.check_and_schedule_messages()
    
    # 主线程保持运行
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logging.info("程序退出")
        # 取消所有预定但未发送的消息
        for timer in sender.scheduled_messages.values():
            timer.cancel()

相关推荐

前端面试:聊聊 meta 标签?(meta标签用法)

提供给页面的一些元信息(名称/值对),有助于SEO。Meta标签是HTML中用于定义文档类型声明的标签。它们通常被用在head标签中,与文档的body标签相关联。在一个有head...

web网页性能分析系列(网页性能指标有哪些)

在前端开发中,App或者WebPage性能的好坏和响应速度,尤其是App端显得格外重要,一直都是前端很头疼的问题。专业的测试工具可以知道自己的网页还有哪些需要优化的地方,总体的评分是多少,是否合乎用...

[抓狂瞬间] 5 大差异 + 布局绝招!前端元素面试通关秘籍

刚入行的搬砖工程师,面对面试必问题“行内元素和块级元素有啥区别”,是不是心跳加速,大脑却一片空白?别担心!这看似基础的问题,实则是面试逆袭的突破口。今天就带你深入剖析,用5个关键差异,解锁前端布...

python中Django视图(view)的详解(附示例)

本篇文章给大家带来的内容是关于python中Django视图(view)的详解(附示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。一个视图函数(类),简称视图,是一个简单的Pyt...

MySQL进行整库数据备份「表(结构+数据)、视图、函数、事件」

  前言  通常情况下,我们需要改什么地方就备份什么地方就可以了,但也免不了需要整库备份的时候,本文记录实现MySQL使用脚本进行整库数据备份【表(结构+数据)、视图、函数、事件】  主要是使用mys...

python入门-day14-周末小项目(python周末培训班哪个好)

周末小项目-简易记事本的内容。这是一个综合练习,结合之前学过的函数、文件操作和异常处理,设计一个简单的命令行记事本程序。我会用清晰的步骤和代码带你实现添加、查看、删除笔记并保存到文件的功能,确保...

Python必会的50个代码操作(python代码介绍)

学习Python时,掌握一些常用的程序操作非常重要。以下是50个Python必会的程序操作,主要包括基础语法、数据结构、函数和文件操作等。1.HelloWorldprint("Hello,...

ScalersTalk成长会Python小组第7周学习笔记

Scalers点评:在2015年,ScalersTalk成长会完成Python小组完成了《Python核心编程》第1轮的学习。到2016年,我们开始第二轮的学习,并且将重点放在章节的习题上。Pytho...

电脑CMD命令与电脑工作效率提升(cmd使用提升命令)

在日常使用电脑同时按“windows+R”,可以弹出来一个框然后输入CMD弹出以下框:在这个框中输入一些内容可以简化一些我们的电脑操作好的,以下是一些最常用的CMD命令,按功能分类整理,适合日常使用和...

多线程实现消息推送并可重试3次以及1小时后重试

#-*-coding:utf-8-*-"""CreatedonTueApr2209:05:462025@author:1""&#...

RBAC权限模型(rbac权限模型的优点)

RBAC权限模型RBAC权限模型(Role-BasedAccessControl)即:基于角色的权限控制。这是目前最常被开发者使用也是相对易用、通用权限模型。准备工作CREATETABLE`s...

如何使用PIL生成验证码?(pixivic验证码)

web项目中遇到使用验证码的情况有很多,进行介绍下使用PIL生成验证码的方法。安装开始安装PIL的过程确实麻烦各种问题层出不绝,不过不断深入后就没有这方面的困扰了:windows安装:直接安装Pil...

技术是这样应用的(一)(技术的运用)

WindowsServer2003路由与远程访问在计算机网络教室使用过程中网络访问控制的实现目前很多计算机网络教室采用双网卡服务器的路由和远程访问功能,通过NAT地址转换实现教室内学生机的互联网访...

Flask-RESTful 用法指南(flask写restful接口)

Flask-RESTful是一个Flask扩展,用于快速构建RESTfulAPI。它提供了简单的语法来创建资源路由,并内置了请求解析和响应格式化功能。##安装首先安装Flask-REST...

Python办公自动化系列篇之三:PowerPoint演示文稿(.pptx)

作为高效办公自动化领域的主流编程语言,Python凭借其优雅的语法结构、完善的技术生态及成熟的第三方工具库集合,已成为企业数字化转型过程中提升运营效率的理想选择。该语言在结构化数据处理、自动化文档生成...