Iniital release of DosVault.

This commit is contained in:
2025-09-06 13:53:44 -04:00
commit b3e71456c8
41 changed files with 7391 additions and 0 deletions

0
src/libs/__init__.py Normal file
View File

101
src/libs/apis.py Normal file
View File

@@ -0,0 +1,101 @@
import requests
from dataclasses import dataclass
from enum import Enum
from .config import Config
from typing import Dict
class URLS(Enum):
IGDB_URL = "https://api.igdb.com/v4"
TWITCH_AUTH_URL = "https://id.twitch.tv/oauth2/token"
IGDB_GAMES_ENDPOINT = IGDB_URL + "/games"
IGDB_COVERS_ENDPOINT = IGDB_URL + "/covers"
@dataclass
class Credentials:
client_id: str
client_secret: str
access_token: str|None = None
expiry: int|None = None
token_type: str|None = None
def get_credentials(self) -> Dict:
auth_url = URLS.TWITCH_AUTH_URL.value+f"?client_id={self.client_id}&client_secret={self.client_secret}&grant_type=client_credentials"
resp = requests.post(auth_url)
if not resp.status_code == 200:
raise ValueError("Failed to obtain access token from Twitch")
else:
return resp.json()
def authenticate(self) -> 'Credentials':
credentials: Dict = self.get_credentials()
self.access_token = credentials['access_token']
self.expiry = credentials['expires_in']
self.token_type = credentials['token_type']
if not self.access_token:
raise ValueError("Failed to obtain access token")
return self
def __init__(self, config: Config):
self.client_id = config.igdb_client_id
self.client_secret = config.igdb_api_key
class IGDB:
def __init__(self, credentials: Credentials):
self.client_id = credentials.client_id
self.access_token = credentials.access_token
self.token_type = credentials.token_type
if not self.access_token:
raise ValueError("Access token is not set. Please authenticate first.")
def headers(self) -> Dict:
if not self.access_token:
raise ValueError("Access token is not set. Please authenticate first.")
return {
"Client-ID": self.client_id,
"Authorization": f"Bearer {self.access_token}",
}
def search_game_by_title(self, query: str) -> Dict:
if not self.access_token:
raise ValueError("Access token is not set. Please authenticate first.")
search_url = URLS.IGDB_GAMES_ENDPOINT.value
headers = self.headers()
# Request full cover and artwork data with expanded fields
data = f"""search "{query}"; fields name,summary,first_release_date,rating,platforms.name,genres.name,involved_companies.company.name,cover.image_id,artworks.image_id,themes.name,player_perspectives,id; where platforms = (13); limit 10;"""
resp = requests.post(search_url, headers=headers, data=data)
if resp.status_code != 200:
raise ValueError(f"Failed to search games: {resp.status_code} - {resp.text}")
return resp.json()
def get_cover_details(self, cover_id: int) -> Dict:
"""Get cover details from IGDB by cover ID"""
if not self.access_token:
raise ValueError("Access token is not set. Please authenticate first.")
covers_url = URLS.IGDB_COVERS_ENDPOINT.value
headers = self.headers()
data = f"""fields image_id,url,height,width,game; where id = {cover_id};"""
resp = requests.post(covers_url, headers=headers, data=data)
if resp.status_code != 200:
raise ValueError(f"Failed to get cover details: {resp.status_code} - {resp.text}")
return resp.json()
def get_covers_by_game_id(self, game_id: int) -> Dict:
"""Get all covers for a specific game ID"""
if not self.access_token:
raise ValueError("Access token is not set. Please authenticate first.")
covers_url = URLS.IGDB_COVERS_ENDPOINT.value
headers = self.headers()
data = f"""fields image_id,url,height,width; where game = {game_id};"""
resp = requests.post(covers_url, headers=headers, data=data)
if resp.status_code != 200:
raise ValueError(f"Failed to get covers for game: {resp.status_code} - {resp.text}")
return resp.json()
@staticmethod
def build_cover_url(image_id: str, size: str = "cover_big") -> str:
"""Build IGDB cover URL from image_id
Size options: thumb, cover_small, screenshot_med, cover_big, logo_med, screenshot_big, screenshot_huge, thumb, micro, 720p, 1080p
"""
return f"https://images.igdb.com/igdb/image/upload/t_{size}/{image_id}.jpg"

74
src/libs/auth.py Normal file
View File

@@ -0,0 +1,74 @@
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Optional
from passlib.context import CryptContext
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from sqlalchemy import select
from .database import User_table, UserRole
SECRET_KEY = "your-secret-key-change-this-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthManager:
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def verify_token(token: str) -> Optional[str]:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
@staticmethod
def authenticate_user(session: Session, username: str, password: str) -> Optional[User_table]:
user = session.scalar(select(User_table).where(User_table.username == username))
if not user:
return None
if not AuthManager.verify_password(password, user.password_hash):
return None
return user
@staticmethod
def get_user_by_username(session: Session, username: str) -> Optional[User_table]:
return session.scalar(select(User_table).where(User_table.username == username))
@staticmethod
def create_user(session: Session, username: str, email: str, password: str, role: str = UserRole.NORMAL.value) -> User_table:
hashed_password = AuthManager.get_password_hash(password)
user = User_table(
username=username,
email=email,
password_hash=hashed_password,
role=role
)
session.add(user)
session.commit()
session.refresh(user)
return user

113
src/libs/config.py Normal file
View File

@@ -0,0 +1,113 @@
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, Dict
import json
import os
# Check for environment variable override (used in Docker)
if os.getenv("DOSFRONTEND_CONFIG_DIR"):
DOSFRONTEND_CONFIG_DIR: Path = Path(os.getenv("DOSFRONTEND_CONFIG_DIR"))
else:
# Default to XDG config directory for regular installations
XDG_CONFIG_HOME: Path = Path(Path.home()).joinpath(".config")
DOSFRONTEND_CONFIG_DIR: Path = XDG_CONFIG_HOME.joinpath("dosfrontend")
DOSFRONTEND_CONFIG_FILE: Path = DOSFRONTEND_CONFIG_DIR.joinpath("config.json")
@dataclass
class Config:
path: Path = DOSFRONTEND_CONFIG_FILE
rom_path: Path = DOSFRONTEND_CONFIG_DIR.joinpath("roms")
metadata_path: Path = DOSFRONTEND_CONFIG_DIR.joinpath("metadata")
database_path: Path = DOSFRONTEND_CONFIG_DIR.joinpath("roms.db")
images_path: Path = DOSFRONTEND_CONFIG_DIR.joinpath("images")
host: str = "localhost"
port: int = 8080
websocket_port: int = 8081
igdb_api_key: str = ""
igdb_client_id: str = ""
def __init__(self, path: Optional[Path] = None):
if path:
self.path = path
self.load()
def load_env_secrets(self) -> Dict[str, str] | None:
secrets: Dict[str, str] = {}
igdb_api_key = os.getenv("IGDB_SECRET_KEY")
igdb_client_id = os.getenv("IGDB_CLIENT_ID")
if not igdb_api_key or not igdb_client_id:
file_path: Path = Path(__file__)
env_path: Path = file_path.parent.parent.parent.joinpath(".env")
if not env_path.exists():
return
else:
with env_path.open('r') as f:
for line in f:
if line.startswith("#") or "=" not in line:
continue
key, value = line.strip().split("=", 1)
key, value = key.strip(), value.strip('"').strip("'")
secrets[key] = value
f.close()
if secrets.get("IGDB_SECRET_KEY") and secrets.get("IGDB_CLIENT_ID"):
return secrets
else: return None
else:
secrets = {
"IGDB_SECRET_KEY": igdb_api_key,
"IGDB_CLIENT_ID": igdb_client_id,
}
return secrets
def to_dict(self) -> dict:
return {
"rom_path": str(self.rom_path),
"metadata_path": str(self.metadata_path),
"host": self.host,
"port": self.port,
"websocket_port": self.websocket_port,
"igdb_api_key": self.igdb_api_key,
"igdb_client_id": self.igdb_client_id,
}
def save(self):
if not self.path.parent.exists():
self.path.parent.mkdir(parents=True, exist_ok=True)
rom_path = input(f"Enter the path to your ROMs [{self.rom_path}] enter for default: ").strip()
metadata_path = input(f"Enter the path to your metadata [{self.metadata_path}] enter for default: ").strip()
self.rom_path = Path(rom_path) if rom_path else self.rom_path
self.metadata_path = Path(metadata_path) if metadata_path else self.metadata_path
if not self.rom_path.exists():
self.rom_path.mkdir(parents=True, exist_ok=True)
if not self.metadata_path.exists():
self.metadata_path.mkdir(parents=True, exist_ok=True)
if not self.images_path.exists():
self.images_path.mkdir(parents=True, exist_ok=True)
with open(self.path, 'w') as f:
json.dump(self.to_dict(), f, indent=4)
f.close()
def load(self) -> "Config":
if self.path.exists():
with open(self.path, 'r') as f:
data = json.load(f)
self.rom_path = Path(data.get("rom_path", str(self.rom_path)))
self.metadata_path = Path(data.get("metadata_path", str(self.metadata_path)))
self.host = data.get("host", self.host)
self.port = data.get("port", self.port)
self.websocket_port = data.get("websocket_port", self.websocket_port)
if self.igdb_api_key == "" or self.igdb_client_id == "":
secrets = self.load_env_secrets()
if secrets:
self.igdb_api_key = secrets.get("IGDB_SECRET_KEY", "")
self.igdb_client_id = secrets.get("IGDB_CLIENT_ID", "")
f.close()
self.save()
return self
f.close()
else:
self.save()
self.load()
return self

241
src/libs/database.py Normal file
View File

@@ -0,0 +1,241 @@
from __future__ import annotations
from pathlib import Path
from typing import List, Optional
from datetime import datetime
from enum import Enum as PyEnum
from sqlalchemy import (
String,
Integer,
ForeignKey,
Table,
Column,
UniqueConstraint,
MetaData,
select,
DateTime,
Boolean
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
from sqlalchemy.types import TypeDecorator
from .objects import Roms
from .functions import extract_year_from_title
# ---- Base (with naming convention; nice for Alembic) -------------------------
convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
class Base(DeclarativeBase):
metadata = MetaData(naming_convention=convention)
# ---- PathType to store pathlib.Path as TEXT ----------------------------------
class PathType(TypeDecorator):
impl = String
cache_ok = True
def process_bind_param(self, value, dialect):
return None if value is None else str(value)
def process_result_value(self, value, dialect):
return None if value is None else Path(value)
# ---- Association tables (use Column, not mapped_column) ----------------------
metadata_tags = Table(
"metadata_tags",
Base.metadata,
Column("metadata_id", ForeignKey("metadata.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
UniqueConstraint("metadata_id", "tag_id"),
)
metadata_genres = Table(
"metadata_genres",
Base.metadata,
Column("metadata_id", ForeignKey("metadata.id", ondelete="CASCADE"), primary_key=True),
Column("genre_id", ForeignKey("genre.id", ondelete="CASCADE"), primary_key=True),
UniqueConstraint("metadata_id", "genre_id"),
)
user_favorites = Table(
"user_favorites",
Base.metadata,
Column("user_id", ForeignKey("users.id", ondelete="CASCADE"), primary_key=True),
Column("game_id", ForeignKey("game.id", ondelete="CASCADE"), primary_key=True),
UniqueConstraint("user_id", "game_id"),
)
class UserRole(PyEnum):
DEMO = "demo"
NORMAL = "normal"
SUPER = "super"
class User_table(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
password_hash: Mapped[str] = mapped_column(String(255))
role: Mapped[str] = mapped_column(String(20), default=UserRole.NORMAL.value)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
last_login: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
favorites: Mapped[List["Game_table"]] = relationship(
secondary=user_favorites,
back_populates="favorited_by",
lazy="selectin",
)
def __repr__(self) -> str:
return f"User(id={self.id}, username={self.username!r}, role={self.role})"
class Tags_table(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30), unique=True, index=True)
games: Mapped[List["Metadata_table"]] = relationship(
secondary=metadata_tags,
back_populates="tags",
lazy="selectin",
)
class Genre_table(Base):
__tablename__ = "genre"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30), unique=True, index=True)
games: Mapped[List["Metadata_table"]] = relationship(
secondary=metadata_genres,
back_populates="genre",
lazy="selectin",
)
class Game_table(Base):
__tablename__ = "game"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(66), index=True)
path: Mapped[Path] = mapped_column(PathType(), unique=True, nullable=False)
metadata_obj: Mapped[Optional["Metadata_table"]] = relationship(
back_populates="game",
uselist=False,
cascade="all, delete-orphan",
passive_deletes=True,
)
favorited_by: Mapped[List["User_table"]] = relationship(
secondary=user_favorites,
back_populates="favorites",
lazy="selectin",
)
def __repr__(self) -> str:
return f"Game(id={self.id}, title={self.title!r}, path={str(self.path)!r})"
class Metadata_table(Base):
__tablename__ = "metadata"
id: Mapped[int] = mapped_column(primary_key=True)
game_id: Mapped[int] = mapped_column(
ForeignKey("game.id", ondelete="CASCADE"),
unique=True,
nullable=False,
)
title: Mapped[str] = mapped_column(String(66))
description: Mapped[Optional[str]] = mapped_column(String, nullable=True)
year: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
developer: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
publisher: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
players: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
cover_image: Mapped[Optional[str]] = mapped_column(String, nullable=True) # Remote URL
screenshot: Mapped[Optional[str]] = mapped_column(String, nullable=True) # Remote URL
cover_image_path: Mapped[Optional[Path]] = mapped_column(PathType(), nullable=True) # Local file path
screenshot_path: Mapped[Optional[Path]] = mapped_column(PathType(), nullable=True) # Local file path
genre: Mapped[List[Genre_table]] = relationship(
secondary=metadata_genres,
back_populates="games",
lazy="selectin",
)
tags: Mapped[List[Tags_table]] = relationship(
secondary=metadata_tags,
back_populates="games",
lazy="selectin",
)
game: Mapped["Game_table"] = relationship(back_populates="metadata_obj")
def __repr__(self) -> str:
return f"Metadata(id={self.id}, game_id={self.game_id}, title={self.title!r}, year={self.year})"
def _get_or_create_by_name(session: Session, model, name: str):
obj = session.scalar(select(model).where(model.name == name))
if obj is None:
obj = model(name=name)
session.add(obj)
return obj
def get_existing_rom_paths(session: Session) -> set[Path]:
return {game.path.resolve() for game in session.scalars(select(Game_table)).all()}
def ingest_roms(roms: Roms, session: Session, *, batch: int = 200) -> int:
n = 0
for g in roms.list:
game = session.scalar(select(Game_table).where(Game_table.path == g.path))
if game is None:
game = Game_table(title=g.title, path=g.path)
session.add(game)
else:
game.title = g.title
mdto = g.metadata
md = game.metadata_obj
if md is None:
md = Metadata_table(game=game, title=mdto.title or g.title)
session.add(md)
md.title = mdto.title or g.title
md.description = mdto.description
md.year = mdto.year if mdto.year is not None else extract_year_from_title(md.title)
md.developer = mdto.developer
md.publisher = mdto.publisher
md.players = mdto.players
md.cover_image = mdto.cover_image
md.screenshot = mdto.screenshot
md.cover_image_path = mdto.cover_image_path
md.screenshot_path = mdto.screenshot_path
try: genres = sorted({s.strip() for s in (mdto.genre or []) if s and s.strip()})
except: genres = []
try: tags = sorted({s.strip() for s in (mdto.tags or []) if s and s.strip()})
except: tags = []
md.genre = [_get_or_create_by_name(session, Genre_table, name) for name in genres]
md.tags = [_get_or_create_by_name(session, Tags_table, name) for name in tags]
n += 1
if n % batch == 0:
session.flush()
session.commit()
return n

78
src/libs/functions.py Normal file
View File

@@ -0,0 +1,78 @@
from typing import Optional
import re
import asyncio
import aiohttp
from pathlib import Path
import hashlib
YEAR_RE = re.compile(r"\((\d{4})\)")
PARENS_RE = re.compile(r"\([^)]*\)")
def extract_year_from_title(title: Optional[str]) -> Optional[int]:
if not title:
return None
m = YEAR_RE.search(title)
return int(m.group(1)) if m else None
def clean_title(title: str) -> str:
# remove anything in (...) from the title
cleaned = PARENS_RE.sub("", title)
return " ".join(cleaned.split()).strip()
async def download_image(url: str, save_path: Path, session: aiohttp.ClientSession) -> bool:
"""
Download an image from URL and save it locally.
Args:
url: The image URL to download
save_path: Local path where to save the image
session: aiohttp client session
Returns:
bool: True if download was successful, False otherwise
"""
try:
# Create directory if it doesn't exist
save_path.parent.mkdir(parents=True, exist_ok=True)
async with session.get(url) as response:
if response.status == 200:
content = await response.read()
with open(save_path, 'wb') as f:
f.write(content)
return True
else:
print(f"Failed to download {url}: HTTP {response.status}")
return False
except Exception as e:
print(f"Error downloading {url}: {e}")
return False
def get_image_filename(url: str, game_title: str, image_type: str) -> str:
"""
Generate a unique filename for an image based on game title and URL.
Args:
url: The image URL
game_title: The game title
image_type: 'cover' or 'screenshot'
Returns:
str: Generated filename
"""
# Create a hash of the URL to ensure uniqueness
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
# Clean game title for filename
clean_name = re.sub(r'[^\w\-_\. ]', '', game_title)
clean_name = re.sub(r'\s+', '_', clean_name).strip('_')
# Get file extension from URL
try:
ext = Path(url.split('?')[0]).suffix
if not ext:
ext = '.jpg' # Default extension
except:
ext = '.jpg'
return f"{clean_name}_{image_type}_{url_hash}{ext}"

220
src/libs/logging.py Normal file
View File

@@ -0,0 +1,220 @@
#!/usr/bin/env python
"""Logging configuration for DosVault application."""
from __future__ import annotations
import logging
import logging.handlers
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
try:
from .config import Config
except ImportError:
from config import Config
class JSONFormatter(logging.Formatter):
"""Custom JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'module': record.name,
'message': record.getMessage(),
'filename': record.filename,
'line_number': record.lineno,
}
if record.exc_info:
log_entry['traceback'] = self.formatException(record.exc_info)
return json.dumps(log_entry)
class LogManager:
"""Manages logging configuration and log file access."""
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
# Use the existing config directory structure
self.log_dir = self.config.path.parent / "logs"
self.log_dir.mkdir(exist_ok=True)
self.log_file = self.log_dir / "application.log"
self.error_log_file = self.log_dir / "error.log"
self._setup_logging()
def _setup_logging(self):
"""Configure logging handlers and formatters."""
# Create root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Clear existing handlers
root_logger.handlers.clear()
# Console handler with simple format
console_handler = logging.StreamHandler()
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
console_handler.setLevel(logging.INFO)
root_logger.addHandler(console_handler)
# File handler with JSON format
file_handler = logging.handlers.RotatingFileHandler(
self.log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(JSONFormatter())
file_handler.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
# Error file handler
error_handler = logging.handlers.RotatingFileHandler(
self.error_log_file,
maxBytes=5*1024*1024, # 5MB
backupCount=3
)
error_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s\n%(pathname)s:%(lineno)d\n'
)
error_handler.setFormatter(error_formatter)
error_handler.setLevel(logging.ERROR)
root_logger.addHandler(error_handler)
# Log startup
logging.info("DosVault logging system initialized")
def get_recent_logs(self, limit: int = 1000, level_filter: Optional[str] = None, since: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get recent log entries from the log file."""
logs = []
if not self.log_file.exists():
return logs
try:
# Parse the since timestamp if provided
since_datetime = None
if since:
try:
since_datetime = datetime.fromisoformat(since.replace('Z', '+00:00'))
except ValueError:
logging.warning(f"Invalid since timestamp format: {since}")
with open(self.log_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Get the last 'limit*2' lines to ensure we have enough after filtering
recent_lines = lines[-(limit*2):] if len(lines) > limit*2 else lines
for line in recent_lines:
line = line.strip()
if not line:
continue
try:
log_entry = json.loads(line)
# Apply time filter if specified
if since_datetime:
try:
log_datetime = datetime.fromisoformat(log_entry['timestamp'])
# Handle timezone-aware/naive comparison
if log_datetime.tzinfo is None and since_datetime.tzinfo is not None:
# Make log_datetime timezone-aware (assume UTC)
log_datetime = log_datetime.replace(tzinfo=timezone.utc)
elif log_datetime.tzinfo is not None and since_datetime.tzinfo is None:
# Make since_datetime timezone-aware (assume UTC)
since_datetime = since_datetime.replace(tzinfo=timezone.utc)
if log_datetime <= since_datetime:
continue
except (ValueError, KeyError):
pass # Skip time filtering for invalid timestamps
# Apply level filter if specified
if level_filter and log_entry.get('level') != level_filter:
continue
logs.append(log_entry)
except json.JSONDecodeError:
# Handle non-JSON log lines
logs.append({
'timestamp': datetime.now().isoformat(),
'level': 'INFO',
'module': 'system',
'message': line
})
# Sort by timestamp and limit results
logs.sort(key=lambda x: x.get('timestamp', ''))
logs = logs[-limit:] if len(logs) > limit else logs
except Exception as e:
logging.error(f"Error reading log file: {e}")
return logs
def get_log_files(self) -> List[Dict[str, Any]]:
"""Get information about available log files."""
files = []
for log_file in self.log_dir.glob("*.log*"):
try:
stat = log_file.stat()
files.append({
'name': log_file.name,
'path': str(log_file),
'size': stat.st_size,
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat()
})
except Exception as e:
logging.error(f"Error getting file info for {log_file}: {e}")
return sorted(files, key=lambda x: x['modified'], reverse=True)
def clear_old_logs(self, keep_days: int = 7) -> int:
"""Clear log files older than specified days."""
cleared_count = 0
cutoff_time = datetime.now().timestamp() - (keep_days * 24 * 3600)
for log_file in self.log_dir.glob("*.log.*"): # Rotated logs only
try:
if log_file.stat().st_mtime < cutoff_time:
log_file.unlink()
cleared_count += 1
logging.info(f"Cleared old log file: {log_file.name}")
except Exception as e:
logging.error(f"Error clearing log file {log_file}: {e}")
return cleared_count
def get_log_file_content(self, file_type: str = "application") -> Optional[Path]:
"""Get the path to a specific log file for download."""
if file_type == "application":
return self.log_file if self.log_file.exists() else None
elif file_type == "error":
return self.error_log_file if self.error_log_file.exists() else None
else:
# Look for specific log file
log_file = self.log_dir / f"{file_type}.log"
return log_file if log_file.exists() else None
# Global log manager instance - initialized lazily
log_manager = None
def get_log_manager() -> LogManager:
"""Get or create the global log manager instance."""
global log_manager
if log_manager is None:
log_manager = LogManager()
return log_manager

29
src/libs/objects.py Normal file
View File

@@ -0,0 +1,29 @@
from dataclasses import dataclass, field
from typing import List, Optional
from pathlib import Path
@dataclass
class Metadata:
title: str = None
description: Optional[str] = None
year: Optional[int] = None
developer: Optional[str] = None
publisher: Optional[str] = None
genre: Optional[List[str]] = field(default_factory=list)
players: Optional[int] = None
cover_image: Optional[str] = None # Remote URL
screenshot: Optional[str] = None # Remote URL
cover_image_path: Optional[Path] = None # Local file path
screenshot_path: Optional[Path] = None # Local file path
tags: Optional[List[str]] = field(default_factory=list)
@dataclass
class Game:
title: str
path: Path
metadata: Metadata|None = None
@dataclass
class Roms:
list: List[Game] = field(default_factory=list)