mirror of
https://codeberg.org/hyperreal/admin-scripts
synced 2025-01-18 15:53:45 +01:00
120 lines
3.4 KiB
Python
Executable File
120 lines
3.4 KiB
Python
Executable File
# /// script
|
|
# dependencies = [
|
|
# "resend",
|
|
# ]
|
|
# ///
|
|
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import resend
|
|
|
|
|
|
def send_email(program: str, log: str, **kwargs: BaseException):
|
|
resend.api_key = Path("/usr/local/etc/resend_api_key.txt").read_text().strip("\n")
|
|
err_msg = kwargs.get("err_msg", None)
|
|
|
|
match log:
|
|
case "ok":
|
|
subj = f"[{socket.getfqdn()}] {program} OK ✅"
|
|
msg = f"{program} on {socket.getfqdn()} ran successfully!"
|
|
case "err":
|
|
subj = f"[{socket.getfqdn()}] {program} Error ❌"
|
|
msg = f"There was an error running {program} on {socket.getfqdn()}: {err_msg}.\n\nPlease investigate."
|
|
case _:
|
|
subj = ""
|
|
msg = ""
|
|
|
|
params: resend.Emails.SendParams = {
|
|
"from": "Admin <admin@hyperreal.coffee>",
|
|
"to": ["hyperreal@moonshadow.dev"],
|
|
"subject": subj,
|
|
"text": msg,
|
|
}
|
|
|
|
email = resend.Emails.send(params)
|
|
print(email)
|
|
|
|
|
|
def run_kbackup():
|
|
kbackup_cmd = [
|
|
"kbackup",
|
|
"--verbose",
|
|
"--autobg",
|
|
"/home/jas/documents/default.kbp",
|
|
]
|
|
|
|
try:
|
|
subprocess.run(kbackup_cmd, check=True, text=True)
|
|
print(f"kbackup ran successfully")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error running kbackup: {e}")
|
|
send_email("sync_to_remotes", "err", err_msg=e)
|
|
exit(1)
|
|
except KeyboardInterrupt as e:
|
|
print(f"Run of kbackup interrupted: {e}")
|
|
send_email("sync_to_remotes", "err", err_msg=e)
|
|
exit(130)
|
|
|
|
|
|
def encrypt_tar():
|
|
tar_dir = Path("/home/jas/sync_to_remotes")
|
|
for tarfile in tar_dir.iterdir():
|
|
if tarfile.name.endswith("tar"):
|
|
age_cmd = [
|
|
"age",
|
|
"-e",
|
|
"-i",
|
|
"/home/jas/documents/age-key.txt",
|
|
"-o",
|
|
f"{tarfile}.age",
|
|
tarfile,
|
|
]
|
|
|
|
print(f"Encrypting {tarfile}...")
|
|
try:
|
|
subprocess.run(age_cmd, check=True)
|
|
print(f"Successfully encrypted {tarfile}")
|
|
os.remove(tarfile)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error during archive encryption of {tarfile}: {e}")
|
|
send_email("sync_to_remotes", "err", err_msg=e)
|
|
exit(1)
|
|
except KeyboardInterrupt as e:
|
|
print(f"Archive encryption of {tarfile} interrupted: {e}")
|
|
send_email("sync_to_remotes", "err", err_msg=e)
|
|
exit(130)
|
|
|
|
|
|
def sync_to_remotes(src: str, dest: str):
|
|
rclone_cmd = ["rclone", "sync", "--transfers", "8", src, dest]
|
|
|
|
try:
|
|
subprocess.run(rclone_cmd, check=True, text=True)
|
|
print(f"Successful sync to {dest}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error during sync to {dest}: {e}")
|
|
send_email("sync_to_remotes", "err", err_msg=e)
|
|
exit(1)
|
|
except KeyboardInterrupt as e:
|
|
print(f"Sync to {dest} interrupted: {e}")
|
|
send_email("sync_to_remotes", "err", err_msg=e)
|
|
exit(130)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
remotes = [
|
|
("/home/jas/sync_to_remotes/", "wasabi:/desktop-backups-moonshadow/"),
|
|
]
|
|
|
|
run_kbackup()
|
|
|
|
encrypt_tar()
|
|
|
|
for remote in remotes:
|
|
sync_to_remotes(remote[0], remote[1])
|
|
|
|
send_email("sync_to_remotes", "ok")
|