261 lines
8.9 KiB
Python
261 lines
8.9 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
from enum import Enum as PyEnum
|
|
from sqlalchemy import (
|
|
String,
|
|
Integer,
|
|
ForeignKey,
|
|
Table,
|
|
Column,
|
|
UniqueConstraint,
|
|
MetaData,
|
|
select,
|
|
DateTime,
|
|
Boolean
|
|
)
|
|
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
|
|
from sqlalchemy.types import TypeDecorator
|
|
from .objects import Roms
|
|
from .functions import extract_year_from_title
|
|
|
|
|
|
# ---- Base (with naming convention; nice for Alembic) -------------------------
|
|
convention = {
|
|
"ix": "ix_%(column_0_label)s",
|
|
"uq": "uq_%(table_name)s_%(column_0_name)s",
|
|
"ck": "ck_%(table_name)s_%(constraint_name)s",
|
|
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
|
|
"pk": "pk_%(table_name)s",
|
|
}
|
|
|
|
class Base(DeclarativeBase):
|
|
metadata = MetaData(naming_convention=convention)
|
|
|
|
|
|
# ---- PathType to store pathlib.Path as TEXT ----------------------------------
|
|
class PathType(TypeDecorator):
|
|
impl = String
|
|
cache_ok = True
|
|
|
|
def process_bind_param(self, value, dialect):
|
|
return None if value is None else str(value)
|
|
|
|
def process_result_value(self, value, dialect):
|
|
return None if value is None else Path(value)
|
|
|
|
|
|
# ---- Association tables (use Column, not mapped_column) ----------------------
|
|
metadata_tags = Table(
|
|
"metadata_tags",
|
|
Base.metadata,
|
|
Column("metadata_id", ForeignKey("metadata.id", ondelete="CASCADE"), primary_key=True),
|
|
Column("tag_id", ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
|
|
UniqueConstraint("metadata_id", "tag_id"),
|
|
)
|
|
|
|
metadata_genres = Table(
|
|
"metadata_genres",
|
|
Base.metadata,
|
|
Column("metadata_id", ForeignKey("metadata.id", ondelete="CASCADE"), primary_key=True),
|
|
Column("genre_id", ForeignKey("genre.id", ondelete="CASCADE"), primary_key=True),
|
|
UniqueConstraint("metadata_id", "genre_id"),
|
|
)
|
|
|
|
user_favorites = Table(
|
|
"user_favorites",
|
|
Base.metadata,
|
|
Column("user_id", ForeignKey("users.id", ondelete="CASCADE"), primary_key=True),
|
|
Column("game_id", ForeignKey("game.id", ondelete="CASCADE"), primary_key=True),
|
|
UniqueConstraint("user_id", "game_id"),
|
|
)
|
|
|
|
|
|
class UserRole(PyEnum):
|
|
DEMO = "demo"
|
|
NORMAL = "normal"
|
|
SUPER = "super"
|
|
|
|
|
|
class User_table(Base):
|
|
__tablename__ = "users"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
|
|
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
|
|
password_hash: Mapped[str] = mapped_column(String(255))
|
|
role: Mapped[str] = mapped_column(String(20), default=UserRole.NORMAL.value)
|
|
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
|
|
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
|
|
last_login: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
|
|
|
|
favorites: Mapped[List["Game_table"]] = relationship(
|
|
secondary=user_favorites,
|
|
back_populates="favorited_by",
|
|
lazy="selectin",
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"User(id={self.id}, username={self.username!r}, role={self.role})"
|
|
|
|
|
|
class Tags_table(Base):
|
|
__tablename__ = "tags"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(30), unique=True, index=True)
|
|
|
|
games: Mapped[List["Metadata_table"]] = relationship(
|
|
secondary=metadata_tags,
|
|
back_populates="tags",
|
|
lazy="selectin",
|
|
)
|
|
|
|
|
|
class Genre_table(Base):
|
|
__tablename__ = "genre"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(30), unique=True, index=True)
|
|
|
|
games: Mapped[List["Metadata_table"]] = relationship(
|
|
secondary=metadata_genres,
|
|
back_populates="genre",
|
|
lazy="selectin",
|
|
)
|
|
|
|
|
|
class Game_table(Base):
|
|
__tablename__ = "game"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
title: Mapped[str] = mapped_column(String(66), index=True)
|
|
path: Mapped[Path] = mapped_column(PathType(), unique=True, nullable=False)
|
|
|
|
metadata_obj: Mapped[Optional["Metadata_table"]] = relationship(
|
|
back_populates="game",
|
|
uselist=False,
|
|
cascade="all, delete-orphan",
|
|
passive_deletes=True,
|
|
)
|
|
|
|
favorited_by: Mapped[List["User_table"]] = relationship(
|
|
secondary=user_favorites,
|
|
back_populates="favorites",
|
|
lazy="selectin",
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"Game(id={self.id}, title={self.title!r}, path={str(self.path)!r})"
|
|
|
|
|
|
class Metadata_table(Base):
|
|
__tablename__ = "metadata"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
game_id: Mapped[int] = mapped_column(
|
|
ForeignKey("game.id", ondelete="CASCADE"),
|
|
unique=True,
|
|
nullable=False,
|
|
)
|
|
|
|
title: Mapped[str] = mapped_column(String(66))
|
|
description: Mapped[Optional[str]] = mapped_column(String, nullable=True)
|
|
year: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
|
|
developer: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
|
|
publisher: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
|
|
players: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
|
|
cover_image: Mapped[Optional[str]] = mapped_column(String, nullable=True) # Remote URL
|
|
screenshot: Mapped[Optional[str]] = mapped_column(String, nullable=True) # Remote URL
|
|
cover_image_path: Mapped[Optional[Path]] = mapped_column(PathType(), nullable=True) # Local file path
|
|
screenshot_path: Mapped[Optional[Path]] = mapped_column(PathType(), nullable=True) # Local file path
|
|
|
|
genre: Mapped[List[Genre_table]] = relationship(
|
|
secondary=metadata_genres,
|
|
back_populates="games",
|
|
lazy="selectin",
|
|
)
|
|
tags: Mapped[List[Tags_table]] = relationship(
|
|
secondary=metadata_tags,
|
|
back_populates="games",
|
|
lazy="selectin",
|
|
)
|
|
|
|
game: Mapped["Game_table"] = relationship(back_populates="metadata_obj")
|
|
|
|
def __repr__(self) -> str:
|
|
return f"Metadata(id={self.id}, game_id={self.game_id}, title={self.title!r}, year={self.year})"
|
|
|
|
def _get_or_create_by_name(session: Session, model, name: str):
|
|
obj = session.scalar(select(model).where(model.name == name))
|
|
if obj is None:
|
|
obj = model(name=name)
|
|
session.add(obj)
|
|
return obj
|
|
|
|
def get_existing_rom_paths(session: Session) -> set[Path]:
|
|
return {game.path.resolve() for game in session.scalars(select(Game_table)).all()}
|
|
|
|
def ingest_roms(roms: Roms, session: Session, *, batch: int = 200) -> int:
|
|
import logging
|
|
n = 0
|
|
for g in roms.list:
|
|
try:
|
|
game = session.scalar(select(Game_table).where(Game_table.path == g.path))
|
|
if game is None:
|
|
game = Game_table(title=g.title, path=g.path)
|
|
session.add(game)
|
|
logging.info(f"Adding new game: {g.title}")
|
|
else:
|
|
game.title = g.title
|
|
logging.info(f"Updating existing game: {g.title}")
|
|
|
|
mdto = g.metadata
|
|
md = game.metadata_obj
|
|
if md is None:
|
|
md = Metadata_table(game=game, title=mdto.title or g.title)
|
|
session.add(md)
|
|
|
|
md.title = mdto.title or g.title
|
|
md.description = mdto.description
|
|
md.year = mdto.year if mdto.year is not None else extract_year_from_title(md.title)
|
|
md.developer = mdto.developer
|
|
md.publisher = mdto.publisher
|
|
md.players = mdto.players
|
|
md.cover_image = mdto.cover_image
|
|
md.screenshot = mdto.screenshot
|
|
md.cover_image_path = mdto.cover_image_path
|
|
md.screenshot_path = mdto.screenshot_path
|
|
|
|
try: genres = sorted({s.strip() for s in (mdto.genre or []) if s and s.strip()})
|
|
except: genres = []
|
|
try: tags = sorted({s.strip() for s in (mdto.tags or []) if s and s.strip()})
|
|
except: tags = []
|
|
|
|
md.genre = [_get_or_create_by_name(session, Genre_table, name) for name in genres]
|
|
md.tags = [_get_or_create_by_name(session, Tags_table, name) for name in tags]
|
|
|
|
n += 1
|
|
|
|
# Use more frequent flushes and commits to reduce lock time
|
|
if n % batch == 0:
|
|
session.commit() # Commit more frequently to reduce lock duration
|
|
logging.info(f"Committed batch of {batch} games to database ({n} total)")
|
|
except Exception as e:
|
|
logging.error(f"Failed to ingest game {g.title}: {e}")
|
|
session.rollback() # Rollback on error to prevent corruption
|
|
continue
|
|
|
|
# Final commit for remaining items
|
|
try:
|
|
session.commit()
|
|
logging.info(f"Successfully ingested {n} games to database")
|
|
except Exception as e:
|
|
logging.error(f"Failed final commit during ROM ingestion: {e}")
|
|
session.rollback()
|
|
|
|
return n
|
|
|