百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

多线程实现消息推送并可重试3次以及1小时后重试

zhezhongyun 2025-05-05 20:10 26 浏览

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025

@author: 1
"""

import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql
from concurrent.futures import ThreadPoolExecutor
import logging
from queue import Queue
import os
from pathlib import Path

# 获取虚拟环境目录
venv_dir = os.environ.get('VIRTUAL_ENV', None)
if venv_dir:
    log_path = Path(venv_dir) / "app.log"
else:
    log_path = Path.home() / "app.log"

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler(log_path),
        logging.StreamHandler()
    ]
)

# 信息等级定义
INFO_LEVELS = {
    '紧急': {'retry_interval': 60, 'max_retries': 3},
    '重要': {'retry_interval': 60, 'max_retries': 3},
    '一般': {'retry_interval': 60, 'max_retries': 3}
}

# 数据库连接配置
DB_CONFIG = {
    'host': '************',
    'user': 'root',
    'password': '*******',
    'database': '***********',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

# 创建线程池
executor = ThreadPoolExecutor(max_workers=20)

class MessageSender:
    def __init__(self):
        self.message_queue = Queue()
        self.lock = threading.RLock()  # 可重入锁
        self.scheduled_messages = {}
        
    def send_message(self, group, message, info_level, attempt=1):
        """发送消息到企业微信机器人"""
        headers = {'Content-Type': 'application/json'}
        payload = {
            "msgtype": "text",
            "text": {
                "content": message
            }
        }
        
        try:
            response = requests.post(group['robot_webhook'], json=payload, headers=headers, timeout=10)
            if response.status_code == 200 and response.json().get('errcode') == 0:
                logging.info(f"成功发送到群 {group['group_name']}")
                return True
            else:
                logging.warning(f"发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
                return False
        except Exception as e:
            logging.error(f"发送到群 {group['group_name']} 异常: {e}")
            return False

    def retry_send(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
        """独立的重试逻辑,不影响其他消息"""
        max_retries = INFO_LEVELS[info_level]['max_retries']
        retry_interval = INFO_LEVELS[info_level]['retry_interval']
        final_retry_delay = 3600  # 1小时(秒)
        
        success = self.send_message(group, message, info_level, attempt)
        
        if not success:
            if attempt <= max_retries:
                # 前3次快速重试
                logging.info(f"将在 {retry_interval} 秒后重试 (尝试 {attempt}/{max_retries})")
                time.sleep(retry_interval)
                self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt + 1)
            else:
                # 3次失败后,1小时后再试
                logging.warning(f"3次重试失败,将在1小时后再次尝试")
                time.sleep(final_retry_delay)
                # 重置尝试次数
                self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt=1)

    def send_to_superior_groups(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
        """向上级群组发送消息,每个上级群组独立处理"""
        current_group = group
        while current_group['parent_group_id'] is not None:
            print('retry5')
            superior = self.find_group_by_id(current_group['parent_group_id'])
            if superior:
                # 为每个上级群组创建独立线程
                print('retry6')
                executor.submit(self.retry_send, superior, message, info_level, upno, sendplan, plantime)
                if attempt == 2:
                    break
                print('retry7')
                attempt = attempt + 1
                if upno == 1:
                    break
                print('retry8')
                current_group = superior
            else:
                break

    def find_group_by_id(self, group_id):
        """根据ID查找群组信息"""
        try:
            connection = pymysql.connect(**DB_CONFIG)  # 不加锁
            with connection.cursor() as cursor:
                with self.lock:  # 只锁住查询部分
                    cursor.execute(
                        "SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE id = %s",
                        (group_id,)
                    )
                    return cursor.fetchone()
        except Exception as e:
            logging.error(f"查找群组时出错: {e}")
            return None
        finally:
            if 'connection' in locals() and connection:
                connection.close()  # 确保连接关闭

    def find_group_by_name(self, group_name):
        """根据名称查找群组信息"""
        try:
            with self.lock:
                connection = pymysql.connect(**DB_CONFIG)
                with connection.cursor() as cursor:
                    sql = "SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE group_name = %s"
                    cursor.execute(sql, (group_name,))
                    print('按群名查找,',group_name)
                    result = cursor.fetchone()
                    print('执行命令,')
                    print(f'查询结果: {result}')
                    print(1111)
                    if not result:
                        logging.warning(f"未找到群组: {group_name}")
                    return result
        except Exception as e:
            print(f"查找群组时出错: {e}")
            logging.error(f"查找群组时出错: {e}")

            return None
        finally:
            if 'connection' in locals() and connection:
                connection.close()

    def schedule_message(self, row):
        """根据plantime安排消息发送"""
        try:
            plantime = row['plantime']
            sendplan = row['sendplan'] # 1即时发送 2 定时发送
            print('sendplan,',sendplan)
            target_group2 = row['role']
            print('目前消息群,',target_group2)
            if sendplan == 1:
                print('如果sendplan=1,立即发送')
                # 如果sendplan=1,立即发送
                self.process_message(row)
                return
            print('如果sendplan不是1,定时发送')
            # 将plantime转换为datetime对象
            if isinstance(plantime, str):
                send_time = datetime.strptime(plantime, '%Y-%m-%d %H:%M:%S')
            else:
                send_time = plantime
            
            now = datetime.now()
            if send_time <= now:
                # 如果发送时间已过,立即发送
                print('如果发送时间已过,立即发送,',target_group2)
                target_group = self.find_group_by_name(target_group2)
                print('hhh')
                if not target_group:
                    print('jjj')
                    logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
                    return  # 直接返回,不继续执行

                self.process_message(row)
            else:
                # 计算延迟时间(秒)
                delay = (send_time - now).total_seconds()
                
                # 为消息创建定时任务
                message_id = row['id']
                print('为消息创建定时任务,',message_id)
                if message_id not in self.scheduled_messages:
                    timer = threading.Timer(delay, self.process_message, args=(row,))
                    timer.start()
                    print('已安排')
                    self.scheduled_messages[message_id] = timer
                    logging.info(f"已安排消息 {message_id} 在 {send_time} 发送")
                
        except Exception as e:
            logging.error(f"安排消息发送时出错: {e}")

    def process_message(self, row):
        """处理单条消息"""
        try:
            message = f"通知: {row['result']}"
            target_group = self.find_group_by_name(row['role'])
            
            if not target_group:
                logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
                return  # 直接返回,不继续执行
            
            upno = row['upno']
            sendplan = row['sendplan']
            plantime = row['plantime']
            
            # 主发送
            self.retry_send(target_group, message, row['levelname'], upno, sendplan, plantime)
            
            # 向上级发送
            self.send_to_superior_groups(target_group, message, row['levelname'], upno, sendplan, plantime)
            
            # 更新数据库状态
            self.update_message_status(row['id'])
            
            # 从预定消息中移除
            if row['id'] in self.scheduled_messages:
                del self.scheduled_messages[row['id']]
                
        except Exception as e:
            logging.error(f" 处理消息时出错: {e}")

    def update_message_status(self, message_id):
        """更新消息状态"""
        try:
            with self.lock:
                connection = pymysql.connect(**DB_CONFIG)
                with connection.cursor() as cursor:
                    update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
                    cursor.execute(update_sql, (message_id,))
                    connection.commit()
        except Exception as e:
            logging.error(f"更新消息状态时出错: {e}")
        finally:
            if 'connection' in locals() and connection:
                connection.close()

    def check_and_schedule_messages(self):
        """检查并安排消息发送"""
        try:
            with self.lock:
                connection = pymysql.connect(**DB_CONFIG)
                with connection.cursor() as cursor:
                    sql = """SELECT id, sqlcmd, result, createdate, role, details, levelname, upno, sendplan, plantime 
                            FROM ai_dayairesult 
                            WHERE isend = 0"""
                    cursor.execute(sql)
                    results = cursor.fetchall()
                    
                    for row in results:
                        try:
                            self.schedule_message(row)  # 即使某条失败,继续下一条
                        except Exception as e:
                            logging.error(f" 安排消息 {row['id']} 时出错: {e}")
                            
        except Exception as e:
            logging.error(f" 检查并安排消息时出错: {e}")
        finally:
            if 'connection' in locals() and connection:
                connection.close()

def run_scheduler(sender):
    """运行定时任务"""
    # 每分钟检查一次待发送消息
    schedule.every(1).minutes.do(sender.check_and_schedule_messages)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

if __name__ == "__main__":
    sender = MessageSender()

    # 启动调度器线程
    scheduler_thread = threading.Thread(target=run_scheduler, args=(sender,))
    scheduler_thread.daemon = True
    scheduler_thread.start()
    
    # 立即执行一次检查
    sender.check_and_schedule_messages()
    
    # 主线程保持运行
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logging.info("程序退出")
        # 取消所有预定但未发送的消息
        for timer in sender.scheduled_messages.values():
            timer.cancel()

相关推荐

一篇文章带你了解SVG 渐变知识(svg动画效果)

渐变是一种从一种颜色到另一种颜色的平滑过渡。另外,可以把多个颜色的过渡应用到同一个元素上。SVG渐变主要有两种类型:(Linear,Radial)。一、SVG线性渐变<linearGradie...

Vue3 实战指南:15 个高效组件开发技巧解析

Vue.js作为一款流行的JavaScript框架,在前端开发领域占据着重要地位。Vue3的发布,更是带来了诸多令人兴奋的新特性和改进,让开发者能够更高效地构建应用程序。今天,我们就来深入探讨...

CSS渲染性能优化(低阻抗喷油器阻值一般为多少欧)

在当今快节奏的互联网环境中,网页加载速度直接影响用户体验和业务转化率。页面加载时间每增加100毫秒,就会导致显著的流量和收入损失。作为前端开发的重要组成部分,CSS的渲染性能优化不容忽视。为什么CSS...

前端面试题-Vue 项目中,你做过哪些性能优化?

在Vue项目中,以下是我在生产环境中实践过且用户反馈较好的性能优化方案,整理为分类要点:一、代码层面优化1.代码分割与懒加载路由懒加载:使用`()=>import()`动态导入组件,结...

如何通过JavaScript判断Web页面按钮是否置灰?

在JavaScript语言中判断Web页面按钮是否置灰(禁用状态),可以通过以下几种方式实现,其具体情形取决于按钮的禁用方式(原生disabled属性或CSS样式控制):一、检查原生dis...

「图片显示移植-1」 尝试用opengl/GLFW显示图片

GLFW【https://www.glfw.org】调用了opengl来做图形的显示。我最近需要用opengl来显示图像,不能使用opencv等库。看了一个glfw的官网,里面有github:http...

大模型实战:Flask+H5三件套实现大模型基础聊天界面

本文使用Flask和H5三件套(HTML+JS+CSS)实现大模型聊天应用的基本方式话不多说,先贴上实现效果:流式输出:思考输出:聊天界面模型设置:模型设置会话切换:前言大模型的聊天应用从功能...

ae基础知识(二)(ae必学知识)

hi,大家好,我今天要给大家继续分享的还是ae的基础知识,今天主要分享的就是关于ae的路径文字制作步骤(时间关系没有截图)、动态文字的制作知识、以及ae特效的扭曲的一些基本操作。最后再次复习一下ae的...

YSLOW性能测试前端调优23大规则(二十一)---避免过滤器

AlphalmageLoader过滤器是IE浏览器专有的一个关于图片的属性,主要是为了解决半透明真彩色的PNG显示问题。AlphalmageLoader的语法如下:filter:progid:DX...

Chrome浏览器的渲染流程详解(chrome预览)

我们来详细介绍一下浏览器的**渲染流程**。渲染流程是浏览器将从网络获取到的HTML、CSS和JavaScript文件,最终转化为用户屏幕上可见的、可交互的像素画面的过程。它是一个复杂但高度优...

在 WordPress 中如何设置背景色透明度?

最近开始写一些WordPress专业的知识,阅读数奇低,然后我发一些微信昵称技巧,又说我天天发这些小学生爱玩的玩意,写点文章真不容易。那我两天发点专业的东西,两天发点小学生的东西,剩下三天我看着办...

manim 数学动画之旅--图形样式(数学图形绘制)

manim绘制图形时,除了上一节提到的那些必需的参数,还有一些可选的参数,这些参数可以控制图形显示的样式。绘制各类基本图形(点,线,圆,多边形等)时,每个图形都有自己的默认的样式,比如上一节的图形,...

Web页面如此耗电!到了某种程度,会是大损失

现在用户上网大多使用移动设备或者笔记本电脑。对这两者来说,电池寿命都很重要。在这篇文章里,我们将讨论影响电池寿命的因素,以及作为一个web开发者,我们如何让网页耗电更少,以便用户有更多时间来关注我们的...

11.mxGraph的mxCell和Styles样式(graph style)

3.1.3mxCell[翻译]mxCell是顶点和边的单元对象。mxCell复制了模型中可用的许多功能。使用上的关键区别是,使用模型方法会创建适当的事件通知和撤销,而使用单元进行更改时没有更改记...

按钮重复点击:这“简单”问题,为何难住大半面试者与开发者?

在前端开发中,按钮重复点击是一个看似不起眼,实则非常普遍且容易引发线上事故的问题。想象一下:提交表单时,因为网络卡顿或手抖,重复点击导致后端创建了多条冗余数据…这些场景不仅影响用户体验,更可能造成实...