# /// script # dependencies = [ # "resend", # ] # /// import os import socket import subprocess from pathlib import Path import resend def send_email(program: str, log: str, **kwargs: BaseException): resend.api_key = Path("/usr/local/etc/resend_api_key.txt").read_text().strip("\n") err_msg = kwargs.get("err_msg", None) match log: case "ok": subj = f"[{socket.getfqdn()}] {program} OK ✅" msg = f"{program} on {socket.getfqdn()} ran successfully!" case "err": subj = f"[{socket.getfqdn()}] {program} Error ❌" msg = f"There was an error running {program} on {socket.getfqdn()}: {err_msg}.\n\nPlease investigate." case _: subj = "" msg = "" params: resend.Emails.SendParams = { "from": "Admin ", "to": ["hyperreal@moonshadow.dev"], "subject": subj, "text": msg, } email = resend.Emails.send(params) print(email) def run_kbackup(): kbackup_cmd = [ "kbackup", "--verbose", "--autobg", "/home/jas/documents/default.kbp", ] try: subprocess.run(kbackup_cmd, check=True, text=True) print(f"kbackup ran successfully") except subprocess.CalledProcessError as e: print(f"Error running kbackup: {e}") send_email("sync_to_remotes", "err", err_msg=e) exit(1) except KeyboardInterrupt as e: print(f"Run of kbackup interrupted: {e}") send_email("sync_to_remotes", "err", err_msg=e) exit(130) def encrypt_tar(): tar_dir = Path("/home/jas/sync_to_remotes") for tarfile in tar_dir.iterdir(): if tarfile.name.endswith("tar"): age_cmd = [ "age", "-e", "-i", "/home/jas/documents/age-key.txt", "-o", f"{tarfile}.age", tarfile, ] print(f"Encrypting {tarfile}...") try: subprocess.run(age_cmd, check=True) print(f"Successfully encrypted {tarfile}") os.remove(tarfile) except subprocess.CalledProcessError as e: print(f"Error during archive encryption of {tarfile}: {e}") send_email("sync_to_remotes", "err", err_msg=e) exit(1) except KeyboardInterrupt as e: print(f"Archive encryption of {tarfile} interrupted: {e}") send_email("sync_to_remotes", "err", err_msg=e) exit(130) def sync_to_remotes(src: str, dest: str): rclone_cmd = ["rclone", "sync", "--transfers", "8", src, dest] try: subprocess.run(rclone_cmd, check=True, text=True) print(f"Successful sync to {dest}") except subprocess.CalledProcessError as e: print(f"Error during sync to {dest}: {e}") send_email("sync_to_remotes", "err", err_msg=e) exit(1) except KeyboardInterrupt as e: print(f"Sync to {dest} interrupted: {e}") send_email("sync_to_remotes", "err", err_msg=e) exit(130) if __name__ == "__main__": remotes = [ ("/home/jas/sync_to_remotes/", "wasabi:/desktop-backups-moonshadow/"), ] run_kbackup() encrypt_tar() for remote in remotes: sync_to_remotes(remote[0], remote[1]) send_email("sync_to_remotes", "ok")