Files
smsweb/database.py
OpenClaw Agent a473ee7397 修复: 使用 current_app 替代 app 访问配置
问题: database.py 中 get_statistics 使用 app.config['TIMEZONE'],
      但 app 对象未导入,导致 AttributeError

解决:
1. 导入 current_app: from flask import current_app
2. 使用 current_app.config['TIMEZONE'] 替代 app.config['TIMEZONE']
3. current_app 是 Flask 上下文代理,自动指向当前应用实例

符合 Flask 官方最佳实践
2026-02-08 05:21:02 +08:00

374 lines
12 KiB
Python

"""
数据库模型
"""
import sqlite3
import json
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Dict, Any
import os
from flask import current_app
class Database:
"""数据库管理类"""
def __init__(self, db_path: str, timezone_offset: int = 8):
"""初始化数据库
Args:
db_path: 数据库文件路径
timezone_offset: 时区偏移(小时),默认 UTC+8
"""
self.db_path = db_path
self.timezone_offset = timezone_offset
self.init_db()
def convert_to_local(self, dt_str: Optional[str]) -> Optional[str]:
"""将 UTC 时间字符串转换为本地时间字符串
Args:
dt_str: UTC 时间字符串 (格式: 2024-01-01 00:00:00)
Returns:
本地时间字符串
"""
if not dt_str:
return None
try:
# 解析 UTC 时间
utc_dt = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
# 添加时区
utc_dt = utc_dt.replace(tzinfo=timezone.utc)
# 转换为本地时间
local_dt = utc_dt + timedelta(hours=self.timezone_offset)
return local_dt.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return dt_str
def get_connection(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(self):
"""初始化数据库表"""
conn = self.get_connection()
cursor = conn.cursor()
# 创建短信表
cursor.execute('''
CREATE TABLE IF NOT EXISTS sms_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_number TEXT NOT NULL,
content TEXT NOT NULL,
timestamp INTEGER NOT NULL,
device_info TEXT,
sim_info TEXT,
sign_verified INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address TEXT
)
''')
# 创建接收日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS receive_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_number TEXT,
content TEXT,
timestamp INTEGER,
sign TEXT,
sign_valid INTEGER,
ip_address TEXT,
status TEXT NOT NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_from_number ON sms_messages(from_number)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON sms_messages(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON sms_messages(created_at)')
conn.commit()
conn.close()
def add_message(self, from_number: str, content: str, timestamp: int,
device_info: Optional[str] = None, sim_info: Optional[str] = None,
sign_verified: bool = False, ip_address: Optional[str] = None) -> int:
"""添加短信"""
conn = self.get_connection()
cursor = conn.cursor()
# 存储为 UTC 时间
utc_now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO sms_messages
(from_number, content, timestamp, device_info, sim_info, sign_verified, ip_address, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (from_number, content, timestamp, device_info, sim_info, sign_verified, ip_address, utc_now))
message_id = cursor.lastrowid
conn.commit()
conn.close()
return message_id
def add_log(self, from_number: Optional[str], content: Optional[str],
timestamp: Optional[int], sign: Optional[str],
sign_valid: Optional[bool], ip_address: Optional[str],
status: str, error_message: Optional[str] = None) -> int:
"""添加接收日志"""
conn = self.get_connection()
cursor = conn.cursor()
sign_valid_int = 1 if sign_valid else 0 if sign_valid is not None else None
# 存储为 UTC 时间
utc_now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO receive_logs
(from_number, content, timestamp, sign, sign_valid, ip_address, status, error_message, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (from_number, content, timestamp, sign, sign_valid_int, ip_address, status, error_message, utc_now))
log_id = cursor.lastrowid
conn.commit()
conn.close()
return log_id
def get_messages(self, page: int = 1, limit: int = 50,
from_number: Optional[str] = None,
search: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取短信列表"""
conn = self.get_connection()
cursor = conn.cursor()
offset = (page - 1) * limit
where_clauses = []
params = []
if from_number:
where_clauses.append("from_number = ?")
params.append(from_number)
if search:
where_clauses.append("(content LIKE ? OR from_number LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
query = f'''
SELECT * FROM sms_messages
WHERE {where_sql}
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
'''
params.extend([limit, offset])
cursor.execute(query, params)
messages = [dict(row) for row in cursor.fetchall()]
# 转换时间为本地时区
for msg in messages:
if msg.get('created_at'):
msg['created_at'] = self.convert_to_local(msg['created_at'])
# 转换时间戳为本地时间
if msg.get('timestamp'):
timestamp = msg['timestamp']
local_dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=self.timezone_offset)))
msg['local_timestamp'] = local_dt.strftime('%Y-%m-%d %H:%M:%S')
conn.close()
return messages
def get_message_count(self, from_number: Optional[str] = None,
search: Optional[str] = None) -> int:
"""获取短信总数"""
conn = self.get_connection()
cursor = conn.cursor()
where_clauses = []
params = []
if from_number:
where_clauses.append("from_number = ?")
params.append(from_number)
if search:
where_clauses.append("(content LIKE ? OR from_number LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
cursor.execute(f'SELECT COUNT(*) as count FROM sms_messages WHERE {where_sql}', params)
count = cursor.fetchone()['count']
conn.close()
return count
def get_message_by_id(self, message_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取短信"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM sms_messages WHERE id = ?', (message_id,))
row = cursor.fetchone()
conn.close()
if row:
msg = dict(row)
# 转换时间为本地时区
if msg.get('created_at'):
msg['created_at'] = self.convert_to_local(msg['created_at'])
# 转换时间戳为本地时间
if msg.get('timestamp'):
timestamp = msg['timestamp']
local_dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=self.timezone_offset)))
msg['local_timestamp'] = local_dt.strftime('%Y-%m-%d %H:%M:%S')
return msg
return None
def get_recent_messages(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取最近的短信"""
return self.get_messages(page=1, limit=limit)
def get_from_numbers(self) -> List[Dict[str, Any]]:
"""获取所有发送方号码"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT from_number, COUNT(*) as count, MAX(timestamp) as last_time
FROM sms_messages
GROUP BY from_number
ORDER BY count DESC
''')
numbers = [dict(row) for row in cursor.fetchall()]
conn.close()
return numbers
def get_logs(self, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
"""获取接收日志"""
conn = self.get_connection()
cursor = conn.cursor()
offset = (page - 1) * limit
cursor.execute('''
SELECT * FROM receive_logs
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (limit, offset))
logs = [dict(row) for row in cursor.fetchall()]
# 转换时间为本地时区
for log in logs:
if log.get('created_at'):
log['created_at'] = self.convert_to_local(log['created_at'])
conn.close()
return logs
def cleanup_old_messages(self, days: int = 30, max_messages: int = 10000):
"""清理老数据"""
conn = self.get_connection()
cursor = conn.cursor()
# 按天数清理
cutoff_date = datetime.now() - timedelta(days=days)
cursor.execute('''
DELETE FROM sms_messages
WHERE created_at < ?
''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
deleted_count = cursor.rowcount
# 按数量清理
cursor.execute('''
DELETE FROM sms_messages
WHERE id NOT IN (
SELECT id FROM sms_messages
ORDER BY created_at DESC
LIMIT ?
)
''', (max_messages,))
deleted_count += cursor.rowcount
conn.commit()
conn.close()
return deleted_count
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
import pytz
conn = self.get_connection()
cursor = conn.cursor()
# 获取总短信数
cursor.execute('SELECT COUNT(*) as total FROM sms_messages')
total = cursor.fetchone()['total']
# 使用本地时区计算今日和本周的 UTC 时间范围
local_tz = pytz.timezone(current_app.config['TIMEZONE'])
local_now = datetime.now(local_tz)
# 今日本地时间的开始和结束
today_start_local = local_now.replace(hour=0, minute=0, second=0, microsecond=0)
today_end_local = today_start_local + timedelta(days=1)
# 转换为 UTC 时间
today_start_utc = today_start_local.astimezone(pytz.UTC)
today_end_utc = today_end_local.astimezone(pytz.UTC)
# 今日短信数
cursor.execute('''
SELECT COUNT(*) as today FROM sms_messages
WHERE created_at >= ? AND created_at < ?
''', (today_start_utc.strftime('%Y-%m-%d %H:%M:%S'), today_end_utc.strftime('%Y-%m-%d %H:%M:%S')))
today = cursor.fetchone()['today']
# 本周开始时间(本周一 00:00:00 本地时间)
weekday = local_now.weekday()
week_start_local = (local_now - timedelta(days=weekday)).replace(hour=0, minute=0, second=0, microsecond=0)
week_start_utc = week_start_local.astimezone(pytz.UTC)
# 本周短信数
cursor.execute('''
SELECT COUNT(*) as week FROM sms_messages
WHERE created_at >= ?
''', (week_start_utc.strftime('%Y-%m-%d %H:%M:%S'),))
week = cursor.fetchone()['week']
# 签名验证占比
cursor.execute('''
SELECT
SUM(CASE WHEN sign_verified = 1 THEN 1 ELSE 0 END) as verified,
SUM(CASE WHEN sign_verified = 0 THEN 1 ELSE 0 END) as unverified
FROM sms_messages
''')
row = cursor.fetchone()
conn.close()
return {
'total': total,
'today': today,
'week': week,
'verified': row['verified'] or 0,
'unverified': row['unverified'] or 0
}