Files
contraband_manager/update_security_ranking_v2.py
OpenClaw Agent b4b7f8ff1e feat: 优化补录逻辑和班次时间
修复:
- 修复 send_notification 函数中 now.strftime() 调用错误

新功能:
- 补录命令支持指定时间: 补录 日期 姓名 内容 [时间]
- 不指定时间则使用班次下班前2分钟
- 班次统一为 18:00-02:00(次日),默认补录时间 01:58

代码优化:
- 添加文件头注释
- 函数添加 docstring
- 移除调试代码
2026-02-07 18:09:16 +00:00

291 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
违禁品查获排行榜系统 - 数据管理脚本
支持业务日期自动校准、WebDAV备份、企业微信通知
"""
import json
import os
import re
from datetime import datetime, timedelta, timezone
import requests
import sys
import subprocess
# ================= 配置管理 =================
def load_config():
"""加载配置文件"""
config_file = os.path.join(os.path.dirname(__file__), 'config.json')
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"错误: 配置文件未找到: {config_file}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"错误: 配置文件解析失败: {e}")
sys.exit(1)
# 加载配置
CONFIG = load_config()
# 从配置文件中获取配置项
VERSION = CONFIG.get('version', '')
WEBHOOK_URL = CONFIG.get('webhook_url', '')
# 强制使用核心数据文件(脚本所在目录的绝对路径)
DATA_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'security_data_v2.json')
BEIJING_TZ = timezone(timedelta(hours=8)) # 时区配置也可以从配置文件读取,此处保持不变
WEBDAV_BASE = CONFIG.get('webdav', {}).get('base', '')
WEBDAV_AUTH = CONFIG.get('webdav', {}).get('auth', '')
# ================= 核心工具 =================
def get_now():
return datetime.now(BEIJING_TZ)
def upload_to_webdav(local_path, remote_sub_path):
remote_url = f"{WEBDAV_BASE}{remote_sub_path}"
if "/" in remote_sub_path:
parent_dir = "/".join(remote_sub_path.split("/")[:-1])
mkdir_cmd = f"curl -u '{WEBDAV_AUTH}' -X MKCOL '{WEBDAV_BASE}{parent_dir}/' >/dev/null 2>&1"
subprocess.run(mkdir_cmd, shell=True)
cmd = f"curl -u '{WEBDAV_AUTH}' -T '{local_path}' '{remote_url}'"
subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def load_data():
data = {"meta": {"script_version": VERSION}, "members": {}, "logs": [], "last_id": -1}
if os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r', encoding='utf-8') as f:
loaded = json.load(f)
if "meta" not in loaded: loaded["meta"] = data["meta"]
return loaded
return data
def save_data(data, action_text="系统自动备份"):
if "meta" not in data: data["meta"] = {}
data["meta"]["script_version"] = VERSION
data["meta"]["last_update"] = get_now().isoformat()
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# 异步备份
upload_to_webdav(DATA_FILE, "latest_security_v2.json")
upload_to_webdav(__file__, f"script_history/script_{VERSION}.py")
# 触发新版快报推送
send_notification(action_text, data)
def format_name(name):
"""两个字姓名中间加4个空格以对齐三字姓名"""
return f"{name[0]} {name[1]}" if len(name) == 2 else name
def send_notification(action_text, data):
"""发送企业微信通知 - v2.3.8 对齐增强与删除功能 + 新业务日期"""
if not WEBHOOK_URL: return
now = get_now()
now_str = now.strftime("%Y-%m-%d %H:%M")
biz_month = now.strftime("%Y-%m")
# 1. 统计当月排名 (过滤0票)
stats = {}
for log in data.get("logs", []):
# 只有未删除的记录才参与排名
if log.get("biz_month") == biz_month and not log.get("is_deleted", False):
n = log["name"]
if n in data["members"] and data["members"][n]["status"] == "active":
stats[n] = stats.get(n, 0) + 1
ranking = sorted(stats.items(), key=lambda x: x[1], reverse=True)
# 此处调用 format_name 进行名字对齐
ranking_text = "\n".join([f"{i+1}. {format_name(m[0])}: {m[1]} 票({data['members'][m[0]].get('shift', '未知')})" for i, m in enumerate(ranking)])
# 2. 获取最近动态 (只显示未删除的)
recent_records = [l for l in data.get('logs', []) if not l.get('is_deleted', False)][-3:]
recent_records.reverse()
recent_text = ""
for r in recent_records:
r_time = datetime.fromisoformat(r['actual_time']).astimezone(BEIJING_TZ).strftime("%m-%d %H:%M")
# 此处也应用对齐逻辑
recent_text += f"- [{r_time}] {format_name(r['name'])}: {r['content']}\n"
content = f"""安检团队查获快报
更新时间:{now}
最新操作: {action_text}
排名明细:
{ranking_text if ranking_text else "暂无记录"}
(仅显示已查获成员0 记录成员已隐藏)
最近动态:
{recent_text if recent_text else "暂无记录"}"""
payload = {"msgtype": "text", "text": {"content": content}}
try: requests.post(WEBHOOK_URL, json=payload, timeout=5)
except: pass
def add_member(name_with_shift):
m = re.match(r"^(.*?)\((.*?)\)$", name_with_shift)
if not m: return "错误:格式应为 '新增 姓名(班次)'"
name, shift = m.groups()
data = load_data()
if name in data["members"] and data["members"][name]["status"] == "active":
return f" 人员 {name} 已存在且为活跃状态。"
data["members"][name] = {"shift": shift, "status": "active"}
action_text = f"新增成员 {format_name(name)} ({shift})"
save_data(data, action_text)
return f"✅ 已录入人员:{name} ({shift})"
def remove_member(name):
data = load_data()
if name not in data["members"] or data["members"][name]["status"] == "offboarded":
return f" 未找到活跃人员 '{name}' 或已离职。"
data["members"][name]["status"] = "offboarded"
action_text = f"去除成员 {format_name(name)}"
save_data(data, action_text)
return f"✅ 已去除人员:{name} (已标记为离职,不参与排名)"
def record_entry(name, content, biz_date_manual=None, biz_time=None):
"""录入查获记录
Args:
name: 人员姓名
content: 查获内容
biz_date_manual: 补录的业务日期 (MM-DD格式)None表示普通录入
biz_time: 补录的指定时间 (HH:MM格式)可选默认为班次下班前2分钟
"""
data = load_data()
if name not in data["members"] or data["members"][name]["status"] == "offboarded":
return f"错误:未找到活跃人员 '{name}' 或该人员已离职。"
now = get_now()
is_manual = False
if biz_date_manual:
# 补录模式
year = now.year
dt = datetime.strptime(f"{year}{biz_date_manual.replace('-','')}", "%Y%m%d").replace(tzinfo=BEIJING_TZ)
info = {"biz_date": dt.strftime("%Y-%m-%d"), "biz_month": dt.strftime("%Y-%m"), "is_manual": True}
is_manual = True
# 计算 actual_time
if biz_time:
# 使用指定时间
hour, minute = map(int, biz_time.split(':'))
actual_dt = dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
else:
# 使用班次下班前2分钟
shift = data["members"][name].get("shift", "")
# 班次统一为 18:00-02:00(次日)默认下班前2分钟
actual_dt = dt.replace(hour=1, minute=58, second=0, microsecond=0)
actual_time = actual_dt.isoformat()
else:
# 普通录入模式
biz_dt = now - timedelta(days=1) if 0 <= now.hour <= 5 else now
info = {"biz_date": biz_dt.strftime("%Y-%m-%d"), "biz_month": biz_dt.strftime("%Y-%m"), "is_manual": False}
actual_time = now.isoformat()
data["last_id"] += 1
new_id = data["last_id"]
log = {"id": new_id, "name": name, "content": content, "actual_time": actual_time, "biz_date": info["biz_date"], "biz_month": info["biz_month"]}
data["logs"].append(log)
tag = "[补录] " if is_manual else ""
action_text = f"{tag}新增 {format_name(name)} {content}"
save_data(data, action_text)
daily_count = sum(1 for l in data["logs"] if l["name"] == name and l["biz_date"] == info["biz_date"] and not l.get('is_deleted', False))
return f"[已录入] {name} - {content} (当日第{daily_count}票, 索引 #{new_id})"
def delete_entry(log_id):
data = load_data()
found = False
for log in data["logs"]:
if log['id'] == log_id:
log['is_deleted'] = True
found = True
break
if found:
action_text = f"删除记录 # {log_id}"
save_data(data, action_text)
return f"✅ 已删除记录:#{log_id}"
return f"❌ 未找到索引为 #{log_id} 的记录。"
def query_logs(mode, value):
data = load_data()
results = [l for l in data["logs"] if (l["name"] == value if mode=="name" else l["biz_date"].endswith(value)) and not l.get('is_deleted', False)]
if not results: return "🔍 暂无记录。"
res = f"🔍 查询结果 ({value})\n----------------\n"
for l in results:
time_str = datetime.fromisoformat(l['actual_time']).astimezone(BEIJING_TZ).strftime("%m-%d %H:%M")
res += f"#{l['id']} - {l['content']} (录入时间: {time_str})\n"
return res
def get_ranking(target_month=None):
data = load_data()
now = get_now()
if not target_month:
target_month = (now.replace(day=1)-timedelta(days=1)).strftime("%Y-%m") if 0<=now.hour<6 else now.strftime("%Y-%m")
elif len(target_month) <= 3:
target_month = f"{now.year}-{target_month.replace('','').zfill(2)}"
stats = {}
for log in data.get("logs", []):
if log.get("biz_month") == target_month and not log.get('is_deleted', False):
n = log["name"]
if n in data["members"] and data["members"][n]["status"] == "active":
stats[n] = stats.get(n, 0) + 1
sorted_ranking = sorted(stats.items(), key=lambda x: x[1], reverse=True)
res = f"🏆 安检排行榜 ({target_month}) [v{VERSION}]\n----------------\n"
if not sorted_ranking: return res + "暂无记录。"
for i, (name, count) in enumerate(sorted_ranking):
res += f"{i+1}. {format_name(name)}: {count} 票 ({data['members'][name]['shift']})\n"
return res
if __name__ == "__main__":
args = sys.argv[1:]
if not args: print(f"安检系统 {VERSION}"); sys.exit(0)
# 优先检查去除成员命令
if len(args) >= 2 and args[0] == "去除":
print(remove_member(args[1]))
# 优先检查新增成员命令
elif len(args) >= 2 and args[0] == "新增":
print(add_member(" ".join(args[1:])))
# 优先检查删除命令
elif len(args) >= 2 and args[0] == "删除" and args[1].startswith("#"):
try:
log_id = int(args[1][1:].strip())
print(delete_entry(log_id))
except ValueError: print("错误:删除指令格式为 '删除 #索引号'")
# 补录命令:补录 日期 姓名 内容 [时间 HH:MM]
elif len(args) >= 4 and args[0] == "补录":
biz_date = args[1]
# 验证日期格式
if not re.match(r'^\d{2}-?\d{2}$', biz_date):
print(f"错误:日期格式不正确,应为 MM-DD 或 MMDD如 02-06")
sys.exit(1)
name = args[2]
# 检查是否有时间参数(格式 HH:MM在倒数第二个位置
if len(args) >= 5 and re.match(r'^\d{2}:\d{2}$', args[-1]):
content = " ".join(args[3:-1])
biz_time = args[-1]
else:
content = " ".join(args[3:])
biz_time = None # 默认使用班次下班前2分钟
print(record_entry(name, content, biz_date, biz_time))
# 检查排行榜命令
elif "排行榜" in " ".join(args):
m = re.search(r"排行榜\s*(.*)", " ".join(args))
print(get_ranking(m.group(1).strip() if m.group(1) else None))
# 检查违禁品查询命令
elif len(args) >= 2 and args[0].startswith("违禁品"):
val = " ".join(args[1:]).strip()
print(query_logs("date", val) if re.match(r"^\d{2}-?\d{2}$", val) else query_logs("name", val))
# 默认处理记录录入
elif len(args) >= 2:
print(record_entry(args[0], " ".join(args[1:])))
else:
print("错误:格式不识别")