#!/usr/bin/env python import os import re import zipfile import PyPDF2 from bs4 import BeautifulSoup from mobi import Mobi from .api_hooks import DuckDuckGo from .storage import Storage class Catalogue: """ Decodes book metadata for storage """ def __init__(self, config): self.file_list = [] self.opf_regx = re.compile(r"\.opf") self.cover_regx = re.compile(r"\.jpg|\.jpeg|\.png|\.bmp|\.gif") self.html_regx = re.compile(r"\.html") self.title_sanitization_regx = re.compile(r"^(Book )+[0-9]*") self.title_sanitization_lvl2_regx = re.compile(r"^(Book )+[0-9]*\W+(-)") self.title_sanitization_dirs_regx = re.compile(r"/") self.root_dir = config.root self.book_folder = config.book_path self.books = None self.db_pointer = config.catalogue_db self.config = config def scan_folder(self, _path=None): """ Scan folder by _path, allows recurisive scanning """ if _path is not None: folder = _path elif os.path.isdir(str(self.root_dir) + "/" + self.book_folder): folder = str(self.root_dir) + "/" + self.book_folder else: folder = self.book_folder try: for f in os.listdir(folder): _path = os.path.abspath(folder + "/" + f) if os.path.isdir(_path.strip() + "/"): self.file_list.append(self.scan_folder(_path)) else: self.file_list.append(_path) except FileNotFoundError as fnfe: self.config.logger.error(fnfe) def filter_books(self): """Calls scan_folder and filters out book files. :returns self._book_list_expanded: json string containing all book metadata """ self.scan_folder() # Populate file list regx = re.compile(r"\.epub|\.mobi|\.pdf") try: self.books = list(filter( regx.search, filter(None, self.file_list))) except TypeError as error: self.config.logger.error(error) def process_by_filetype(self, book): """Determine books filetype and process.""" if book.endswith(".epub"): epub = self.process_epub(book) return self.extract_metadata_epub(epub) if book.endswith(".mobi"): return self.extract_metadata_mobi(book) if book.endswith(".pdf"): return self.extract_metadata_pdf(book) self.config.logger.error(f"Unknown Filetype {book}") @staticmethod def process_epub(book): """Return dictionary of epub file contents""" details = {} book = zipfile.ZipFile(book, "r") with book as book_zip: details["files"] = [] details["path"] = book.filename expanded = book_zip.infolist() regx = re.compile(r"\.opf|cover") for i in expanded: match = re.search(regx, i.filename) if match: # Returns zip file location of requested files details["files"].append(match.string) return details def extract_metadata_epub(self, book): """ Return extracted metadata and cover picture book['path'] == Full path to ebook file book['files'] == list of files from self.process_book(book) """ book_zip = zipfile.ZipFile(book["path"], "r") with book_zip as f: content = self.extract_content(f, book) soup = BeautifulSoup(content, "lxml") title = soup.find("dc:title") if title is None: title = book["path"].split("/")[-1].rsplit(".", 1)[0] else: title = title.contents[0] if re.match(self.title_sanitization_regx, title): if re.match(self.title_sanitization_lvl2_regx, title): title = re.split(r"-+\W", title)[1] else: title = re.split(self.title_sanitization_regx, title)[2] author = soup.find("dc:creator") if author is not None: author = author.contents[0] try: cover = self.extract_cover_image(f, book) except IndexError: # cover = self.extract_cover_html(book_zip, book) cover = DuckDuckGo().image_result(title) try: description = self.stripTags(soup.find("dc:description").text) except AttributeError: description = None try: identifier = self.stripTags(soup.find("dc:identifier").text) except AttributeError: identifier = None try: publisher = self.stripTags(soup.find("dc:publisher").text) except AttributeError: publisher = None try: date = self.stripTags(soup.find("dc:date").text) except AttributeError: date = None try: rights = self.stripTags(soup.find("dc:rights").text) except AttributeError: rights = None try: tags = soup.find_all("dc:subject") except AttributeError: tags = None ftags = None if tags is not None: for tag in tags: if ftags is None: ftags = tag.text else: ftags = ftags + "," + tag.text book_details = [ title, author, cover, book["path"], description, identifier, publisher, date, rights, ftags, ] return book_details def extract_metadata_pdf(self, book): """Return extracted metadata :NOTES: Retrieval of data has been problematic, some pdf's providing reliable titles that corespond with the actual, and others being nonsense. """ ddg = DuckDuckGo() try: pdf = PyPDF2.PdfFileReader(book) except Exception: return None try: # Getting odd errors on when attempting to access some pdfs # where they would report as encrypted, when not. info = pdf.getDocumentInfo() if info is None: # check to ensure we actually have a pdf return None except Exception: return None fname = book.__str__() title = book.split("/")[-1].rsplit(".", 1)[0] title = title.replace("_", " ") if info.author is None: author = None else: author = info.author try: cover_image = ddg.image_result(title) except: cover_image = None description = ddg.description_result(title) identifier = None publisher = None date = None rights = None ftags = None return [ title, author, cover_image, fname, description, identifier, publisher, date, rights, ftags, ] @staticmethod def stripTags(source): p = re.compile(r"<.*?>") return p.sub("", source) def extract_metadata_mobi(self, book): book = Mobi(book) book.parse() try: cover_image = book.readImageRecord(0) except KeyError: cover_image = None title = book.title().decode("utf-8") author = book.author().decode("utf-8") book_config = book.config try: description = self.stripTags( book_config["exth"]["records"][103].decode("utf-8") ) except KeyError: description = None try: identifier = book_config["exth"]["records"][104].decode("utf-8") except KeyError: identifier = None try: publisher = book_config["exth"]["records"][101].decode("utf-8") except KeyError: publisher = None date = None rights = None try: ftags = book_config["exth"]["records"][105].decode("utf-8") if ":" in ftags: ftags = ftags.replace(":", ",") elif ";" in ftags: ftags = ftags.replace(";", ",") # elif re.search(r"\s", ftags): # Must be final assignment to avoid spliting on multiple delimeters # ftags = ftags.replace(" ", ",") except KeyError: ftags = None return [ title, author, cover_image, book.f.name, description, identifier, publisher, date, rights, ftags, ] def extract_content(self, book_zip, book): """ Opens epub as zip file filters then stores as list any files matching opf_regx """ content = book_zip.open(list(filter(self.opf_regx.search, book["files"]))[0]) return content def extract_cover_html(self, book_zip, book): """ Opens epub as zip file filters then stores as list any files matching html_regx """ cover = book_zip.open(list(filter(self.html_regx.search, book["files"]))[0]) return cover def extract_cover_image(self, book_zip, book): """ Opens epub as zip file filters then stores as list any files matching cover_regx """ try: cover = book_zip.open( list(filter(self.cover_regx.search, book["files"]))[0] ) cover = book_zip.read(cover.name) return cover except Exception: return False def compare_shelf_current(self): """ Calls storage system, gets list of books stored and compares against files on disk """ db = Storage(self.config) stored = db.book_paths_list() if not stored: stored = [] if self.books is None: self.filter_books() on_disk, in_storage = [], [] for _x in self.books: on_disk.append(_x) for _y in stored: in_storage.append(_y) a, b, = set( on_disk ), set(in_storage) c = set.difference(a, b) return c def import_books(self, list=None, **kwargs): """ Main entry point for import operations. Gets a list of new files via compare_shelf_current. Iterates over list and inserts new books into database. """ try: fsocket = kwargs["socket"] except KeyError: fsocket = "/dev/null" book_list = self.compare_shelf_current() db = Storage(self.config) for book in book_list: book = self.process_by_filetype(book) with open(fsocket, "w") as _socket: try: _socket.write(book[0]) except TypeError: continue _socket.close() db.insert_book(book)