Added new rom import system utilizing WAL to avoid locking the database and freezing the frontend
Also added new logging setup to hopefully stream the scrape process
This commit is contained in:
@@ -99,15 +99,19 @@ async def inject_metadata(roms: Roms) -> Roms:
|
||||
try:
|
||||
await asyncio.sleep(0.25) # keep your throttle
|
||||
md = await scrape_metadata(game.title, session)
|
||||
except ValueError:
|
||||
scrape_errors.append(game.title)
|
||||
results[i] = md
|
||||
logging.info(f"Successfully scraped: {game.title} # {i+1}/{len(roms.list)}")
|
||||
except Exception as e:
|
||||
# Handle all exceptions, not just ValueError
|
||||
scrape_errors.append(f"{game.title}: {str(e)}")
|
||||
md = Metadata(title=game.title, year=extract_year_from_title(game.title))
|
||||
# log each item as its done
|
||||
results[i] = md
|
||||
logging.info(f"Scraped: {game.title} # {i+1}/{len(roms.list)}")
|
||||
# Log recent errors
|
||||
for err in scrape_errors[-5:]:
|
||||
logging.warning(f"Scraping error: {err}")
|
||||
results[i] = md
|
||||
logging.info(f"Used fallback metadata for: {game.title} # {i+1}/{len(roms.list)}")
|
||||
|
||||
# Log error details every 5 errors to avoid spam but provide visibility
|
||||
if len(scrape_errors) % 5 == 0:
|
||||
logging.warning(f"Scraping error for {game.title}: {str(e)}")
|
||||
logging.info(f"Total scraping errors so far: {len(scrape_errors)}")
|
||||
|
||||
tasks = [asyncio.create_task(_job(i, game)) for i, game in enumerate(roms.list)]
|
||||
await asyncio.gather(*tasks)
|
||||
@@ -133,27 +137,55 @@ async def filter_new_roms(romlist: Roms, session: Session) -> Roms:
|
||||
|
||||
async def main():
|
||||
url = f"sqlite+pysqlite:///{config.database_path}"
|
||||
engine = create_engine(url, future=True)
|
||||
# Use a connection with shorter timeout and WAL mode for better concurrency
|
||||
engine = create_engine(
|
||||
url,
|
||||
future=True,
|
||||
connect_args={
|
||||
"timeout": 10, # 10 second timeout instead of default 30
|
||||
"check_same_thread": False
|
||||
},
|
||||
pool_pre_ping=True
|
||||
)
|
||||
|
||||
# Database tables are now managed by migrations
|
||||
# Base.metadata.create_all(engine)
|
||||
|
||||
with Session(engine) as s:
|
||||
romlist = await make_romlist()
|
||||
new_romlist = await filter_new_roms(romlist, s)
|
||||
|
||||
if new_romlist.list:
|
||||
new_romlist = await inject_metadata(new_romlist)
|
||||
ingest_roms(new_romlist, s)
|
||||
else:
|
||||
logging.info("No new ROMs to scrape!")
|
||||
try:
|
||||
with Session(engine) as s:
|
||||
# Enable WAL mode for better concurrency
|
||||
try:
|
||||
s.execute("PRAGMA journal_mode=WAL")
|
||||
s.execute("PRAGMA busy_timeout=5000") # 5 second busy timeout
|
||||
s.commit()
|
||||
logging.info("Enabled WAL mode for better database concurrency")
|
||||
except Exception as e:
|
||||
logging.warning(f"Could not enable WAL mode: {e}")
|
||||
|
||||
romlist = await make_romlist()
|
||||
new_romlist = await filter_new_roms(romlist, s)
|
||||
|
||||
if new_romlist.list:
|
||||
logging.info(f"Starting metadata scraping for {len(new_romlist.list)} new games")
|
||||
new_romlist = await inject_metadata(new_romlist)
|
||||
|
||||
logging.info("Starting database ingestion with smaller batches")
|
||||
# Use smaller batches to reduce database lock time
|
||||
ingest_roms(new_romlist, s, batch=50)
|
||||
else:
|
||||
logging.info("No new ROMs to scrape!")
|
||||
|
||||
logging.info("ROM scanning completed")
|
||||
if scrape_errors:
|
||||
logging.warning(f"Total scraping errors: {len(scrape_errors)}")
|
||||
for err in scrape_errors:
|
||||
logging.warning(f"Failed to scrape: {err}")
|
||||
else:
|
||||
logging.info("ROM scanning completed with no errors")
|
||||
logging.info("ROM scanning completed successfully")
|
||||
if scrape_errors:
|
||||
logging.warning(f"Total scraping errors: {len(scrape_errors)}")
|
||||
for err in scrape_errors[-10:]: # Show last 10 errors only
|
||||
logging.warning(f"Failed to scrape: {err}")
|
||||
else:
|
||||
logging.info("ROM scanning completed with no metadata errors")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"ROM scanning failed with error: {e}")
|
||||
raise
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Initialize logging
|
||||
|
||||
@@ -65,6 +65,8 @@ class Config:
|
||||
return {
|
||||
"rom_path": str(self.rom_path),
|
||||
"metadata_path": str(self.metadata_path),
|
||||
"database_path": str(self.database_path),
|
||||
"images_path": str(self.images_path),
|
||||
"host": self.host,
|
||||
"port": self.port,
|
||||
"websocket_port": self.websocket_port,
|
||||
@@ -73,21 +75,21 @@ class Config:
|
||||
}
|
||||
|
||||
def save(self):
|
||||
# Ensure config directory exists
|
||||
if not self.path.parent.exists():
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
rom_path = input(f"Enter the path to your ROMs [{self.rom_path}] enter for default: ").strip()
|
||||
metadata_path = input(f"Enter the path to your metadata [{self.metadata_path}] enter for default: ").strip()
|
||||
self.rom_path = Path(rom_path) if rom_path else self.rom_path
|
||||
self.metadata_path = Path(metadata_path) if metadata_path else self.metadata_path
|
||||
|
||||
# Create directories if they don't exist
|
||||
if not self.rom_path.exists():
|
||||
self.rom_path.mkdir(parents=True, exist_ok=True)
|
||||
if not self.metadata_path.exists():
|
||||
self.metadata_path.mkdir(parents=True, exist_ok=True)
|
||||
if not self.images_path.exists():
|
||||
self.images_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write configuration to file
|
||||
with open(self.path, 'w') as f:
|
||||
json.dump(self.to_dict(), f, indent=4)
|
||||
f.close()
|
||||
|
||||
def load(self) -> "Config":
|
||||
if self.path.exists():
|
||||
@@ -95,19 +97,26 @@ class Config:
|
||||
data = json.load(f)
|
||||
self.rom_path = Path(data.get("rom_path", str(self.rom_path)))
|
||||
self.metadata_path = Path(data.get("metadata_path", str(self.metadata_path)))
|
||||
self.database_path = Path(data.get("database_path", str(self.database_path)))
|
||||
self.images_path = Path(data.get("images_path", str(self.images_path)))
|
||||
self.host = data.get("host", self.host)
|
||||
self.port = data.get("port", self.port)
|
||||
self.websocket_port = data.get("websocket_port", self.websocket_port)
|
||||
self.igdb_api_key = data.get("igdb_api_key", self.igdb_api_key)
|
||||
self.igdb_client_id = data.get("igdb_client_id", self.igdb_client_id)
|
||||
|
||||
# Load environment secrets if API keys are still empty
|
||||
if self.igdb_api_key == "" or self.igdb_client_id == "":
|
||||
secrets = self.load_env_secrets()
|
||||
if secrets:
|
||||
self.igdb_api_key = secrets.get("IGDB_SECRET_KEY", "")
|
||||
self.igdb_client_id = secrets.get("IGDB_CLIENT_ID", "")
|
||||
f.close()
|
||||
self.save()
|
||||
return self
|
||||
f.close()
|
||||
self.igdb_api_key = secrets.get("IGDB_SECRET_KEY", self.igdb_api_key)
|
||||
self.igdb_client_id = secrets.get("IGDB_CLIENT_ID", self.igdb_client_id)
|
||||
else:
|
||||
# Config file doesn't exist, create it with defaults
|
||||
# Load environment secrets for initial setup
|
||||
secrets = self.load_env_secrets()
|
||||
if secrets:
|
||||
self.igdb_api_key = secrets.get("IGDB_SECRET_KEY", self.igdb_api_key)
|
||||
self.igdb_client_id = secrets.get("IGDB_CLIENT_ID", self.igdb_client_id)
|
||||
self.save()
|
||||
self.load()
|
||||
return self
|
||||
|
||||
@@ -199,43 +199,62 @@ def get_existing_rom_paths(session: Session) -> set[Path]:
|
||||
return {game.path.resolve() for game in session.scalars(select(Game_table)).all()}
|
||||
|
||||
def ingest_roms(roms: Roms, session: Session, *, batch: int = 200) -> int:
|
||||
import logging
|
||||
n = 0
|
||||
for g in roms.list:
|
||||
game = session.scalar(select(Game_table).where(Game_table.path == g.path))
|
||||
if game is None:
|
||||
game = Game_table(title=g.title, path=g.path)
|
||||
session.add(game)
|
||||
else:
|
||||
game.title = g.title
|
||||
mdto = g.metadata
|
||||
md = game.metadata_obj
|
||||
if md is None:
|
||||
md = Metadata_table(game=game, title=mdto.title or g.title)
|
||||
session.add(md)
|
||||
try:
|
||||
game = session.scalar(select(Game_table).where(Game_table.path == g.path))
|
||||
if game is None:
|
||||
game = Game_table(title=g.title, path=g.path)
|
||||
session.add(game)
|
||||
logging.info(f"Adding new game: {g.title}")
|
||||
else:
|
||||
game.title = g.title
|
||||
logging.info(f"Updating existing game: {g.title}")
|
||||
|
||||
mdto = g.metadata
|
||||
md = game.metadata_obj
|
||||
if md is None:
|
||||
md = Metadata_table(game=game, title=mdto.title or g.title)
|
||||
session.add(md)
|
||||
|
||||
md.title = mdto.title or g.title
|
||||
md.description = mdto.description
|
||||
md.year = mdto.year if mdto.year is not None else extract_year_from_title(md.title)
|
||||
md.developer = mdto.developer
|
||||
md.publisher = mdto.publisher
|
||||
md.players = mdto.players
|
||||
md.cover_image = mdto.cover_image
|
||||
md.screenshot = mdto.screenshot
|
||||
md.cover_image_path = mdto.cover_image_path
|
||||
md.screenshot_path = mdto.screenshot_path
|
||||
md.title = mdto.title or g.title
|
||||
md.description = mdto.description
|
||||
md.year = mdto.year if mdto.year is not None else extract_year_from_title(md.title)
|
||||
md.developer = mdto.developer
|
||||
md.publisher = mdto.publisher
|
||||
md.players = mdto.players
|
||||
md.cover_image = mdto.cover_image
|
||||
md.screenshot = mdto.screenshot
|
||||
md.cover_image_path = mdto.cover_image_path
|
||||
md.screenshot_path = mdto.screenshot_path
|
||||
|
||||
try: genres = sorted({s.strip() for s in (mdto.genre or []) if s and s.strip()})
|
||||
except: genres = []
|
||||
try: tags = sorted({s.strip() for s in (mdto.tags or []) if s and s.strip()})
|
||||
except: tags = []
|
||||
try: genres = sorted({s.strip() for s in (mdto.genre or []) if s and s.strip()})
|
||||
except: genres = []
|
||||
try: tags = sorted({s.strip() for s in (mdto.tags or []) if s and s.strip()})
|
||||
except: tags = []
|
||||
|
||||
md.genre = [_get_or_create_by_name(session, Genre_table, name) for name in genres]
|
||||
md.tags = [_get_or_create_by_name(session, Tags_table, name) for name in tags]
|
||||
md.genre = [_get_or_create_by_name(session, Genre_table, name) for name in genres]
|
||||
md.tags = [_get_or_create_by_name(session, Tags_table, name) for name in tags]
|
||||
|
||||
n += 1
|
||||
if n % batch == 0:
|
||||
session.flush()
|
||||
n += 1
|
||||
|
||||
# Use more frequent flushes and commits to reduce lock time
|
||||
if n % batch == 0:
|
||||
session.commit() # Commit more frequently to reduce lock duration
|
||||
logging.info(f"Committed batch of {batch} games to database ({n} total)")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to ingest game {g.title}: {e}")
|
||||
session.rollback() # Rollback on error to prevent corruption
|
||||
continue
|
||||
|
||||
session.commit()
|
||||
# Final commit for remaining items
|
||||
try:
|
||||
session.commit()
|
||||
logging.info(f"Successfully ingested {n} games to database")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed final commit during ROM ingestion: {e}")
|
||||
session.rollback()
|
||||
|
||||
return n
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ from typing import Optional, Annotated
|
||||
from datetime import timedelta, datetime, timezone
|
||||
import re
|
||||
import asyncio
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import FastAPI, Depends, HTTPException, status, Request, Form, Query, BackgroundTasks
|
||||
@@ -15,18 +17,16 @@ from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.exception_handlers import http_exception_handler
|
||||
from fastapi.exceptions import HTTPException as StarletteHTTPException
|
||||
from starlette.exceptions import HTTPException as StarletteBaseHTTPException
|
||||
from sqlalchemy import create_engine, select, func
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
from sqlalchemy.exc import OperationalError
|
||||
from alembic.config import Config as AlembicConfig
|
||||
from alembic import command
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
# Try relative imports first (when run as module)
|
||||
from .libs.config import Config
|
||||
from .libs.database import Base, Game_table, Metadata_table, User_table, UserRole, user_favorites, Tags_table, Genre_table
|
||||
from .libs.database import Game_table, Metadata_table, User_table, UserRole, user_favorites, Tags_table, Genre_table
|
||||
from .libs.auth import AuthManager, ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
from .libs.logging import get_log_manager
|
||||
except ImportError:
|
||||
@@ -36,13 +36,31 @@ except ImportError:
|
||||
from libs.auth import AuthManager, ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
from libs.logging import get_log_manager
|
||||
|
||||
# Initialize logging system first
|
||||
get_log_manager()
|
||||
|
||||
config = Config()
|
||||
engine = create_engine(f"sqlite+pysqlite:///{config.database_path}", echo=False)
|
||||
engine = create_engine(
|
||||
f"sqlite+pysqlite:///{config.database_path}",
|
||||
echo=False,
|
||||
connect_args={
|
||||
"timeout": 10, # 10 second timeout
|
||||
"check_same_thread": False
|
||||
},
|
||||
pool_pre_ping=True
|
||||
)
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
# Initialize logging system
|
||||
import logging
|
||||
get_log_manager()
|
||||
# Enable WAL mode for better concurrency during startup
|
||||
try:
|
||||
with engine.connect() as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000") # 5 second busy timeout
|
||||
conn.commit()
|
||||
logging.info("Enabled WAL mode for web application database")
|
||||
except Exception as e:
|
||||
logging.warning(f"Could not enable WAL mode for web application: {e}")
|
||||
|
||||
logging.info("DosVault web application starting up")
|
||||
|
||||
app = FastAPI(title="DOS Frontend", description="ROM Management System")
|
||||
@@ -119,7 +137,7 @@ def ensure_super_user():
|
||||
# Create default super user
|
||||
logging.info("No super user found, creating default admin user...")
|
||||
try:
|
||||
default_admin = AuthManager.create_user(
|
||||
AuthManager.create_user(
|
||||
session=db,
|
||||
username="admin",
|
||||
email="admin@dosvault.local",
|
||||
@@ -807,7 +825,7 @@ async def create_user(
|
||||
if existing_user:
|
||||
raise HTTPException(status_code=400, detail="Username or email already exists")
|
||||
|
||||
new_user = AuthManager.create_user(db, username, email, password, role)
|
||||
AuthManager.create_user(db, username, email, password, role)
|
||||
return RedirectResponse(url="/admin/users", status_code=303)
|
||||
|
||||
|
||||
@@ -1017,6 +1035,21 @@ async def admin_rom_scan(
|
||||
|
||||
return {"status": "started", "message": "ROM scan started"}
|
||||
|
||||
@app.post("/api/admin/game-scan")
|
||||
async def admin_game_scan(
|
||||
background_tasks: BackgroundTasks,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: User_table = Depends(require_super_user)
|
||||
):
|
||||
"""Trigger game scan in the background (alias for ROM scan)"""
|
||||
if "game_scan" in running_tasks and not running_tasks["game_scan"].done():
|
||||
return {"status": "already_running", "message": "Game scan is already in progress"}
|
||||
|
||||
task = asyncio.create_task(run_rom_scan())
|
||||
running_tasks["game_scan"] = task
|
||||
|
||||
return {"status": "started", "message": "Game scan started"}
|
||||
|
||||
@app.post("/api/admin/metadata-refresh")
|
||||
async def admin_metadata_refresh(
|
||||
background_tasks: BackgroundTasks,
|
||||
@@ -1134,7 +1167,7 @@ async def admin_system_stats(
|
||||
"disk_usage": disk_usage,
|
||||
"running_tasks": {
|
||||
task_name: not running_tasks[task_name].done() if task_name in running_tasks else False
|
||||
for task_name in ["rom_scan", "metadata_refresh", "image_sync"]
|
||||
for task_name in ["rom_scan", "game_scan", "metadata_refresh", "image_sync"]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1142,31 +1175,43 @@ async def run_rom_scan():
|
||||
"""Run the ROM scanner subprocess"""
|
||||
try:
|
||||
logging.info("Starting ROM scan subprocess")
|
||||
# Use the same approach as devenv - run the script directly with proper PYTHONPATH
|
||||
script_path = Path(__file__).parent / "__main__.py" # Point to src/__main__.py
|
||||
|
||||
# Create subprocess with real-time output capture
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
"python", "-m", "src",
|
||||
"python", str(script_path),
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
stderr=asyncio.subprocess.STDOUT, # Merge stderr into stdout for unified logging
|
||||
cwd=Path(__file__).parent.parent # Set working directory to project root
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
|
||||
# Log the output for visibility
|
||||
if stdout:
|
||||
for line in stdout.decode().strip().split('\n'):
|
||||
if line.strip():
|
||||
logging.info(f"ROM Scanner: {line.strip()}")
|
||||
# Capture output in real-time and log it
|
||||
output_lines = []
|
||||
try:
|
||||
while True:
|
||||
line = await process.stdout.readline()
|
||||
if not line:
|
||||
break
|
||||
|
||||
decoded_line = line.decode().rstrip()
|
||||
if decoded_line:
|
||||
output_lines.append(decoded_line)
|
||||
# Log to main application log immediately for real-time visibility
|
||||
logging.info(f"ROM Scanner: {decoded_line}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error reading ROM scanner output: {e}")
|
||||
|
||||
if stderr:
|
||||
for line in stderr.decode().strip().split('\n'):
|
||||
if line.strip():
|
||||
logging.error(f"ROM Scanner Error: {line.strip()}")
|
||||
# Wait for process to complete
|
||||
await process.wait()
|
||||
|
||||
success = process.returncode == 0
|
||||
logging.info(f"ROM scan subprocess completed with exit code: {process.returncode}")
|
||||
|
||||
return {
|
||||
"success": success,
|
||||
"output": stdout.decode(),
|
||||
"error": stderr.decode(),
|
||||
"output": '\n'.join(output_lines),
|
||||
"error": "",
|
||||
"returncode": process.returncode
|
||||
}
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user