Files
smsweb/database.py
OpenClaw Agent a0fecb2755 修复统计数据时区问题: 使用本地时区(Asia/Shanghai)计算今日和本周统计
问题: 原代码使用SQLite的DATE('now')获取的是UTC时间,不是本地时间
解决: 在Python中使用pytz计算本地时区的今日/本周时间范围,转换为UTC后查询数据库
2026-02-07 20:28:49 +00:00

373 lines
12 KiB
Python

"""
数据库模型
"""
import sqlite3
import json
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Dict, Any
import os
class Database:
"""数据库管理类"""
def __init__(self, db_path: str, timezone_offset: int = 8):
"""初始化数据库
Args:
db_path: 数据库文件路径
timezone_offset: 时区偏移(小时),默认 UTC+8
"""
self.db_path = db_path
self.timezone_offset = timezone_offset
self.init_db()
def convert_to_local(self, dt_str: Optional[str]) -> Optional[str]:
"""将 UTC 时间字符串转换为本地时间字符串
Args:
dt_str: UTC 时间字符串 (格式: 2024-01-01 00:00:00)
Returns:
本地时间字符串
"""
if not dt_str:
return None
try:
# 解析 UTC 时间
utc_dt = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
# 添加时区
utc_dt = utc_dt.replace(tzinfo=timezone.utc)
# 转换为本地时间
local_dt = utc_dt + timedelta(hours=self.timezone_offset)
return local_dt.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return dt_str
def get_connection(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(self):
"""初始化数据库表"""
conn = self.get_connection()
cursor = conn.cursor()
# 创建短信表
cursor.execute('''
CREATE TABLE IF NOT EXISTS sms_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_number TEXT NOT NULL,
content TEXT NOT NULL,
timestamp INTEGER NOT NULL,
device_info TEXT,
sim_info TEXT,
sign_verified INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address TEXT
)
''')
# 创建接收日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS receive_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_number TEXT,
content TEXT,
timestamp INTEGER,
sign TEXT,
sign_valid INTEGER,
ip_address TEXT,
status TEXT NOT NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_from_number ON sms_messages(from_number)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON sms_messages(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON sms_messages(created_at)')
conn.commit()
conn.close()
def add_message(self, from_number: str, content: str, timestamp: int,
device_info: Optional[str] = None, sim_info: Optional[str] = None,
sign_verified: bool = False, ip_address: Optional[str] = None) -> int:
"""添加短信"""
conn = self.get_connection()
cursor = conn.cursor()
# 存储为 UTC 时间
utc_now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO sms_messages
(from_number, content, timestamp, device_info, sim_info, sign_verified, ip_address, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (from_number, content, timestamp, device_info, sim_info, sign_verified, ip_address, utc_now))
message_id = cursor.lastrowid
conn.commit()
conn.close()
return message_id
def add_log(self, from_number: Optional[str], content: Optional[str],
timestamp: Optional[int], sign: Optional[str],
sign_valid: Optional[bool], ip_address: Optional[str],
status: str, error_message: Optional[str] = None) -> int:
"""添加接收日志"""
conn = self.get_connection()
cursor = conn.cursor()
sign_valid_int = 1 if sign_valid else 0 if sign_valid is not None else None
# 存储为 UTC 时间
utc_now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO receive_logs
(from_number, content, timestamp, sign, sign_valid, ip_address, status, error_message, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (from_number, content, timestamp, sign, sign_valid_int, ip_address, status, error_message, utc_now))
log_id = cursor.lastrowid
conn.commit()
conn.close()
return log_id
def get_messages(self, page: int = 1, limit: int = 50,
from_number: Optional[str] = None,
search: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取短信列表"""
conn = self.get_connection()
cursor = conn.cursor()
offset = (page - 1) * limit
where_clauses = []
params = []
if from_number:
where_clauses.append("from_number = ?")
params.append(from_number)
if search:
where_clauses.append("(content LIKE ? OR from_number LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
query = f'''
SELECT * FROM sms_messages
WHERE {where_sql}
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
'''
params.extend([limit, offset])
cursor.execute(query, params)
messages = [dict(row) for row in cursor.fetchall()]
# 转换时间为本地时区
for msg in messages:
if msg.get('created_at'):
msg['created_at'] = self.convert_to_local(msg['created_at'])
# 转换时间戳为本地时间
if msg.get('timestamp'):
timestamp = msg['timestamp']
local_dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=self.timezone_offset)))
msg['local_timestamp'] = local_dt.strftime('%Y-%m-%d %H:%M:%S')
conn.close()
return messages
def get_message_count(self, from_number: Optional[str] = None,
search: Optional[str] = None) -> int:
"""获取短信总数"""
conn = self.get_connection()
cursor = conn.cursor()
where_clauses = []
params = []
if from_number:
where_clauses.append("from_number = ?")
params.append(from_number)
if search:
where_clauses.append("(content LIKE ? OR from_number LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
cursor.execute(f'SELECT COUNT(*) as count FROM sms_messages WHERE {where_sql}', params)
count = cursor.fetchone()['count']
conn.close()
return count
def get_message_by_id(self, message_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取短信"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM sms_messages WHERE id = ?', (message_id,))
row = cursor.fetchone()
conn.close()
if row:
msg = dict(row)
# 转换时间为本地时区
if msg.get('created_at'):
msg['created_at'] = self.convert_to_local(msg['created_at'])
# 转换时间戳为本地时间
if msg.get('timestamp'):
timestamp = msg['timestamp']
local_dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=self.timezone_offset)))
msg['local_timestamp'] = local_dt.strftime('%Y-%m-%d %H:%M:%S')
return msg
return None
def get_recent_messages(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取最近的短信"""
return self.get_messages(page=1, limit=limit)
def get_from_numbers(self) -> List[Dict[str, Any]]:
"""获取所有发送方号码"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT from_number, COUNT(*) as count, MAX(timestamp) as last_time
FROM sms_messages
GROUP BY from_number
ORDER BY count DESC
''')
numbers = [dict(row) for row in cursor.fetchall()]
conn.close()
return numbers
def get_logs(self, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
"""获取接收日志"""
conn = self.get_connection()
cursor = conn.cursor()
offset = (page - 1) * limit
cursor.execute('''
SELECT * FROM receive_logs
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (limit, offset))
logs = [dict(row) for row in cursor.fetchall()]
# 转换时间为本地时区
for log in logs:
if log.get('created_at'):
log['created_at'] = self.convert_to_local(log['created_at'])
conn.close()
return logs
def cleanup_old_messages(self, days: int = 30, max_messages: int = 10000):
"""清理老数据"""
conn = self.get_connection()
cursor = conn.cursor()
# 按天数清理
cutoff_date = datetime.now() - timedelta(days=days)
cursor.execute('''
DELETE FROM sms_messages
WHERE created_at < ?
''', (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
deleted_count = cursor.rowcount
# 按数量清理
cursor.execute('''
DELETE FROM sms_messages
WHERE id NOT IN (
SELECT id FROM sms_messages
ORDER BY created_at DESC
LIMIT ?
)
''', (max_messages,))
deleted_count += cursor.rowcount
conn.commit()
conn.close()
return deleted_count
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
import pytz
conn = self.get_connection()
cursor = conn.cursor()
# 获取总短信数
cursor.execute('SELECT COUNT(*) as total FROM sms_messages')
total = cursor.fetchone()['total']
# 使用本地时区计算今日和本周的 UTC 时间范围
local_tz = pytz.timezone(self.timezone)
local_now = datetime.now(local_tz)
# 今日本地时间的开始和结束
today_start_local = local_now.replace(hour=0, minute=0, second=0, microsecond=0)
today_end_local = today_start_local + timedelta(days=1)
# 转换为 UTC 时间
today_start_utc = today_start_local.astimezone(pytz.UTC)
today_end_utc = today_end_local.astimezone(pytz.UTC)
# 今日短信数
cursor.execute('''
SELECT COUNT(*) as today FROM sms_messages
WHERE created_at >= ? AND created_at < ?
''', (today_start_utc.strftime('%Y-%m-%d %H:%M:%S'), today_end_utc.strftime('%Y-%m-%d %H:%M:%S')))
today = cursor.fetchone()['today']
# 本周开始时间(本周一 00:00:00 本地时间)
weekday = local_now.weekday()
week_start_local = (local_now - timedelta(days=weekday)).replace(hour=0, minute=0, second=0, microsecond=0)
week_start_utc = week_start_local.astimezone(pytz.UTC)
# 本周短信数
cursor.execute('''
SELECT COUNT(*) as week FROM sms_messages
WHERE created_at >= ?
''', (week_start_utc.strftime('%Y-%m-%d %H:%M:%S'),))
week = cursor.fetchone()['week']
# 签名验证占比
cursor.execute('''
SELECT
SUM(CASE WHEN sign_verified = 1 THEN 1 ELSE 0 END) as verified,
SUM(CASE WHEN sign_verified = 0 THEN 1 ELSE 0 END) as unverified
FROM sms_messages
''')
row = cursor.fetchone()
conn.close()
return {
'total': total,
'today': today,
'week': week,
'verified': row['verified'] or 0,
'unverified': row['unverified'] or 0
}