mirror of
https://codeberg.org/hyperreal/bin
synced 2025-01-18 16:23:43 +01:00
325 lines
11 KiB
Plaintext
325 lines
11 KiB
Plaintext
|
#!/usr/bin/env python
|
||
|
|
||
|
# This program archives the content of fandom wikis.
|
||
|
#
|
||
|
# It's pretty much feature-complete. I still have to add detailed comments to
|
||
|
# describe what each significant piece of code is doing.
|
||
|
#
|
||
|
# This program doesn't scrape from the fandom.com wiki sites directly; rather,
|
||
|
# it uses my BreezeWiki instance to avoid downloading unneccessary ads, images,
|
||
|
# and other junk.
|
||
|
#
|
||
|
# Each resulting archive is self-contained, meaning one can extract the
|
||
|
# contents and browse the wiki snapshot locally (offline). The URLs for CSS,
|
||
|
# images, and links in each page are replaced by the file:/// URLs for their
|
||
|
# corresponding pages on the local filesystem.
|
||
|
#
|
||
|
# This file is formatted with `black -l 79' to comply with PEP8 standards.
|
||
|
|
||
|
import concurrent.futures
|
||
|
import shutil
|
||
|
import sys
|
||
|
import tarfile
|
||
|
from datetime import datetime
|
||
|
from pathlib import Path
|
||
|
from urllib.parse import urljoin
|
||
|
|
||
|
import requests
|
||
|
from bs4 import BeautifulSoup
|
||
|
from rich.console import Console
|
||
|
from rich.progress import Progress
|
||
|
from rich.tree import Tree
|
||
|
|
||
|
console = Console()
|
||
|
|
||
|
|
||
|
class FandomWiki:
|
||
|
def __init__(self, name: str):
|
||
|
self.name = name
|
||
|
self.canonical_url = "https://{}.fandom.com".format(name)
|
||
|
self.breezewiki_url = "https://wiki.hyperreal.coffee/{}".format(name)
|
||
|
self.site_dir = Path.cwd().joinpath("{}.fandom.com".format(name))
|
||
|
self.images_dir = self.site_dir.joinpath("images")
|
||
|
|
||
|
if not self.site_dir.exists():
|
||
|
self.site_dir.mkdir()
|
||
|
|
||
|
if not self.images_dir.exists():
|
||
|
self.images_dir.mkdir()
|
||
|
|
||
|
def get_hop0_urls(self) -> list:
|
||
|
starting_url = "{}/wiki/Local_Sitemap".format(self.canonical_url)
|
||
|
hop0_urls = list()
|
||
|
|
||
|
while True:
|
||
|
response = requests.get(starting_url)
|
||
|
response.raise_for_status()
|
||
|
soup = BeautifulSoup(response.text, "html.parser")
|
||
|
mw_allpages_nav = soup.find_all(
|
||
|
"div", {"class": "mw-allpages-nav"}
|
||
|
)[0]
|
||
|
|
||
|
if (
|
||
|
len(mw_allpages_nav.find_all("a")) < 2
|
||
|
and "Next page"
|
||
|
not in mw_allpages_nav.find_all("a")[0].get_text()
|
||
|
):
|
||
|
break
|
||
|
else:
|
||
|
if len(mw_allpages_nav.find_all("a")) < 2:
|
||
|
starting_url = "{}{}".format(
|
||
|
self.canonical_url,
|
||
|
mw_allpages_nav.find_all("a")[0].get("href"),
|
||
|
)
|
||
|
else:
|
||
|
starting_url = "{}{}".format(
|
||
|
self.canonical_url,
|
||
|
mw_allpages_nav.find_all("a")[1].get("href"),
|
||
|
)
|
||
|
|
||
|
hop0_urls.append(starting_url)
|
||
|
console.print("[[bold]HOP 0[/bold]] {}".format(starting_url))
|
||
|
|
||
|
return hop0_urls
|
||
|
|
||
|
def get_hop1_urls(self, hop0_urls: list):
|
||
|
hop1_urls = [self.breezewiki_url]
|
||
|
|
||
|
for url in hop0_urls:
|
||
|
response = requests.get(url)
|
||
|
response.raise_for_status()
|
||
|
soup = BeautifulSoup(response.text, "html.parser")
|
||
|
|
||
|
for item in soup.find_all("a"):
|
||
|
if item.get("href") and item.get("href").startswith("/wiki"):
|
||
|
if "Local_Sitemap" not in item.get(
|
||
|
"href"
|
||
|
) and "Special:" not in item.get("href"):
|
||
|
new_url = "{}{}".format(
|
||
|
self.breezewiki_url, item.get("href")
|
||
|
)
|
||
|
hop1_urls.append(new_url)
|
||
|
console.print(
|
||
|
"[[bold]HOP 1[/bold]] {}".format(new_url)
|
||
|
)
|
||
|
|
||
|
return hop1_urls
|
||
|
|
||
|
def save_css(self):
|
||
|
response = requests.get(self.breezewiki_url)
|
||
|
response.raise_for_status()
|
||
|
soup = BeautifulSoup(response.text, "html.parser")
|
||
|
css_pages = list()
|
||
|
|
||
|
for css in soup.find_all("link"):
|
||
|
if css.attrs.get("href"):
|
||
|
css_url = urljoin(self.breezewiki_url, css.attrs.get("href"))
|
||
|
css_pages.append(css_url)
|
||
|
|
||
|
for page in css_pages:
|
||
|
response = requests.get(page)
|
||
|
response.raise_for_status()
|
||
|
|
||
|
css_filename = self.site_dir.joinpath(
|
||
|
"proxy{}.css".format(css_pages.index(page))
|
||
|
)
|
||
|
with open(css_filename, "wb") as outfile:
|
||
|
outfile.write(response.content)
|
||
|
|
||
|
console.print(
|
||
|
"[[bold green]CSS[/bold green]] {}".format(css_filename)
|
||
|
)
|
||
|
|
||
|
def save_img(self, img_url: str):
|
||
|
filename = self.images_dir.joinpath(
|
||
|
Path(img_url.split("/revision")[0]).name
|
||
|
)
|
||
|
if not filename.exists():
|
||
|
response = requests.get(img_url, stream=True)
|
||
|
response.raise_for_status()
|
||
|
|
||
|
with open(filename, "wb") as outfile:
|
||
|
for chunk in response.iter_content(chunk_size=8192):
|
||
|
outfile.write(chunk)
|
||
|
|
||
|
console.print("[[bold green]IMG[/bold green]] {}".format(filename))
|
||
|
else:
|
||
|
console.print(
|
||
|
"[[bold yellow]IMG (EXISTS)[/bold yellow]] {}".format(filename)
|
||
|
)
|
||
|
|
||
|
def fetch_all_images(self, page_url: str):
|
||
|
response = requests.get(page_url)
|
||
|
response.raise_for_status()
|
||
|
soup = BeautifulSoup(response.content, "html.parser")
|
||
|
|
||
|
img_tags = soup.find_all("img")
|
||
|
img_urls = [img["src"] for img in img_tags if "src" in img.attrs]
|
||
|
clean_img_urls = [
|
||
|
x
|
||
|
for x in img_urls
|
||
|
if "breezewiki" not in x and "Wordmark" not in x
|
||
|
]
|
||
|
|
||
|
for img_url in clean_img_urls:
|
||
|
self.save_img(img_url)
|
||
|
|
||
|
def save_page(self, url: str):
|
||
|
filename = self.site_dir.joinpath("{}.html".format(url.split("/")[-1]))
|
||
|
if not filename.exists():
|
||
|
response = requests.get(url)
|
||
|
response.raise_for_status()
|
||
|
soup = BeautifulSoup(response.text, "html.parser")
|
||
|
|
||
|
stylesheet_count = 0
|
||
|
for link in soup.find_all("link", {"rel": "stylesheet"}):
|
||
|
stylesheet_count += 1
|
||
|
link.decompose()
|
||
|
|
||
|
for i in range(stylesheet_count):
|
||
|
if soup.head:
|
||
|
soup.head.append(
|
||
|
soup.new_tag(
|
||
|
"link",
|
||
|
rel="stylesheet",
|
||
|
type="text/css",
|
||
|
href="proxy{}.css".format(i),
|
||
|
)
|
||
|
)
|
||
|
|
||
|
self.fetch_all_images(url)
|
||
|
|
||
|
soup.find("div", {"class": "bw-top-banner"}).extract() # type: ignore
|
||
|
|
||
|
for link in soup.find_all("a"):
|
||
|
if link.get("href") and link.get("href").startswith(
|
||
|
"/{}/wiki".format(self.name)
|
||
|
):
|
||
|
link_basename = link.get("href").partition("/wiki/")[2]
|
||
|
link["href"] = "{}/{}.html".format(
|
||
|
self.site_dir, link_basename
|
||
|
)
|
||
|
|
||
|
with open(filename, "w") as outfile:
|
||
|
outfile.write(soup.prettify())
|
||
|
|
||
|
console.print(
|
||
|
"[[bold green]HTML[/bold green]] {}".format(filename)
|
||
|
)
|
||
|
else:
|
||
|
console.print(
|
||
|
"[[bold yellow]HTML (EXISTS)[/bold yellow]] {}".format(
|
||
|
filename
|
||
|
)
|
||
|
)
|
||
|
|
||
|
def fetch_all_pages(self, hop1_urls: list):
|
||
|
self.save_css()
|
||
|
|
||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||
|
executor.map(self.save_page, hop1_urls)
|
||
|
|
||
|
def archive(self):
|
||
|
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||
|
|
||
|
img_files = [
|
||
|
f
|
||
|
for f in self.images_dir.iterdir()
|
||
|
if self.images_dir.joinpath(f).is_file()
|
||
|
]
|
||
|
|
||
|
img_archive_filename = "{}-{}.tar.xz".format(
|
||
|
self.images_dir, timestamp
|
||
|
)
|
||
|
|
||
|
with Progress() as progress:
|
||
|
task = progress.add_task(
|
||
|
"[cyan]Archiving images...", total=len(img_files)
|
||
|
)
|
||
|
|
||
|
with tarfile.open(img_archive_filename, "w:xz") as tar:
|
||
|
for img_file in img_files:
|
||
|
if progress.finished:
|
||
|
break
|
||
|
full_file_path = self.images_dir.joinpath(img_file)
|
||
|
tar.add(full_file_path, arcname=img_file)
|
||
|
progress.update(task, advance=1)
|
||
|
|
||
|
progress.stop()
|
||
|
|
||
|
shutil.rmtree(self.images_dir, ignore_errors=True)
|
||
|
|
||
|
web_files = [
|
||
|
f
|
||
|
for f in self.site_dir.iterdir()
|
||
|
if self.site_dir.joinpath(f).is_file
|
||
|
or self.site_dir.joinpath(f).is_dir()
|
||
|
]
|
||
|
|
||
|
web_archive_filename = "{}-{}.tar.gz".format(self.site_dir, timestamp)
|
||
|
|
||
|
with Progress() as progress:
|
||
|
task = progress.add_task(
|
||
|
"[cyan]Archiving web files...", total=len(web_files)
|
||
|
)
|
||
|
|
||
|
with tarfile.open(web_archive_filename, "w:gz") as tar:
|
||
|
for web_file in web_files:
|
||
|
if progress.finished:
|
||
|
break
|
||
|
full_file_path = self.site_dir.joinpath(web_file)
|
||
|
tar.add(full_file_path, arcname=web_file)
|
||
|
progress.update(task, advance=1)
|
||
|
|
||
|
progress.stop()
|
||
|
|
||
|
shutil.rmtree(self.site_dir, ignore_errors=True)
|
||
|
|
||
|
console.print("\nTotal web files scraped: {}".format(len(web_files)))
|
||
|
console.print("Total images scraped: {}".format(len(img_files)))
|
||
|
|
||
|
|
||
|
def archive_site(name: str):
|
||
|
site = FandomWiki(name)
|
||
|
site.fetch_all_pages(site.get_hop1_urls(site.get_hop0_urls()))
|
||
|
site.archive()
|
||
|
|
||
|
|
||
|
def usage_message():
|
||
|
supported_wikis = [
|
||
|
"cyberpunk",
|
||
|
"dishonored",
|
||
|
"dragonage",
|
||
|
"forgottenrealms",
|
||
|
"masseffect",
|
||
|
"residentevil",
|
||
|
]
|
||
|
wiki_tree = Tree("[green]Fandom Wikis")
|
||
|
for wiki in supported_wikis:
|
||
|
wiki_tree.add(wiki)
|
||
|
|
||
|
console.print("Usage:\n\tarchive-fandom-wiki [[italic]name[/italic]]\n")
|
||
|
console.print("Example:\n\tarchive-fandom-wiki dishonored\n")
|
||
|
console.print(wiki_tree)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
if len(sys.argv) > 1:
|
||
|
match sys.argv[1]:
|
||
|
case "cyberpunk":
|
||
|
archive_site("cyberpunk")
|
||
|
case "dishonored":
|
||
|
archive_site("dishonored")
|
||
|
case "dragonage":
|
||
|
archive_site("dragonage")
|
||
|
case "forgottenrealms":
|
||
|
archive_site("forgottenrealms")
|
||
|
case "masseffect":
|
||
|
archive_site("masseffect")
|
||
|
case "residentevil":
|
||
|
archive_site("residentevil")
|
||
|
case _:
|
||
|
usage_message()
|
||
|
else:
|
||
|
usage_message()
|