Added custom error pages, user password change, initial setup workflow with default admin user.

This commit is contained in:
2025-09-07 11:40:45 -04:00
parent ee1715ff4f
commit c94c0554df
9 changed files with 1444 additions and 89 deletions

View File

@@ -13,8 +13,15 @@ from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse, JSON
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.exception_handlers import http_exception_handler
from fastapi.exceptions import HTTPException as StarletteHTTPException
from starlette.exceptions import HTTPException as StarletteBaseHTTPException
from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.exc import OperationalError
from alembic.config import Config as AlembicConfig
from alembic import command
import subprocess
try:
# Try relative imports first (when run as module)
@@ -54,13 +61,219 @@ app.add_middleware(
security = HTTPBearer(auto_error=False)
# Database tables are now managed by migrations
# Base.metadata.create_all(bind=engine)
# Database initialization functions using Alembic
def check_database_exists():
"""Check if the database has been initialized with basic tables"""
try:
with SessionLocal() as db:
# Try to query a basic table to see if database is initialized
db.scalar(select(func.count(User_table.id)))
return True
except OperationalError:
return False
except Exception:
return False
def run_alembic_migrations():
"""Run Alembic migrations to initialize/update database schema"""
try:
# Use the migrate command via subprocess to ensure proper environment
result = subprocess.run(
["python", "-m", "devenv", "db-upgrade"],
cwd=".",
capture_output=True,
text=True
)
if result.returncode == 0:
logging.info("Database migrations completed successfully")
return True
else:
logging.error(f"Migration failed: {result.stderr}")
# Fall back to direct Alembic command
try:
# Create alembic config
alembic_cfg = AlembicConfig("alembic.ini")
command.upgrade(alembic_cfg, "head")
logging.info("Database migrations completed via direct Alembic")
return True
except Exception as e:
logging.error(f"Direct Alembic migration also failed: {e}")
return False
except Exception as e:
logging.error(f"Failed to run migrations: {e}")
return False
def ensure_super_user():
"""Ensure at least one super user exists, create default if none"""
try:
with SessionLocal() as db:
# Check if any super user exists
super_user = db.scalar(select(User_table).where(User_table.role == UserRole.SUPER.value))
if super_user:
logging.info(f"Super user already exists: {super_user.username}")
return True
# Create default super user
logging.info("No super user found, creating default admin user...")
try:
default_admin = AuthManager.create_user(
session=db,
username="admin",
email="admin@dosvault.local",
password="admin123",
role=UserRole.SUPER.value
)
logging.warning("Default super user created: admin / admin123")
logging.warning("IMPORTANT: Please change the default admin password immediately!")
return True
except Exception as e:
logging.error(f"Failed to create default super user: {e}")
return False
except Exception as e:
logging.error(f"Failed to ensure super user: {e}")
return False
def initialize_database_and_users():
"""Initialize database and ensure super user exists"""
success = True
# Check if database exists
if not check_database_exists():
logging.info("Database not initialized, running migrations...")
if not run_alembic_migrations():
logging.error("Failed to initialize database")
success = False
# Ensure super user exists
if success and not ensure_super_user():
logging.error("Failed to ensure super user exists")
success = False
return success
# Global flags to track initialization status
db_initialization_attempted = False
db_initialization_successful = False
@app.on_event("startup")
async def startup_event():
"""Handle application startup - initialize database in background"""
global db_initialization_attempted, db_initialization_successful
logging.info("Starting database initialization...")
db_initialization_attempted = True
try:
# Run initialization in background
success = await asyncio.get_event_loop().run_in_executor(
None, initialize_database_and_users
)
db_initialization_successful = success
if success:
logging.info("Database initialization completed successfully")
else:
logging.error("Database initialization failed")
except Exception as e:
logging.error(f"Error during database initialization: {e}")
db_initialization_successful = False
templates = Jinja2Templates(directory="templates")
templates.env.globals['max'] = max
templates.env.globals['min'] = min
# Error handlers
@app.exception_handler(StarletteHTTPException)
async def custom_http_exception_handler(request: Request, exc: StarletteHTTPException):
"""Handle HTTP exceptions with custom error pages"""
try:
# Get current user for error page context
current_user = None
try:
if "auth_token" in request.cookies:
token = request.cookies["auth_token"]
with SessionLocal() as db:
username = AuthManager.verify_token(token)
if username:
current_user = AuthManager.get_user_by_username(db, username)
if current_user and not current_user.is_active:
current_user = None
except Exception:
pass
# Handle authentication errors - redirect to login page
if exc.status_code == 401:
# For AJAX/API requests, return JSON error
if request.headers.get("accept", "").startswith("application/json") or "/api/" in str(request.url):
return JSONResponse(
status_code=401,
content={"detail": "Authentication required"}
)
# For page requests, redirect to login with error message
redirect_url = f"/login?error={exc.detail}&redirect={request.url.path}"
return RedirectResponse(url=redirect_url, status_code=303)
# Handle 404 and other errors with custom error page
error_details = {
"request": request,
"current_user": current_user,
"error_code": exc.status_code,
"error_title": {
404: "Page Not Found",
403: "Access Forbidden",
500: "Internal Server Error",
}.get(exc.status_code, "Error"),
"error_description": {
404: "The page you're looking for doesn't exist or has been moved.",
403: "You don't have permission to access this resource.",
500: "An internal server error occurred. Please try again later.",
}.get(exc.status_code, str(exc.detail)),
"error_details": str(exc.detail) if exc.status_code >= 500 else None
}
return templates.TemplateResponse(
"error.html",
error_details,
status_code=exc.status_code
)
except Exception as e:
# Fallback to default error handling if our custom handler fails
logging.error(f"Error in custom exception handler: {e}")
return await http_exception_handler(request, exc)
@app.get("/login", response_class=HTMLResponse)
async def login_page(
request: Request,
error: Optional[str] = Query(None),
redirect: Optional[str] = Query(None)
):
"""Display login page"""
# If user is already logged in, redirect to home or requested page
try:
if "auth_token" in request.cookies:
token = request.cookies["auth_token"]
with SessionLocal() as db:
username = AuthManager.verify_token(token)
if username:
current_user = AuthManager.get_user_by_username(db, username)
if current_user and current_user.is_active:
return RedirectResponse(url=redirect or "/", status_code=303)
except Exception:
pass
return templates.TemplateResponse("login.html", {
"request": request,
"error": error,
"redirect": redirect or "/"
})
# Removed proxy image URL function
@@ -125,70 +338,137 @@ def require_super_user(current_user: User_table = Depends(require_auth)):
@app.get("/", response_class=HTMLResponse)
async def index(request: Request, db: Session = Depends(get_db), current_user: Optional[User_table] = Depends(get_current_user)):
page = int(request.query_params.get("page", 1))
per_page = int(request.query_params.get("per_page", 20))
search = request.query_params.get("search", "").strip()
view = request.query_params.get("view", "grid") # grid or list
async def index(request: Request):
global db_initialization_attempted, db_initialization_successful
# Limit per_page to reasonable values
per_page = max(10, min(per_page, 100))
offset = (page - 1) * per_page
# Check if database is ready
if not db_initialization_attempted:
# Database initialization hasn't started yet
return templates.TemplateResponse("setup.html", {
"request": request,
"status": "initializing",
"message": "Database initialization starting..."
})
# Base query
games_query = select(Game_table)
count_query = select(func.count(Game_table.id))
if not db_initialization_successful:
# Database initialization failed
return templates.TemplateResponse("setup.html", {
"request": request,
"status": "error",
"message": "Database initialization failed. Please check the logs and try running the setup manually.",
"setup_commands": [
"db-init",
"create-admin"
]
})
# Add search filtering
if search:
# Fuzzy-ish search - split search terms and match any of them
search_terms = search.split()
search_conditions = []
for term in search_terms:
term_pattern = f"%{term}%"
term_condition = (
Game_table.title.ilike(term_pattern) |
(Game_table.metadata_obj.has(Metadata_table.title.ilike(term_pattern))) |
(Game_table.metadata_obj.has(Metadata_table.description.ilike(term_pattern))) |
(Game_table.metadata_obj.has(Metadata_table.developer.ilike(term_pattern))) |
(Game_table.metadata_obj.has(Metadata_table.publisher.ilike(term_pattern))) |
(Game_table.metadata_obj.has(Metadata_table.genre.any(Genre_table.name.ilike(term_pattern)))) |
(Game_table.metadata_obj.has(Metadata_table.tags.any(Tags_table.name.ilike(term_pattern))))
)
search_conditions.append(term_condition)
# Match all terms (AND logic) for better relevance
if search_conditions:
from sqlalchemy import and_
combined_filter = and_(*search_conditions)
games_query = games_query.where(combined_filter)
count_query = count_query.where(combined_filter)
total_games = db.scalar(count_query)
# Add alphabetical sorting by default
games = db.scalars(games_query.order_by(Game_table.title).offset(offset).limit(per_page)).all()
# Get user's favorite game IDs if logged in
user_favorites = set()
if current_user and current_user.role != UserRole.DEMO.value:
user_favorites = {game.id for game in current_user.favorites}
total_pages = (total_games + per_page - 1) // per_page
return templates.TemplateResponse("index.html", {
"request": request,
"games": games,
"current_page": page,
"total_pages": total_pages,
"per_page": per_page,
"search": search,
"view": view,
"total_games": total_games,
"current_user": current_user,
"is_demo": current_user is None or current_user.role == UserRole.DEMO.value,
"user_favorites": user_favorites
})
# Database is ready, proceed with normal operation
try:
# Create a database session for this request
with SessionLocal() as db:
# Try to get current user (this will handle auth gracefully)
current_user = None
try:
token = None
# Check for auth token in cookie
if "auth_token" in request.cookies:
token = request.cookies["auth_token"]
if token:
username = AuthManager.verify_token(token)
if username:
current_user = AuthManager.get_user_by_username(db, username)
if current_user and not current_user.is_active:
current_user = None
except Exception as e:
logging.debug(f"Error getting current user: {e}")
current_user = None
page = int(request.query_params.get("page", 1))
per_page = int(request.query_params.get("per_page", 20))
search = request.query_params.get("search", "").strip()
view = request.query_params.get("view", "grid") # grid or list
# Limit per_page to reasonable values
per_page = max(10, min(per_page, 100))
offset = (page - 1) * per_page
# Base query
games_query = select(Game_table)
count_query = select(func.count(Game_table.id))
# Add search filtering
if search:
# Fuzzy-ish search - split search terms and match any of them
search_terms = search.split()
search_conditions = []
for term in search_terms:
term_pattern = f"%{term}%"
term_condition = (
Game_table.title.ilike(term_pattern) |
(Game_table.metadata_obj.has(Metadata_table.title.ilike(term_pattern))) |
(Game_table.metadata_obj.has(Metadata_table.description.ilike(term_pattern))) |
(Game_table.metadata_obj.has(Metadata_table.developer.ilike(term_pattern))) |
(Game_table.metadata_obj.has(Metadata_table.publisher.ilike(term_pattern))) |
(Game_table.metadata_obj.has(Metadata_table.genre.any(Genre_table.name.ilike(term_pattern)))) |
(Game_table.metadata_obj.has(Metadata_table.tags.any(Tags_table.name.ilike(term_pattern))))
)
search_conditions.append(term_condition)
# Match all terms (AND logic) for better relevance
if search_conditions:
from sqlalchemy import and_
combined_filter = and_(*search_conditions)
games_query = games_query.where(combined_filter)
count_query = count_query.where(combined_filter)
total_games = db.scalar(count_query)
# Add alphabetical sorting by default
games = db.scalars(games_query.order_by(Game_table.title).offset(offset).limit(per_page)).all()
# Get user's favorite game IDs if logged in
user_favorites = set()
if current_user and current_user.role != UserRole.DEMO.value:
user_favorites = {game.id for game in current_user.favorites}
total_pages = (total_games + per_page - 1) // per_page
return templates.TemplateResponse("index.html", {
"request": request,
"games": games,
"current_page": page,
"total_pages": total_pages,
"per_page": per_page,
"search": search,
"view": view,
"total_games": total_games,
"current_user": current_user,
"is_demo": current_user is None or current_user.role == UserRole.DEMO.value,
"user_favorites": user_favorites
})
except OperationalError as e:
# Database tables don't exist or other database issue
logging.error(f"Database error in index route: {e}")
return templates.TemplateResponse("setup.html", {
"request": request,
"status": "error",
"message": "Database not properly initialized. Please run setup commands.",
"setup_commands": [
"db-init",
"create-admin"
]
})
except Exception as e:
# Other unexpected errors
logging.error(f"Unexpected error in index route: {e}")
return templates.TemplateResponse("setup.html", {
"request": request,
"status": "error",
"message": f"An unexpected error occurred: {str(e)}"
})
@app.post("/login")
@@ -1169,6 +1449,106 @@ async def health_check():
"""Health check endpoint for Docker/monitoring"""
return {"status": "healthy", "service": "DosVault"}
@app.get("/profile")
async def user_profile(
request: Request,
db: Session = Depends(get_db),
current_user: User_table = Depends(require_auth)
):
"""Display user profile page"""
return templates.TemplateResponse("user_profile.html", {
"request": request,
"current_user": current_user
})
@app.post("/profile/change-password")
async def change_password(
request: Request,
db: Session = Depends(get_db),
current_user: User_table = Depends(require_auth)
):
"""Change user password"""
try:
# Get JSON data from request
data = await request.json()
current_password = data.get("current_password")
new_password = data.get("new_password")
if not current_password or not new_password:
raise HTTPException(status_code=400, detail="Current password and new password are required")
if len(new_password) < 6:
raise HTTPException(status_code=400, detail="New password must be at least 6 characters long")
# Verify current password
if not AuthManager.verify_password(current_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Current password is incorrect")
# Update password
current_user.password_hash = AuthManager.get_password_hash(new_password)
db.commit()
logging.info(f"Password changed for user: {current_user.username}")
return {"message": "Password updated successfully"}
except HTTPException:
raise
except Exception as e:
logging.error(f"Error changing password for user {current_user.username}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/profile/update-info")
async def update_profile_info(
request: Request,
db: Session = Depends(get_db),
current_user: User_table = Depends(require_super_user)
):
"""Update user profile information (super users only)"""
try:
# Get JSON data from request
data = await request.json()
username = data.get("username", "").strip()
email = data.get("email", "").strip()
if not username or not email:
raise HTTPException(status_code=400, detail="Username and email are required")
if len(username) < 3:
raise HTTPException(status_code=400, detail="Username must be at least 3 characters long")
# Check if username already exists (excluding current user)
existing_user = db.query(User_table).filter(
User_table.username == username,
User_table.id != current_user.id
).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username already exists")
# Check if email already exists (excluding current user)
existing_email = db.query(User_table).filter(
User_table.email == email,
User_table.id != current_user.id
).first()
if existing_email:
raise HTTPException(status_code=400, detail="Email already exists")
# Update user information
old_username = current_user.username
current_user.username = username
current_user.email = email
db.commit()
logging.info(f"Profile information updated for user: {old_username} -> {username} ({email})")
return {"message": "Profile updated successfully"}
except HTTPException:
raise
except Exception as e:
logging.error(f"Error updating profile for user {current_user.username}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/api/auth/token")
async def get_auth_token(
request: Request,
@@ -1179,6 +1559,39 @@ async def get_auth_token(
token = request.cookies.get("auth_token", "")
return {"token": token, "username": current_user.username}
# Catch-all route for 404 errors - this must be the last route
@app.get("/{path:path}", response_class=HTMLResponse)
async def catch_all_404(request: Request, path: str):
"""Catch-all route for 404 errors"""
# Get current user for error page context
current_user = None
try:
if "auth_token" in request.cookies:
token = request.cookies["auth_token"]
with SessionLocal() as db:
username = AuthManager.verify_token(token)
if username:
current_user = AuthManager.get_user_by_username(db, username)
if current_user and not current_user.is_active:
current_user = None
except Exception:
pass
error_details = {
"request": request,
"current_user": current_user,
"error_code": 404,
"error_title": "Page Not Found",
"error_description": f"The page '/{path}' doesn't exist or has been moved.",
"error_details": None
}
return templates.TemplateResponse(
"error.html",
error_details,
status_code=404
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=config.host, port=config.port)