""" 短信转发接收端主应用 """ import os import logging import logging.handlers from datetime import datetime, timedelta, timezone from functools import wraps import hashlib from flask import Flask, request, jsonify, render_template, redirect, url_for, flash, session, make_response from functools import update_wrapper from config import config, Config from database import Database from sign_verify import verify_from_app def no_cache(f): """禁用缓存的装饰器""" def new_func(*args, **kwargs): resp = make_response(f(*args, **kwargs)) resp.cache_control.no_cache = True resp.cache_control.no_store = True resp.cache_control.max_age = 0 return resp return update_wrapper(new_func, f) # 初始化应用 def create_app(config_name='default'): # 从配置文件加载 app_config = Config.load_from_json('config.json') app = Flask(__name__) app.secret_key = app_config.SECRET_KEY # 应用配置 app.config['HOST'] = app_config.HOST app.config['PORT'] = app_config.PORT app.config['DEBUG'] = app_config.DEBUG app.config['SECRET_KEY'] = app_config.SECRET_KEY app.config['SIGN_VERIFY'] = app_config.SIGN_VERIFY app.config['SIGN_MAX_AGE'] = app_config.SIGN_MAX_AGE app.config['DATABASE_PATH'] = app_config.DATABASE_PATH app.config['MAX_MESSAGES'] = app_config.MAX_MESSAGES app.config['AUTO_CLEANUP'] = app_config.AUTO_CLEANUP app.config['CLEANUP_DAYS'] = app_config.CLEANUP_DAYS app.config['PER_PAGE'] = app_config.PER_PAGE app.config['REFRESH_INTERVAL'] = app_config.REFRESH_INTERVAL app.config['LOG_LEVEL'] = app_config.LOG_LEVEL app.config['LOG_FILE'] = app_config.LOG_FILE app.config['TIMEZONE'] = app_config.TIMEZONE app.config['API_TOKENS'] = app_config.API_TOKENS app.config['LOGIN_ENABLED'] = app_config.LOGIN_ENABLED app.config['LOGIN_USERNAME'] = app_config.LOGIN_USERNAME app.config['LOGIN_PASSWORD'] = app_config.LOGIN_PASSWORD app.config['SESSION_LIFETIME'] = app_config.SESSION_LIFETIME # 初始化日志 setup_logging(app) # 解析时区 try: timezone_name = app_config.TIMEZONE import pytz app.timezone = pytz.timezone(timezone_name) app.timezone_offset = app.timezone.utcoffset(datetime.now()).total_seconds() / 3600 except ImportError: # 如果没有 pytz,使用简单的时区偏移 app.timezone_offset = 8 # 默认 UTC+8 app.timezone = None app.logger.warning('pytz not installed, using simple timezone offset') # 初始化数据库 db = Database(app.config['DATABASE_PATH'], timezone_offset=app.timezone_offset) # 注册路由 register_routes(app, db) return app def setup_logging(app): """配置日志""" log_level = getattr(logging, app.config['LOG_LEVEL']) formatter = logging.Formatter( '[%(asctime)s] %(levelname)s in %(module)s: %(message)s' ) # 文件日志 log_file_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), app.config['LOG_FILE'] ) file_handler = logging.handlers.RotatingFileHandler( log_file_path, maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setFormatter(formatter) file_handler.setLevel(log_level) # 控制台日志 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.setLevel(log_level) app.logger.addHandler(file_handler) app.logger.addHandler(console_handler) app.logger.setLevel(log_level) def login_required(f): """登录验证装饰器""" @wraps(f) def decorated_function(*args, **kwargs): if app.config.get('LOGIN_ENABLED', True): if 'logged_in' not in session or not session['logged_in']: return redirect(url_for('login', next=request.url)) # 检查会话是否过期 last_activity = session.get('last_activity') if last_activity: session_lifetime = app.config.get('SESSION_LIFETIME', 3600) if datetime.now().timestamp() - last_activity > session_lifetime: session.clear() flash('会话已过期,请重新登录', 'info') return redirect(url_for('login', next=request.url)) # 更新最后活动时间 session['last_activity'] = datetime.now().timestamp() return f(*args, **kwargs) return decorated_function def register_routes(app, db): """注册路由""" @app.route('/login', methods=['GET', 'POST']) @no_cache def login(): """登录页面""" # 如果禁用了登录,直接跳转到首页 if not app.config.get('LOGIN_ENABLED', True): return redirect(url_for('index')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') if username == app.config['LOGIN_USERNAME'] and \ password == app.config['LOGIN_PASSWORD']: session['logged_in'] = True session['username'] = username session['login_time'] = datetime.now().timestamp() session['last_activity'] = datetime.now().timestamp() next_url = request.args.get('next') if next_url: return redirect(next_url) return redirect(url_for('index')) else: flash('用户名或密码错误', 'error') return render_template('login.html') @app.route('/logout') @no_cache def logout(): """登出""" session.clear() flash('已退出登录', 'info') return redirect(url_for('login')) @app.route('/') @login_required def index(): """首页 - 短信列表""" page = request.args.get('page', 1, type=int) limit = request.args.get('limit', app.config['PER_PAGE'], type=int) from_number = request.args.get('from', None) search = request.args.get('search', None) messages = db.get_messages(page, limit, from_number, search) total = db.get_message_count(from_number, search) total_pages = (total + limit - 1) // limit stats = db.get_statistics() from_numbers = db.get_from_numbers()[:20] return render_template('index.html', messages=messages, page=page, total_pages=total_pages, total=total, from_number=from_number, search=search, limit=limit, stats=stats, from_numbers=from_numbers, refresh_interval=app.config['REFRESH_INTERVAL']) @app.route('/message/') @login_required def message_detail(message_id): """短信详情""" message = db.get_message_by_id(message_id) if not message: flash('短信不存在', 'error') return redirect(url_for('index')) return render_template('message_detail.html', message=message) @app.route('/logs') @login_required def logs(): """查看接收日志""" page = request.args.get('page', 1, type=int) limit = request.args.get('limit', app.config['PER_PAGE'], type=int) logs = db.get_logs(page, limit) return render_template('logs.html', logs=logs, page=page, limit=limit) @app.route('/statistics') @login_required def statistics(): """统计信息""" stats = db.get_statistics() recent = db.get_recent_messages(20) from_numbers = db.get_from_numbers() return render_template('statistics.html', stats=stats, recent=recent, from_numbers=from_numbers, cleanup_days=app.config['CLEANUP_DAYS'], max_messages=app.config['MAX_MESSAGES']) @app.route('/settings') @login_required def settings(): """设置页面""" return render_template('settings.html', api_tokens=app.config['API_TOKENS'], login_enabled=app.config['LOGIN_ENABLED'], sign_verify=app.config['SIGN_VERIFY']) @app.route('/api/receive', methods=['POST']) def receive_sms(): """接收短信接口""" try: # 获取参数 from_number = request.form.get('from') content = request.form.get('content') timestamp_str = request.form.get('timestamp') sign = request.form.get('sign', '') token_param = request.form.get('token', '') ip_address = request.remote_addr # 验证必填字段 if not from_number or not content: db.add_log(from_number, content, None, sign, None, ip_address, 'error', '缺少必填参数 (from/content)') return jsonify({'error': '缺少必填参数'}), 400 # 如果提供了 token,查找对应的 secret secret = None if token_param: for token_config in app.config['API_TOKENS']: if token_config.get('enabled') and token_config.get('token') == token_param: secret = token_config.get('secret') break # 解析时间戳 timestamp = None if timestamp_str: try: timestamp = int(timestamp_str) except ValueError: db.add_log(from_number, content, None, sign, None, ip_address, 'error', f'时间戳格式错误: {timestamp_str}') return jsonify({'error': '时间戳格式错误'}), 400 # 验证签名 sign_verified = False if sign and secret and app.config['SIGN_VERIFY']: is_valid, message = verify_from_app( from_number, content, timestamp, sign, secret, app.config['SIGN_MAX_AGE'] ) if not is_valid: db.add_log(from_number, content, timestamp, sign, False, ip_address, 'error', f'签名验证失败: {message}') app.logger.warning(f'签名验证失败: {message}') return jsonify({'error': message}), 403 else: sign_verified = True app.logger.info(f'短信已签名验证: {from_number}') # 保存短信 message_id = db.add_message( from_number=from_number, content=content, timestamp=timestamp or int(datetime.now().timestamp() * 1000), device_info=request.form.get('device'), sim_info=request.form.get('sim'), sign_verified=sign_verified, ip_address=ip_address ) # 记录成功日志 db.add_log(from_number, content, timestamp, sign, sign_verified, ip_address, 'success') app.logger.info(f'收到短信: {from_number} -> {content[:50]}... (ID: {message_id})') return jsonify({ 'success': True, 'message_id': message_id, 'message': '短信已接收' }), 200 except Exception as e: app.logger.error(f'处理短信失败: {e}', exc_info=True) return jsonify({'error': '服务器内部错误'}), 500 @app.route('/api/messages', methods=['GET']) @login_required def api_messages(): """短信列表 API""" page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 20, type=int) from_number = request.args.get('from', None) search = request.args.get('search', None) messages = db.get_messages(page, limit, from_number, search) total = db.get_message_count(from_number, search) return jsonify({ 'success': True, 'data': messages, 'total': total, 'page': page, 'limit': limit }) @app.route('/api/statistics', methods=['GET']) @login_required def api_statistics(): """统计信息 API""" stats = db.get_statistics() return jsonify({ 'success': True, 'data': stats }) @app.route('/cleanup') @login_required def cleanup(): """清理老数据""" deleted = db.cleanup_old_messages( days=app.config['CLEANUP_DAYS'], max_messages=app.config['MAX_MESSAGES'] ) flash(f'已清理 {deleted} 条旧数据', 'success') return redirect(url_for('statistics')) @app.errorhandler(404) def not_found(e): return render_template('error.html', error='页面不存在'), 404 @app.errorhandler(500) def server_error(e): app.logger.error(f'服务器错误: {e}', exc_info=True) return render_template('error.html', error='服务器内部错误'), 500 if __name__ == '__main__': env = os.environ.get('FLASK_ENV', 'development') app = create_app(env) app.logger.info(f'启动短信接收服务 (环境: {env})') app.logger.info(f'数据库: {app.config["DATABASE_PATH"]}') app.logger.info(f'监听端口: {app.config["PORT"]}') app.logger.info(f'登录已启用: {app.config["LOGIN_ENABLED"]}') app.run( host=app.config['HOST'], port=app.config['PORT'], debug=app.config['DEBUG'] )