"""Alerts panel for proactive issue detection."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, ClassVar
from debug_toolbar.core.panel import Panel
if TYPE_CHECKING:
from debug_toolbar.core.context import RequestContext
@dataclass
class Alert:
"""Represents a single alert."""
title: str
message: str
severity: str
category: str
suggestion: str
[docs]
class AlertsPanel(Panel):
"""Panel displaying proactive alerts for potential issues.
Detects and warns about:
- Security issues (CSRF, insecure cookies, missing headers)
- Performance problems (slow queries, large responses)
- Database issues (N+1 queries)
- Configuration problems (debug mode in production)
"""
panel_id: ClassVar[str] = "AlertsPanel"
title: ClassVar[str] = "Alerts"
template: ClassVar[str] = "panels/alerts.html"
has_content: ClassVar[bool] = True
nav_title: ClassVar[str] = "Alerts"
SEVERITY_CRITICAL: ClassVar[str] = "critical"
SEVERITY_WARNING: ClassVar[str] = "warning"
SEVERITY_INFO: ClassVar[str] = "info"
CATEGORY_SECURITY: ClassVar[str] = "security"
CATEGORY_PERFORMANCE: ClassVar[str] = "performance"
CATEGORY_DATABASE: ClassVar[str] = "database"
CATEGORY_CONFIGURATION: ClassVar[str] = "configuration"
RESPONSE_SIZE_WARNING_BYTES: ClassVar[int] = 1024 * 1024
RESPONSE_SIZE_CRITICAL_BYTES: ClassVar[int] = 5 * 1024 * 1024
QUERY_TIME_WARNING_MS: ClassVar[float] = 100.0
QUERY_TIME_CRITICAL_MS: ClassVar[float] = 500.0
N_PLUS_ONE_THRESHOLD: ClassVar[int] = 3
N_PLUS_ONE_CRITICAL_THRESHOLD: ClassVar[int] = 10
[docs]
async def generate_stats(self, context: RequestContext) -> dict[str, Any]:
"""Generate alert statistics from context metadata."""
alerts: list[Alert] = []
alerts.extend(self._check_security_headers(context))
alerts.extend(self._check_csrf_protection(context))
alerts.extend(self._check_cookie_security(context))
alerts.extend(self._check_debug_mode(context))
alerts.extend(self._check_response_size(context))
alerts.extend(self._check_slow_queries(context))
alerts.extend(self._check_n_plus_one(context))
by_severity: dict[str, int] = {
self.SEVERITY_CRITICAL: 0,
self.SEVERITY_WARNING: 0,
self.SEVERITY_INFO: 0,
}
by_category: dict[str, int] = {
self.CATEGORY_SECURITY: 0,
self.CATEGORY_PERFORMANCE: 0,
self.CATEGORY_DATABASE: 0,
self.CATEGORY_CONFIGURATION: 0,
}
for alert in alerts:
by_severity[alert.severity] = by_severity.get(alert.severity, 0) + 1
by_category[alert.category] = by_category.get(alert.category, 0) + 1
alert_dicts = [
{
"title": a.title,
"message": a.message,
"severity": a.severity,
"category": a.category,
"suggestion": a.suggestion,
}
for a in alerts
]
return {
"alerts": alert_dicts,
"total_alerts": len(alerts),
"by_severity": by_severity,
"by_category": by_category,
}
def _check_security_headers(self, context: RequestContext) -> list[Alert]:
"""Check for missing security headers."""
alerts = []
headers_data = context.get_panel_data("HeadersPanel")
if not headers_data:
return alerts
response_headers = headers_data.get("response_headers", {})
security_headers = response_headers.get("security_headers", {})
missing_headers = security_headers.get("missing", [])
for missing in missing_headers:
header_name = missing.get("name", "")
description = missing.get("description", "Security header")
alerts.append(
Alert(
title=f"Missing Security Header: {header_name}",
message=f"{description}. This header is recommended for production applications.",
severity=self.SEVERITY_WARNING,
category=self.CATEGORY_SECURITY,
suggestion=(
f"Add the '{header_name}' header to your response middleware or application configuration."
),
)
)
return alerts
def _check_csrf_protection(self, context: RequestContext) -> list[Alert]:
"""Check for missing CSRF protection on state-changing requests."""
alerts = []
metadata = context.metadata
method = metadata.get("method", "GET").upper()
headers = metadata.get("headers", {})
if method in {"POST", "PUT", "PATCH", "DELETE"}:
csrf_headers = {
"x-csrf-token",
"x-csrftoken",
"csrf-token",
}
normalized_headers = {k.lower() for k in headers}
if not csrf_headers.intersection(normalized_headers):
content_type = next((v for k, v in headers.items() if k.lower() == "content-type"), "")
if "application/json" not in content_type.lower():
alerts.append(
Alert(
title="Potential Missing CSRF Protection",
message=f"State-changing request ({method}) detected without CSRF token header. "
"This may indicate missing CSRF protection.",
severity=self.SEVERITY_WARNING,
category=self.CATEGORY_SECURITY,
suggestion=(
"Implement CSRF protection middleware or ensure your application "
"validates CSRF tokens for state-changing requests."
),
)
)
return alerts
def _check_cookie_security(self, context: RequestContext) -> list[Alert]:
"""Check for insecure cookie settings."""
alerts = []
metadata = context.metadata
response_headers = metadata.get("response_headers", {})
set_cookie_headers = [v for k, v in response_headers.items() if k.lower() == "set-cookie"]
for cookie in set_cookie_headers:
cookie_parts = cookie.split(";")
if not any(part.strip().lower() == "secure" for part in cookie_parts):
alerts.append(
Alert(
title="Insecure Cookie Detected",
message=(
"Cookie set without 'Secure' flag. This allows the cookie to be sent over "
"unencrypted HTTP connections."
),
severity=self.SEVERITY_WARNING,
category=self.CATEGORY_SECURITY,
suggestion=(
"Add the 'Secure' flag to all cookies in production to ensure they are only "
"sent over HTTPS."
),
)
)
has_httponly = any(part.strip().lower() == "httponly" for part in cookie_parts)
if not has_httponly and "session" in cookie.lower():
alerts.append(
Alert(
title="Cookie Missing HttpOnly Flag",
message=(
"Session cookie set without 'HttpOnly' flag. This makes it accessible to "
"JavaScript, increasing XSS risk."
),
severity=self.SEVERITY_WARNING,
category=self.CATEGORY_SECURITY,
suggestion="Add the 'HttpOnly' flag to session cookies to prevent JavaScript access.",
)
)
if not any(part.strip().lower().startswith("samesite") for part in cookie_parts):
alerts.append(
Alert(
title="Cookie Missing SameSite Attribute",
message="Cookie set without 'SameSite' attribute. This may allow CSRF attacks.",
severity=self.SEVERITY_INFO,
category=self.CATEGORY_SECURITY,
suggestion="Add 'SameSite=Lax' or 'SameSite=Strict' to cookies to prevent CSRF attacks.",
)
)
return alerts
def _check_debug_mode(self, context: RequestContext) -> list[Alert]:
"""Check if debug mode appears to be enabled in production."""
alerts = []
settings_data = context.get_panel_data("SettingsPanel")
is_debug = settings_data.get("debug", False) if settings_data else False
env_value = settings_data.get("environment", "") if settings_data else ""
environment = env_value.lower() if isinstance(env_value, str) else ""
if is_debug and environment in {"production", "prod"}:
alerts.append(
Alert(
title="Debug Mode Enabled in Production",
message=(
"Debug mode is enabled in what appears to be a production environment. "
"This exposes sensitive information and reduces performance."
),
severity=self.SEVERITY_CRITICAL,
category=self.CATEGORY_CONFIGURATION,
suggestion=(
"Set debug=False in production environments. Use environment variables or "
"configuration files to manage this setting."
),
)
)
return alerts
def _check_response_size(self, context: RequestContext) -> list[Alert]:
"""Check for large response bodies."""
alerts = []
metadata = context.metadata
response_headers = metadata.get("response_headers", {})
content_length = next((v for k, v in response_headers.items() if k.lower() == "content-length"), "0")
try:
size_bytes = int(content_length)
except (ValueError, TypeError):
return alerts
if size_bytes >= self.RESPONSE_SIZE_CRITICAL_BYTES:
size_mb = size_bytes / (1024 * 1024)
alerts.append(
Alert(
title="Very Large Response Body",
message=(
f"Response body is {size_mb:.2f} MB, which may cause performance issues and slow page loads."
),
severity=self.SEVERITY_CRITICAL,
category=self.CATEGORY_PERFORMANCE,
suggestion=(
"Consider implementing pagination, lazy loading, or response compression. "
"For APIs, limit the number of records returned per request."
),
)
)
elif size_bytes >= self.RESPONSE_SIZE_WARNING_BYTES:
size_mb = size_bytes / (1024 * 1024)
alerts.append(
Alert(
title="Large Response Body",
message=f"Response body is {size_mb:.2f} MB. This may impact performance.",
severity=self.SEVERITY_WARNING,
category=self.CATEGORY_PERFORMANCE,
suggestion="Consider implementing pagination or response compression to reduce response size.",
)
)
return alerts
def _check_slow_queries(self, context: RequestContext) -> list[Alert]:
"""Check for slow database queries."""
alerts = []
sql_data = context.get_panel_data("SQLAlchemyPanel")
if not sql_data:
return alerts
queries = sql_data.get("queries", [])
critical_queries = [q for q in queries if q.get("duration_ms", 0) >= self.QUERY_TIME_CRITICAL_MS]
warning_queries = [
q for q in queries if self.QUERY_TIME_WARNING_MS <= q.get("duration_ms", 0) < self.QUERY_TIME_CRITICAL_MS
]
if critical_queries:
slowest = max(critical_queries, key=lambda q: q.get("duration_ms", 0))
duration = slowest.get("duration_ms", 0)
alerts.append(
Alert(
title=f"Critical Slow Query Detected ({len(critical_queries)} total)",
message=(
f"Found {len(critical_queries)} database queries exceeding "
f"{self.QUERY_TIME_CRITICAL_MS}ms. Slowest query took {duration:.2f}ms."
),
severity=self.SEVERITY_CRITICAL,
category=self.CATEGORY_DATABASE,
suggestion=(
"Review the SQL panel for slow queries. Add database indexes, optimize query logic, "
"or use query result caching to improve performance."
),
)
)
elif warning_queries:
slowest = max(warning_queries, key=lambda q: q.get("duration_ms", 0))
duration = slowest.get("duration_ms", 0)
alerts.append(
Alert(
title=f"Slow Query Warning ({len(warning_queries)} total)",
message=f"Found {len(warning_queries)} database queries exceeding {self.QUERY_TIME_WARNING_MS}ms. "
f"Slowest query took {duration:.2f}ms.",
severity=self.SEVERITY_WARNING,
category=self.CATEGORY_DATABASE,
suggestion="Review the SQL panel for queries that could benefit from optimization or indexing.",
)
)
return alerts
def _check_n_plus_one(self, context: RequestContext) -> list[Alert]:
"""Check for N+1 query patterns."""
alerts = []
sql_data = context.get_panel_data("SQLAlchemyPanel")
if not sql_data:
return alerts
n_plus_one_groups = sql_data.get("n_plus_one_groups", [])
for group in n_plus_one_groups:
count = group.get("count", 0)
if count >= self.N_PLUS_ONE_THRESHOLD:
origin = group.get("origin_display", "unknown location")
severity = (
self.SEVERITY_CRITICAL if count >= self.N_PLUS_ONE_CRITICAL_THRESHOLD else self.SEVERITY_WARNING
)
alerts.append(
Alert(
title=f"N+1 Query Pattern Detected ({count} queries)",
message=(
f"Detected {count} similar queries from {origin}. This is likely an N+1 query problem."
),
severity=severity,
category=self.CATEGORY_DATABASE,
suggestion=group.get(
"suggestion",
(
"Use SQLAlchemy's eager loading (joinedload/selectinload) to fetch "
"related data in fewer queries."
),
),
)
)
return alerts
[docs]
def get_nav_subtitle(self) -> str:
"""Get the navigation subtitle showing alert count and severity."""
return ""