Files
DosVault/src/refresh_covers.py
2025-09-06 13:53:44 -04:00

274 lines
12 KiB
Python
Executable File

#!/usr/bin/env python
"""
Script to refresh cover image metadata for games with old numeric image IDs.
This will re-query IGDB for fresh image data and download the images locally.
"""
from __future__ import annotations
import asyncio
import aiohttp
from pathlib import Path
from typing import List, Optional
from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import Session, selectinload
try:
from libs.config import Config
from libs.database import Game_table, Metadata_table
from libs.functions import download_image, get_image_filename, clean_title
from libs.apis import Credentials, IGDB
except ImportError:
import sys
sys.path.append(str(Path(__file__).parent))
from libs.config import Config
from libs.database import Game_table, Metadata_table
from libs.functions import download_image, get_image_filename, clean_title
from libs.apis import Credentials, IGDB
class CoverRefreshManager:
def __init__(self):
self.config = Config()
self.engine = create_engine(f"sqlite+pysqlite:///{self.config.database_path}", future=True)
# Initialize IGDB API
token = Credentials(self.config).authenticate()
self.igdb = IGDB(token)
self.refreshed_count = 0
self.download_success_count = 0
self.failed_refreshes: List[str] = []
def get_games_with_old_image_ids(self, limit: Optional[int] = None) -> List[Game_table]:
"""Get games that have old numeric image IDs."""
with Session(self.engine) as session:
stmt = (
select(Game_table)
.join(Metadata_table)
.options(selectinload(Game_table.metadata_obj))
.where(
# Has old numeric image IDs (not alphanumeric)
Metadata_table.cover_image.op('REGEXP')('^[0-9]+$')
)
.order_by(Game_table.title)
)
if limit:
stmt = stmt.limit(limit)
return session.scalars(stmt).all()
async def refresh_game_metadata(self, game: Game_table, session: aiohttp.ClientSession) -> dict:
"""Refresh metadata for a single game and download images."""
result = {
'game_title': game.title,
'api_success': False,
'cover_updated': False,
'cover_downloaded': False,
'screenshot_updated': False,
'screenshot_downloaded': False,
'errors': []
}
try:
# Search for fresh game data
clean_title_text = clean_title(game.title)
igdb_response = self.igdb.search_game_by_title(clean_title_text)
if not igdb_response or len(igdb_response) == 0:
result['errors'].append('No IGDB results found')
return result
game_data = igdb_response[0] # Take the first result
result['api_success'] = True
metadata = game.metadata_obj
if not metadata:
result['errors'].append('No metadata object found')
return result
# Update cover image if found
cover_data = game_data.get('cover')
if cover_data and cover_data.get('image_id'):
new_cover_id = cover_data['image_id']
new_cover_url = IGDB.build_cover_url(new_cover_id, 'cover_big')
# Update database with new image ID/URL
metadata.cover_image = new_cover_id # Store the new ID
result['cover_updated'] = True
# Download the image
cover_filename = get_image_filename(new_cover_url, game.title, 'cover')
cover_path = self.config.images_path / cover_filename
if await download_image(new_cover_url, cover_path, session):
metadata.cover_image_path = cover_path
result['cover_downloaded'] = True
self.download_success_count += 1
else:
result['errors'].append(f'Failed to download cover: {new_cover_url}')
# Update screenshot if found
artworks = game_data.get('artworks', [])
if artworks and len(artworks) > 0 and artworks[0].get('image_id'):
new_screenshot_id = artworks[0]['image_id']
new_screenshot_url = IGDB.build_cover_url(new_screenshot_id, 'screenshot_med')
# Update database with new image ID/URL
metadata.screenshot = new_screenshot_id # Store the new ID
result['screenshot_updated'] = True
# Download the image
screenshot_filename = get_image_filename(new_screenshot_url, game.title, 'screenshot')
screenshot_path = self.config.images_path / screenshot_filename
if await download_image(new_screenshot_url, screenshot_path, session):
metadata.screenshot_path = screenshot_path
result['screenshot_downloaded'] = True
self.download_success_count += 1
else:
result['errors'].append(f'Failed to download screenshot: {new_screenshot_url}')
if result['cover_updated'] or result['screenshot_updated']:
self.refreshed_count += 1
except Exception as e:
result['errors'].append(f'API error: {str(e)}')
return result
async def process_batch(self, games: List[Game_table], batch_size: int = 20):
"""Process games in batches with rate limiting."""
semaphore = asyncio.Semaphore(2) # Lower concurrency for API calls
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
async def refresh_with_semaphore(game):
async with semaphore:
await asyncio.sleep(0.5) # Rate limiting - be respectful to IGDB API
return await self.refresh_game_metadata(game, session)
# Process in smaller batches to avoid overwhelming the API
for i in range(0, len(games), batch_size):
batch = games[i:i + batch_size]
print(f"\nProcessing batch {i//batch_size + 1} ({len(batch)} games)...")
# Process this batch
tasks = [refresh_with_semaphore(game) for game in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Update database with results - need to reattach objects to new session
with Session(self.engine) as db_session:
for game, result in zip(batch, results):
if isinstance(result, Exception):
print(f"{game.title}: Exception - {str(result)}")
self.failed_refreshes.append(f"{game.title}: {str(result)}")
continue
# Reattach the game object to this session
db_session.merge(game)
# Update the game's metadata directly
if result.get('cover_path'):
game.metadata_obj.cover_image_path = result['cover_path']
if result.get('screenshot_path'):
game.metadata_obj.screenshot_path = result['screenshot_path']
# Show progress
status = []
if result['api_success']:
if result['cover_updated']:
status.append('cover updated' + (' + downloaded' if result['cover_downloaded'] else ''))
if result['screenshot_updated']:
status.append('screenshot updated' + (' + downloaded' if result['screenshot_downloaded'] else ''))
if result['errors']:
error_summary = '; '.join(result['errors'][:2]) # Show first 2 errors
if len(result['errors']) > 2:
error_summary += f' (+{len(result["errors"])-2} more)'
status.append(f"errors: {error_summary}")
self.failed_refreshes.append(f"{game.title}: {error_summary}")
print(f" {game.title}: {', '.join(status) if status else 'no changes'}")
# Commit batch
db_session.commit()
print(f" Batch committed to database")
async def run(self, limit: Optional[int] = None, dry_run: bool = False):
"""Run the cover refresh process."""
print("🔄 ROM Cover Refresh Tool")
print("=" * 50)
# Get games with old image IDs
with Session(self.engine) as session:
# Use raw SQL for SQLite REGEXP (since SQLite's REGEXP isn't standard)
stmt = (
select(Game_table)
.join(Metadata_table)
.options(selectinload(Game_table.metadata_obj))
.where(
# Check if cover_image is purely numeric (old format)
Metadata_table.cover_image.isnot(None) &
~Metadata_table.cover_image.op('GLOB')('*[a-zA-Z]*') # No letters
)
.order_by(Game_table.title)
)
if limit:
stmt = stmt.limit(limit)
games = session.scalars(stmt).all()
print(f"Found {len(games)} games with old numeric image IDs")
if not games:
print("✅ No games need cover refresh!")
return
if dry_run:
print("\n🔍 DRY RUN - showing first 10 games that would be processed:")
for i, game in enumerate(games[:10]):
metadata = game.metadata_obj
print(f" {i+1}. {game.title}")
print(f" Current cover ID: {metadata.cover_image}")
if metadata.screenshot:
print(f" Current screenshot ID: {metadata.screenshot}")
return
# Show warning about API usage
print(f"\n⚠️ This will make {len(games)} IGDB API calls.")
print(" Be mindful of rate limits and API quotas.")
proceed = input(f"\nRefresh metadata for {len(games)} games? [y/N]: ").strip().lower()
if proceed != 'y':
print("Cancelled.")
return
# Process the games
await self.process_batch(games)
# Show final results
print(f"\n✅ Refresh Complete!")
print(f" Games with updated metadata: {self.refreshed_count}")
print(f" Images successfully downloaded: {self.download_success_count}")
print(f" Failed refreshes: {len(self.failed_refreshes)}")
if self.failed_refreshes:
print(f"\nFailed Refreshes (first 10):")
for failure in self.failed_refreshes[:10]:
print(f" - {failure}")
if len(self.failed_refreshes) > 10:
print(f" ... and {len(self.failed_refreshes) - 10} more")
async def main():
import argparse
parser = argparse.ArgumentParser(description="Refresh cover metadata for games with old image IDs")
parser.add_argument('--limit', type=int, help='Limit number of games to process')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without processing')
args = parser.parse_args()
manager = CoverRefreshManager()
await manager.run(limit=args.limit, dry_run=args.dry_run)
if __name__ == "__main__":
asyncio.run(main())