244 lines
10 KiB
Python
244 lines
10 KiB
Python
import json
|
||
import os
|
||
import re
|
||
from datetime import datetime, timedelta, timezone
|
||
import requests
|
||
import sys
|
||
import subprocess
|
||
|
||
# ================= 配置管理 =================
|
||
|
||
def load_config():
|
||
"""加载配置文件"""
|
||
config_file = os.path.join(os.path.dirname(__file__), 'config.json')
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except FileNotFoundError:
|
||
print(f"错误: 配置文件未找到: {config_file}")
|
||
sys.exit(1)
|
||
except json.JSONDecodeError as e:
|
||
print(f"错误: 配置文件解析失败: {e}")
|
||
sys.exit(1)
|
||
|
||
# 加载配置
|
||
CONFIG = load_config()
|
||
|
||
# 从配置文件中获取配置项
|
||
VERSION = CONFIG.get('version', '')
|
||
WEBHOOK_URL = CONFIG.get('webhook_url', '')
|
||
# 强制使用核心数据文件(脚本所在目录的绝对路径)
|
||
DATA_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'security_data_v2.json')
|
||
BEIJING_TZ = timezone(timedelta(hours=8)) # 时区配置也可以从配置文件读取,此处保持不变
|
||
WEBDAV_BASE = CONFIG.get('webdav', {}).get('base', '')
|
||
WEBDAV_AUTH = CONFIG.get('webdav', {}).get('auth', '')
|
||
|
||
# ================= 核心工具 =================
|
||
|
||
def get_now():
|
||
return datetime.now(BEIJING_TZ)
|
||
|
||
def upload_to_webdav(local_path, remote_sub_path):
|
||
remote_url = f"{WEBDAV_BASE}{remote_sub_path}"
|
||
if "/" in remote_sub_path:
|
||
parent_dir = "/".join(remote_sub_path.split("/")[:-1])
|
||
mkdir_cmd = f"curl -u '{WEBDAV_AUTH}' -X MKCOL '{WEBDAV_BASE}{parent_dir}/' >/dev/null 2>&1"
|
||
subprocess.run(mkdir_cmd, shell=True)
|
||
cmd = f"curl -u '{WEBDAV_AUTH}' -T '{local_path}' '{remote_url}'"
|
||
subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||
|
||
def load_data():
|
||
data = {"meta": {"script_version": VERSION}, "members": {}, "logs": [], "last_id": -1}
|
||
if os.path.exists(DATA_FILE):
|
||
with open(DATA_FILE, 'r', encoding='utf-8') as f:
|
||
loaded = json.load(f)
|
||
if "meta" not in loaded: loaded["meta"] = data["meta"]
|
||
return loaded
|
||
return data
|
||
|
||
def save_data(data, action_text="系统自动备份"):
|
||
if "meta" not in data: data["meta"] = {}
|
||
data["meta"]["script_version"] = VERSION
|
||
data["meta"]["last_update"] = get_now().isoformat()
|
||
with open(DATA_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||
# 异步备份
|
||
upload_to_webdav(DATA_FILE, "latest_security_v2.json")
|
||
upload_to_webdav(__file__, f"script_history/script_{VERSION}.py")
|
||
# 触发新版快报推送
|
||
send_notification(action_text, data)
|
||
|
||
def format_name(name):
|
||
"""两个字姓名中间加4个空格以对齐三字姓名"""
|
||
return f"{name[0]} {name[1]}" if len(name) == 2 else name
|
||
|
||
def send_notification(action_text, data):
|
||
"""发送企业微信通知 - v2.3.8 对齐增强与删除功能 + 新业务日期"""
|
||
if not WEBHOOK_URL: return
|
||
now = get_now().strftime("%Y-%m-%d %H:%M")
|
||
biz_month = get_now().strftime("%Y-%m")
|
||
|
||
# 1. 统计当月排名 (过滤0票)
|
||
stats = {}
|
||
for log in data.get("logs", []):
|
||
# 只有未删除的记录才参与排名
|
||
if log.get("biz_month") == biz_month and not log.get("is_deleted", False):
|
||
n = log["name"]
|
||
if n in data["members"] and data["members"][n]["status"] == "active":
|
||
stats[n] = stats.get(n, 0) + 1
|
||
|
||
ranking = sorted(stats.items(), key=lambda x: x[1], reverse=True)
|
||
# 此处调用 format_name 进行名字对齐
|
||
ranking_text = "\n".join([f"{i+1}. {format_name(m[0])}: {m[1]} 票({data['members'][m[0]].get('shift', '未知')})" for i, m in enumerate(ranking)])
|
||
|
||
# 2. 获取最近动态 (只显示未删除的)
|
||
recent_records = [l for l in data.get('logs', []) if not l.get('is_deleted', False)][-3:]
|
||
recent_records.reverse()
|
||
recent_text = ""
|
||
for r in recent_records:
|
||
r_time = datetime.fromisoformat(r['actual_time']).astimezone(BEIJING_TZ).strftime("%m-%d %H:%M")
|
||
# 此处也应用对齐逻辑
|
||
recent_text += f"- [{r_time}] {format_name(r['name'])}: {r['content']}\n"
|
||
|
||
content = f"""安检团队查获快报
|
||
更新时间:{now}
|
||
最新操作: {action_text}
|
||
|
||
排名明细:
|
||
{ranking_text if ranking_text else "暂无记录"}
|
||
(仅显示已查获成员,0 记录成员已隐藏)
|
||
|
||
最近动态:
|
||
{recent_text if recent_text else "暂无记录"}"""
|
||
|
||
payload = {"msgtype": "text", "text": {"content": content}}
|
||
try: requests.post(WEBHOOK_URL, json=payload, timeout=5)
|
||
except: pass
|
||
|
||
def add_member(name_with_shift):
|
||
m = re.match(r"^(.*?)\((.*?)\)$", name_with_shift)
|
||
if not m: return "错误:格式应为 '新增 姓名(班次)'"
|
||
name, shift = m.groups()
|
||
data = load_data()
|
||
if name in data["members"] and data["members"][name]["status"] == "active":
|
||
return f"ℹ️ 人员 {name} 已存在且为活跃状态。"
|
||
data["members"][name] = {"shift": shift, "status": "active"}
|
||
action_text = f"新增成员 {format_name(name)} ({shift})"
|
||
save_data(data, action_text)
|
||
return f"✅ 已录入人员:{name} ({shift})"
|
||
|
||
def remove_member(name):
|
||
data = load_data()
|
||
if name not in data["members"] or data["members"][name]["status"] == "offboarded":
|
||
return f"ℹ️ 未找到活跃人员 '{name}' 或已离职。"
|
||
data["members"][name]["status"] = "offboarded"
|
||
action_text = f"去除成员 {format_name(name)}"
|
||
save_data(data, action_text)
|
||
return f"✅ 已去除人员:{name} (已标记为离职,不参与排名)"
|
||
|
||
def record_entry(name, content, biz_date_manual=None):
|
||
data = load_data()
|
||
if name not in data["members"] or data["members"][name]["status"] == "offboarded":
|
||
return f"错误:未找到活跃人员 '{name}' 或该人员已离职。"
|
||
|
||
now = get_now()
|
||
if biz_date_manual:
|
||
year = now.year
|
||
dt = datetime.strptime(f"{year}{biz_date_manual.replace('-','')}", "%Y%m%d").replace(tzinfo=BEIJING_TZ)
|
||
info = {"biz_date": dt.strftime("%Y-%m-%d"), "biz_month": dt.strftime("%Y-%m"), "is_manual": True}
|
||
else:
|
||
# 修正业务日期判定逻辑:00:00-05:00 归属到前一个日历日
|
||
# 重点:判断当前小时是否在 0 到 5 之间(包含 0 和 5)
|
||
biz_dt = now - timedelta(days=1) if 0 <= now.hour <= 5 else now
|
||
info = {"biz_date": biz_dt.strftime("%Y-%m-%d"), "biz_month": biz_dt.strftime("%Y-%m"), "is_manual": False}
|
||
|
||
data["last_id"] += 1
|
||
new_id = data["last_id"]
|
||
log = {"id": new_id, "name": name, "content": content, "actual_time": now.isoformat(), "biz_date": info["biz_date"], "biz_month": info["biz_month"]}
|
||
data["logs"].append(log)
|
||
|
||
tag = "[补录] " if info.get("is_manual") else ""
|
||
action_text = f"{tag}新增 {format_name(name)} {content}"
|
||
save_data(data, action_text)
|
||
|
||
daily_count = sum(1 for l in data["logs"] if l["name"] == name and l["biz_date"] == info["biz_date"] and not l.get('is_deleted', False))
|
||
return f"[已录入] {name} - {content} (当日第{daily_count}票, 索引 #{new_id})"
|
||
|
||
def delete_entry(log_id):
|
||
data = load_data()
|
||
found = False
|
||
for log in data["logs"]:
|
||
if log['id'] == log_id:
|
||
log['is_deleted'] = True # 标记为删除,而不是真正移除
|
||
found = True
|
||
break
|
||
if found:
|
||
action_text = f"删除记录 # {log_id}"
|
||
save_data(data, action_text)
|
||
return f"✅ 已删除记录:#{log_id}"
|
||
return f"❌ 未找到索引为 #{log_id} 的记录。"
|
||
|
||
def query_logs(mode, value):
|
||
data = load_data()
|
||
results = [l for l in data["logs"] if (l["name"] == value if mode=="name" else l["biz_date"].endswith(value)) and not l.get('is_deleted', False)]
|
||
if not results: return "🔍 暂无记录。"
|
||
res = f"🔍 查询结果 ({value})\n----------------\n"
|
||
for l in results:
|
||
time_str = datetime.fromisoformat(l['actual_time']).astimezone(BEIJING_TZ).strftime("%m-%d %H:%M")
|
||
res += f"#{l['id']} - {l['content']} (录入时间: {time_str})\n"
|
||
return res
|
||
|
||
def get_ranking(target_month=None):
|
||
data = load_data()
|
||
now = get_now()
|
||
if not target_month:
|
||
target_month = (now.replace(day=1)-timedelta(days=1)).strftime("%Y-%m") if 0<=now.hour<6 else now.strftime("%Y-%m")
|
||
elif len(target_month) <= 3:
|
||
target_month = f"{now.year}-{target_month.replace('月','').zfill(2)}"
|
||
|
||
stats = {}
|
||
for log in data.get("logs", []):
|
||
if log.get("biz_month") == target_month and not log.get('is_deleted', False):
|
||
n = log["name"]
|
||
if n in data["members"] and data["members"][n]["status"] == "active":
|
||
stats[n] = stats.get(n, 0) + 1
|
||
|
||
sorted_ranking = sorted(stats.items(), key=lambda x: x[1], reverse=True)
|
||
res = f"🏆 安检排行榜 ({target_month}) [v{VERSION}]\n----------------\n"
|
||
if not sorted_ranking: return res + "暂无记录。"
|
||
for i, (name, count) in enumerate(sorted_ranking):
|
||
res += f"{i+1}. {format_name(name)}: {count} 票 ({data['members'][name]['shift']})\n"
|
||
return res
|
||
|
||
if __name__ == "__main__":
|
||
args = sys.argv[1:]
|
||
# print(f"DEBUG: Raw args received: {args}") # 再次调试打印
|
||
|
||
if not args: print(f"安检系统 {VERSION}"); sys.exit(0)
|
||
|
||
# 优先检查去除成员命令
|
||
if len(args) >= 2 and args[0] == "去除":
|
||
print(remove_member(args[1]))
|
||
# 优先检查新增成员命令
|
||
elif len(args) >= 2 and args[0] == "新增":
|
||
print(add_member(" ".join(args[1:])))
|
||
# 优先检查删除命令,直接从 args 列表解析
|
||
elif len(args) >= 2 and args[0] == "删除" and args[1].startswith("#"):
|
||
try:
|
||
log_id = int(args[1][1:].strip())
|
||
print(delete_entry(log_id))
|
||
except ValueError: print("错误:删除指令格式为 '删除 #索引号'")
|
||
# 检查排行榜命令
|
||
elif "排行榜" in " ".join(args):
|
||
m = re.search(r"排行榜\s*(.*)", " ".join(args))
|
||
print(get_ranking(m.group(1).strip() if m.group(1) else None))
|
||
# 检查违禁品查询命令
|
||
elif len(args) >= 2 and args[0].startswith("违禁品"):
|
||
val = " ".join(args[1:]).strip()
|
||
print(query_logs("date", val) if re.match(r"^\d{2}-?\d{2}$", val) else query_logs("name", val))
|
||
# 默认处理记录录入
|
||
elif len(args) >= 2:
|
||
print(record_entry(args[0], " ".join(args[1:])))
|
||
else:
|
||
print("错误:格式不识别")
|