当前位置: 首页 > news >正文

访问日志查询功能

支持分页、时间范围过滤和多种条件过滤,方便前端显示和管理API访问记录

from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta
from sqlalchemy import func, extract, and_, or_
import osapp = Flask(__name__)# 数据库配置
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///api_monitor.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Falsedb = SQLAlchemy(app)class APIAccess(db.Model):__tablename__ = 'api_access'id = db.Column(db.Integer, primary_key=True, index=True)endpoint = db.Column(db.String(200), nullable=False)client_ip = db.Column(db.String(45))user_id = db.Column(db.Integer, nullable=True)response_status = db.Column(db.Integer)created_at = db.Column(db.DateTime, default=datetime.utcnow)update_at = db.Column(db.String)def to_dict(self):return {'id': self.id,'endpoint': self.endpoint,'client_ip': self.client_ip,'user_id': self.user_id,'response_status': self.response_status,'created_at': self.created_at.isoformat() if self.created_at else None,'update_at': self.update_at}# 创建数据库表
def init_db():with app.app_context():db.create_all()# API 监控端点
@app.route('/api/monitor/stats', methods=['GET'])
def get_stats():"""获取总体统计信息"""# 时间范围参数days = int(request.args.get('days', 7))start_date = datetime.utcnow() - timedelta(days=days)# 总访问量total_requests = APIAccess.query.filter(APIAccess.created_at >= start_date).count()# 成功请求 (2xx)success_requests = APIAccess.query.filter(and_(APIAccess.created_at >= start_date, APIAccess.response_status >= 200, APIAccess.response_status < 300)).count()# 错误请求 (4xx, 5xx)error_requests = APIAccess.query.filter(and_(APIAccess.created_at >= start_date,or_(and_(APIAccess.response_status >= 400, APIAccess.response_status < 500),and_(APIAccess.response_status >= 500, APIAccess.response_status < 600)))).count()# 成功率success_rate = (success_requests / total_requests * 100) if total_requests > 0 else 0return jsonify({'total_requests': total_requests,'success_requests': success_requests,'error_requests': error_requests,'success_rate': round(success_rate, 2),'period_days': days})@app.route('/api/monitor/endpoints', methods=['GET'])
def get_endpoint_stats():"""按端点统计访问信息"""days = int(request.args.get('days', 7))start_date = datetime.utcnow() - timedelta(days=days)# 按端点分组统计endpoint_stats = db.session.query(APIAccess.endpoint,func.count(APIAccess.id).label('total_requests'),func.avg(APIAccess.response_status).label('avg_status'),func.min(APIAccess.created_at).label('first_access'),func.max(APIAccess.created_at).label('last_access')).filter(APIAccess.created_at >= start_date)\.group_by(APIAccess.endpoint)\.order_by(func.count(APIAccess.id).desc())\.all()result = []for stat in endpoint_stats:result.append({'endpoint': stat.endpoint,'total_requests': stat.total_requests,'avg_status': round(stat.avg_status, 2) if stat.avg_status else None,'first_access': stat.first_access.isoformat() if stat.first_access else None,'last_access': stat.last_access.isoformat() if stat.last_access else None})return jsonify(result)@app.route('/api/monitor/users', methods=['GET'])
def get_user_stats():"""按用户统计访问信息"""days = int(request.args.get('days', 7))start_date = datetime.utcnow() - timedelta(days=days)user_stats = db.session.query(APIAccess.user_id,func.count(APIAccess.id).label('total_requests'),func.count(func.distinct(APIAccess.endpoint)).label('unique_endpoints')).filter(and_(APIAccess.created_at >= start_date, APIAccess.user_id.isnot(None))).group_by(APIAccess.user_id)\.order_by(func.count(APIAccess.id).desc())\.limit(20)\.all()result = []for stat in user_stats:result.append({'user_id': stat.user_id,'total_requests': stat.total_requests,'unique_endpoints': stat.unique_endpoints})return jsonify(result)@app.route('/api/monitor/status-codes', methods=['GET'])
def get_status_code_stats():"""按状态码统计"""days = int(request.args.get('days', 7))start_date = datetime.utcnow() - timedelta(days=days)status_stats = db.session.query(APIAccess.response_status,func.count(APIAccess.id).label('count')).filter(APIAccess.created_at >= start_date)\.group_by(APIAccess.response_status)\.order_by(func.count(APIAccess.id).desc())\.all()result = []for stat in status_stats:result.append({'status_code': stat.response_status,'count': stat.count})return jsonify(result)@app.route('/api/monitor/hourly', methods=['GET'])
def get_hourly_stats():"""按小时统计访问量"""days = int(request.args.get('days', 1))start_date = datetime.utcnow() - timedelta(days=days)hourly_stats = db.session.query(extract('hour', APIAccess.created_at).label('hour'),func.count(APIAccess.id).label('requests')).filter(APIAccess.created_at >= start_date)\.group_by(extract('hour', APIAccess.created_at))\.order_by(extract('hour', APIAccess.created_at))\.all()result = []for stat in hourly_stats:result.append({'hour': int(stat.hour),'requests': stat.requests})return jsonify(result)@app.route('/api/monitor/logs', methods=['GET'])
def get_access_logs():"""获取访问日志"""# 分页参数page = int(request.args.get('page', 1))per_page = int(request.args.get('per_page', 50))# 过滤参数endpoint = request.args.get('endpoint')user_id = request.args.get('user_id')status_code = request.args.get('status_code')days = int(request.args.get('days', 7))start_date = datetime.utcnow() - timedelta(days=days)query = APIAccess.query.filter(APIAccess.created_at >= start_date)if endpoint:query = query.filter(APIAccess.endpoint.like(f'%{endpoint}%'))if user_id:query = query.filter(APIAccess.user_id == int(user_id))if status_code:query = query.filter(APIAccess.response_status == int(status_code))# 按创建时间倒序logs = query.order_by(APIAccess.created_at.desc())\.paginate(page=page, per_page=per_page, error_out=False)result = {'logs': [log.to_dict() for log in logs.items],'pagination': {'page': logs.page,'per_page': logs.per_page,'total': logs.total,'pages': logs.pages,'has_next': logs.has_next,'has_prev': logs.has_prev}}return jsonify(result)@app.route('/api/monitor/errors', methods=['GET'])
def get_error_analysis():"""错误分析"""days = int(request.args.get('days', 7))start_date = datetime.utcnow() - timedelta(days=days)# 按状态码分组的错误统计error_stats = db.session.query(APIAccess.response_status,APIAccess.endpoint,func.count(APIAccess.id).label('count')).filter(and_(APIAccess.created_at >= start_date,APIAccess.response_status >= 400)).group_by(APIAccess.response_status, APIAccess.endpoint)\.order_by(func.count(APIAccess.id).desc())\.all()result = []for stat in error_stats:result.append({'status_code': stat.response_status,'endpoint': stat.endpoint,'count': stat.count})return jsonify(result)if __name__ == '__main__':init_db()app.run(debug=True, host='0.0.0.0', port=5000)
@app.route('/api/stats/overview', methods=['GET'])
def get_overview_stats():"""获取API监控概览统计信息返回:- total_requests: 总请求数量- total_users: 总用户数量(去重)- today_requests: 今日请求数量- today_users: 今日用户数量(去重)"""try:# 获取今天的日期范围today = date.today()today_start = datetime.combine(today, datetime.min.time())today_end = datetime.combine(today, datetime.max.time())# 1. 总请求数量total_requests = APIAccess.query.count()# 2. 总用户数量(去重,排除NULL值)total_users_query = db.session.query(func.count(distinct(APIAccess.user_id))).filter(APIAccess.user_id.isnot(None))total_users = total_users_query.scalar() or 0# 3. 今日请求数量today_requests = APIAccess.query.filter(APIAccess.created_at >= today_start,APIAccess.created_at <= today_end).count()# 4. 今日用户数量(去重,排除NULL值)today_users_query = db.session.query(func.count(distinct(APIAccess.user_id))).filter(APIAccess.user_id.isnot(None),APIAccess.created_at >= today_start,APIAccess.created_at <= today_end)today_users = today_users_query.scalar() or 0# 返回统计结果result = {'total_requests': total_requests,'total_users': total_users,'today_requests': today_requests,'today_users': today_users,'date': today.isoformat(),'timestamp': datetime.utcnow().isoformat()}return jsonify(result)except Exception as e:return jsonify({'error': '获取统计信息失败','message': str(e)}), 500
@app.route('/api/stats/chart', methods=['GET'])
def get_chart_data():"""获取访问量折线图数据查询参数:- start_date: 开始日期 (格式: YYYY-MM-DD,可选)- end_date: 结束日期 (格式: YYYY-MM-DD,可选)- endpoint: 端点路径过滤(可选)- user_id: 用户ID过滤(可选)如果不提供日期,默认最近7天"""try:# 获取日期参数start_date_str = request.args.get('start_date')end_date_str = request.args.get('end_date')endpoint = request.args.get('endpoint')user_id = request.args.get('user_id')# 处理日期范围if start_date_str and end_date_str:try:start_date = datetime.strptime(start_date_str, '%Y-%m-%d')end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')except ValueError:return jsonify({'error': '日期格式错误,请使用 YYYY-MM-DD 格式'}), 400else:# 默认最近7天end_date = datetime.utcnow()start_date = end_date - timedelta(days=7)# 构建查询条件conditions = [APIAccess.created_at >= start_date, APIAccess.created_at <= end_date]if endpoint:conditions.append(APIAccess.endpoint.like(f'%{endpoint}%'))if user_id:try:conditions.append(APIAccess.user_id == int(user_id))except ValueError:return jsonify({'error': '无效的用户ID'}), 400# 计算天数差,决定聚合方式days_diff = (end_date - start_date).daysif days_diff <= 1:# 1天内按小时显示chart_data = get_hourly_data(conditions, start_date, end_date)else:# 多天按日期显示chart_data = get_daily_data(conditions, start_date, end_date)# 计算总访问量total_requests = db.session.query(func.count(APIAccess.id)).filter(*conditions).scalar()result = {'labels': chart_data['labels'],'data': chart_data['data'],'period': chart_data['period'],'total': total_requests,'date_range': {'start': start_date.strftime('%Y-%m-%d'),'end': end_date.strftime('%Y-%m-%d')},'filters': {'endpoint': endpoint,'user_id': user_id}}return jsonify(result)except Exception as e:return jsonify({'error': '获取图表数据失败','message': str(e)}), 500def get_daily_data(conditions, start_date, end_date):"""按天聚合数据"""daily_stats = db.session.query(func.date(APIAccess.created_at).label('date'),func.count(APIAccess.id).label('count')).filter(*conditions)\.group_by(func.date(APIAccess.created_at))\.order_by(func.date(APIAccess.created_at))\.all()# 转换为字典stats_dict = {stat.date.isoformat(): stat.count for stat in daily_stats}# 生成完整的时间序列labels = []data = []current = start_date.date()end = end_date.date()while current <= end:date_str = current.isoformat()labels.append(current.strftime('%m-%d'))data.append(stats_dict.get(date_str, 0))current += timedelta(days=1)return {'labels': labels,'data': data,'period': 'daily'}def get_hourly_data(conditions, start_date, end_date):"""按小时聚合数据"""hourly_stats = db.session.query(extract('hour', APIAccess.created_at).label('hour'),func.count(APIAccess.id).label('count')).filter(*conditions)\.group_by(extract('hour', APIAccess.created_at))\.order_by(extract('hour', APIAccess.created_at))\.all()# 转换为字典stats_dict = {int(stat.hour): stat.count for stat in hourly_stats}# 生成24小时的数据labels = []data = []for hour in range(24):labels.append(f"{hour:02d}:00")data.append(stats_dict.get(hour, 0))return {'labels': labels,'data': data,'period': 'hourly'}
from flask import Flask, jsonify, request
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy import create_engine, func, extract, and_, or_
from datetime import datetime, timedelta
import os
from contextlib import contextmanager# 数据库配置
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///api_monitor.db')# 创建引擎
engine = create_engine(DATABASE_URL, echo=False)# 创建 SessionLocal
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)# 导入模型(需要先定义APIAccess模型)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()class APIAccess(Base):__tablename__ = 'api_access'id = Base.Column(Base.Integer, primary_key=True, index=True)endpoint = Base.Column(Base.String(200), nullable=False)client_ip = Base.Column(Base.String(45))user_id = Base.Column(Base.Integer, nullable=True)response_status = Base.Column(Base.Integer)created_at = Base.Column(Base.DateTime, default=datetime.utcnow)update_at = Base.Column(Base.DateTime, default=datetime.utcnow)app = Flask(__name__)@contextmanager
def get_db():"""数据库会话上下文管理器"""session = SessionLocal()try:yield sessionfinally:session.close()@app.route('/api/stats/overview', methods=['GET'])
def get_overview_stats():"""获取概览统计信息"""try:with get_db() as session:# 获取今天的日期范围today = datetime.utcnow().date()today_start = datetime.combine(today, datetime.min.time())today_end = datetime.combine(today, datetime.max.time())# 总请求数量total_requests = session.query(func.count(APIAccess.id)).scalar()# 总用户数量(去重)total_users = session.query(func.count(func.distinct(APIAccess.user_id)))\.filter(APIAccess.user_id.isnot(None)).scalar() or 0# 今日请求数量today_requests = session.query(func.count(APIAccess.id))\.filter(APIAccess.created_at >= today_start,APIAccess.created_at <= today_end).scalar()# 今日用户数量(去重)today_users = session.query(func.count(func.distinct(APIAccess.user_id)))\.filter(APIAccess.user_id.isnot(None),APIAccess.created_at >= today_start,APIAccess.created_at <= today_end).scalar() or 0result = {'total_requests': total_requests,'total_users': total_users,'today_requests': today_requests,'today_users': today_users,'date': today.isoformat(),'timestamp': datetime.utcnow().isoformat()}return jsonify(result)except Exception as e:return jsonify({'error': '获取统计信息失败','message': str(e)}), 500@app.route('/api/stats/chart', methods=['GET'])
def get_chart_data():"""获取访问量折线图数据"""try:# 获取参数start_date_str = request.args.get('start_date')end_date_str = request.args.get('end_date')endpoint = request.args.get('endpoint')user_id = request.args.get('user_id')# 处理日期范围if start_date_str and end_date_str:try:start_date = datetime.strptime(start_date_str, '%Y-%m-%d')end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')except ValueError:return jsonify({'error': '日期格式错误,请使用 YYYY-MM-DD 格式'}), 400else:# 默认最近7天end_date = datetime.utcnow()start_date = end_date - timedelta(days=7)with get_db() as session:# 构建查询条件conditions = [APIAccess.created_at >= start_date, APIAccess.created_at <= end_date]if endpoint:conditions.append(APIAccess.endpoint.like(f'%{endpoint}%'))if user_id:try:conditions.append(APIAccess.user_id == int(user_id))except ValueError:return jsonify({'error': '无效的用户ID'}), 400# 计算天数差,决定聚合方式days_diff = (end_date - start_date).daysif days_diff <= 1:# 1天内按小时显示chart_data = get_hourly_data_session(session, conditions, start_date, end_date)else:# 多天按日期显示chart_data = get_daily_data_session(session, conditions, start_date, end_date)# 计算总访问量total_requests = session.query(func.count(APIAccess.id)).filter(*conditions).scalar()result = {'labels': chart_data['labels'],'data': chart_data['data'],'period': chart_data['period'],'total': total_requests,'date_range': {'start': start_date.strftime('%Y-%m-%d'),'end': end_date.strftime('%Y-%m-%d')},'filters': {'endpoint': endpoint,'user_id': user_id}}return jsonify(result)except Exception as e:return jsonify({'error': '获取图表数据失败','message': str(e)}), 500def get_daily_data_session(session, conditions, start_date, end_date):"""按天聚合数据(使用session参数)"""daily_stats = session.query(func.date(APIAccess.created_at).label('date'),func.count(APIAccess.id).label('count')).filter(*conditions)\.group_by(func.date(APIAccess.created_at))\.order_by(func.date(APIAccess.created_at))\.all()# 转换为字典stats_dict = {stat.date.isoformat(): stat.count for stat in daily_stats}# 生成完整的时间序列labels = []data = []current = start_date.date()end = end_date.date()while current <= end:date_str = current.isoformat()labels.append(current.strftime('%m-%d'))data.append(stats_dict.get(date_str, 0))current += timedelta(days=1)return {'labels': labels,'data': data,'period': 'daily'}def get_hourly_data_session(session, conditions, start_date, end_date):"""按小时聚合数据(使用session参数)"""hourly_stats = session.query(extract('hour', APIAccess.created_at).label('hour'),func.count(APIAccess.id).label('count')).filter(*conditions)\.group_by(extract('hour', APIAccess.created_at))\.order_by(extract('hour', APIAccess.created_at))\.all()# 转换为字典stats_dict = {int(stat.hour): stat.count for stat in hourly_stats}# 生成24小时的数据labels = []data = []for hour in range(24):labels.append(f"{hour:02d}:00")data.append(stats_dict.get(hour, 0))return {'labels': labels,'data': data,'period': 'hourly'}@app.route('/api/logs', methods=['GET'])
def get_access_logs():"""获取访问日志信息"""try:# 获取分页参数page = int(request.args.get('page', 1))per_page = int(request.args.get('per_page', 50))# 获取过滤参数start_date_str = request.args.get('start_date')end_date_str = request.args.get('end_date')endpoint = request.args.get('endpoint')user_id = request.args.get('user_id')status_code = request.args.get('status_code')with get_db() as session:# 构建查询query = session.query(APIAccess)# 时间范围过滤if start_date_str and end_date_str:try:start_date = datetime.strptime(start_date_str, '%Y-%m-%d')end_date = datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S')query = query.filter(APIAccess.created_at >= start_date, APIAccess.created_at <= end_date)except ValueError:return jsonify({'error': '日期格式错误,请使用 YYYY-MM-DD 格式'}), 400else:# 默认最近7天end_date = datetime.utcnow()start_date = end_date - timedelta(days=7)query = query.filter(APIAccess.created_at >= start_date, APIAccess.created_at <= end_date)# 其他过滤条件if endpoint:query = query.filter(APIAccess.endpoint.like(f'%{endpoint}%'))if user_id:try:query = query.filter(APIAccess.user_id == int(user_id))except ValueError:return jsonify({'error': '无效的用户ID'}), 400if status_code:try:query = query.filter(APIAccess.response_status == int(status_code))except ValueError:return jsonify({'error': '无效的状态码'}), 400# 获取总数total = query.count()# 计算分页offset = (page - 1) * per_page# 按时间倒序,分页查询logs = query.order_by(APIAccess.created_at.desc()).offset(offset).limit(per_page).all()# 格式化日志数据logs_data = []for log in logs:logs_data.append({'id': log.id,'time': log.created_at.isoformat() if log.created_at else None,'endpoint': log.endpoint,'user': log.user_id,'status_code': log.response_status,'client_ip': log.client_ip})# 计算总页数pages = (total + per_page - 1) // per_page# 返回结果result = {'logs': logs_data,'pagination': {'page': page,'per_page': per_page,'total': total,'pages': pages,'has_next': page < pages,'has_prev': page > 1},'filters': {'start_date': start_date_str,'end_date': end_date_str,'endpoint': endpoint,'user_id': user_id,'status_code': status_code}}return jsonify(result)except Exception as e:return jsonify({'error': '获取访问日志失败','message': str(e)}), 500if __name__ == '__main__':# 创建表Base.metadata.create_all(bind=engine)app.run(debug=True, host='0.0.0.0', port=5000)

http://www.dtcms.com/a/602195.html

相关文章:

  • vite创建vue2项目
  • 【MATLAB例程】二维平面的TOA定位,几何精度因子GDOP和克拉美罗下界CRLB计算与输出
  • 怎么创一个网站赚钱免费入驻的外贸平台
  • 云边云科技SD-WAN解决方案 — 构建安全、高效、智能的云网基石
  • 20251112给荣品RD-RK3588开发板跑Rockchip的原厂Android13系统时适配AP6275P模块的BT蓝牙部分【使用原厂的DTS】
  • MyBatis 专题深度细化解析
  • a做爰视频免费观费网站asp网站如何迁移
  • 网站推广平台wordpress怎么加属性
  • 文创做的好的网站推荐微信公众号属于网站建设
  • 1. Cockpit 管理服务器;2. Linux 软件包管理
  • 【剑斩OFFER】算法的暴力美学——山脉数组的蜂顶索引
  • 关键词挖掘工具有哪些兰州seo优化
  • LeetCode 热题 100——哈希——最长连续序列
  • c语言反编译软件|详细解析c语言反编译工具的使用及其重要性
  • 模板网站更改青海制作网站的公司
  • 牛客:栈的压入、弹出序列
  • 深入解析UDP服务器核心开发机制
  • 阜阳做网站的公司网站开发前端跟后端的区别
  • MongoDB知识点与技巧总结
  • 企业网站 设计国外免费建站网站不用下载
  • LeetCode算法学习之数组中的第K个最大元素
  • 应急调度系统让每一次救援都精准到位
  • RL机器人人库使用简介
  • 北京网络公司建站百度爱采购优化排名软件
  • 长沙微网站开发重庆网站建设及优化公司
  • Java集合框架深度剖析 — 从源码看ArrayList、HashMap的设计与优化
  • 网站关键词排名快速提升thinkphp建站网址
  • JavaScript中??、、||、?.运算的区别
  • 微信公众号网站怎么做上海网站开发报价
  • Python压缩音乐文件大小