多线程实现消息推送并可重试3次以及1小时后重试
zhezhongyun 2025-05-05 20:10 39 浏览
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025
@author: 1
"""
import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql
from concurrent.futures import ThreadPoolExecutor
import logging
from queue import Queue
import os
from pathlib import Path
# 获取虚拟环境目录
venv_dir = os.environ.get('VIRTUAL_ENV', None)
if venv_dir:
log_path = Path(venv_dir) / "app.log"
else:
log_path = Path.home() / "app.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
# 信息等级定义
INFO_LEVELS = {
'紧急': {'retry_interval': 60, 'max_retries': 3},
'重要': {'retry_interval': 60, 'max_retries': 3},
'一般': {'retry_interval': 60, 'max_retries': 3}
}
# 数据库连接配置
DB_CONFIG = {
'host': '************',
'user': 'root',
'password': '*******',
'database': '***********',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 创建线程池
executor = ThreadPoolExecutor(max_workers=20)
class MessageSender:
def __init__(self):
self.message_queue = Queue()
self.lock = threading.RLock() # 可重入锁
self.scheduled_messages = {}
def send_message(self, group, message, info_level, attempt=1):
"""发送消息到企业微信机器人"""
headers = {'Content-Type': 'application/json'}
payload = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
response = requests.post(group['robot_webhook'], json=payload, headers=headers, timeout=10)
if response.status_code == 200 and response.json().get('errcode') == 0:
logging.info(f"成功发送到群 {group['group_name']}")
return True
else:
logging.warning(f"发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
return False
except Exception as e:
logging.error(f"发送到群 {group['group_name']} 异常: {e}")
return False
def retry_send(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
"""独立的重试逻辑,不影响其他消息"""
max_retries = INFO_LEVELS[info_level]['max_retries']
retry_interval = INFO_LEVELS[info_level]['retry_interval']
final_retry_delay = 3600 # 1小时(秒)
success = self.send_message(group, message, info_level, attempt)
if not success:
if attempt <= max_retries:
# 前3次快速重试
logging.info(f"将在 {retry_interval} 秒后重试 (尝试 {attempt}/{max_retries})")
time.sleep(retry_interval)
self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt + 1)
else:
# 3次失败后,1小时后再试
logging.warning(f"3次重试失败,将在1小时后再次尝试")
time.sleep(final_retry_delay)
# 重置尝试次数
self.retry_send(group, message, info_level, upno, sendplan, plantime, attempt=1)
def send_to_superior_groups(self, group, message, info_level, upno, sendplan, plantime, attempt=1):
"""向上级群组发送消息,每个上级群组独立处理"""
current_group = group
while current_group['parent_group_id'] is not None:
print('retry5')
superior = self.find_group_by_id(current_group['parent_group_id'])
if superior:
# 为每个上级群组创建独立线程
print('retry6')
executor.submit(self.retry_send, superior, message, info_level, upno, sendplan, plantime)
if attempt == 2:
break
print('retry7')
attempt = attempt + 1
if upno == 1:
break
print('retry8')
current_group = superior
else:
break
def find_group_by_id(self, group_id):
"""根据ID查找群组信息"""
try:
connection = pymysql.connect(**DB_CONFIG) # 不加锁
with connection.cursor() as cursor:
with self.lock: # 只锁住查询部分
cursor.execute(
"SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE id = %s",
(group_id,)
)
return cursor.fetchone()
except Exception as e:
logging.error(f"查找群组时出错: {e}")
return None
finally:
if 'connection' in locals() and connection:
connection.close() # 确保连接关闭
def find_group_by_name(self, group_name):
"""根据名称查找群组信息"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = "SELECT id, group_name, level, robot_webhook, parent_group_id FROM wechat_groups WHERE group_name = %s"
cursor.execute(sql, (group_name,))
print('按群名查找,',group_name)
result = cursor.fetchone()
print('执行命令,')
print(f'查询结果: {result}')
print(1111)
if not result:
logging.warning(f"未找到群组: {group_name}")
return result
except Exception as e:
print(f"查找群组时出错: {e}")
logging.error(f"查找群组时出错: {e}")
return None
finally:
if 'connection' in locals() and connection:
connection.close()
def schedule_message(self, row):
"""根据plantime安排消息发送"""
try:
plantime = row['plantime']
sendplan = row['sendplan'] # 1即时发送 2 定时发送
print('sendplan,',sendplan)
target_group2 = row['role']
print('目前消息群,',target_group2)
if sendplan == 1:
print('如果sendplan=1,立即发送')
# 如果sendplan=1,立即发送
self.process_message(row)
return
print('如果sendplan不是1,定时发送')
# 将plantime转换为datetime对象
if isinstance(plantime, str):
send_time = datetime.strptime(plantime, '%Y-%m-%d %H:%M:%S')
else:
send_time = plantime
now = datetime.now()
if send_time <= now:
# 如果发送时间已过,立即发送
print('如果发送时间已过,立即发送,',target_group2)
target_group = self.find_group_by_name(target_group2)
print('hhh')
if not target_group:
print('jjj')
logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
return # 直接返回,不继续执行
self.process_message(row)
else:
# 计算延迟时间(秒)
delay = (send_time - now).total_seconds()
# 为消息创建定时任务
message_id = row['id']
print('为消息创建定时任务,',message_id)
if message_id not in self.scheduled_messages:
timer = threading.Timer(delay, self.process_message, args=(row,))
timer.start()
print('已安排')
self.scheduled_messages[message_id] = timer
logging.info(f"已安排消息 {message_id} 在 {send_time} 发送")
except Exception as e:
logging.error(f"安排消息发送时出错: {e}")
def process_message(self, row):
"""处理单条消息"""
try:
message = f"通知: {row['result']}"
target_group = self.find_group_by_name(row['role'])
if not target_group:
logging.warning(f" 未找到群组 '{row['role']}',跳过此消息")
return # 直接返回,不继续执行
upno = row['upno']
sendplan = row['sendplan']
plantime = row['plantime']
# 主发送
self.retry_send(target_group, message, row['levelname'], upno, sendplan, plantime)
# 向上级发送
self.send_to_superior_groups(target_group, message, row['levelname'], upno, sendplan, plantime)
# 更新数据库状态
self.update_message_status(row['id'])
# 从预定消息中移除
if row['id'] in self.scheduled_messages:
del self.scheduled_messages[row['id']]
except Exception as e:
logging.error(f" 处理消息时出错: {e}")
def update_message_status(self, message_id):
"""更新消息状态"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
cursor.execute(update_sql, (message_id,))
connection.commit()
except Exception as e:
logging.error(f"更新消息状态时出错: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
def check_and_schedule_messages(self):
"""检查并安排消息发送"""
try:
with self.lock:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = """SELECT id, sqlcmd, result, createdate, role, details, levelname, upno, sendplan, plantime
FROM ai_dayairesult
WHERE isend = 0"""
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
try:
self.schedule_message(row) # 即使某条失败,继续下一条
except Exception as e:
logging.error(f" 安排消息 {row['id']} 时出错: {e}")
except Exception as e:
logging.error(f" 检查并安排消息时出错: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
def run_scheduler(sender):
"""运行定时任务"""
# 每分钟检查一次待发送消息
schedule.every(1).minutes.do(sender.check_and_schedule_messages)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
sender = MessageSender()
# 启动调度器线程
scheduler_thread = threading.Thread(target=run_scheduler, args=(sender,))
scheduler_thread.daemon = True
scheduler_thread.start()
# 立即执行一次检查
sender.check_and_schedule_messages()
# 主线程保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("程序退出")
# 取消所有预定但未发送的消息
for timer in sender.scheduled_messages.values():
timer.cancel()
相关推荐
- perl基础——循环控制_principle循环
-
在编程中,我们往往需要进行不同情况的判断,选择,重复操作。这些时候我们需要对简单语句来添加循环控制变量或者命令。if/unless我们需要在满足特定条件下再执行的语句,可以通过if/unle...
- CHAPTER 2 The Antechamber of M de Treville 第二章 特雷维尔先生的前厅
-
CHAPTER1TheThreePresentsofD'ArtagnantheElderCHAPTER2TheAntechamber...
- CHAPTER 5 The King'S Musketeers and the Cardinal'S Guards 第五章 国王的火枪手和红衣主教的卫士
-
CHAPTER3TheAudienceCHAPTER5TheKing'SMusketeersandtheCardinal'SGuard...
- CHAPTER 3 The Audience 第三章 接见
-
CHAPTER3TheAudienceCHAPTER3TheAudience第三章接见M.DeTrévillewasatt...
- 别搞印象流!数据说明谁才是外线防守第一人!
-
来源:Reddit译者:@assholeeric编辑:伯伦WhoarethebestperimeterdefendersintheNBA?Here'sagraphofStea...
- V-Day commemorations prove anti-China claims hollow
-
People'sLiberationArmyhonorguardstakepartinthemilitaryparademarkingthe80thanniversary...
- EasyPoi使用_easypoi api
-
EasyPoi的主要特点:1.设计精巧,使用简单2.接口丰富,扩展简单3.默认值多,writelessdomore4.springmvc支持,web导出可以简单明了使用1.easypoi...
- 关于Oracle数据库12c 新特性总结_oracle数据库12514
-
概述今天主要简单介绍一下Oracle12c的一些新特性,仅供参考。参考:http://docs.oracle.com/database/121/NEWFT/chapter12102.htm#NEWFT...
- 【开发者成长】JAVA 线上故障排查完整套路!
-
线上故障主要会包括CPU、磁盘、内存以及网络问题,而大多数故障可能会包含不止一个层面的问题,所以进行排查时候尽量四个方面依次排查一遍。同时例如jstack、jmap等工具也是不囿于一个方面的问题...
- 使用 Python 向多个地址发送电子邮件
-
在本文中,我们将演示如何使用Python编程语言向使用不同电子邮件地址的不同收件人发送电子邮件。具体来说,我们将向许多不同的人发送电子邮件。使用Python向多个地址发送电子邮件Python...
- 提高工作效率的--Linux常用命令,能够决解95%以上的问题
-
点击上方关注,第一时间接受干货转发,点赞,收藏,不如一次关注评论区第一条注意查看回复:Linux命令获取linux常用命令大全pdf+Linux命令行大全pdf为什么要学习Linux命令?1、因为Li...
- linux常用系统命令_linux操作系统常用命令
-
系统信息arch显示机器的处理器架构dmidecode-q显示硬件系统部件-(SMBIOS/DMI)hdparm-i/dev/hda罗列一个磁盘的架构特性hdparm-tT/dev/s...
- 小白入门必知必会-PostgreSQL-15.2源码编译安装
-
一PostgreSQL编译安装1.1下载源码包在PostgreSQL官方主页https://www.postgresql.org/ftp/source/下载区选择所需格式的源码包下载。cd/we...
- Linux操作系统之常用命令_linux系统常用命令详解
-
Linux操作系统一、常用命令1.系统(1)系统信息arch显示机器的处理器架构uname-m显示机器的处理器架构uname-r显示正在使用的内核版本dmidecode-q显示硬件系...
- linux网络命名空间简介_linux 网络相关命令
-
此篇会以例子的方式介绍下linux网络命名空间。此例中会创建两个networknamespace:nsa、nsb,一个网桥bridge0,nsa、nsb中添加网络设备veth,网络设备间...
- 一周热门
- 最近发表
-
- perl基础——循环控制_principle循环
- CHAPTER 2 The Antechamber of M de Treville 第二章 特雷维尔先生的前厅
- CHAPTER 5 The King'S Musketeers and the Cardinal'S Guards 第五章 国王的火枪手和红衣主教的卫士
- CHAPTER 3 The Audience 第三章 接见
- 别搞印象流!数据说明谁才是外线防守第一人!
- V-Day commemorations prove anti-China claims hollow
- EasyPoi使用_easypoi api
- 关于Oracle数据库12c 新特性总结_oracle数据库12514
- 【开发者成长】JAVA 线上故障排查完整套路!
- 使用 Python 向多个地址发送电子邮件
- 标签列表
-
- HTML 教程 (33)
- HTML 简介 (35)
- HTML 实例/测验 (32)
- HTML 测验 (32)
- JavaScript 和 HTML DOM 参考手册 (32)
- HTML 拓展阅读 (30)
- HTML文本框样式 (31)
- HTML滚动条样式 (34)
- HTML5 浏览器支持 (33)
- HTML5 新元素 (33)
- HTML5 WebSocket (30)
- HTML5 代码规范 (32)
- HTML5 标签 (717)
- HTML5 标签 (已废弃) (75)
- HTML5电子书 (32)
- HTML5开发工具 (34)
- HTML5小游戏源码 (34)
- HTML5模板下载 (30)
- HTTP 状态消息 (33)
- HTTP 方法:GET 对比 POST (33)
- 键盘快捷键 (35)
- 标签 (226)
- HTML button formtarget 属性 (30)
- opacity 属性 (32)
- transition 属性 (33)