问题: database.py 中 get_statistics 使用 app.config['TIMEZONE'],
但 app 对象未导入,导致 AttributeError
解决:
1. 导入 current_app: from flask import current_app
2. 使用 current_app.config['TIMEZONE'] 替代 app.config['TIMEZONE']
3. current_app 是 Flask 上下文代理,自动指向当前应用实例
符合 Flask 官方最佳实践
374 lines
12 KiB
Python
374 lines
12 KiB
Python
"""
|
|
数据库模型
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import List, Optional, Dict, Any
|
|
import os
|
|
from flask import current_app
|
|
|
|
|
|
class Database:
|
|
"""数据库管理类"""
|
|
|
|
def __init__(self, db_path: str, timezone_offset: int = 8):
|
|
"""初始化数据库
|
|
|
|
Args:
|
|
db_path: 数据库文件路径
|
|
timezone_offset: 时区偏移(小时),默认 UTC+8
|
|
"""
|
|
self.db_path = db_path
|
|
self.timezone_offset = timezone_offset
|
|
self.init_db()
|
|
|
|
def convert_to_local(self, dt_str: Optional[str]) -> Optional[str]:
|
|
"""将 UTC 时间字符串转换为本地时间字符串
|
|
|
|
Args:
|
|
dt_str: UTC 时间字符串 (格式: 2024-01-01 00:00:00)
|
|
|
|
Returns:
|
|
本地时间字符串
|
|
"""
|
|
if not dt_str:
|
|
return None
|
|
|
|
try:
|
|
# 解析 UTC 时间
|
|
utc_dt = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
|
|
# 添加时区
|
|
utc_dt = utc_dt.replace(tzinfo=timezone.utc)
|
|
# 转换为本地时间
|
|
local_dt = utc_dt + timedelta(hours=self.timezone_offset)
|
|
return local_dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
except Exception:
|
|
return dt_str
|
|
|
|
def get_connection(self):
|
|
"""获取数据库连接"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def init_db(self):
|
|
"""初始化数据库表"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# 创建短信表
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS sms_messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
from_number TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
timestamp INTEGER NOT NULL,
|
|
device_info TEXT,
|
|
sim_info TEXT,
|
|
sign_verified INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
ip_address TEXT
|
|
)
|
|
''')
|
|
|
|
# 创建接收日志表
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS receive_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
from_number TEXT,
|
|
content TEXT,
|
|
timestamp INTEGER,
|
|
sign TEXT,
|
|
sign_valid INTEGER,
|
|
ip_address TEXT,
|
|
status TEXT NOT NULL,
|
|
error_message TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# 创建索引
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_from_number ON sms_messages(from_number)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON sms_messages(timestamp)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON sms_messages(created_at)')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def add_message(self, from_number: str, content: str, timestamp: int,
|
|
device_info: Optional[str] = None, sim_info: Optional[str] = None,
|
|
sign_verified: bool = False, ip_address: Optional[str] = None) -> int:
|
|
"""添加短信"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# 存储为 UTC 时间
|
|
utc_now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
|
|
cursor.execute('''
|
|
INSERT INTO sms_messages
|
|
(from_number, content, timestamp, device_info, sim_info, sign_verified, ip_address, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (from_number, content, timestamp, device_info, sim_info, sign_verified, ip_address, utc_now))
|
|
|
|
message_id = cursor.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return message_id
|
|
|
|
def add_log(self, from_number: Optional[str], content: Optional[str],
|
|
timestamp: Optional[int], sign: Optional[str],
|
|
sign_valid: Optional[bool], ip_address: Optional[str],
|
|
status: str, error_message: Optional[str] = None) -> int:
|
|
"""添加接收日志"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
sign_valid_int = 1 if sign_valid else 0 if sign_valid is not None else None
|
|
|
|
# 存储为 UTC 时间
|
|
utc_now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
|
|
cursor.execute('''
|
|
INSERT INTO receive_logs
|
|
(from_number, content, timestamp, sign, sign_valid, ip_address, status, error_message, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (from_number, content, timestamp, sign, sign_valid_int, ip_address, status, error_message, utc_now))
|
|
|
|
log_id = cursor.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return log_id
|
|
|
|
def get_messages(self, page: int = 1, limit: int = 50,
|
|
from_number: Optional[str] = None,
|
|
search: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""获取短信列表"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
offset = (page - 1) * limit
|
|
where_clauses = []
|
|
params = []
|
|
|
|
if from_number:
|
|
where_clauses.append("from_number = ?")
|
|
params.append(from_number)
|
|
|
|
if search:
|
|
where_clauses.append("(content LIKE ? OR from_number LIKE ?)")
|
|
params.extend([f"%{search}%", f"%{search}%"])
|
|
|
|
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
|
|
|
|
query = f'''
|
|
SELECT * FROM sms_messages
|
|
WHERE {where_sql}
|
|
ORDER BY timestamp DESC
|
|
LIMIT ? OFFSET ?
|
|
'''
|
|
params.extend([limit, offset])
|
|
|
|
cursor.execute(query, params)
|
|
messages = [dict(row) for row in cursor.fetchall()]
|
|
|
|
# 转换时间为本地时区
|
|
for msg in messages:
|
|
if msg.get('created_at'):
|
|
msg['created_at'] = self.convert_to_local(msg['created_at'])
|
|
# 转换时间戳为本地时间
|
|
if msg.get('timestamp'):
|
|
timestamp = msg['timestamp']
|
|
local_dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=self.timezone_offset)))
|
|
msg['local_timestamp'] = local_dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
conn.close()
|
|
|
|
return messages
|
|
|
|
def get_message_count(self, from_number: Optional[str] = None,
|
|
search: Optional[str] = None) -> int:
|
|
"""获取短信总数"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
where_clauses = []
|
|
params = []
|
|
|
|
if from_number:
|
|
where_clauses.append("from_number = ?")
|
|
params.append(from_number)
|
|
|
|
if search:
|
|
where_clauses.append("(content LIKE ? OR from_number LIKE ?)")
|
|
params.extend([f"%{search}%", f"%{search}%"])
|
|
|
|
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
|
|
|
|
cursor.execute(f'SELECT COUNT(*) as count FROM sms_messages WHERE {where_sql}', params)
|
|
count = cursor.fetchone()['count']
|
|
conn.close()
|
|
|
|
return count
|
|
|
|
def get_message_by_id(self, message_id: int) -> Optional[Dict[str, Any]]:
|
|
"""根据ID获取短信"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('SELECT * FROM sms_messages WHERE id = ?', (message_id,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row:
|
|
msg = dict(row)
|
|
# 转换时间为本地时区
|
|
if msg.get('created_at'):
|
|
msg['created_at'] = self.convert_to_local(msg['created_at'])
|
|
# 转换时间戳为本地时间
|
|
if msg.get('timestamp'):
|
|
timestamp = msg['timestamp']
|
|
local_dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=self.timezone_offset)))
|
|
msg['local_timestamp'] = local_dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
return msg
|
|
return None
|
|
|
|
def get_recent_messages(self, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""获取最近的短信"""
|
|
return self.get_messages(page=1, limit=limit)
|
|
|
|
def get_from_numbers(self) -> List[Dict[str, Any]]:
|
|
"""获取所有发送方号码"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT from_number, COUNT(*) as count, MAX(timestamp) as last_time
|
|
FROM sms_messages
|
|
GROUP BY from_number
|
|
ORDER BY count DESC
|
|
''')
|
|
|
|
numbers = [dict(row) for row in cursor.fetchall()]
|
|
conn.close()
|
|
|
|
return numbers
|
|
|
|
def get_logs(self, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
|
|
"""获取接收日志"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
offset = (page - 1) * limit
|
|
|
|
cursor.execute('''
|
|
SELECT * FROM receive_logs
|
|
ORDER BY created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
''', (limit, offset))
|
|
|
|
logs = [dict(row) for row in cursor.fetchall()]
|
|
|
|
# 转换时间为本地时区
|
|
for log in logs:
|
|
if log.get('created_at'):
|
|
log['created_at'] = self.convert_to_local(log['created_at'])
|
|
|
|
conn.close()
|
|
|
|
return logs
|
|
|
|
def cleanup_old_messages(self, days: int = 30, max_messages: int = 10000):
|
|
"""清理老数据"""
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# 按天数清理
|
|
cutoff_date = datetime.now() - timedelta(days=days)
|
|
cursor.execute('''
|
|
DELETE FROM sms_messages
|
|
WHERE created_at < ?
|
|
''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
|
|
|
|
deleted_count = cursor.rowcount
|
|
|
|
# 按数量清理
|
|
cursor.execute('''
|
|
DELETE FROM sms_messages
|
|
WHERE id NOT IN (
|
|
SELECT id FROM sms_messages
|
|
ORDER BY created_at DESC
|
|
LIMIT ?
|
|
)
|
|
''', (max_messages,))
|
|
|
|
deleted_count += cursor.rowcount
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return deleted_count
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""获取统计信息"""
|
|
import pytz
|
|
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# 获取总短信数
|
|
cursor.execute('SELECT COUNT(*) as total FROM sms_messages')
|
|
total = cursor.fetchone()['total']
|
|
|
|
# 使用本地时区计算今日和本周的 UTC 时间范围
|
|
local_tz = pytz.timezone(current_app.config['TIMEZONE'])
|
|
local_now = datetime.now(local_tz)
|
|
|
|
# 今日本地时间的开始和结束
|
|
today_start_local = local_now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
today_end_local = today_start_local + timedelta(days=1)
|
|
|
|
# 转换为 UTC 时间
|
|
today_start_utc = today_start_local.astimezone(pytz.UTC)
|
|
today_end_utc = today_end_local.astimezone(pytz.UTC)
|
|
|
|
# 今日短信数
|
|
cursor.execute('''
|
|
SELECT COUNT(*) as today FROM sms_messages
|
|
WHERE created_at >= ? AND created_at < ?
|
|
''', (today_start_utc.strftime('%Y-%m-%d %H:%M:%S'), today_end_utc.strftime('%Y-%m-%d %H:%M:%S')))
|
|
today = cursor.fetchone()['today']
|
|
|
|
# 本周开始时间(本周一 00:00:00 本地时间)
|
|
weekday = local_now.weekday()
|
|
week_start_local = (local_now - timedelta(days=weekday)).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
week_start_utc = week_start_local.astimezone(pytz.UTC)
|
|
|
|
# 本周短信数
|
|
cursor.execute('''
|
|
SELECT COUNT(*) as week FROM sms_messages
|
|
WHERE created_at >= ?
|
|
''', (week_start_utc.strftime('%Y-%m-%d %H:%M:%S'),))
|
|
week = cursor.fetchone()['week']
|
|
|
|
# 签名验证占比
|
|
cursor.execute('''
|
|
SELECT
|
|
SUM(CASE WHEN sign_verified = 1 THEN 1 ELSE 0 END) as verified,
|
|
SUM(CASE WHEN sign_verified = 0 THEN 1 ELSE 0 END) as unverified
|
|
FROM sms_messages
|
|
''')
|
|
row = cursor.fetchone()
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
'total': total,
|
|
'today': today,
|
|
'week': week,
|
|
'verified': row['verified'] or 0,
|
|
'unverified': row['unverified'] or 0
|
|
}
|