Files
contraband_manager/update_security_ranking_v2.py
2026-02-06 21:46:42 +00:00

243 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import re
from datetime import datetime, timedelta, timezone
import requests
import sys
import subprocess
# ================= 配置管理 =================
def load_config():
"""加载配置文件"""
config_file = os.path.join(os.path.dirname(__file__), 'config.json')
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"错误: 配置文件未找到: {config_file}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"错误: 配置文件解析失败: {e}")
sys.exit(1)
# 加载配置
CONFIG = load_config()
# 从配置文件中获取配置项
VERSION = CONFIG.get('version', '')
WEBHOOK_URL = CONFIG.get('webhook_url', '')
DATA_FILE = CONFIG.get('data_file', '')
BEIJING_TZ = timezone(timedelta(hours=8)) # 时区配置也可以从配置文件读取,此处保持不变
WEBDAV_BASE = CONFIG.get('webdav', {}).get('base', '')
WEBDAV_AUTH = CONFIG.get('webdav', {}).get('auth', '')
# ================= 核心工具 =================
def get_now():
return datetime.now(BEIJING_TZ)
def upload_to_webdav(local_path, remote_sub_path):
remote_url = f"{WEBDAV_BASE}{remote_sub_path}"
if "/" in remote_sub_path:
parent_dir = "/".join(remote_sub_path.split("/")[:-1])
mkdir_cmd = f"curl -u '{WEBDAV_AUTH}' -X MKCOL '{WEBDAV_BASE}{parent_dir}/' >/dev/null 2>&1"
subprocess.run(mkdir_cmd, shell=True)
cmd = f"curl -u '{WEBDAV_AUTH}' -T '{local_path}' '{remote_url}'"
subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def load_data():
data = {"meta": {"script_version": VERSION}, "members": {}, "logs": [], "last_id": -1}
if os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r', encoding='utf-8') as f:
loaded = json.load(f)
if "meta" not in loaded: loaded["meta"] = data["meta"]
return loaded
return data
def save_data(data, action_text="系统自动备份"):
if "meta" not in data: data["meta"] = {}
data["meta"]["script_version"] = VERSION
data["meta"]["last_update"] = get_now().isoformat()
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# 异步备份
upload_to_webdav(DATA_FILE, "latest_security_v2.json")
upload_to_webdav(__file__, f"script_history/script_{VERSION}.py")
# 触发新版快报推送
send_notification(action_text, data)
def format_name(name):
"""两个字姓名中间加4个空格以对齐三字姓名"""
return f"{name[0]} {name[1]}" if len(name) == 2 else name
def send_notification(action_text, data):
"""发送企业微信通知 - v2.3.8 对齐增强与删除功能 + 新业务日期"""
if not WEBHOOK_URL: return
now = get_now().strftime("%Y-%m-%d %H:%M")
biz_month = get_now().strftime("%Y-%m")
# 1. 统计当月排名 (过滤0票)
stats = {}
for log in data.get("logs", []):
# 只有未删除的记录才参与排名
if log.get("biz_month") == biz_month and not log.get("is_deleted", False):
n = log["name"]
if n in data["members"] and data["members"][n]["status"] == "active":
stats[n] = stats.get(n, 0) + 1
ranking = sorted(stats.items(), key=lambda x: x[1], reverse=True)
# 此处调用 format_name 进行名字对齐
ranking_text = "\n".join([f"{i+1}. {format_name(m[0])}: {m[1]} 票({data['members'][m[0]].get('shift', '未知')})" for i, m in enumerate(ranking)])
# 2. 获取最近动态 (只显示未删除的)
recent_records = [l for l in data.get('logs', []) if not l.get('is_deleted', False)][-3:]
recent_records.reverse()
recent_text = ""
for r in recent_records:
r_time = datetime.fromisoformat(r['actual_time']).astimezone(BEIJING_TZ).strftime("%m-%d %H:%M")
# 此处也应用对齐逻辑
recent_text += f"- [{r_time}] {format_name(r['name'])}: {r['content']}\n"
content = f"""安检团队查获快报
更新时间:{now}
最新操作: {action_text}
排名明细:
{ranking_text if ranking_text else "暂无记录"}
(仅显示已查获成员0 记录成员已隐藏)
最近动态:
{recent_text if recent_text else "暂无记录"}"""
payload = {"msgtype": "text", "text": {"content": content}}
try: requests.post(WEBHOOK_URL, json=payload, timeout=5)
except: pass
def add_member(name_with_shift):
m = re.match(r"^(.*?)\((.*?)\)$", name_with_shift)
if not m: return "错误:格式应为 '新增 姓名(班次)'"
name, shift = m.groups()
data = load_data()
if name in data["members"] and data["members"][name]["status"] == "active":
return f" 人员 {name} 已存在且为活跃状态。"
data["members"][name] = {"shift": shift, "status": "active"}
action_text = f"新增成员 {format_name(name)} ({shift})"
save_data(data, action_text)
return f"✅ 已录入人员:{name} ({shift})"
def remove_member(name):
data = load_data()
if name not in data["members"] or data["members"][name]["status"] == "offboarded":
return f" 未找到活跃人员 '{name}' 或已离职。"
data["members"][name]["status"] = "offboarded"
action_text = f"去除成员 {format_name(name)}"
save_data(data, action_text)
return f"✅ 已去除人员:{name} (已标记为离职,不参与排名)"
def record_entry(name, content, biz_date_manual=None):
data = load_data()
if name not in data["members"] or data["members"][name]["status"] == "offboarded":
return f"错误:未找到活跃人员 '{name}' 或该人员已离职。"
now = get_now()
if biz_date_manual:
year = now.year
dt = datetime.strptime(f"{year}{biz_date_manual.replace('-','')}", "%Y%m%d").replace(tzinfo=BEIJING_TZ)
info = {"biz_date": dt.strftime("%Y-%m-%d"), "biz_month": dt.strftime("%Y-%m"), "is_manual": True}
else:
# 修正业务日期判定逻辑00:00-05:00 归属到前一个日历日
# 重点:判断当前小时是否在 0 到 5 之间(包含 0 和 5
biz_dt = now - timedelta(days=1) if 0 <= now.hour <= 5 else now
info = {"biz_date": biz_dt.strftime("%Y-%m-%d"), "biz_month": biz_dt.strftime("%Y-%m"), "is_manual": False}
data["last_id"] += 1
new_id = data["last_id"]
log = {"id": new_id, "name": name, "content": content, "actual_time": now.isoformat(), "biz_date": info["biz_date"], "biz_month": info["biz_month"]}
data["logs"].append(log)
tag = "[补录] " if info.get("is_manual") else ""
action_text = f"{tag}新增 {format_name(name)} {content}"
save_data(data, action_text)
daily_count = sum(1 for l in data["logs"] if l["name"] == name and l["biz_date"] == info["biz_date"] and not l.get('is_deleted', False))
return f"[已录入] {name} - {content} (当日第{daily_count}票, 索引 #{new_id})"
def delete_entry(log_id):
data = load_data()
found = False
for log in data["logs"]:
if log['id'] == log_id:
log['is_deleted'] = True # 标记为删除,而不是真正移除
found = True
break
if found:
action_text = f"删除记录 # {log_id}"
save_data(data, action_text)
return f"✅ 已删除记录:#{log_id}"
return f"❌ 未找到索引为 #{log_id} 的记录。"
def query_logs(mode, value):
data = load_data()
results = [l for l in data["logs"] if (l["name"] == value if mode=="name" else l["biz_date"].endswith(value)) and not l.get('is_deleted', False)]
if not results: return "🔍 暂无记录。"
res = f"🔍 查询结果 ({value})\n----------------\n"
for l in results:
time_str = datetime.fromisoformat(l['actual_time']).astimezone(BEIJING_TZ).strftime("%m-%d %H:%M")
res += f"#{l['id']} - {l['content']} (录入时间: {time_str})\n"
return res
def get_ranking(target_month=None):
data = load_data()
now = get_now()
if not target_month:
target_month = (now.replace(day=1)-timedelta(days=1)).strftime("%Y-%m") if 0<=now.hour<6 else now.strftime("%Y-%m")
elif len(target_month) <= 3:
target_month = f"{now.year}-{target_month.replace('','').zfill(2)}"
stats = {}
for log in data.get("logs", []):
if log.get("biz_month") == target_month and not log.get('is_deleted', False):
n = log["name"]
if n in data["members"] and data["members"][n]["status"] == "active":
stats[n] = stats.get(n, 0) + 1
sorted_ranking = sorted(stats.items(), key=lambda x: x[1], reverse=True)
res = f"🏆 安检排行榜 ({target_month}) [v{VERSION}]\n----------------\n"
if not sorted_ranking: return res + "暂无记录。"
for i, (name, count) in enumerate(sorted_ranking):
res += f"{i+1}. {format_name(name)}: {count} 票 ({data['members'][name]['shift']})\n"
return res
if __name__ == "__main__":
args = sys.argv[1:]
# print(f"DEBUG: Raw args received: {args}") # 再次调试打印
if not args: print(f"安检系统 {VERSION}"); sys.exit(0)
# 优先检查去除成员命令
if len(args) >= 2 and args[0] == "去除":
print(remove_member(args[1]))
# 优先检查新增成员命令
elif len(args) >= 2 and args[0] == "新增":
print(add_member(" ".join(args[1:])))
# 优先检查删除命令,直接从 args 列表解析
elif len(args) >= 2 and args[0] == "删除" and args[1].startswith("#"):
try:
log_id = int(args[1][1:].strip())
print(delete_entry(log_id))
except ValueError: print("错误:删除指令格式为 '删除 #索引号'")
# 检查排行榜命令
elif "排行榜" in " ".join(args):
m = re.search(r"排行榜\s*(.*)", " ".join(args))
print(get_ranking(m.group(1).strip() if m.group(1) else None))
# 检查违禁品查询命令
elif len(args) >= 2 and args[0].startswith("违禁品"):
val = " ".join(args[1:]).strip()
print(query_logs("date", val) if re.match(r"^\d{2}-?\d{2}$", val) else query_logs("name", val))
# 默认处理记录录入
elif len(args) >= 2:
print(record_entry(args[0], " ".join(args[1:])))
else:
print("错误:格式不识别")