commit b3e71456c8d34dac8c39ef9067008b973d3e3616 Author: th3r00t Date: Sat Sep 6 13:53:44 2025 -0400 Iniital release of DosVault. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4bcdd4e --- /dev/null +++ b/.gitignore @@ -0,0 +1,62 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Development +# devenv.lock +# devenv.nix +.direnv/ +.devenv/ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ + +# Local data +src/roms.db* +roms/ +data/ +logs/ +images/ + +# Release builds +release/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..2b7a7b9 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,100 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Development Commands + +The project uses devenv.nix for development environment management. All commands should be run from the repository root: + +- `tests` - Run pytest with coverage +- `lint` - Check code with ruff and black +- `fix` - Auto-fix code issues with ruff and black +- `typecheck` - Run pyright type checking +- `run` - Execute the main ROM scraper application +- `serve` - Start the FastAPI web server +- `create-admin` - Create initial admin user for web interface +- `migrate` - Database migration management (see Migration Commands) +- `db-init` - Initialize database schema (first-time setup) +- `db-upgrade` - Apply pending database migrations +- `db-create` - Create new migration (requires message argument) +- `build` - Build the application using build.sh (creates zipapp in release/) + +## Architecture + +This is a Python ROM metadata scraper and web-based ROM management system for DOS games: + +### Core Components + +- **Main Application** (`src/__main__.py`): Async scraper that scans ROM directories, fetches metadata from IGDB API, and stores everything in SQLite +- **Web Application** (`src/webapp.py`): FastAPI server with user authentication, ROM browsing, downloads, and admin interface +- **Configuration** (`src/libs/config.py`): XDG-compliant config management with automatic setup prompts +- **Database Layer** (`src/libs/database.py`): SQLAlchemy models with many-to-many relationships for games, metadata, genres, tags, and users +- **Authentication** (`src/libs/auth.py`): JWT-based auth with bcrypt password hashing and role-based access control +- **Data Models** (`src/libs/objects.py`): Dataclasses for Game, Metadata, and Roms collections +- **API Integration** (`src/libs/apis.py`): IGDB API client with Twitch OAuth authentication +- **Utilities** (`src/libs/functions.py`): Title cleaning and year extraction from ROM filenames + +### Data Flow + +**ROM Scraping:** +1. Compares filesystem ROMs with database entries to avoid re-indexing +2. Authenticates with IGDB via Twitch OAuth using client credentials +3. Scrapes metadata for new games only with rate limiting (4 concurrent requests) +4. Stores normalized data in SQLite with proper foreign key relationships +5. Handles duplicate games and metadata updates gracefully + +**Web Interface:** +1. FastAPI serves modern responsive web interface with Tailwind CSS +2. JWT-based authentication with three user roles: demo, normal, super +3. Demo users can browse but not download; normal users get full access; super users can manage everything +4. Pagination, favorites system, and file downloads for authorized users +5. Admin interface for user management and metadata editing + +### Key Technical Details + +- Uses asyncio with semaphore-based rate limiting for API requests +- SQLAlchemy with declarative base and proper naming conventions +- FastAPI with Jinja2 templates, JWT authentication, and role-based access control +- Configuration supports both environment variables and .env files +- Custom PathType for storing pathlib.Path objects in database +- Batch processing for database operations with configurable batch sizes +- Modern responsive UI with Tailwind CSS and Alpine.js for interactivity + +## Database Migrations + +The project uses Alembic for database schema versioning and migrations: + +### First-Time Setup +```bash +db-init # Initialize database with current schema +migrate stamp # Mark database as up-to-date with migrations +``` + +### Migration Management +```bash +migrate create "description" # Create new migration file +migrate upgrade # Apply all pending migrations +migrate current # Show current database revision +migrate history # Show migration history +migrate check # Check database migration status +``` + +### Schema Changes +1. Modify models in `src/libs/database.py` +2. Create migration: `migrate create "description of changes"` +3. Review generated migration file in `migrations/versions/` +4. Apply migration: `migrate upgrade` + +### Migration Files +- Located in `migrations/versions/` +- Named with revision ID and description +- Contain `upgrade()` and `downgrade()` functions +- Support batch operations for SQLite compatibility + +## Environment Setup + +Requires IGDB API credentials: +- `IGDB_CLIENT_ID` - Twitch client ID +- `IGDB_SECRET_KEY` - Twitch client secret + +Can be provided via environment variables or `.env` file in project root. \ No newline at end of file diff --git a/DOCKER.md b/DOCKER.md new file mode 100644 index 0000000..90e12b6 --- /dev/null +++ b/DOCKER.md @@ -0,0 +1,109 @@ +# DosVault Docker Deployment + +## Quick Start + +1. **Copy the environment template:** + ```bash + cp .env.example .env + ``` + +2. **Edit `.env` with your configuration:** + - Set `IGDB_CLIENT_ID` and `IGDB_SECRET_KEY` (required) + - Set `ROMS_PATH` to your ROM collection directory + - Optionally customize host/port settings + +3. **Start the application:** + ```bash + docker-compose up -d + ``` + +4. **Create admin user:** + ```bash + docker-compose exec dosvault python src/create_admin.py + ``` + +5. **Access the application:** + - Web interface: http://localhost:8080 + - Admin panel: http://localhost:8080/admin + +## Configuration + +### Environment Variables + +| Variable | Required | Description | +|----------|----------|-------------| +| `IGDB_CLIENT_ID` | Yes | Twitch API Client ID | +| `IGDB_SECRET_KEY` | Yes | Twitch API Client Secret | +| `ROMS_PATH` | No | Path to ROM collection (default: ./roms) | +| `DOSFRONTEND_CONFIG_DIR` | No | Application data directory (default: /app/data) | + +### Configuration Persistence + +Configuration changes made through the web interface are automatically persisted to the mounted volume: + +- **In Docker**: Configuration is stored in `/app/data/config.json` (mounted volume) +- **Regular install**: Configuration is stored in `~/.config/dosfrontend/config.json` +- **File structure**: All application data uses the same base directory: + - `config.json` - Main configuration file + - `roms.db` - SQLite database + - `images/` - Downloaded game artwork + - `logs/` - Application logs + +### Volume Mounts + +- `dosvault_data:/app/data` - Application data (database, images, logs) +- `${ROMS_PATH}:/app/data/roms:ro` - ROM collection (read-only) + +## Database Management + +### Initialize Database +```bash +docker-compose exec dosvault python src/migrate.py db-init +``` + +### Run Migrations +```bash +docker-compose exec dosvault python src/migrate.py upgrade +``` + +### Scrape ROM Metadata +```bash +docker-compose exec dosvault python -m src +``` + +## Maintenance + +### View Logs +```bash +docker-compose logs -f dosvault +``` + +### Backup Database +```bash +docker-compose exec dosvault cp /app/data/roms.db /app/data/backup.db +docker cp $(docker-compose ps -q dosvault):/app/data/backup.db ./backup.db +``` + +### Update Application +```bash +docker-compose pull +docker-compose up -d +``` + +## Troubleshooting + +### Check Container Health +```bash +docker-compose ps +``` + +### Access Container Shell +```bash +docker-compose exec dosvault bash +``` + +### Reset Data +```bash +docker-compose down -v +docker-compose up -d +``` \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..28bd7a4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,49 @@ +# Multi-stage Docker build for DosVault +FROM python:3.11-slim as base + +# Set working directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + curl \ + gcc \ + g++ \ + && rm -rf /var/lib/apt/lists/* + +# Copy Python dependencies +COPY requirements.txt* pyproject.toml* ./ + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt || \ + pip install --no-cache-dir fastapi uvicorn sqlalchemy alembic \ + aiohttp bcrypt python-jose python-multipart jinja2 + +# Copy application code +COPY src/ ./src/ +COPY templates/ ./templates/ +COPY migrations/ ./migrations/ +COPY alembic.ini ./ +COPY CLAUDE.md README.md ./ + +# Create necessary directories +RUN mkdir -p /app/data/logs /app/data/images /app/data/roms /app/data/metadata + +# Set environment variables +ENV PYTHONPATH=/app +ENV DOSFRONTEND_CONFIG_DIR=/app/data + +# Expose ports +EXPOSE 8080 8081 + +# Create non-root user +RUN useradd -m -u 1000 dosvault && \ + chown -R dosvault:dosvault /app +USER dosvault + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 + +# Default command +CMD ["python", "-m", "uvicorn", "src.webapp:app", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e4a23f5 --- /dev/null +++ b/README.md @@ -0,0 +1,197 @@ +# šŸŽ® DosVault + +**Your Personal DOS Game Collection Manager** + +DosVault is a modern, web-based collection manager for DOS games that combines powerful metadata scraping with an intuitive browsing experience. Built with Python and FastAPI, it helps you organize, discover, and manage your retro gaming library with style. + +## ✨ Features + +### šŸŽÆ Core Functionality +- **Automatic Metadata Scraping** - Pulls game information, cover art, and screenshots from IGDB API +- **Local Image Storage** - Downloads and caches all images locally for fast loading +- **Intelligent ROM Detection** - Scans directories and avoids re-indexing existing games +- **Advanced Search & Filtering** - Find games by title, genre, developer, or description +- **Genre & Tag Browsing** - Organized categorization with alphabetical sorting + +### 🌐 Modern Web Interface +- **Responsive Design** - Works beautifully on desktop, tablet, and mobile +- **Multiple View Modes** - Switch between grid and list views +- **Interactive Screenshots** - Click to view full-screen image galleries +- **Smart Pagination** - Navigate large collections with ease +- **Real-time Favorites** - Heart games to build your personal collection + +### šŸ” User Management +- **Role-Based Access Control** - Demo, Normal, and Super Admin roles +- **Secure Authentication** - JWT-based auth with bcrypt password hashing +- **Personal Favorites** - Each user maintains their own favorites list +- **Admin Dashboard** - User management and system overview + +### šŸ“± Mobile-First +- **Hamburger Navigation** - Clean mobile menu system +- **Touch-Optimized** - Large buttons and smooth interactions +- **Responsive Controls** - Pagination and filters work great on mobile + +## šŸš€ Quick Start + +### Prerequisites +- Python 3.11+ +- [Devenv](https://devenv.sh/) (recommended) or manual dependency management +- IGDB API credentials (free from Twitch Developer Console) + +### Installation + +1. **Clone the repository:** + ```bash + git clone + cd dosfrontend + ``` + +2. **Set up environment:** + ```bash + # With devenv (recommended) + devenv shell + + # Or manually install dependencies + pip install fastapi uvicorn sqlalchemy alembic bcrypt python-jose aiohttp + ``` + +3. **Configure IGDB API:** + Create a `.env` file with your IGDB credentials: + ```env + IGDB_CLIENT_ID=your_twitch_client_id + IGDB_SECRET_KEY=your_twitch_client_secret + ``` + +4. **Initialize database:** + ```bash + db-init + create-admin # Create your first admin user + ``` + +5. **Run the application:** + ```bash + serve # Starts web server + run # Runs ROM scraper (optional) + ``` + +6. **Access DosVault:** + Open http://localhost:8080 in your browser + +## šŸ“ Project Structure + +``` +dosfrontend/ +ā”œā”€ā”€ src/ +│ ā”œā”€ā”€ __main__.py # ROM scraper application +│ ā”œā”€ā”€ webapp.py # FastAPI web server +│ └── libs/ +│ ā”œā”€ā”€ config.py # XDG-compliant configuration +│ ā”œā”€ā”€ database.py # SQLAlchemy models +│ ā”œā”€ā”€ auth.py # JWT authentication +│ ā”œā”€ā”€ apis.py # IGDB API integration +│ └── functions.py # Utility functions +ā”œā”€ā”€ templates/ # Jinja2 HTML templates +ā”œā”€ā”€ migrations/ # Database schema versions +ā”œā”€ā”€ devenv.nix # Development environment +└── CLAUDE.md # Development guidance +``` + +## šŸŽ® Usage + +### Scraping ROMs +```bash +# Scan ROM directories and fetch metadata +run +``` + +### Web Interface +```bash +# Start the web server +serve +``` + +### Database Management +```bash +# Create migrations +migrate create "description of changes" + +# Apply migrations +migrate upgrade + +# Check migration status +migrate current +``` + +### Administration +```bash +# Create admin user +create-admin + +# Run tests +tests + +# Code quality +lint +typecheck +``` + +## āš™ļø Configuration + +DosVault uses XDG-compliant configuration stored in: +- **Linux/Mac:** `~/.config/dosfrontend/` +- **Windows:** `%APPDATA%/dosfrontend/` + +Key configuration options: +- ROM directories to scan +- Image storage location +- Database path +- Web server host/port +- IGDB API credentials + +## šŸ—ļø Architecture + +### Backend +- **FastAPI** - Modern Python web framework +- **SQLAlchemy** - Database ORM with proper relationships +- **Alembic** - Database migration management +- **AsyncIO** - Concurrent API requests with rate limiting +- **JWT + BCrypt** - Secure authentication + +### Frontend +- **Jinja2** - Server-side templating +- **Tailwind CSS** - Utility-first styling +- **Alpine.js** - Lightweight JavaScript framework +- **Responsive Design** - Mobile-first approach + +### Data Flow +1. **Scraper** scans ROM directories and compares with database +2. **IGDB API** provides metadata via Twitch OAuth +3. **Images** are downloaded and cached locally +4. **Web interface** serves games with fast local assets +5. **Users** browse, search, and manage favorites + +## šŸ¤ Contributing + +1. Fork the repository +2. Create a feature branch (`git checkout -b feature/amazing-feature`) +3. Make your changes +4. Run tests and linting (`tests`, `lint`) +5. Commit your changes (`git commit -m 'Add amazing feature'`) +6. Push to the branch (`git push origin feature/amazing-feature`) +7. Open a Pull Request + +## šŸ“ License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +## šŸ™ Acknowledgments + +- **IGDB** for providing comprehensive game metadata +- **Twitch** for OAuth authentication to IGDB API +- **FastAPI** for the excellent modern Python web framework +- **Tailwind CSS** for making responsive design a breeze +- **DOSBox** community for keeping retro gaming alive + +--- + +**Built with ā¤ļø for retro gaming enthusiasts** \ No newline at end of file diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..927da65 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,94 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = migrations + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version number format to use with the --rev-id parameter +# to specify a starting revision +# version_num_format = %04d + +# version_path_separator = : +# version_path_separator = os # Use os.pathsep. Default configuration used on new projects. + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..20edfe0 --- /dev/null +++ b/build.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env sh +python -m zipapp src --compress --output=release/dfe --python="/usr/bin/env python" diff --git a/devenv.lock b/devenv.lock new file mode 100644 index 0000000..7dce35e --- /dev/null +++ b/devenv.lock @@ -0,0 +1,103 @@ +{ + "nodes": { + "devenv": { + "locked": { + "dir": "src/modules", + "lastModified": 1756415044, + "owner": "cachix", + "repo": "devenv", + "rev": "c570189b38b549141179647da3ddde249ac50fec", + "type": "github" + }, + "original": { + "dir": "src/modules", + "owner": "cachix", + "repo": "devenv", + "type": "github" + } + }, + "flake-compat": { + "flake": false, + "locked": { + "lastModified": 1747046372, + "owner": "edolstra", + "repo": "flake-compat", + "rev": "9100a0f413b0c601e0533d1d94ffd501ce2e7885", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, + "git-hooks": { + "inputs": { + "flake-compat": "flake-compat", + "gitignore": "gitignore", + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1755960406, + "owner": "cachix", + "repo": "git-hooks.nix", + "rev": "e891a93b193fcaf2fc8012d890dc7f0befe86ec2", + "type": "github" + }, + "original": { + "owner": "cachix", + "repo": "git-hooks.nix", + "type": "github" + } + }, + "gitignore": { + "inputs": { + "nixpkgs": [ + "git-hooks", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1709087332, + "owner": "hercules-ci", + "repo": "gitignore.nix", + "rev": "637db329424fd7e46cf4185293b9cc8c88c95394", + "type": "github" + }, + "original": { + "owner": "hercules-ci", + "repo": "gitignore.nix", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1755783167, + "owner": "cachix", + "repo": "devenv-nixpkgs", + "rev": "4a880fb247d24fbca57269af672e8f78935b0328", + "type": "github" + }, + "original": { + "owner": "cachix", + "ref": "rolling", + "repo": "devenv-nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "devenv": "devenv", + "git-hooks": "git-hooks", + "nixpkgs": "nixpkgs", + "pre-commit-hooks": [ + "git-hooks" + ] + } + } + }, + "root": "root", + "version": 7 +} diff --git a/devenv.nix b/devenv.nix new file mode 100644 index 0000000..cb46fe6 --- /dev/null +++ b/devenv.nix @@ -0,0 +1,97 @@ +{ pkgs, lib, config, inputs, ... }: + +{ + # https://devenv.sh/basics/ + + # https://devenv.sh/packages/ + packages = with pkgs; [ + git + curl + pkg-config + sqlite + pyright + pre-commit + ]; + languages.python = { + enable = true; + package = pkgs.python313; + libraries = with pkgs.python313Packages; [ ]; + venv = { + enable = true; + requirements = '' + pudb + ptpython + ipython + pytest + pytest-cov + flake8 + ptpython + ipython + isort + pynvim + ruff + black + sqlalchemy + requests + fastapi + uvicorn + jinja2 + python-multipart + bcrypt + python-jose + passlib + alembic + aiohttp + ''; + }; + # uv = { + # enable = false; + # sync.enable = true; + # }; + }; + env = { + PYTHONBREAKPOINT = "pudb.set_trace"; + }; + + # https://devenv.sh/variables/ + # variables = { + # GREET = "world"; + # }; + + # https://devenv.sh/scripts/ + scripts = { + "tests".exec = "cd $REPO_ROOT && python -m pytest --rootdir=$REPO_ROOT -c $REPO_ROOT/pytest.ini"; + "lint".exec = "cd $REPO_ROOT && ${pkgs.ruff}/bin/ruff check . && black --check ."; + "fix".exec = "cd $REPO_ROOT && ${pkgs.ruff}/bin/ruff check . --fix && black ."; + "typecheck".exec = "cd $REPO_ROOT && pyright"; + "run".exec = ''cd $REPO_ROOT && ./src/__main__.py "$@"''; + "serve".exec = "cd $REPO_ROOT && python src/webapp.py"; + "create-admin".exec = "cd $REPO_ROOT && python src/create_admin.py"; + "migrate".exec = "cd $REPO_ROOT && python src/migrate.py"; + "db-init".exec = "cd $REPO_ROOT && python src/migrate.py init"; + "db-upgrade".exec = "cd $REPO_ROOT && python src/migrate.py upgrade"; + "db-create".exec = "cd $REPO_ROOT && python src/migrate.py create"; + "build".exec = "cd $REPO_ROOT && ./build.sh"; + "backfill-images".exec = "cd $REPO_ROOT && python src/backfill_images.py"; + }; + enterShell = '' + export REPO_ROOT="$(git rev-parse --show-toplevel 2>/dev/null || pwd)" + ''; + + # https://devenv.sh/tasks/ + # tasks = { + # "myproj:setup".exec = "mytool build"; + # "devenv:enterShell".after = [ "myproj:setup" ]; + # }; + + # https://devenv.sh/tests/ + enterTest = '' + echo "Running tests" + pytest -q + ''; + + # https://devenv.sh/git-hooks/ + # git-hooks.hooks.shellcheck.enable = true; + + # See full reference at https://devenv.sh/reference/options/ +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7b89800 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,30 @@ +version: '3.8' + +services: + dosvault: + build: . + ports: + - "8080:8080" + - "8081:8081" + volumes: + # Mount data directory for persistence + - dosvault_data:/app/data + # Mount ROM directory (customize this path) + - "${ROMS_PATH:-./roms}:/app/data/roms:ro" + environment: + # IGDB API Configuration + - IGDB_CLIENT_ID=${IGDB_CLIENT_ID} + - IGDB_SECRET_KEY=${IGDB_SECRET_KEY} + # Application Configuration + - DOSFRONTEND_CONFIG_DIR=/app/data + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + +volumes: + dosvault_data: + driver: local \ No newline at end of file diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 0000000..9425db0 --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,92 @@ +import sys +from pathlib import Path + +# Add src to Python path +src_path = Path(__file__).parent.parent / "src" +sys.path.insert(0, str(src_path)) + +from logging.config import fileConfig +from sqlalchemy import engine_from_config +from sqlalchemy import pool +from alembic import context + +# Import your models +from libs.database import Base +from libs.config import Config + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# Set the SQLAlchemy URL from our config +app_config = Config() +config.set_main_option("sqlalchemy.url", f"sqlite:///{app_config.database_path}") + +# add your model's MetaData object here +# for 'autogenerate' support +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + render_as_batch=True, # Enable batch mode for SQLite + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + render_as_batch=True, # Enable batch mode for SQLite + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() \ No newline at end of file diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 0000000..37d0cac --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} \ No newline at end of file diff --git a/migrations/versions/001_initial_schema.py b/migrations/versions/001_initial_schema.py new file mode 100644 index 0000000..3a315f0 --- /dev/null +++ b/migrations/versions/001_initial_schema.py @@ -0,0 +1,106 @@ +"""Initial database schema + +Revision ID: 001 +Revises: +Create Date: 2024-01-01 10:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.types import TypeDecorator +from pathlib import Path + +# revision identifiers, used by Alembic. +revision = '001' +down_revision = None +branch_labels = None +depends_on = None + +# Define the PathType here since it's needed for the migration +class PathType(TypeDecorator): + impl = sa.String + cache_ok = True + + def process_bind_param(self, value, dialect): + return None if value is None else str(value) + + def process_result_value(self, value, dialect): + return None if value is None else Path(value) + +def upgrade() -> None: + # This represents the initial schema from the original system + # The tables (tags, genre, game, metadata, metadata_genres, metadata_tags) + # already exist in the database, so this migration is just for tracking + + # If running on a fresh database, these would create the tables: + + # Create tags table + op.create_table('tags', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=30), nullable=False), + sa.PrimaryKeyConstraint('id', name=op.f('pk_tags')) + ) + op.create_index('ix_tags_name', 'tags', ['name'], unique=True) + + # Create genre table + op.create_table('genre', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=30), nullable=False), + sa.PrimaryKeyConstraint('id', name=op.f('pk_genre')) + ) + op.create_index('ix_genre_name', 'genre', ['name'], unique=True) + + # Create game table + op.create_table('game', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('title', sa.String(length=66), nullable=False), + sa.Column('path', PathType(), nullable=False), + sa.PrimaryKeyConstraint('id', name=op.f('pk_game')), + sa.UniqueConstraint('path', name=op.f('uq_game_path')) + ) + op.create_index('ix_game_title', 'game', ['title']) + + # Create metadata table + op.create_table('metadata', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('game_id', sa.Integer(), nullable=False), + sa.Column('title', sa.String(length=66), nullable=False), + sa.Column('description', sa.String(), nullable=True), + sa.Column('year', sa.Integer(), nullable=True), + sa.Column('developer', sa.String(length=255), nullable=True), + sa.Column('publisher', sa.String(length=255), nullable=True), + sa.Column('players', sa.Integer(), nullable=True), + sa.Column('cover_image', sa.String(), nullable=True), + sa.Column('screenshot', sa.String(), nullable=True), + sa.ForeignKeyConstraint(['game_id'], ['game.id'], name=op.f('fk_metadata_game_id_game'), ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name=op.f('pk_metadata')), + sa.UniqueConstraint('game_id', name=op.f('uq_metadata_game_id')) + ) + + # Create association tables + op.create_table('metadata_genres', + sa.Column('metadata_id', sa.Integer(), nullable=False), + sa.Column('genre_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['genre_id'], ['genre.id'], name=op.f('fk_metadata_genres_genre_id_genre'), ondelete='CASCADE'), + sa.ForeignKeyConstraint(['metadata_id'], ['metadata.id'], name=op.f('fk_metadata_genres_metadata_id_metadata'), ondelete='CASCADE'), + sa.PrimaryKeyConstraint('metadata_id', 'genre_id', name=op.f('pk_metadata_genres')), + sa.UniqueConstraint('metadata_id', 'genre_id', name=op.f('uq_metadata_genres_metadata_id')) + ) + + op.create_table('metadata_tags', + sa.Column('metadata_id', sa.Integer(), nullable=False), + sa.Column('tag_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['metadata_id'], ['metadata.id'], name=op.f('fk_metadata_tags_metadata_id_metadata'), ondelete='CASCADE'), + sa.ForeignKeyConstraint(['tag_id'], ['tags.id'], name=op.f('fk_metadata_tags_tag_id_tags'), ondelete='CASCADE'), + sa.PrimaryKeyConstraint('metadata_id', 'tag_id', name=op.f('pk_metadata_tags')), + sa.UniqueConstraint('metadata_id', 'tag_id', name=op.f('uq_metadata_tags_metadata_id')) + ) + + +def downgrade() -> None: + op.drop_table('metadata_tags') + op.drop_table('metadata_genres') + op.drop_table('metadata') + op.drop_table('game') + op.drop_table('genre') + op.drop_table('tags') \ No newline at end of file diff --git a/migrations/versions/002_add_user_system.py b/migrations/versions/002_add_user_system.py new file mode 100644 index 0000000..b77f9c1 --- /dev/null +++ b/migrations/versions/002_add_user_system.py @@ -0,0 +1,47 @@ +"""Add user authentication system + +Revision ID: 002 +Revises: 001 +Create Date: 2024-01-01 11:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = '002' +down_revision = '001' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Create users table + op.create_table('users', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('username', sa.String(length=50), nullable=False), + sa.Column('email', sa.String(length=100), nullable=False), + sa.Column('password_hash', sa.String(length=255), nullable=False), + sa.Column('role', sa.String(length=20), nullable=False), + sa.Column('is_active', sa.Boolean(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('last_login', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id', name=op.f('pk_users')) + ) + op.create_index('ix_users_email', 'users', ['email'], unique=True) + op.create_index('ix_users_username', 'users', ['username'], unique=True) + + # Create user_favorites association table + op.create_table('user_favorites', + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('game_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['game_id'], ['game.id'], name=op.f('fk_user_favorites_game_id_game'), ondelete='CASCADE'), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], name=op.f('fk_user_favorites_user_id_users'), ondelete='CASCADE'), + sa.PrimaryKeyConstraint('user_id', 'game_id', name=op.f('pk_user_favorites')), + sa.UniqueConstraint('user_id', 'game_id', name=op.f('uq_user_favorites_user_id')) + ) + + +def downgrade() -> None: + op.drop_table('user_favorites') + op.drop_table('users') \ No newline at end of file diff --git a/migrations/versions/002_example_migration.py.example b/migrations/versions/002_example_migration.py.example new file mode 100644 index 0000000..9e134b9 --- /dev/null +++ b/migrations/versions/002_example_migration.py.example @@ -0,0 +1,33 @@ +"""Example migration - add rating column to metadata + +This is an example of how to create a migration. +To use this: +1. Remove the .example extension +2. Update the revision ID and down_revision +3. Run: migrate upgrade + +Revision ID: 002 +Revises: 001 +Create Date: 2024-01-01 11:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = '002' +down_revision = '001' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Add a rating column to the metadata table + with op.batch_alter_table('metadata', schema=None) as batch_op: + batch_op.add_column(sa.Column('rating', sa.Float(), nullable=True)) + + +def downgrade() -> None: + # Remove the rating column from the metadata table + with op.batch_alter_table('metadata', schema=None) as batch_op: + batch_op.drop_column('rating') \ No newline at end of file diff --git a/migrations/versions/3e8f92662c04_add_local_image_path_fields.py b/migrations/versions/3e8f92662c04_add_local_image_path_fields.py new file mode 100644 index 0000000..39563b1 --- /dev/null +++ b/migrations/versions/3e8f92662c04_add_local_image_path_fields.py @@ -0,0 +1,38 @@ +"""add local image path fields + +Revision ID: 3e8f92662c04 +Revises: 002 +Create Date: 2025-09-06 01:18:21.497321 + +""" +from alembic import op +import sqlalchemy as sa +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent.parent / 'src')) +from libs.database import PathType + + +# revision identifiers, used by Alembic. +revision = '3e8f92662c04' +down_revision = '002' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('metadata', schema=None) as batch_op: + batch_op.add_column(sa.Column('cover_image_path', PathType(), nullable=True)) + batch_op.add_column(sa.Column('screenshot_path', PathType(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('metadata', schema=None) as batch_op: + batch_op.drop_column('screenshot_path') + batch_op.drop_column('cover_image_path') + + # ### end Alembic commands ### \ No newline at end of file diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..d90f291 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +addopts = --cov=src --cov-report=term-missing --ignore=src/__main__.py +testpaths = tests/ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c306ca9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,21 @@ +# DosVault Python Dependencies + +# Web Framework +fastapi>=0.104.0 +uvicorn[standard]>=0.24.0 +jinja2>=3.1.2 +python-multipart>=0.0.6 + +# Database +sqlalchemy>=2.0.0 +alembic>=1.12.0 + +# Authentication & Security +python-jose[cryptography]>=3.3.0 +bcrypt>=4.0.1 + +# HTTP Client +aiohttp>=3.9.0 + +# Utilities +pathlib2>=2.3.7; python_version<"3.4" \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/__main__.py b/src/__main__.py new file mode 100755 index 0000000..b735f4b --- /dev/null +++ b/src/__main__.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python +from __future__ import annotations + +import asyncio +import aiohttp +import logging +from pathlib import Path +from typing import Optional, List +from sqlalchemy import create_engine +from sqlalchemy.orm import Session +from libs.config import Config +from libs.database import (Base, ingest_roms, get_existing_rom_paths) +from libs.objects import Metadata, Game, Roms +from libs.functions import extract_year_from_title, clean_title, download_image, get_image_filename +from libs.apis import Credentials, IGDB +from libs.logging import get_log_manager + +config = Config() +token = Credentials(config).authenticate() +scrape_errors: List[str] = [] + +async def scrape_metadata(title: str, session: aiohttp.ClientSession) -> Metadata: + igdb_response = IGDB(token) + md = Metadata() + igdb_response = IGDB(token).search_game_by_title(clean_title(title)) + try: + if igdb_response[0]: + game_data = igdb_response[0] + try: md.title = game_data.get('name', title) + except: md.title = title + try: md.description = game_data.get('summary') + except: md.description = None + try: md.year = game_data.get('first_release_date') + except: md.year = extract_year_from_title(title) + try: md.developer = game_data.get('involved_companies', "")[0].get('company', "").get('name') + except: md.developer = None + try: md.publisher = game_data.get('involved_companies', "")[0].get('company', "").get('name') + except: md.publisher = None + try: md.genre = [genre['name'] for genre in game_data.get('genres', [])] + except: md.genre = [] + try: md.players = game_data.get('player_perspectives', 1)[0] + except: md.players = 1 + try: + cover_data = game_data.get('cover') + if cover_data and cover_data.get('image_id'): + md.cover_image = IGDB.build_cover_url(cover_data['image_id'], 'cover_big') + + # Download cover image locally + cover_filename = get_image_filename(md.cover_image, title, 'cover') + cover_path = config.images_path / cover_filename + if await download_image(md.cover_image, cover_path, session): + md.cover_image_path = cover_path + else: + md.cover_image = None + md.cover_image_path = None + except: + md.cover_image = None + md.cover_image_path = None + try: + artworks = game_data.get('artworks', []) + if artworks and artworks[0].get('image_id'): + md.screenshot = IGDB.build_cover_url(artworks[0]['image_id'], 'screenshot_med') + + # Download screenshot locally + screenshot_filename = get_image_filename(md.screenshot, title, 'screenshot') + screenshot_path = config.images_path / screenshot_filename + if await download_image(md.screenshot, screenshot_path, session): + md.screenshot_path = screenshot_path + else: + md.screenshot = None + md.screenshot_path = None + except: + md.screenshot = None + md.screenshot_path = None + try: md.tags = [theme['name'] for theme in game_data.get('themes', [])] + except: md.tags = [] + except IndexError: + pass + return md + +async def make_romlist(dir: Optional[Path] = None, roms: Optional[Roms] = None) -> Roms: + romList: Roms = roms if roms else Roms() + rompath: Path = dir if dir else config.rom_path + + for pointer in rompath.rglob("*"): + if pointer.is_file(): + title = pointer.stem + romList.list.append(Game(title=title, path=pointer, metadata=Metadata())) + return romList + + +async def inject_metadata(roms: Roms) -> Roms: + sem = asyncio.Semaphore(4) # run up to 4 concurrent scrapes + results = [None] * len(roms.list) + + async with aiohttp.ClientSession() as session: + async def _job(i: int, game): + async with sem: + try: + await asyncio.sleep(0.25) # keep your throttle + md = await scrape_metadata(game.title, session) + except ValueError: + scrape_errors.append(game.title) + md = Metadata(title=game.title, year=extract_year_from_title(game.title)) + # print each item as its done to the top of the screen + results[i] = md + print("\033[F\033[K", end='') + for err in scrape_errors[-5:]: + print(f"Error: {err}") + print(f"Scraped: {game.title} # {i+1}/{len(roms.list)}") + + tasks = [asyncio.create_task(_job(i, game)) for i, game in enumerate(roms.list)] + await asyncio.gather(*tasks) + + for game, md in zip(roms.list, results): + game.metadata = md + + return roms + +async def filter_new_roms(romlist: Roms, session: Session) -> Roms: + existing_paths = get_existing_rom_paths(session) + new_roms = Roms() + + for game in romlist.list: + if game.path.resolve() not in existing_paths: + new_roms.list.append(game) + + print(f"Found {len(romlist.list)} total ROMs") + print(f"Found {len(existing_paths)} existing ROMs in database") + print(f"Will scrape {len(new_roms.list)} new ROMs") + + return new_roms + +async def main(): + url = f"sqlite+pysqlite:///{config.database_path}" + engine = create_engine(url, future=True) + # Database tables are now managed by migrations + # Base.metadata.create_all(engine) + + with Session(engine) as s: + romlist = await make_romlist() + new_romlist = await filter_new_roms(romlist, s) + + if new_romlist.list: + new_romlist = await inject_metadata(new_romlist) + ingest_roms(new_romlist, s) + else: + print("No new ROMs to scrape!") + + print("Done\nError list:") + for err in scrape_errors: + print(f" - {err}") + +if __name__ == "__main__": + # Initialize logging + get_log_manager() + logging.info("Starting DosVault ROM scraper") + asyncio.run(main()) + diff --git a/src/backfill_images.py b/src/backfill_images.py new file mode 100644 index 0000000..484300d --- /dev/null +++ b/src/backfill_images.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python +""" +Backfill script to download images for existing games in the database. +This script finds games that have remote image URLs but no local image files, +and downloads them with proper error handling and progress tracking. +""" +from __future__ import annotations + +import asyncio +import aiohttp +from pathlib import Path +from typing import List, Optional +from sqlalchemy import create_engine, select, func +from sqlalchemy.orm import Session, selectinload + +try: + from libs.config import Config + from libs.database import Game_table, Metadata_table + from libs.functions import download_image, get_image_filename + from libs.apis import IGDB +except ImportError: + import sys + sys.path.append(str(Path(__file__).parent)) + from libs.config import Config + from libs.database import Game_table, Metadata_table + from libs.functions import download_image, get_image_filename + from libs.apis import IGDB + +class ImageBackfillManager: + def __init__(self): + self.config = Config() + self.engine = create_engine(f"sqlite+pysqlite:///{self.config.database_path}", future=True) + self.failed_downloads: List[str] = [] + self.successful_downloads: int = 0 + + def get_image_url(self, image_data: str, image_type: str = 'cover_big') -> Optional[str]: + """Convert image ID or URL to full URL.""" + if not image_data: + return None + + # If it's already a full URL, return as-is + if image_data.startswith('http'): + return image_data + + # Skip old numeric-only image IDs (from old IGDB API) - they're no longer valid + if image_data.isdigit(): + print(f" āš ļø Skipping old numeric image ID: {image_data}") + return None + + # New IGDB image IDs are alphanumeric (e.g., 'co3ws0') + if len(image_data) > 0 and not image_data.isspace(): + return IGDB.build_cover_url(image_data, image_type) + + return None + + def get_games_needing_images(self, limit: Optional[int] = None) -> List[Game_table]: + """Get games that have remote image URLs but no local image files.""" + with Session(self.engine) as session: + stmt = ( + select(Game_table) + .join(Metadata_table) + .options(selectinload(Game_table.metadata_obj)) # Eager load relationships + .where( + ( + # Has cover image URL but no local cover path + (Metadata_table.cover_image.is_not(None)) & + (Metadata_table.cover_image_path.is_(None)) + ) | ( + # Has screenshot URL but no local screenshot path + (Metadata_table.screenshot.is_not(None)) & + (Metadata_table.screenshot_path.is_(None)) + ) + ) + .order_by(Game_table.title) + ) + + if limit: + stmt = stmt.limit(limit) + + # Load the objects with eager loading + games = session.scalars(stmt).all() + return games + + def get_stats(self): + """Get statistics about images in the database.""" + with Session(self.engine) as session: + total_games = session.scalar(select(func.count(Game_table.id))) + + games_with_cover_urls = session.scalar( + select(func.count(Metadata_table.id)) + .where(Metadata_table.cover_image.is_not(None)) + ) + + games_with_local_covers = session.scalar( + select(func.count(Metadata_table.id)) + .where(Metadata_table.cover_image_path.is_not(None)) + ) + + games_with_screenshot_urls = session.scalar( + select(func.count(Metadata_table.id)) + .where(Metadata_table.screenshot.is_not(None)) + ) + + games_with_local_screenshots = session.scalar( + select(func.count(Metadata_table.id)) + .where(Metadata_table.screenshot_path.is_not(None)) + ) + + return { + 'total_games': total_games, + 'games_with_cover_urls': games_with_cover_urls, + 'games_with_local_covers': games_with_local_covers, + 'games_with_screenshot_urls': games_with_screenshot_urls, + 'games_with_local_screenshots': games_with_local_screenshots, + } + + async def download_images_for_game(self, game: Game_table, session: aiohttp.ClientSession) -> dict: + """Download images for a single game.""" + result = { + 'game_title': game.title, + 'cover_success': False, + 'screenshot_success': False, + 'cover_path': None, + 'screenshot_path': None, + 'errors': [] + } + + metadata = game.metadata_obj + if not metadata: + result['errors'].append('No metadata found') + return result + + # Download cover image if URL exists but no local file + if metadata.cover_image and not metadata.cover_image_path: + try: + cover_url = self.get_image_url(metadata.cover_image, 'cover_big') + if cover_url: + cover_filename = get_image_filename(cover_url, game.title, 'cover') + cover_path = self.config.images_path / cover_filename + + if await download_image(cover_url, cover_path, session): + result['cover_success'] = True + result['cover_path'] = cover_path + else: + result['errors'].append(f'Failed to download cover: {cover_url}') + else: + result['errors'].append(f'Invalid cover image data: {metadata.cover_image}') + except Exception as e: + result['errors'].append(f'Cover download error: {str(e)}') + + # Download screenshot if URL exists but no local file + if metadata.screenshot and not metadata.screenshot_path: + try: + screenshot_url = self.get_image_url(metadata.screenshot, 'screenshot_med') + if screenshot_url: + screenshot_filename = get_image_filename(screenshot_url, game.title, 'screenshot') + screenshot_path = self.config.images_path / screenshot_filename + + if await download_image(screenshot_url, screenshot_path, session): + result['screenshot_success'] = True + result['screenshot_path'] = screenshot_path + else: + result['errors'].append(f'Failed to download screenshot: {screenshot_url}') + else: + result['errors'].append(f'Invalid screenshot image data: {metadata.screenshot}') + except Exception as e: + result['errors'].append(f'Screenshot download error: {str(e)}') + + return result + + async def process_batch(self, games: List[Game_table], batch_size: int = 50): + """Process a batch of games with concurrent downloads.""" + semaphore = asyncio.Semaphore(4) # Limit concurrent downloads + + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: + async def download_with_semaphore(game): + async with semaphore: + await asyncio.sleep(0.1) # Small delay to be respectful + return await self.download_images_for_game(game, session) + + # Process in batches to avoid overwhelming the database + for i in range(0, len(games), batch_size): + batch = games[i:i + batch_size] + print(f"\nProcessing batch {i//batch_size + 1} ({len(batch)} games)...") + + # Download images concurrently for this batch + tasks = [download_with_semaphore(game) for game in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Update database with successful downloads + with Session(self.engine) as db_session: + for game, result in zip(batch, results): + if isinstance(result, Exception): + print(f" āœ— {game.title}: {str(result)}") + self.failed_downloads.append(f"{game.title}: {str(result)}") + continue + + # Update metadata with local paths + if result['cover_success']: + game.metadata_obj.cover_image_path = result['cover_path'] + self.successful_downloads += 1 + + if result['screenshot_success']: + game.metadata_obj.screenshot_path = result['screenshot_path'] + self.successful_downloads += 1 + + # Show progress + status = [] + if result['cover_success']: + status.append('cover āœ“') + if result['screenshot_success']: + status.append('screenshot āœ“') + if result['errors']: + status.extend([f"error: {err}" for err in result['errors']]) + + print(f" {game.title}: {', '.join(status) if status else 'no images needed'}") + + # Commit batch + db_session.commit() + print(f" Batch committed to database") + + async def run(self, limit: Optional[int] = None, dry_run: bool = False): + """Run the image backfill process.""" + print("šŸ–¼ļø ROM Image Backfill Tool") + print("=" * 50) + + # Show current statistics + stats = self.get_stats() + print(f"Database Statistics:") + print(f" Total games: {stats['total_games']}") + print(f" Games with cover URLs: {stats['games_with_cover_urls']}") + print(f" Games with local covers: {stats['games_with_local_covers']}") + print(f" Games with screenshot URLs: {stats['games_with_screenshot_urls']}") + print(f" Games with local screenshots: {stats['games_with_local_screenshots']}") + + # Use session context for all operations + with Session(self.engine) as session: + # Get games that need images within the session + stmt = ( + select(Game_table) + .join(Metadata_table) + .options(selectinload(Game_table.metadata_obj)) + .where( + ( + # Has cover image URL but no local cover path + (Metadata_table.cover_image.is_not(None)) & + (Metadata_table.cover_image_path.is_(None)) + ) | ( + # Has screenshot URL but no local screenshot path + (Metadata_table.screenshot.is_not(None)) & + (Metadata_table.screenshot_path.is_(None)) + ) + ) + .order_by(Game_table.title) + ) + + if limit: + stmt = stmt.limit(limit) + + games = session.scalars(stmt).all() + print(f"\nFound {len(games)} games needing image downloads") + + if not games: + print("āœ… All games already have local images!") + return + + if dry_run: + print("\nšŸ” DRY RUN - showing first 10 games that would be processed:") + for i, game in enumerate(games[:10]): + metadata = game.metadata_obj + print(f" {i+1}. {game.title}") + if metadata.cover_image and not metadata.cover_image_path: + cover_url = self.get_image_url(metadata.cover_image, 'cover_big') + print(f" Cover: {cover_url or metadata.cover_image}") + if metadata.screenshot and not metadata.screenshot_path: + screenshot_url = self.get_image_url(metadata.screenshot, 'screenshot_med') + print(f" Screenshot: {screenshot_url or metadata.screenshot}") + return + + # Confirm before proceeding + proceed = input(f"\nDownload images for {len(games)} games? [y/N]: ").strip().lower() + if proceed != 'y': + print("Cancelled.") + return + + # Process the games + await self.process_batch(games) + + # Show final results + print(f"\nāœ… Backfill Complete!") + print(f" Successfully downloaded: {self.successful_downloads} images") + print(f" Failed downloads: {len(self.failed_downloads)}") + + if self.failed_downloads: + print(f"\nFailed Downloads:") + for failure in self.failed_downloads[:10]: # Show first 10 + print(f" - {failure}") + if len(self.failed_downloads) > 10: + print(f" ... and {len(self.failed_downloads) - 10} more") + +async def main(): + import argparse + + parser = argparse.ArgumentParser(description="Download images for existing ROM entries") + parser.add_argument('--limit', type=int, help='Limit number of games to process') + parser.add_argument('--dry-run', action='store_true', help='Show what would be done without downloading') + parser.add_argument('--stats-only', action='store_true', help='Show statistics only') + + args = parser.parse_args() + + manager = ImageBackfillManager() + + if args.stats_only: + stats = manager.get_stats() + print("Database Statistics:") + for key, value in stats.items(): + print(f" {key}: {value}") + return + + await manager.run(limit=args.limit, dry_run=args.dry_run) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/create_admin.py b/src/create_admin.py new file mode 100755 index 0000000..60c2856 --- /dev/null +++ b/src/create_admin.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from libs.config import Config +from libs.database import Base, User_table, UserRole +from libs.auth import AuthManager +import sys + +def create_admin_user(): + config = Config() + engine = create_engine(f"sqlite+pysqlite:///{config.database_path}") + # Database tables are now managed by migrations + # Base.metadata.create_all(bind=engine) + + SessionLocal = sessionmaker(bind=engine) + db = SessionLocal() + + # Check if admin user exists + existing_admin = db.query(User_table).filter(User_table.role == UserRole.SUPER.value).first() + if existing_admin: + print(f"Admin user already exists: {existing_admin.username}") + return + + username = input("Enter admin username: ").strip() + email = input("Enter admin email: ").strip() + password = input("Enter admin password: ").strip() + + if not username or not email or not password: + print("All fields are required!") + sys.exit(1) + + # Check if username exists + existing_user = db.query(User_table).filter(User_table.username == username).first() + if existing_user: + print("Username already exists!") + sys.exit(1) + + admin_user = AuthManager.create_user(db, username, email, password, UserRole.SUPER.value) + print(f"Admin user created successfully: {admin_user.username}") + + db.close() + +if __name__ == "__main__": + create_admin_user() \ No newline at end of file diff --git a/src/libs/__init__.py b/src/libs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/libs/apis.py b/src/libs/apis.py new file mode 100644 index 0000000..914ee1d --- /dev/null +++ b/src/libs/apis.py @@ -0,0 +1,101 @@ +import requests +from dataclasses import dataclass +from enum import Enum +from .config import Config +from typing import Dict + + +class URLS(Enum): + IGDB_URL = "https://api.igdb.com/v4" + TWITCH_AUTH_URL = "https://id.twitch.tv/oauth2/token" + IGDB_GAMES_ENDPOINT = IGDB_URL + "/games" + IGDB_COVERS_ENDPOINT = IGDB_URL + "/covers" + +@dataclass +class Credentials: + client_id: str + client_secret: str + access_token: str|None = None + expiry: int|None = None + token_type: str|None = None + + def get_credentials(self) -> Dict: + auth_url = URLS.TWITCH_AUTH_URL.value+f"?client_id={self.client_id}&client_secret={self.client_secret}&grant_type=client_credentials" + resp = requests.post(auth_url) + if not resp.status_code == 200: + raise ValueError("Failed to obtain access token from Twitch") + else: + return resp.json() + + def authenticate(self) -> 'Credentials': + credentials: Dict = self.get_credentials() + self.access_token = credentials['access_token'] + self.expiry = credentials['expires_in'] + self.token_type = credentials['token_type'] + if not self.access_token: + raise ValueError("Failed to obtain access token") + return self + + def __init__(self, config: Config): + self.client_id = config.igdb_client_id + self.client_secret = config.igdb_api_key + + +class IGDB: + def __init__(self, credentials: Credentials): + self.client_id = credentials.client_id + self.access_token = credentials.access_token + self.token_type = credentials.token_type + if not self.access_token: + raise ValueError("Access token is not set. Please authenticate first.") + + def headers(self) -> Dict: + if not self.access_token: + raise ValueError("Access token is not set. Please authenticate first.") + return { + "Client-ID": self.client_id, + "Authorization": f"Bearer {self.access_token}", + } + + def search_game_by_title(self, query: str) -> Dict: + if not self.access_token: + raise ValueError("Access token is not set. Please authenticate first.") + search_url = URLS.IGDB_GAMES_ENDPOINT.value + headers = self.headers() + # Request full cover and artwork data with expanded fields + data = f"""search "{query}"; fields name,summary,first_release_date,rating,platforms.name,genres.name,involved_companies.company.name,cover.image_id,artworks.image_id,themes.name,player_perspectives,id; where platforms = (13); limit 10;""" + resp = requests.post(search_url, headers=headers, data=data) + if resp.status_code != 200: + raise ValueError(f"Failed to search games: {resp.status_code} - {resp.text}") + return resp.json() + + def get_cover_details(self, cover_id: int) -> Dict: + """Get cover details from IGDB by cover ID""" + if not self.access_token: + raise ValueError("Access token is not set. Please authenticate first.") + covers_url = URLS.IGDB_COVERS_ENDPOINT.value + headers = self.headers() + data = f"""fields image_id,url,height,width,game; where id = {cover_id};""" + resp = requests.post(covers_url, headers=headers, data=data) + if resp.status_code != 200: + raise ValueError(f"Failed to get cover details: {resp.status_code} - {resp.text}") + return resp.json() + + def get_covers_by_game_id(self, game_id: int) -> Dict: + """Get all covers for a specific game ID""" + if not self.access_token: + raise ValueError("Access token is not set. Please authenticate first.") + covers_url = URLS.IGDB_COVERS_ENDPOINT.value + headers = self.headers() + data = f"""fields image_id,url,height,width; where game = {game_id};""" + resp = requests.post(covers_url, headers=headers, data=data) + if resp.status_code != 200: + raise ValueError(f"Failed to get covers for game: {resp.status_code} - {resp.text}") + return resp.json() + + @staticmethod + def build_cover_url(image_id: str, size: str = "cover_big") -> str: + """Build IGDB cover URL from image_id + Size options: thumb, cover_small, screenshot_med, cover_big, logo_med, screenshot_big, screenshot_huge, thumb, micro, 720p, 1080p + """ + return f"https://images.igdb.com/igdb/image/upload/t_{size}/{image_id}.jpg" diff --git a/src/libs/auth.py b/src/libs/auth.py new file mode 100644 index 0000000..d1a7a17 --- /dev/null +++ b/src/libs/auth.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Optional +from passlib.context import CryptContext +from jose import JWTError, jwt +from sqlalchemy.orm import Session +from sqlalchemy import select +from .database import User_table, UserRole + +SECRET_KEY = "your-secret-key-change-this-in-production" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + + +class AuthManager: + @staticmethod + def verify_password(plain_password: str, hashed_password: str) -> bool: + return pwd_context.verify(plain_password, hashed_password) + + @staticmethod + def get_password_hash(password: str) -> str: + return pwd_context.hash(password) + + @staticmethod + def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + + @staticmethod + def verify_token(token: str) -> Optional[str]: + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + return None + return username + except JWTError: + return None + + @staticmethod + def authenticate_user(session: Session, username: str, password: str) -> Optional[User_table]: + user = session.scalar(select(User_table).where(User_table.username == username)) + if not user: + return None + if not AuthManager.verify_password(password, user.password_hash): + return None + return user + + @staticmethod + def get_user_by_username(session: Session, username: str) -> Optional[User_table]: + return session.scalar(select(User_table).where(User_table.username == username)) + + @staticmethod + def create_user(session: Session, username: str, email: str, password: str, role: str = UserRole.NORMAL.value) -> User_table: + hashed_password = AuthManager.get_password_hash(password) + user = User_table( + username=username, + email=email, + password_hash=hashed_password, + role=role + ) + session.add(user) + session.commit() + session.refresh(user) + return user \ No newline at end of file diff --git a/src/libs/config.py b/src/libs/config.py new file mode 100644 index 0000000..abfda36 --- /dev/null +++ b/src/libs/config.py @@ -0,0 +1,113 @@ +from pathlib import Path +from dataclasses import dataclass +from typing import Optional, Dict +import json +import os + +# Check for environment variable override (used in Docker) +if os.getenv("DOSFRONTEND_CONFIG_DIR"): + DOSFRONTEND_CONFIG_DIR: Path = Path(os.getenv("DOSFRONTEND_CONFIG_DIR")) +else: + # Default to XDG config directory for regular installations + XDG_CONFIG_HOME: Path = Path(Path.home()).joinpath(".config") + DOSFRONTEND_CONFIG_DIR: Path = XDG_CONFIG_HOME.joinpath("dosfrontend") + +DOSFRONTEND_CONFIG_FILE: Path = DOSFRONTEND_CONFIG_DIR.joinpath("config.json") + + +@dataclass +class Config: + path: Path = DOSFRONTEND_CONFIG_FILE + rom_path: Path = DOSFRONTEND_CONFIG_DIR.joinpath("roms") + metadata_path: Path = DOSFRONTEND_CONFIG_DIR.joinpath("metadata") + database_path: Path = DOSFRONTEND_CONFIG_DIR.joinpath("roms.db") + images_path: Path = DOSFRONTEND_CONFIG_DIR.joinpath("images") + host: str = "localhost" + port: int = 8080 + websocket_port: int = 8081 + igdb_api_key: str = "" + igdb_client_id: str = "" + + def __init__(self, path: Optional[Path] = None): + if path: + self.path = path + self.load() + + def load_env_secrets(self) -> Dict[str, str] | None: + secrets: Dict[str, str] = {} + igdb_api_key = os.getenv("IGDB_SECRET_KEY") + igdb_client_id = os.getenv("IGDB_CLIENT_ID") + if not igdb_api_key or not igdb_client_id: + file_path: Path = Path(__file__) + env_path: Path = file_path.parent.parent.parent.joinpath(".env") + if not env_path.exists(): + return + else: + with env_path.open('r') as f: + for line in f: + if line.startswith("#") or "=" not in line: + continue + key, value = line.strip().split("=", 1) + key, value = key.strip(), value.strip('"').strip("'") + secrets[key] = value + f.close() + if secrets.get("IGDB_SECRET_KEY") and secrets.get("IGDB_CLIENT_ID"): + return secrets + else: return None + else: + secrets = { + "IGDB_SECRET_KEY": igdb_api_key, + "IGDB_CLIENT_ID": igdb_client_id, + } + return secrets + + def to_dict(self) -> dict: + return { + "rom_path": str(self.rom_path), + "metadata_path": str(self.metadata_path), + "host": self.host, + "port": self.port, + "websocket_port": self.websocket_port, + "igdb_api_key": self.igdb_api_key, + "igdb_client_id": self.igdb_client_id, + } + + def save(self): + if not self.path.parent.exists(): + self.path.parent.mkdir(parents=True, exist_ok=True) + rom_path = input(f"Enter the path to your ROMs [{self.rom_path}] enter for default: ").strip() + metadata_path = input(f"Enter the path to your metadata [{self.metadata_path}] enter for default: ").strip() + self.rom_path = Path(rom_path) if rom_path else self.rom_path + self.metadata_path = Path(metadata_path) if metadata_path else self.metadata_path + if not self.rom_path.exists(): + self.rom_path.mkdir(parents=True, exist_ok=True) + if not self.metadata_path.exists(): + self.metadata_path.mkdir(parents=True, exist_ok=True) + if not self.images_path.exists(): + self.images_path.mkdir(parents=True, exist_ok=True) + with open(self.path, 'w') as f: + json.dump(self.to_dict(), f, indent=4) + f.close() + + def load(self) -> "Config": + if self.path.exists(): + with open(self.path, 'r') as f: + data = json.load(f) + self.rom_path = Path(data.get("rom_path", str(self.rom_path))) + self.metadata_path = Path(data.get("metadata_path", str(self.metadata_path))) + self.host = data.get("host", self.host) + self.port = data.get("port", self.port) + self.websocket_port = data.get("websocket_port", self.websocket_port) + if self.igdb_api_key == "" or self.igdb_client_id == "": + secrets = self.load_env_secrets() + if secrets: + self.igdb_api_key = secrets.get("IGDB_SECRET_KEY", "") + self.igdb_client_id = secrets.get("IGDB_CLIENT_ID", "") + f.close() + self.save() + return self + f.close() + else: + self.save() + self.load() + return self diff --git a/src/libs/database.py b/src/libs/database.py new file mode 100644 index 0000000..ec38a90 --- /dev/null +++ b/src/libs/database.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +from pathlib import Path +from typing import List, Optional +from datetime import datetime +from enum import Enum as PyEnum +from sqlalchemy import ( + String, + Integer, + ForeignKey, + Table, + Column, + UniqueConstraint, + MetaData, + select, + DateTime, + Boolean +) +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session +from sqlalchemy.types import TypeDecorator +from .objects import Roms +from .functions import extract_year_from_title + + +# ---- Base (with naming convention; nice for Alembic) ------------------------- +convention = { + "ix": "ix_%(column_0_label)s", + "uq": "uq_%(table_name)s_%(column_0_name)s", + "ck": "ck_%(table_name)s_%(constraint_name)s", + "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", + "pk": "pk_%(table_name)s", +} + +class Base(DeclarativeBase): + metadata = MetaData(naming_convention=convention) + + +# ---- PathType to store pathlib.Path as TEXT ---------------------------------- +class PathType(TypeDecorator): + impl = String + cache_ok = True + + def process_bind_param(self, value, dialect): + return None if value is None else str(value) + + def process_result_value(self, value, dialect): + return None if value is None else Path(value) + + +# ---- Association tables (use Column, not mapped_column) ---------------------- +metadata_tags = Table( + "metadata_tags", + Base.metadata, + Column("metadata_id", ForeignKey("metadata.id", ondelete="CASCADE"), primary_key=True), + Column("tag_id", ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True), + UniqueConstraint("metadata_id", "tag_id"), +) + +metadata_genres = Table( + "metadata_genres", + Base.metadata, + Column("metadata_id", ForeignKey("metadata.id", ondelete="CASCADE"), primary_key=True), + Column("genre_id", ForeignKey("genre.id", ondelete="CASCADE"), primary_key=True), + UniqueConstraint("metadata_id", "genre_id"), +) + +user_favorites = Table( + "user_favorites", + Base.metadata, + Column("user_id", ForeignKey("users.id", ondelete="CASCADE"), primary_key=True), + Column("game_id", ForeignKey("game.id", ondelete="CASCADE"), primary_key=True), + UniqueConstraint("user_id", "game_id"), +) + + +class UserRole(PyEnum): + DEMO = "demo" + NORMAL = "normal" + SUPER = "super" + + +class User_table(Base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + username: Mapped[str] = mapped_column(String(50), unique=True, index=True) + email: Mapped[str] = mapped_column(String(100), unique=True, index=True) + password_hash: Mapped[str] = mapped_column(String(255)) + role: Mapped[str] = mapped_column(String(20), default=UserRole.NORMAL.value) + is_active: Mapped[bool] = mapped_column(Boolean, default=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) + last_login: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + + favorites: Mapped[List["Game_table"]] = relationship( + secondary=user_favorites, + back_populates="favorited_by", + lazy="selectin", + ) + + def __repr__(self) -> str: + return f"User(id={self.id}, username={self.username!r}, role={self.role})" + + +class Tags_table(Base): + __tablename__ = "tags" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column(String(30), unique=True, index=True) + + games: Mapped[List["Metadata_table"]] = relationship( + secondary=metadata_tags, + back_populates="tags", + lazy="selectin", + ) + + +class Genre_table(Base): + __tablename__ = "genre" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column(String(30), unique=True, index=True) + + games: Mapped[List["Metadata_table"]] = relationship( + secondary=metadata_genres, + back_populates="genre", + lazy="selectin", + ) + + +class Game_table(Base): + __tablename__ = "game" + + id: Mapped[int] = mapped_column(primary_key=True) + title: Mapped[str] = mapped_column(String(66), index=True) + path: Mapped[Path] = mapped_column(PathType(), unique=True, nullable=False) + + metadata_obj: Mapped[Optional["Metadata_table"]] = relationship( + back_populates="game", + uselist=False, + cascade="all, delete-orphan", + passive_deletes=True, + ) + + favorited_by: Mapped[List["User_table"]] = relationship( + secondary=user_favorites, + back_populates="favorites", + lazy="selectin", + ) + + def __repr__(self) -> str: + return f"Game(id={self.id}, title={self.title!r}, path={str(self.path)!r})" + + +class Metadata_table(Base): + __tablename__ = "metadata" + + id: Mapped[int] = mapped_column(primary_key=True) + game_id: Mapped[int] = mapped_column( + ForeignKey("game.id", ondelete="CASCADE"), + unique=True, + nullable=False, + ) + + title: Mapped[str] = mapped_column(String(66)) + description: Mapped[Optional[str]] = mapped_column(String, nullable=True) + year: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + developer: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + publisher: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + players: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + cover_image: Mapped[Optional[str]] = mapped_column(String, nullable=True) # Remote URL + screenshot: Mapped[Optional[str]] = mapped_column(String, nullable=True) # Remote URL + cover_image_path: Mapped[Optional[Path]] = mapped_column(PathType(), nullable=True) # Local file path + screenshot_path: Mapped[Optional[Path]] = mapped_column(PathType(), nullable=True) # Local file path + + genre: Mapped[List[Genre_table]] = relationship( + secondary=metadata_genres, + back_populates="games", + lazy="selectin", + ) + tags: Mapped[List[Tags_table]] = relationship( + secondary=metadata_tags, + back_populates="games", + lazy="selectin", + ) + + game: Mapped["Game_table"] = relationship(back_populates="metadata_obj") + + def __repr__(self) -> str: + return f"Metadata(id={self.id}, game_id={self.game_id}, title={self.title!r}, year={self.year})" + +def _get_or_create_by_name(session: Session, model, name: str): + obj = session.scalar(select(model).where(model.name == name)) + if obj is None: + obj = model(name=name) + session.add(obj) + return obj + +def get_existing_rom_paths(session: Session) -> set[Path]: + return {game.path.resolve() for game in session.scalars(select(Game_table)).all()} + +def ingest_roms(roms: Roms, session: Session, *, batch: int = 200) -> int: + n = 0 + for g in roms.list: + game = session.scalar(select(Game_table).where(Game_table.path == g.path)) + if game is None: + game = Game_table(title=g.title, path=g.path) + session.add(game) + else: + game.title = g.title + mdto = g.metadata + md = game.metadata_obj + if md is None: + md = Metadata_table(game=game, title=mdto.title or g.title) + session.add(md) + + md.title = mdto.title or g.title + md.description = mdto.description + md.year = mdto.year if mdto.year is not None else extract_year_from_title(md.title) + md.developer = mdto.developer + md.publisher = mdto.publisher + md.players = mdto.players + md.cover_image = mdto.cover_image + md.screenshot = mdto.screenshot + md.cover_image_path = mdto.cover_image_path + md.screenshot_path = mdto.screenshot_path + + try: genres = sorted({s.strip() for s in (mdto.genre or []) if s and s.strip()}) + except: genres = [] + try: tags = sorted({s.strip() for s in (mdto.tags or []) if s and s.strip()}) + except: tags = [] + + md.genre = [_get_or_create_by_name(session, Genre_table, name) for name in genres] + md.tags = [_get_or_create_by_name(session, Tags_table, name) for name in tags] + + n += 1 + if n % batch == 0: + session.flush() + + session.commit() + return n + diff --git a/src/libs/functions.py b/src/libs/functions.py new file mode 100644 index 0000000..71d039f --- /dev/null +++ b/src/libs/functions.py @@ -0,0 +1,78 @@ +from typing import Optional +import re +import asyncio +import aiohttp +from pathlib import Path +import hashlib + +YEAR_RE = re.compile(r"\((\d{4})\)") +PARENS_RE = re.compile(r"\([^)]*\)") + +def extract_year_from_title(title: Optional[str]) -> Optional[int]: + if not title: + return None + m = YEAR_RE.search(title) + return int(m.group(1)) if m else None + +def clean_title(title: str) -> str: + # remove anything in (...) from the title + cleaned = PARENS_RE.sub("", title) + return " ".join(cleaned.split()).strip() + +async def download_image(url: str, save_path: Path, session: aiohttp.ClientSession) -> bool: + """ + Download an image from URL and save it locally. + + Args: + url: The image URL to download + save_path: Local path where to save the image + session: aiohttp client session + + Returns: + bool: True if download was successful, False otherwise + """ + try: + # Create directory if it doesn't exist + save_path.parent.mkdir(parents=True, exist_ok=True) + + async with session.get(url) as response: + if response.status == 200: + content = await response.read() + with open(save_path, 'wb') as f: + f.write(content) + return True + else: + print(f"Failed to download {url}: HTTP {response.status}") + return False + except Exception as e: + print(f"Error downloading {url}: {e}") + return False + +def get_image_filename(url: str, game_title: str, image_type: str) -> str: + """ + Generate a unique filename for an image based on game title and URL. + + Args: + url: The image URL + game_title: The game title + image_type: 'cover' or 'screenshot' + + Returns: + str: Generated filename + """ + # Create a hash of the URL to ensure uniqueness + url_hash = hashlib.md5(url.encode()).hexdigest()[:8] + + # Clean game title for filename + clean_name = re.sub(r'[^\w\-_\. ]', '', game_title) + clean_name = re.sub(r'\s+', '_', clean_name).strip('_') + + # Get file extension from URL + try: + ext = Path(url.split('?')[0]).suffix + if not ext: + ext = '.jpg' # Default extension + except: + ext = '.jpg' + + return f"{clean_name}_{image_type}_{url_hash}{ext}" diff --git a/src/libs/logging.py b/src/libs/logging.py new file mode 100644 index 0000000..1865b97 --- /dev/null +++ b/src/libs/logging.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python +"""Logging configuration for DosVault application.""" + +from __future__ import annotations + +import logging +import logging.handlers +import json +from pathlib import Path +from datetime import datetime, timezone +from typing import Dict, List, Optional, Any + +try: + from .config import Config +except ImportError: + from config import Config + + +class JSONFormatter(logging.Formatter): + """Custom JSON formatter for structured logging.""" + + def format(self, record: logging.LogRecord) -> str: + log_entry = { + 'timestamp': datetime.fromtimestamp(record.created).isoformat(), + 'level': record.levelname, + 'module': record.name, + 'message': record.getMessage(), + 'filename': record.filename, + 'line_number': record.lineno, + } + + if record.exc_info: + log_entry['traceback'] = self.formatException(record.exc_info) + + return json.dumps(log_entry) + + +class LogManager: + """Manages logging configuration and log file access.""" + + def __init__(self, config: Optional[Config] = None): + self.config = config or Config() + # Use the existing config directory structure + self.log_dir = self.config.path.parent / "logs" + self.log_dir.mkdir(exist_ok=True) + + self.log_file = self.log_dir / "application.log" + self.error_log_file = self.log_dir / "error.log" + + self._setup_logging() + + def _setup_logging(self): + """Configure logging handlers and formatters.""" + # Create root logger + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + + # Clear existing handlers + root_logger.handlers.clear() + + # Console handler with simple format + console_handler = logging.StreamHandler() + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + console_handler.setFormatter(console_formatter) + console_handler.setLevel(logging.INFO) + root_logger.addHandler(console_handler) + + # File handler with JSON format + file_handler = logging.handlers.RotatingFileHandler( + self.log_file, + maxBytes=10*1024*1024, # 10MB + backupCount=5 + ) + file_handler.setFormatter(JSONFormatter()) + file_handler.setLevel(logging.DEBUG) + root_logger.addHandler(file_handler) + + # Error file handler + error_handler = logging.handlers.RotatingFileHandler( + self.error_log_file, + maxBytes=5*1024*1024, # 5MB + backupCount=3 + ) + error_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s\n%(pathname)s:%(lineno)d\n' + ) + error_handler.setFormatter(error_formatter) + error_handler.setLevel(logging.ERROR) + root_logger.addHandler(error_handler) + + # Log startup + logging.info("DosVault logging system initialized") + + def get_recent_logs(self, limit: int = 1000, level_filter: Optional[str] = None, since: Optional[str] = None) -> List[Dict[str, Any]]: + """Get recent log entries from the log file.""" + logs = [] + + if not self.log_file.exists(): + return logs + + try: + # Parse the since timestamp if provided + since_datetime = None + if since: + try: + since_datetime = datetime.fromisoformat(since.replace('Z', '+00:00')) + except ValueError: + logging.warning(f"Invalid since timestamp format: {since}") + + with open(self.log_file, 'r', encoding='utf-8') as f: + lines = f.readlines() + # Get the last 'limit*2' lines to ensure we have enough after filtering + recent_lines = lines[-(limit*2):] if len(lines) > limit*2 else lines + + for line in recent_lines: + line = line.strip() + if not line: + continue + + try: + log_entry = json.loads(line) + + # Apply time filter if specified + if since_datetime: + try: + log_datetime = datetime.fromisoformat(log_entry['timestamp']) + + # Handle timezone-aware/naive comparison + if log_datetime.tzinfo is None and since_datetime.tzinfo is not None: + # Make log_datetime timezone-aware (assume UTC) + log_datetime = log_datetime.replace(tzinfo=timezone.utc) + elif log_datetime.tzinfo is not None and since_datetime.tzinfo is None: + # Make since_datetime timezone-aware (assume UTC) + since_datetime = since_datetime.replace(tzinfo=timezone.utc) + + if log_datetime <= since_datetime: + continue + except (ValueError, KeyError): + pass # Skip time filtering for invalid timestamps + + # Apply level filter if specified + if level_filter and log_entry.get('level') != level_filter: + continue + + logs.append(log_entry) + except json.JSONDecodeError: + # Handle non-JSON log lines + logs.append({ + 'timestamp': datetime.now().isoformat(), + 'level': 'INFO', + 'module': 'system', + 'message': line + }) + + # Sort by timestamp and limit results + logs.sort(key=lambda x: x.get('timestamp', '')) + logs = logs[-limit:] if len(logs) > limit else logs + + except Exception as e: + logging.error(f"Error reading log file: {e}") + + return logs + + def get_log_files(self) -> List[Dict[str, Any]]: + """Get information about available log files.""" + files = [] + + for log_file in self.log_dir.glob("*.log*"): + try: + stat = log_file.stat() + files.append({ + 'name': log_file.name, + 'path': str(log_file), + 'size': stat.st_size, + 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat() + }) + except Exception as e: + logging.error(f"Error getting file info for {log_file}: {e}") + + return sorted(files, key=lambda x: x['modified'], reverse=True) + + def clear_old_logs(self, keep_days: int = 7) -> int: + """Clear log files older than specified days.""" + cleared_count = 0 + cutoff_time = datetime.now().timestamp() - (keep_days * 24 * 3600) + + for log_file in self.log_dir.glob("*.log.*"): # Rotated logs only + try: + if log_file.stat().st_mtime < cutoff_time: + log_file.unlink() + cleared_count += 1 + logging.info(f"Cleared old log file: {log_file.name}") + except Exception as e: + logging.error(f"Error clearing log file {log_file}: {e}") + + return cleared_count + + def get_log_file_content(self, file_type: str = "application") -> Optional[Path]: + """Get the path to a specific log file for download.""" + if file_type == "application": + return self.log_file if self.log_file.exists() else None + elif file_type == "error": + return self.error_log_file if self.error_log_file.exists() else None + else: + # Look for specific log file + log_file = self.log_dir / f"{file_type}.log" + return log_file if log_file.exists() else None + + +# Global log manager instance - initialized lazily +log_manager = None + +def get_log_manager() -> LogManager: + """Get or create the global log manager instance.""" + global log_manager + if log_manager is None: + log_manager = LogManager() + return log_manager \ No newline at end of file diff --git a/src/libs/objects.py b/src/libs/objects.py new file mode 100644 index 0000000..b6c176a --- /dev/null +++ b/src/libs/objects.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass, field +from typing import List, Optional +from pathlib import Path + +@dataclass +class Metadata: + title: str = None + description: Optional[str] = None + year: Optional[int] = None + developer: Optional[str] = None + publisher: Optional[str] = None + genre: Optional[List[str]] = field(default_factory=list) + players: Optional[int] = None + cover_image: Optional[str] = None # Remote URL + screenshot: Optional[str] = None # Remote URL + cover_image_path: Optional[Path] = None # Local file path + screenshot_path: Optional[Path] = None # Local file path + tags: Optional[List[str]] = field(default_factory=list) + +@dataclass +class Game: + title: str + path: Path + metadata: Metadata|None = None + +@dataclass +class Roms: + list: List[Game] = field(default_factory=list) + diff --git a/src/migrate.py b/src/migrate.py new file mode 100755 index 0000000..c17a41d --- /dev/null +++ b/src/migrate.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +""" +Database migration management script. +""" +import sys +import argparse +from pathlib import Path +from alembic.config import Config +from alembic import command +from alembic.script import ScriptDirectory +from alembic.runtime.environment import EnvironmentContext +from sqlalchemy import create_engine, inspect + +# Add current directory to path for imports +sys.path.insert(0, str(Path(__file__).parent)) + +from libs.config import Config as AppConfig +from libs.database import Base + +def get_alembic_config(): + """Get Alembic configuration object.""" + alembic_cfg = Config(str(Path(__file__).parent.parent / "alembic.ini")) + app_config = AppConfig() + alembic_cfg.set_main_option("sqlalchemy.url", f"sqlite:///{app_config.database_path}") + return alembic_cfg + +def init_database(): + """Initialize database tables without Alembic for first-time setup.""" + app_config = AppConfig() + engine = create_engine(f"sqlite:///{app_config.database_path}") + Base.metadata.create_all(engine) + print(f"Database initialized at {app_config.database_path}") + +def create_migration(message: str): + """Create a new migration file.""" + alembic_cfg = get_alembic_config() + command.revision(alembic_cfg, message=message, autogenerate=True) + print(f"Created migration: {message}") + +def upgrade_database(revision: str = "head"): + """Upgrade database to a specific revision.""" + alembic_cfg = get_alembic_config() + command.upgrade(alembic_cfg, revision) + print(f"Database upgraded to {revision}") + +def downgrade_database(revision: str): + """Downgrade database to a specific revision.""" + alembic_cfg = get_alembic_config() + command.downgrade(alembic_cfg, revision) + print(f"Database downgraded to {revision}") + +def show_history(): + """Show migration history.""" + alembic_cfg = get_alembic_config() + command.history(alembic_cfg) + +def show_current(): + """Show current database revision.""" + alembic_cfg = get_alembic_config() + command.current(alembic_cfg) + +def stamp_database(revision: str = "head"): + """Mark the database as being at a specific revision without running migrations.""" + alembic_cfg = get_alembic_config() + command.stamp(alembic_cfg, revision) + print(f"Database stamped at {revision}") + +def check_database_exists(): + """Check if database and migration table exist.""" + app_config = AppConfig() + db_path = Path(app_config.database_path) + + if not db_path.exists(): + print("Database does not exist.") + return False + + # Check if alembic_version table exists + engine = create_engine(f"sqlite:///{app_config.database_path}") + inspector = inspect(engine) + tables = inspector.get_table_names() + + if "alembic_version" not in tables: + print("Database exists but is not under Alembic control.") + return False + + print("Database exists and is under Alembic control.") + return True + +def main(): + parser = argparse.ArgumentParser(description="Database migration management") + subparsers = parser.add_subparsers(dest='command', help='Available commands') + + # Init command + subparsers.add_parser('init', help='Initialize database (for first-time setup)') + + # Stamp command + stamp_parser = subparsers.add_parser('stamp', help='Mark database as being at a specific revision') + stamp_parser.add_argument('revision', nargs='?', default='head', help='Revision to stamp (default: head)') + + # Create migration command + create_parser = subparsers.add_parser('create', help='Create a new migration') + create_parser.add_argument('message', help='Migration message') + + # Upgrade command + upgrade_parser = subparsers.add_parser('upgrade', help='Upgrade database') + upgrade_parser.add_argument('revision', nargs='?', default='head', help='Target revision (default: head)') + + # Downgrade command + downgrade_parser = subparsers.add_parser('downgrade', help='Downgrade database') + downgrade_parser.add_argument('revision', help='Target revision') + + # History command + subparsers.add_parser('history', help='Show migration history') + + # Current command + subparsers.add_parser('current', help='Show current database revision') + + # Check command + subparsers.add_parser('check', help='Check database status') + + args = parser.parse_args() + + if not args.command: + parser.print_help() + return + + try: + if args.command == 'init': + init_database() + elif args.command == 'stamp': + stamp_database(args.revision) + elif args.command == 'create': + create_migration(args.message) + elif args.command == 'upgrade': + upgrade_database(args.revision) + elif args.command == 'downgrade': + downgrade_database(args.revision) + elif args.command == 'history': + show_history() + elif args.command == 'current': + show_current() + elif args.command == 'check': + check_database_exists() + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/refresh_covers.py b/src/refresh_covers.py new file mode 100755 index 0000000..df5b6bb --- /dev/null +++ b/src/refresh_covers.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python +""" +Script to refresh cover image metadata for games with old numeric image IDs. +This will re-query IGDB for fresh image data and download the images locally. +""" +from __future__ import annotations + +import asyncio +import aiohttp +from pathlib import Path +from typing import List, Optional +from sqlalchemy import create_engine, select, func +from sqlalchemy.orm import Session, selectinload + +try: + from libs.config import Config + from libs.database import Game_table, Metadata_table + from libs.functions import download_image, get_image_filename, clean_title + from libs.apis import Credentials, IGDB +except ImportError: + import sys + sys.path.append(str(Path(__file__).parent)) + from libs.config import Config + from libs.database import Game_table, Metadata_table + from libs.functions import download_image, get_image_filename, clean_title + from libs.apis import Credentials, IGDB + +class CoverRefreshManager: + def __init__(self): + self.config = Config() + self.engine = create_engine(f"sqlite+pysqlite:///{self.config.database_path}", future=True) + + # Initialize IGDB API + token = Credentials(self.config).authenticate() + self.igdb = IGDB(token) + + self.refreshed_count = 0 + self.download_success_count = 0 + self.failed_refreshes: List[str] = [] + + def get_games_with_old_image_ids(self, limit: Optional[int] = None) -> List[Game_table]: + """Get games that have old numeric image IDs.""" + with Session(self.engine) as session: + stmt = ( + select(Game_table) + .join(Metadata_table) + .options(selectinload(Game_table.metadata_obj)) + .where( + # Has old numeric image IDs (not alphanumeric) + Metadata_table.cover_image.op('REGEXP')('^[0-9]+$') + ) + .order_by(Game_table.title) + ) + + if limit: + stmt = stmt.limit(limit) + + return session.scalars(stmt).all() + + async def refresh_game_metadata(self, game: Game_table, session: aiohttp.ClientSession) -> dict: + """Refresh metadata for a single game and download images.""" + result = { + 'game_title': game.title, + 'api_success': False, + 'cover_updated': False, + 'cover_downloaded': False, + 'screenshot_updated': False, + 'screenshot_downloaded': False, + 'errors': [] + } + + try: + # Search for fresh game data + clean_title_text = clean_title(game.title) + igdb_response = self.igdb.search_game_by_title(clean_title_text) + + if not igdb_response or len(igdb_response) == 0: + result['errors'].append('No IGDB results found') + return result + + game_data = igdb_response[0] # Take the first result + result['api_success'] = True + + metadata = game.metadata_obj + if not metadata: + result['errors'].append('No metadata object found') + return result + + # Update cover image if found + cover_data = game_data.get('cover') + if cover_data and cover_data.get('image_id'): + new_cover_id = cover_data['image_id'] + new_cover_url = IGDB.build_cover_url(new_cover_id, 'cover_big') + + # Update database with new image ID/URL + metadata.cover_image = new_cover_id # Store the new ID + result['cover_updated'] = True + + # Download the image + cover_filename = get_image_filename(new_cover_url, game.title, 'cover') + cover_path = self.config.images_path / cover_filename + + if await download_image(new_cover_url, cover_path, session): + metadata.cover_image_path = cover_path + result['cover_downloaded'] = True + self.download_success_count += 1 + else: + result['errors'].append(f'Failed to download cover: {new_cover_url}') + + # Update screenshot if found + artworks = game_data.get('artworks', []) + if artworks and len(artworks) > 0 and artworks[0].get('image_id'): + new_screenshot_id = artworks[0]['image_id'] + new_screenshot_url = IGDB.build_cover_url(new_screenshot_id, 'screenshot_med') + + # Update database with new image ID/URL + metadata.screenshot = new_screenshot_id # Store the new ID + result['screenshot_updated'] = True + + # Download the image + screenshot_filename = get_image_filename(new_screenshot_url, game.title, 'screenshot') + screenshot_path = self.config.images_path / screenshot_filename + + if await download_image(new_screenshot_url, screenshot_path, session): + metadata.screenshot_path = screenshot_path + result['screenshot_downloaded'] = True + self.download_success_count += 1 + else: + result['errors'].append(f'Failed to download screenshot: {new_screenshot_url}') + + if result['cover_updated'] or result['screenshot_updated']: + self.refreshed_count += 1 + + except Exception as e: + result['errors'].append(f'API error: {str(e)}') + + return result + + async def process_batch(self, games: List[Game_table], batch_size: int = 20): + """Process games in batches with rate limiting.""" + semaphore = asyncio.Semaphore(2) # Lower concurrency for API calls + + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: + async def refresh_with_semaphore(game): + async with semaphore: + await asyncio.sleep(0.5) # Rate limiting - be respectful to IGDB API + return await self.refresh_game_metadata(game, session) + + # Process in smaller batches to avoid overwhelming the API + for i in range(0, len(games), batch_size): + batch = games[i:i + batch_size] + print(f"\nProcessing batch {i//batch_size + 1} ({len(batch)} games)...") + + # Process this batch + tasks = [refresh_with_semaphore(game) for game in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Update database with results - need to reattach objects to new session + with Session(self.engine) as db_session: + for game, result in zip(batch, results): + if isinstance(result, Exception): + print(f" āœ— {game.title}: Exception - {str(result)}") + self.failed_refreshes.append(f"{game.title}: {str(result)}") + continue + + # Reattach the game object to this session + db_session.merge(game) + + # Update the game's metadata directly + if result.get('cover_path'): + game.metadata_obj.cover_image_path = result['cover_path'] + if result.get('screenshot_path'): + game.metadata_obj.screenshot_path = result['screenshot_path'] + + # Show progress + status = [] + if result['api_success']: + if result['cover_updated']: + status.append('cover updated' + (' + downloaded' if result['cover_downloaded'] else '')) + if result['screenshot_updated']: + status.append('screenshot updated' + (' + downloaded' if result['screenshot_downloaded'] else '')) + + if result['errors']: + error_summary = '; '.join(result['errors'][:2]) # Show first 2 errors + if len(result['errors']) > 2: + error_summary += f' (+{len(result["errors"])-2} more)' + status.append(f"errors: {error_summary}") + self.failed_refreshes.append(f"{game.title}: {error_summary}") + + print(f" {game.title}: {', '.join(status) if status else 'no changes'}") + + # Commit batch + db_session.commit() + print(f" Batch committed to database") + + async def run(self, limit: Optional[int] = None, dry_run: bool = False): + """Run the cover refresh process.""" + print("šŸ”„ ROM Cover Refresh Tool") + print("=" * 50) + + # Get games with old image IDs + with Session(self.engine) as session: + # Use raw SQL for SQLite REGEXP (since SQLite's REGEXP isn't standard) + stmt = ( + select(Game_table) + .join(Metadata_table) + .options(selectinload(Game_table.metadata_obj)) + .where( + # Check if cover_image is purely numeric (old format) + Metadata_table.cover_image.isnot(None) & + ~Metadata_table.cover_image.op('GLOB')('*[a-zA-Z]*') # No letters + ) + .order_by(Game_table.title) + ) + + if limit: + stmt = stmt.limit(limit) + + games = session.scalars(stmt).all() + print(f"Found {len(games)} games with old numeric image IDs") + + if not games: + print("āœ… No games need cover refresh!") + return + + if dry_run: + print("\nšŸ” DRY RUN - showing first 10 games that would be processed:") + for i, game in enumerate(games[:10]): + metadata = game.metadata_obj + print(f" {i+1}. {game.title}") + print(f" Current cover ID: {metadata.cover_image}") + if metadata.screenshot: + print(f" Current screenshot ID: {metadata.screenshot}") + return + + # Show warning about API usage + print(f"\nāš ļø This will make {len(games)} IGDB API calls.") + print(" Be mindful of rate limits and API quotas.") + + proceed = input(f"\nRefresh metadata for {len(games)} games? [y/N]: ").strip().lower() + if proceed != 'y': + print("Cancelled.") + return + + # Process the games + await self.process_batch(games) + + # Show final results + print(f"\nāœ… Refresh Complete!") + print(f" Games with updated metadata: {self.refreshed_count}") + print(f" Images successfully downloaded: {self.download_success_count}") + print(f" Failed refreshes: {len(self.failed_refreshes)}") + + if self.failed_refreshes: + print(f"\nFailed Refreshes (first 10):") + for failure in self.failed_refreshes[:10]: + print(f" - {failure}") + if len(self.failed_refreshes) > 10: + print(f" ... and {len(self.failed_refreshes) - 10} more") + +async def main(): + import argparse + + parser = argparse.ArgumentParser(description="Refresh cover metadata for games with old image IDs") + parser.add_argument('--limit', type=int, help='Limit number of games to process') + parser.add_argument('--dry-run', action='store_true', help='Show what would be done without processing') + + args = parser.parse_args() + + manager = CoverRefreshManager() + await manager.run(limit=args.limit, dry_run=args.dry_run) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/webapp.py b/src/webapp.py new file mode 100755 index 0000000..f56f1f7 --- /dev/null +++ b/src/webapp.py @@ -0,0 +1,1071 @@ +#!/usr/bin/env python +from __future__ import annotations + +from typing import Optional, Annotated +from datetime import timedelta, datetime, timezone +import re +import asyncio +import subprocess +from pathlib import Path + +from fastapi import FastAPI, Depends, HTTPException, status, Request, Form, Query, BackgroundTasks +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse, JSONResponse +from fastapi.templating import Jinja2Templates +from fastapi.staticfiles import StaticFiles +from fastapi.middleware.cors import CORSMiddleware +from sqlalchemy import create_engine, select, func +from sqlalchemy.orm import Session, sessionmaker + +try: + # Try relative imports first (when run as module) + from .libs.config import Config + from .libs.database import Base, Game_table, Metadata_table, User_table, UserRole, user_favorites, Tags_table, Genre_table + from .libs.auth import AuthManager, ACCESS_TOKEN_EXPIRE_MINUTES + from .libs.logging import get_log_manager +except ImportError: + # Fall back to absolute imports (when run directly) + from libs.config import Config + from libs.database import Base, Game_table, Metadata_table, User_table, UserRole, user_favorites, Tags_table, Genre_table + from libs.auth import AuthManager, ACCESS_TOKEN_EXPIRE_MINUTES + from libs.logging import get_log_manager + +config = Config() +engine = create_engine(f"sqlite+pysqlite:///{config.database_path}", echo=False) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +# Initialize logging system +import logging +get_log_manager() +logging.info("DosVault web application starting up") + +app = FastAPI(title="DOS Frontend", description="ROM Management System") + +# Mount static files for images +app.mount("/images", StaticFiles(directory=str(config.images_path)), name="images") + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Configure this properly for production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +security = HTTPBearer(auto_error=False) + +# Database tables are now managed by migrations +# Base.metadata.create_all(bind=engine) + +templates = Jinja2Templates(directory="templates") +templates.env.globals['max'] = max +templates.env.globals['min'] = min + +# Removed proxy image URL function + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + + +def get_current_user( + request: Request, + credentials: Annotated[Optional[HTTPAuthorizationCredentials], Depends(security)], + db: Session = Depends(get_db) +) -> Optional[User_table]: + token = None + + # Try to get token from Authorization header first (for API calls) + if credentials: + token = credentials.credentials + + # If no Authorization header, try to get token from cookie (for page requests) + if not token and "auth_token" in request.cookies: + token = request.cookies["auth_token"] + + if not token: + logging.debug("No authentication token found in request") + return None + + username = AuthManager.verify_token(token) + if username is None: + logging.debug("Token verification failed") + return None + + user = AuthManager.get_user_by_username(db, username) + if not user or not user.is_active: + logging.debug(f"User not found or inactive: {username}") + return None + + logging.debug(f"Authentication successful for user: {username}") + return user + + +def require_auth(current_user: Optional[User_table] = Depends(get_current_user)): + if not current_user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Not authenticated", + headers={"WWW-Authenticate": "Bearer"}, + ) + return current_user + + +def require_super_user(current_user: User_table = Depends(require_auth)): + if current_user.role != UserRole.SUPER.value: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Insufficient permissions" + ) + return current_user + + +@app.get("/", response_class=HTMLResponse) +async def index(request: Request, db: Session = Depends(get_db), current_user: Optional[User_table] = Depends(get_current_user)): + page = int(request.query_params.get("page", 1)) + per_page = int(request.query_params.get("per_page", 20)) + search = request.query_params.get("search", "").strip() + view = request.query_params.get("view", "grid") # grid or list + + # Limit per_page to reasonable values + per_page = max(10, min(per_page, 100)) + offset = (page - 1) * per_page + + # Base query + games_query = select(Game_table) + count_query = select(func.count(Game_table.id)) + + # Add search filtering + if search: + # Fuzzy-ish search - split search terms and match any of them + search_terms = search.split() + search_conditions = [] + + for term in search_terms: + term_pattern = f"%{term}%" + term_condition = ( + Game_table.title.ilike(term_pattern) | + (Game_table.metadata_obj.has(Metadata_table.title.ilike(term_pattern))) | + (Game_table.metadata_obj.has(Metadata_table.description.ilike(term_pattern))) | + (Game_table.metadata_obj.has(Metadata_table.developer.ilike(term_pattern))) | + (Game_table.metadata_obj.has(Metadata_table.publisher.ilike(term_pattern))) | + (Game_table.metadata_obj.has(Metadata_table.genre.any(Genre_table.name.ilike(term_pattern)))) | + (Game_table.metadata_obj.has(Metadata_table.tags.any(Tags_table.name.ilike(term_pattern)))) + ) + search_conditions.append(term_condition) + + # Match all terms (AND logic) for better relevance + if search_conditions: + from sqlalchemy import and_ + combined_filter = and_(*search_conditions) + games_query = games_query.where(combined_filter) + count_query = count_query.where(combined_filter) + + total_games = db.scalar(count_query) + # Add alphabetical sorting by default + games = db.scalars(games_query.order_by(Game_table.title).offset(offset).limit(per_page)).all() + + # Get user's favorite game IDs if logged in + user_favorites = set() + if current_user and current_user.role != UserRole.DEMO.value: + user_favorites = {game.id for game in current_user.favorites} + + total_pages = (total_games + per_page - 1) // per_page + + return templates.TemplateResponse("index.html", { + "request": request, + "games": games, + "current_page": page, + "total_pages": total_pages, + "per_page": per_page, + "search": search, + "view": view, + "total_games": total_games, + "current_user": current_user, + "is_demo": current_user is None or current_user.role == UserRole.DEMO.value, + "user_favorites": user_favorites + }) + + +@app.post("/login") +async def login( + username: str = Form(...), + password: str = Form(...), + db: Session = Depends(get_db) +): + user = AuthManager.authenticate_user(db, username, password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password" + ) + + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = AuthManager.create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + + user.last_login = datetime.now(timezone.utc) + db.commit() + + # Create response with both JSON data and cookie + response = JSONResponse(content={"access_token": access_token, "token_type": "bearer"}) + response.set_cookie( + key="auth_token", + value=access_token, + max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60, # Convert minutes to seconds + httponly=False, # Allow JavaScript access for API calls + secure=False, # Set to True in production with HTTPS + samesite="lax" # CSRF protection + ) + + return response + + +@app.post("/logout") +async def logout(): + response = JSONResponse(content={"message": "Logged out successfully"}) + response.delete_cookie(key="auth_token") + return response + + +@app.get("/games/{game_id}") +async def get_game( + game_id: int, + request: Request, + db: Session = Depends(get_db), + current_user: Optional[User_table] = Depends(get_current_user) +): + game = db.get(Game_table, game_id) + if not game: + raise HTTPException(status_code=404, detail="Game not found") + + is_favorite = False + if current_user and current_user.role != UserRole.DEMO.value: + is_favorite = game in current_user.favorites + + return templates.TemplateResponse("game_detail.html", { + "request": request, + "game": game, + "current_user": current_user, + "is_favorite": is_favorite, + "can_download": current_user and current_user.role != UserRole.DEMO.value, + "is_demo": current_user is None or current_user.role == UserRole.DEMO.value + }) + + +@app.post("/games/{game_id}/favorite") +async def toggle_favorite( + game_id: int, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_auth) +): + if current_user.role == UserRole.DEMO.value: + raise HTTPException(status_code=403, detail="Demo users cannot favorite games") + + game = db.get(Game_table, game_id) + if not game: + raise HTTPException(status_code=404, detail="Game not found") + + if game in current_user.favorites: + current_user.favorites.remove(game) + action = "removed" + else: + current_user.favorites.append(game) + action = "added" + + db.commit() + return {"message": f"Game {action} from favorites"} + + +@app.get("/download/{game_id}") +async def download_game( + game_id: int, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_auth) +): + if current_user.role == UserRole.DEMO.value: + raise HTTPException(status_code=403, detail="Demo users cannot download games") + + game = db.get(Game_table, game_id) + if not game: + raise HTTPException(status_code=404, detail="Game not found") + + if not game.path.exists(): + raise HTTPException(status_code=404, detail="Game file not found") + + # Create a clean filename using the game title + game_title = game.metadata_obj.title if game.metadata_obj and game.metadata_obj.title else game.title + # Clean the title for use as filename + clean_title = re.sub(r'[^\w\s-]', '', game_title).strip() + clean_title = re.sub(r'[-\s]+', '-', clean_title) + + # Get the original file extension + original_extension = game.path.suffix + download_filename = f"{clean_title}{original_extension}" + + return FileResponse( + path=str(game.path), + filename=download_filename, + media_type='application/octet-stream' + ) + + +@app.get("/admin/games/{game_id}/edit") +async def edit_game_form( + game_id: int, + request: Request, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + game = db.get(Game_table, game_id) + if not game: + raise HTTPException(status_code=404, detail="Game not found") + + return templates.TemplateResponse("edit_game.html", { + "request": request, + "game": game, + "current_user": current_user + }) + + +@app.post("/admin/games/{game_id}/edit") +async def update_game( + game_id: int, + title: str = Form(...), + description: Optional[str] = Form(None), + year: Optional[int] = Form(None), + developer: Optional[str] = Form(None), + publisher: Optional[str] = Form(None), + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + game = db.get(Game_table, game_id) + if not game: + raise HTTPException(status_code=404, detail="Game not found") + + if game.metadata_obj: + metadata = game.metadata_obj + else: + metadata = Metadata_table(game=game) + db.add(metadata) + + metadata.title = title + metadata.description = description + metadata.year = year + metadata.developer = developer + metadata.publisher = publisher + + db.commit() + + return RedirectResponse(url=f"/games/{game_id}", status_code=303) + + +@app.get("/favorites") +async def favorites( + request: Request, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_auth) +): + if current_user.role == UserRole.DEMO.value: + raise HTTPException(status_code=403, detail="Demo users cannot have favorites") + + page = int(request.query_params.get("page", 1)) + per_page = 20 + offset = (page - 1) * per_page + + favorites_query = select(Game_table).join(user_favorites).where( + user_favorites.c.user_id == current_user.id + ).offset(offset).limit(per_page) + + favorites = db.scalars(favorites_query).all() + total_favorites = len(current_user.favorites) + total_pages = (total_favorites + per_page - 1) // per_page + + return templates.TemplateResponse("favorites.html", { + "request": request, + "games": favorites, + "current_page": page, + "total_pages": total_pages, + "current_user": current_user + }) + + +@app.get("/admin") +async def admin_dashboard( + request: Request, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + total_games = db.scalar(select(func.count(Game_table.id))) + total_users = db.scalar(select(func.count(User_table.id))) + + recent_games = db.scalars(select(Game_table).limit(10)).all() + recent_users = db.scalars(select(User_table).order_by(User_table.created_at.desc()).limit(10)).all() + + return templates.TemplateResponse("admin.html", { + "request": request, + "current_user": current_user, + "total_games": total_games, + "total_users": total_users, + "recent_games": recent_games, + "recent_users": recent_users + }) + + +@app.get("/admin/users") +async def manage_users( + request: Request, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + page = int(request.query_params.get("page", 1)) + per_page = 20 + offset = (page - 1) * per_page + + total_users = db.scalar(select(func.count(User_table.id))) + users = db.scalars(select(User_table).offset(offset).limit(per_page)).all() + total_pages = (total_users + per_page - 1) // per_page + + return templates.TemplateResponse("admin_users.html", { + "request": request, + "current_user": current_user, + "users": users, + "current_page": page, + "total_pages": total_pages + }) + + +@app.post("/admin/users/{user_id}/toggle-active") +async def toggle_user_active( + user_id: int, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + user = db.get(User_table, user_id) + if not user: + raise HTTPException(status_code=404, detail="User not found") + + if user.id == current_user.id: + raise HTTPException(status_code=400, detail="Cannot deactivate yourself") + + user.is_active = not user.is_active + db.commit() + + return {"message": f"User {'activated' if user.is_active else 'deactivated'}"} + + +@app.post("/admin/users") +async def create_user( + username: str = Form(...), + email: str = Form(...), + password: str = Form(...), + role: str = Form(...), + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + if role not in [UserRole.DEMO.value, UserRole.NORMAL.value, UserRole.SUPER.value]: + raise HTTPException(status_code=400, detail="Invalid role") + + # Check if username or email exists + existing_user = db.scalar(select(User_table).where( + (User_table.username == username) | (User_table.email == email) + )) + if existing_user: + raise HTTPException(status_code=400, detail="Username or email already exists") + + new_user = AuthManager.create_user(db, username, email, password, role) + return RedirectResponse(url="/admin/users", status_code=303) + + +@app.get("/api/tags") +async def get_tags(db: Session = Depends(get_db)): + """Get all tags with game counts""" + tags = db.scalars(select(Tags_table)).all() + return [{"name": tag.name, "count": len(tag.games)} for tag in tags] + + +@app.get("/api/genres") +async def get_genres(db: Session = Depends(get_db)): + """Get all genres with game counts""" + genres = db.scalars(select(Genre_table)).all() + return [{"name": genre.name, "count": len(genre.games)} for genre in genres] + + +@app.get("/api/cover/{game_id}") +async def get_cover_url(game_id: int, db: Session = Depends(get_db)): + """Get cover URL for a specific game""" + try: + # Try relative imports first (when run as module) + from .libs.apis import Credentials, IGDB + from .libs.config import Config + except ImportError: + # Fall back to absolute imports (when run directly) + from libs.apis import Credentials, IGDB + from libs.config import Config + + game = db.get(Game_table, game_id) + if not game or not game.metadata_obj: + raise HTTPException(status_code=404, detail="Game or metadata not found") + + # If we already have a proper cover image URL, return it + if game.metadata_obj.cover_image and game.metadata_obj.cover_image.startswith('http'): + return {"cover_url": game.metadata_obj.cover_image} + + # If we have an image ID, convert it to a full URL + if game.metadata_obj.cover_image and not game.metadata_obj.cover_image.startswith('http'): + try: + # Try relative imports first (when run as module) + from .libs.apis import IGDB + except ImportError: + # Fall back to absolute imports (when run directly) + from libs.apis import IGDB + + try: + cover_url = IGDB.build_cover_url(game.metadata_obj.cover_image, 'cover_big') + game.metadata_obj.cover_image = cover_url + db.commit() + return {"cover_url": cover_url} + except Exception as e: + print(f"Error converting image ID to URL: {e}") + + # Try to fetch cover from IGDB + try: + config = Config() + token = Credentials(config).authenticate() + igdb_client = IGDB(token) + + # Search for the game to get IGDB data + search_results = igdb_client.search_game_by_title(game.metadata_obj.title or game.title) + + if search_results and len(search_results) > 0: + game_data = search_results[0] + if 'cover' in game_data and game_data['cover'].get('image_id'): + cover_url = IGDB.build_cover_url(game_data['cover']['image_id'], 'cover_big') + + # Update the database with the cover URL + game.metadata_obj.cover_image = cover_url + db.commit() + + return {"cover_url": cover_url} + + except Exception as e: + print(f"Error fetching cover for game {game_id}: {e}") + + return {"cover_url": None} + + +@app.get("/browse/tags/{tag_name}") +async def browse_by_tag( + tag_name: str, + request: Request, + db: Session = Depends(get_db), + current_user: Optional[User_table] = Depends(get_current_user) +): + """Browse games by tag""" + page = int(request.query_params.get("page", 1)) + per_page = int(request.query_params.get("per_page", 20)) + view = request.query_params.get("view", "grid") + + per_page = max(10, min(per_page, 100)) + offset = (page - 1) * per_page + + # Find tag + tag = db.scalar(select(Tags_table).where(Tags_table.name == tag_name)) + if not tag: + raise HTTPException(status_code=404, detail="Tag not found") + + # Get games with this tag + games_query = select(Game_table).join( + Game_table.metadata_obj + ).join( + Metadata_table.tags + ).where(Tags_table.id == tag.id) + + total_games = db.scalar(select(func.count(Game_table.id)).join( + Game_table.metadata_obj + ).join( + Metadata_table.tags + ).where(Tags_table.id == tag.id)) + + games = db.scalars(games_query.offset(offset).limit(per_page)).all() + + # Get user's favorite game IDs if logged in + user_favorites = set() + if current_user and current_user.role != UserRole.DEMO.value: + user_favorites = {game.id for game in current_user.favorites} + + total_pages = (total_games + per_page - 1) // per_page + + return templates.TemplateResponse("index.html", { + "request": request, + "games": games, + "current_page": page, + "total_pages": total_pages, + "per_page": per_page, + "search": f"tag:{tag_name}", + "view": view, + "total_games": total_games, + "current_user": current_user, + "is_demo": current_user is None or current_user.role == UserRole.DEMO.value, + "browse_type": "tag", + "browse_value": tag_name, + "user_favorites": user_favorites + }) + +@app.get("/browse/genres/{genre_name}") +async def browse_by_genre( + genre_name: str, + request: Request, + page: int = Query(1, ge=1), + per_page: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), + current_user: Optional[User_table] = Depends(get_current_user) +): + """Browse games by genre""" + + # Find genre + genre = db.scalar(select(Genre_table).where(Genre_table.name == genre_name)) + if not genre: + raise HTTPException(status_code=404, detail="Genre not found") + + # Get games with this genre - add alphabetical sorting + games_query = select(Game_table).join(Game_table.metadata_obj).join( + Metadata_table.genre + ).where(Genre_table.id == genre.id).order_by(Game_table.title) + + total = db.scalar(select(func.count()).select_from( + select(Game_table).join(Game_table.metadata_obj).join( + Metadata_table.genre + ).where(Genre_table.id == genre.id))) + + games = db.scalars(games_query.offset((page - 1) * per_page).limit(per_page)).all() + + # Get user's favorite game IDs if logged in + user_favorites = set() + if current_user and current_user.role != UserRole.DEMO.value: + user_favorites = {game.id for game in current_user.favorites} + + total_pages = (total + per_page - 1) // per_page + + return templates.TemplateResponse("index.html", { + "request": request, + "current_user": current_user, + "games": games, + "page": page, + "per_page": per_page, + "total": total, + "total_pages": total_pages, + "search": f"genre:{genre_name}", + "show_pagination": True, + "current_url": f"/browse/genres/{genre_name}", + "browse_type": "genre", + "browse_value": genre_name, + "is_demo": current_user is None or current_user.role == UserRole.DEMO.value, + "view": "grid", # Default to grid view for genre browsing + "current_page": page, + "user_favorites": user_favorites + }) + + +# Global variable to track running admin tasks +running_tasks = {} + +@app.post("/api/admin/rom-scan") +async def admin_rom_scan( + background_tasks: BackgroundTasks, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Trigger ROM scan in the background""" + if "rom_scan" in running_tasks and not running_tasks["rom_scan"].done(): + return {"status": "already_running", "message": "ROM scan is already in progress"} + + task = asyncio.create_task(run_rom_scan()) + running_tasks["rom_scan"] = task + + return {"status": "started", "message": "ROM scan started"} + +@app.post("/api/admin/metadata-refresh") +async def admin_metadata_refresh( + background_tasks: BackgroundTasks, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Refresh metadata for all games""" + if "metadata_refresh" in running_tasks and not running_tasks["metadata_refresh"].done(): + return {"status": "already_running", "message": "Metadata refresh is already in progress"} + + task = asyncio.create_task(run_metadata_refresh()) + running_tasks["metadata_refresh"] = task + + return {"status": "started", "message": "Metadata refresh started"} + +@app.post("/api/admin/image-sync") +async def admin_image_sync( + background_tasks: BackgroundTasks, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Download missing images""" + if "image_sync" in running_tasks and not running_tasks["image_sync"].done(): + return {"status": "already_running", "message": "Image sync is already in progress"} + + task = asyncio.create_task(run_image_sync()) + running_tasks["image_sync"] = task + + return {"status": "started", "message": "Image sync started"} + +@app.post("/api/admin/database-cleanup") +async def admin_database_cleanup( + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Clean up orphaned database records""" + try: + # Find games with missing files + games = db.scalars(select(Game_table)).all() + removed_count = 0 + + for game in games: + if not game.path.exists(): + db.delete(game) + removed_count += 1 + + db.commit() + return {"status": "completed", "message": f"Cleaned up {removed_count} orphaned records"} + except Exception as e: + return {"status": "error", "message": f"Database cleanup failed: {str(e)}"} + +@app.post("/api/admin/cache-clear") +async def admin_cache_clear( + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Clear application caches""" + try: + # Clear temporary files and caches + cache_dirs = [ + config.images_path / "cache", + Path("/tmp/dosvault"), + ] + + cleared_files = 0 + for cache_dir in cache_dirs: + if cache_dir.exists(): + for file_path in cache_dir.rglob("*"): + if file_path.is_file(): + file_path.unlink() + cleared_files += 1 + + return {"status": "completed", "message": f"Cleared {cleared_files} cache files"} + except Exception as e: + return {"status": "error", "message": f"Cache clear failed: {str(e)}"} + +@app.get("/api/admin/system-stats") +async def admin_system_stats( + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Get detailed system statistics""" + total_games = db.scalar(select(func.count(Game_table.id))) + total_users = db.scalar(select(func.count(User_table.id))) + total_metadata = db.scalar(select(func.count(Metadata_table.id))) + total_tags = db.scalar(select(func.count(Tags_table.id))) + total_genres = db.scalar(select(func.count(Genre_table.id))) + + # Get recent activity + recent_users = db.scalar(select(func.count(User_table.id)).where( + User_table.created_at >= datetime.utcnow() - timedelta(days=30) + )) or 0 + + # Check disk usage + disk_usage = {} + try: + import shutil + total, used, free = shutil.disk_usage(config.database_path.parent) + disk_usage = { + "total": total, + "used": used, + "free": free, + "percent_used": (used / total) * 100 + } + except Exception: + pass + + return { + "games": total_games, + "users": total_users, + "metadata": total_metadata, + "tags": total_tags, + "genres": total_genres, + "recent_users": recent_users, + "disk_usage": disk_usage, + "running_tasks": { + task_name: not task.done() if task_name in running_tasks else False + for task_name in ["rom_scan", "metadata_refresh", "image_sync"] + } + } + +async def run_rom_scan(): + """Run the ROM scanner subprocess""" + try: + process = await asyncio.create_subprocess_exec( + "python", "-m", "src", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + return {"success": process.returncode == 0, "output": stdout.decode(), "error": stderr.decode()} + except Exception as e: + return {"success": False, "error": str(e)} + +async def run_metadata_refresh(): + """Refresh metadata for games without complete metadata""" + try: + # Run ROM scanner with metadata refresh flag + process = await asyncio.create_subprocess_exec( + "python", "-m", "src", "--refresh-metadata", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + return {"success": process.returncode == 0, "output": stdout.decode(), "error": stderr.decode()} + except Exception as e: + return {"success": False, "error": str(e)} + +async def run_image_sync(): + """Download missing cover images and screenshots""" + try: + # Run ROM scanner with image sync flag + process = await asyncio.create_subprocess_exec( + "python", "-m", "src", "--sync-images", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + return {"success": process.returncode == 0, "output": stdout.decode(), "error": stderr.decode()} + except Exception as e: + return {"success": False, "error": str(e)} + +@app.get("/api/admin/system-logs") +async def admin_system_logs( + request: Request, + limit: int = Query(1000, ge=1, le=10000), + level: Optional[str] = Query(None), + since: Optional[str] = Query(None, description="ISO timestamp to filter logs newer than this time"), + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Get system logs with optional filtering""" + try: + logging.debug(f"System logs request from user: {current_user.username}, limit: {limit}, since: {since}") + logs = get_log_manager().get_recent_logs(limit=limit, level_filter=level, since=since) + return {"logs": logs, "total": len(logs)} + except Exception as e: + logging.error(f"Error in admin_system_logs: {str(e)}") + return {"error": str(e), "logs": []} + +@app.get("/api/admin/download-logs") +async def admin_download_logs( + request: Request, + log_type: str = Query("application"), + token: Optional[str] = Query(None), + db: Session = Depends(get_db) +): + """Download log files""" + try: + # Check authentication - either from header, cookie, or token param + current_user = None + auth_token = None + + # Try token from query parameter first (for download links) + if token: + auth_token = token + # Try cookie + elif "auth_token" in request.cookies: + auth_token = request.cookies["auth_token"] + # Try Authorization header + elif "authorization" in request.headers: + auth_header = request.headers["authorization"] + if auth_header.startswith("Bearer "): + auth_token = auth_header.split(" ", 1)[1] + + if not auth_token: + raise HTTPException(status_code=401, detail="Not authenticated") + + username = AuthManager.verify_token(auth_token) + if username is None: + raise HTTPException(status_code=401, detail="Invalid token") + + current_user = AuthManager.get_user_by_username(db, username) + if not current_user or not current_user.is_active or current_user.role != UserRole.SUPER.value: + raise HTTPException(status_code=403, detail="Insufficient permissions") + + log_file_path = get_log_manager().get_log_file_content(log_type) + if not log_file_path or not log_file_path.exists(): + raise HTTPException(status_code=404, detail="Log file not found") + + return FileResponse( + path=str(log_file_path), + filename=f"dosvault_{log_type}_logs_{datetime.now().strftime('%Y%m%d')}.log", + media_type='text/plain' + ) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error downloading logs: {str(e)}") + +@app.post("/api/admin/clear-logs") +async def admin_clear_logs( + keep_days: int = Query(7, ge=1, le=365), + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Clear old log files""" + try: + cleared_count = get_log_manager().clear_old_logs(keep_days=keep_days) + return { + "status": "completed", + "message": f"Cleared {cleared_count} old log files (older than {keep_days} days)" + } + except Exception as e: + return {"status": "error", "message": f"Failed to clear logs: {str(e)}"} + +@app.get("/api/admin/log-files") +async def admin_log_files( + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Get information about available log files""" + try: + files = get_log_manager().get_log_files() + return {"files": files} + except Exception as e: + return {"error": str(e), "files": []} + +@app.get("/api/admin/auth-test") +async def admin_auth_test( + request: Request, + db: Session = Depends(get_db), + current_user: Optional[User_table] = Depends(get_current_user) +): + """Test authentication status for debugging""" + auth_header = request.headers.get("authorization", "") + cookie_token = request.cookies.get("auth_token", "") + + if current_user: + return { + "authenticated": True, + "username": current_user.username, + "role": current_user.role, + "is_super": current_user.role == UserRole.SUPER.value, + "auth_header_present": bool(auth_header), + "cookie_present": bool(cookie_token), + "token_valid": True, + "token": cookie_token if cookie_token else None # Include token for JS access + } + else: + return { + "authenticated": False, + "username": None, + "role": None, + "is_super": False, + "auth_header_present": bool(auth_header), + "cookie_present": bool(cookie_token), + "token_valid": False, + "token": None + } + +@app.get("/api/admin/config") +async def get_config( + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Get current configuration for editing""" + try: + # Load the current config + current_config = Config() + + # Return sanitized config data + config_data = { + "host": current_config.host, + "port": current_config.port, + "websocket_port": current_config.websocket_port, + "rom_path": str(current_config.rom_path), + "metadata_path": str(current_config.metadata_path), + "database_path": str(current_config.database_path), + "images_path": str(current_config.images_path), + "igdb_client_id": current_config.igdb_client_id, + "igdb_api_key": "***" if current_config.igdb_api_key else "" # Hide sensitive data + } + + return {"success": True, "config": config_data} + except Exception as e: + logging.error(f"Error loading configuration: {e}") + return {"success": False, "error": str(e)} + +@app.post("/api/admin/config") +async def update_config( + config_data: dict, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_super_user) +): + """Update system configuration""" + try: + # Load current config + current_config = Config() + + # Update only allowed fields + if "host" in config_data: + current_config.host = config_data["host"] + if "port" in config_data: + current_config.port = int(config_data["port"]) + if "websocket_port" in config_data: + current_config.websocket_port = int(config_data["websocket_port"]) + if "rom_path" in config_data: + current_config.rom_path = Path(config_data["rom_path"]) + if "metadata_path" in config_data: + current_config.metadata_path = Path(config_data["metadata_path"]) + if "database_path" in config_data: + current_config.database_path = Path(config_data["database_path"]) + if "images_path" in config_data: + current_config.images_path = Path(config_data["images_path"]) + if "igdb_client_id" in config_data: + current_config.igdb_client_id = config_data["igdb_client_id"] + if "igdb_api_key" in config_data and config_data["igdb_api_key"] != "***": + current_config.igdb_api_key = config_data["igdb_api_key"] + + # Save the updated configuration + current_config.save() + + logging.info(f"Configuration updated by user {current_user.username}") + + return {"success": True, "message": "Configuration updated successfully"} + except Exception as e: + logging.error(f"Error updating configuration: {e}") + return {"success": False, "error": str(e)} + +@app.get("/health") +async def health_check(): + """Health check endpoint for Docker/monitoring""" + return {"status": "healthy", "service": "DosVault"} + +@app.get("/api/auth/token") +async def get_auth_token( + request: Request, + db: Session = Depends(get_db), + current_user: User_table = Depends(require_auth) +): + """Get the current auth token for JavaScript use (requires existing authentication)""" + token = request.cookies.get("auth_token", "") + return {"token": token, "username": current_user.username} + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host=config.host, port=config.port) \ No newline at end of file diff --git a/templates/admin.html b/templates/admin.html new file mode 100644 index 0000000..7ee78b8 --- /dev/null +++ b/templates/admin.html @@ -0,0 +1,1390 @@ +{% extends "base.html" %} + +{% block title %}Admin Dashboard - DosVault{% endblock %} + +{% block content %} +
+

Admin Dashboard

+

System overview and management

+
+ +
+
+
+
šŸŽ®
+
+

{{ total_games }}

+

Total Games

+
+
+
+ +
+
+
šŸ‘„
+
+

{{ total_users }}

+

Total Users

+
+
+
+ + + + +
+ + +
+

System Management

+ +
+ +
+
+ + + +

ROM Scanner

+
+

Scan directories for new ROM files and add them to the database

+ +
+
+ + +
+
+ + + +

Metadata Refresh

+
+

Update game metadata and refresh image information from IGDB

+ +
+
+ + +
+
+ + + +

Image Sync

+
+

Download missing cover art and screenshots locally

+ +
+
+ + +
+
+ + + +

Database Cleanup

+
+

Remove orphaned records and cleanup missing files

+ +
+
+ + +
+
+ + + +

System Stats

+
+

View detailed statistics and system health

+ +
+ + +
+
+ + + +

Configuration

+
+

Edit system configuration settings

+ +
+ + +
+
+ + + +

Cache Control

+
+

Clear application caches and temporary files

+ +
+
+ + +
+
+ + + +

System Logs

+
+

View application logs and system events

+ +
+ + +
+
+ + + +

Log Management

+
+

Download and manage log files

+
+ + +
+
+ + +
+
+ + + +

Live Log Stream

+
+

Monitor real-time application logs

+
+ + +
+
+
+
+ + + + + + + + + + + + + +
+
+

Recent Games

+ {% if recent_games %} +
+ {% for game in recent_games %} +
+
+

{{ game.metadata_obj.title or game.title }}

+

{{ game.path.name }}

+
+
+ View + Edit +
+
+ {% endfor %} +
+ {% else %} +

No games found

+ {% endif %} +
+ +
+

Recent Users

+ {% if recent_users %} +
+ {% for user in recent_users %} +
+
+

{{ user.username }}

+

{{ user.email }}

+
+
+ + {{ user.role.upper() }} + + {% if not user.is_active %} + INACTIVE + {% endif %} +
+
+ {% endfor %} +
+ {% else %} +

No users found

+ {% endif %} +
+
+ + +{% endblock %} \ No newline at end of file diff --git a/templates/admin_users.html b/templates/admin_users.html new file mode 100644 index 0000000..473b6e1 --- /dev/null +++ b/templates/admin_users.html @@ -0,0 +1,180 @@ +{% extends "base.html" %} + +{% block title %}Manage Users - DosVault{% endblock %} + +{% block content %} +
+
+
+

Manage Users

+

Create and manage user accounts

+
+ +
+
+ +
+
+ + + + + + + + + + + + {% for user in users %} + + + + + + + + {% endfor %} + +
UserRoleStatusCreatedActions
+
+

{{ user.username }}

+

{{ user.email }}

+
+
+ + {{ user.role.upper() }} + + + + {% if user.is_active %}ACTIVE{% else %}INACTIVE{% endif %} + + + {{ user.created_at.strftime('%Y-%m-%d') if user.created_at else 'N/A' }} + +
+ {% if user.id != current_user.id %} + + {% endif %} +
+
+
+
+ + +{% if total_pages > 1 %} +
+ +
+{% endif %} + + + + + +{% endblock %} \ No newline at end of file diff --git a/templates/base.html b/templates/base.html new file mode 100644 index 0000000..0d2465e --- /dev/null +++ b/templates/base.html @@ -0,0 +1,545 @@ + + + + + + {% block title %}DosVault{% endblock %} + + + + + + + + +
+
+
+

Browse by Genre

+ +
+
+

Loading genres...

+
+
+
+ + + + +
+ {% block content %}{% endblock %} +
+ + + + + + + \ No newline at end of file diff --git a/templates/edit_game.html b/templates/edit_game.html new file mode 100644 index 0000000..79d5463 --- /dev/null +++ b/templates/edit_game.html @@ -0,0 +1,76 @@ +{% extends "base.html" %} + +{% block title %}Edit Game - DosVault{% endblock %} + +{% block content %} +
+
+

Edit Game Metadata

+

Update the information for: {{ game.title }}

+
+ +
+
+
+ + +
+ +
+ + +
+ +
+
+ + +
+ +
+ + +
+
+ +
+
+ + +
+ +
+ + +
+
+ +
+ + Cancel + + +
+
+
+
+{% endblock %} \ No newline at end of file diff --git a/templates/favorites.html b/templates/favorites.html new file mode 100644 index 0000000..afc9b08 --- /dev/null +++ b/templates/favorites.html @@ -0,0 +1,144 @@ +{% extends "base.html" %} + +{% block title %}My Favorites - DosVault{% endblock %} + +{% block content %} +
+

My Favorites

+

Your personally selected ROM collection

+
+ +{% if games %} +
+ {% for game in games %} +
+
+

{{ game.metadata_obj.title or game.title }}

+ +
+ + {% if game.metadata_obj %} +
+ {% if game.metadata_obj.year %} +

Year: {{ game.metadata_obj.year }}

+ {% endif %} + {% if game.metadata_obj.developer %} +

Developer: {{ game.metadata_obj.developer }}

+ {% endif %} + {% if game.metadata_obj.description %} +

{{ game.metadata_obj.description[:100] }}{% if game.metadata_obj.description|length > 100 %}...{% endif %}

+ {% endif %} +
+ {% endif %} + +
+ + View Details + + +
+
+ {% endfor %} +
+ + +{% if total_pages > 1 %} +
+ +
+{% endif %} + +{% else %} +
+
šŸ’”
+

No favorites yet

+

Start browsing and add games to your favorites collection!

+ + Browse ROMs + +
+{% endif %} + + +{% endblock %} \ No newline at end of file diff --git a/templates/game_detail.html b/templates/game_detail.html new file mode 100644 index 0000000..7ca89a4 --- /dev/null +++ b/templates/game_detail.html @@ -0,0 +1,288 @@ +{% extends "base.html" %} + +{% block title %}{{ game.metadata_obj.title or game.title }} - DosVault{% endblock %} + +{% block content %} +
+
+ + +
+
+

{{ game.metadata_obj.title or game.title }}

+

{{ game.path.name }}

+
+ +
+ {% if not is_demo %} + + {% endif %} + + {% if current_user and current_user.role == "super" %} + + Edit Metadata + + {% endif %} + + {% if can_download %} + + {% else %} + + {% if current_user %}Demo Mode - No Downloads{% else %}Login to Download{% endif %} + + {% endif %} +
+
+
+ +
+
+ {% if game.metadata_obj and game.metadata_obj.description %} +
+

Description

+

{{ game.metadata_obj.description }}

+
+ {% endif %} + +
+

Game Information

+
+ {% if game.metadata_obj %} + {% if game.metadata_obj.year %} +
+

Release Year

+

{{ game.metadata_obj.year }}

+
+ {% endif %} + + {% if game.metadata_obj.developer %} +
+

Developer

+

{{ game.metadata_obj.developer }}

+
+ {% endif %} + + {% if game.metadata_obj.publisher %} +
+

Publisher

+

{{ game.metadata_obj.publisher }}

+
+ {% endif %} + + {% if game.metadata_obj.players %} +
+

Players

+

{{ game.metadata_obj.players }}

+
+ {% endif %} + + {% if game.metadata_obj.genre %} +
+

Genres

+
+ {% for genre in game.metadata_obj.genre %} + {{ genre.name }} + {% endfor %} +
+
+ {% endif %} + + {% if game.metadata_obj.tags %} +
+

Tags

+
+ {% for tag in game.metadata_obj.tags %} + {{ tag.name }} + {% endfor %} +
+
+ {% endif %} + {% endif %} + +
+

File Path

+

{{ game.path }}

+
+
+
+
+ +
+ {% if game.metadata_obj and (game.metadata_obj.cover_image_path or game.metadata_obj.cover_image) %} +
+

Cover Art

+ {{ game.metadata_obj.title or game.title }} cover +
+ {% endif %} + + {% if game.metadata_obj and (game.metadata_obj.screenshot_path or game.metadata_obj.screenshot) %} +
+

Screenshot

+
+ {{ game.metadata_obj.title or game.title }} screenshot +
+ + + +
+
+

Click to enlarge

+
+ {% endif %} + + {% if not game.metadata_obj or not game.metadata_obj.description %} +
+
šŸ“¦
+

No detailed metadata available for this game

+ {% if current_user and current_user.role == "super" %} + + Add Metadata + + {% endif %} +
+ {% endif %} +
+
+
+ + + + + +{% endblock %} \ No newline at end of file diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..ddca84a --- /dev/null +++ b/templates/index.html @@ -0,0 +1,610 @@ +{% extends "base.html" %} + +{% block title %}ROM Library - DOS Frontend{% endblock %} + +{% block content %} +
+
+

ROM Library

+
+ +
+ + + +
+ +
+ + + +
+
+ +
+
+
+ + +
+
+ {% if is_demo %} + Demo Mode: You can browse ROMs but cannot download or favorite them. + for full access. + {% else %} + Showing {{ games|length }} of {{ total_games }} ROMs + {% if search %} for "{{ search }}"{% endif %} + {% endif %} +
+ +
+ +
+ + +
+ + +
+ + +
+
+
+
+ + +{% if view == 'grid' %} +
+ {% for game in games %} +
+ + + + +
+ {% if game.metadata_obj and (game.metadata_obj.cover_image_path or (game.metadata_obj.cover_image and game.metadata_obj.cover_image.startswith('http'))) %} + {{ game.metadata_obj.title or game.title }} + + {% else %} +
+ +
+ + + + + + + + +
+ + +
+ +
+ + + + + + + + + + + + + + + + + + + + + + +
+ + +
+

+ {{ (game.metadata_obj.title or game.title)[:18] }}{% if (game.metadata_obj.title or game.title)|length > 18 %}...{% endif %} +

+
+
+ + +
+
CLASSIC DOS GAME
+
DOSVAULT
+
+ + +
+ + + +
+
+ + + +
+
+ + + +
+
+ + + +
+
+
+ {% endif %} + + + {% if not is_demo %} + + {% endif %} +
+ + +
+

+ {{ game.metadata_obj.title or game.title }} +

+ {% if game.metadata_obj and game.metadata_obj.year %} +

{{ game.metadata_obj.year }}

+ {% endif %} + +
+ Click to view details + {% if not is_demo %} + + {% else %} + + Login + + {% endif %} +
+
+
+ {% endfor %} +
+{% endif %} + + +{% if view == 'list' %} +
+ {% for game in games %} +
+ + + +
+ +
+ {% if game.metadata_obj and (game.metadata_obj.cover_image_path or (game.metadata_obj.cover_image and game.metadata_obj.cover_image.startswith('http'))) %} + {{ game.metadata_obj.title or game.title }} + + {% else %} +
+ +
+ + + + + + + + +
+ + +
+ + + + + + + + + + + + + + + + +
DOSVAULT
+
+
+ {% endif %} +
+ + +
+

+ {{ game.metadata_obj.title or game.title }} +

+
+ {% if game.metadata_obj and game.metadata_obj.year %} + {{ game.metadata_obj.year }} + {% endif %} + {% if game.metadata_obj and game.metadata_obj.developer %} + {{ game.metadata_obj.developer }} + {% endif %} +
+ {% if game.metadata_obj and game.metadata_obj.description %} +

+ {{ game.metadata_obj.description[:120] }}{% if game.metadata_obj.description|length > 120 %}...{% endif %} +

+ {% endif %} +
+ + +
+ {% if not is_demo %} + + + {% else %} + + Login to Download + + {% endif %} +
+
+
+ {% endfor %} +
+{% endif %} + + +{% if total_pages > 1 %} +
+
+ Page {{ current_page }} of {{ total_pages }} +
+ +
+{% endif %} + + +{% endblock %} \ No newline at end of file diff --git a/test_images.py b/test_images.py new file mode 100644 index 0000000..3bbf3af --- /dev/null +++ b/test_images.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +""" +Test script to download images for existing games that don't have local images yet. +""" +import asyncio +import aiohttp +from pathlib import Path +from sqlalchemy import create_engine, select +from sqlalchemy.orm import Session + +from src.libs.config import Config +from src.libs.database import Game_table, Metadata_table +from src.libs.functions import download_image, get_image_filename + +async def test_image_downloads(): + config = Config() + url = f"sqlite+pysqlite:///{config.database_path}" + engine = create_engine(url, future=True) + + with Session(engine) as session: + # Get first 3 games that have remote images but no local images + stmt = ( + select(Game_table) + .join(Metadata_table) + .where( + (Metadata_table.cover_image.is_not(None)) & + (Metadata_table.cover_image_path.is_(None)) + ) + .limit(3) + ) + games = session.scalars(stmt).all() + + print(f"Found {len(games)} games to test image downloads for") + + async with aiohttp.ClientSession() as http_session: + for game in games: + metadata = game.metadata_obj + print(f"\nTesting: {game.title}") + + # Download cover image + if metadata.cover_image: + cover_filename = get_image_filename(metadata.cover_image, game.title, 'cover') + cover_path = config.images_path / cover_filename + + print(f" Downloading cover: {metadata.cover_image}") + success = await download_image(metadata.cover_image, cover_path, http_session) + + if success: + print(f" āœ“ Cover saved to: {cover_path}") + # Update database with local path + metadata.cover_image_path = cover_path + else: + print(f" āœ— Failed to download cover") + + # Download screenshot + if metadata.screenshot: + screenshot_filename = get_image_filename(metadata.screenshot, game.title, 'screenshot') + screenshot_path = config.images_path / screenshot_filename + + print(f" Downloading screenshot: {metadata.screenshot}") + success = await download_image(metadata.screenshot, screenshot_path, http_session) + + if success: + print(f" āœ“ Screenshot saved to: {screenshot_path}") + # Update database with local path + metadata.screenshot_path = screenshot_path + else: + print(f" āœ— Failed to download screenshot") + + # Commit the updates + session.commit() + print(f"\nāœ“ Database updated with local image paths") + +if __name__ == "__main__": + asyncio.run(test_image_downloads()) \ No newline at end of file