#!/usr/bin/env python3 """OCI Registry Helper Usage: ocirh [...] Subcommands: repos Lists repositories in the registry. Repos correspond to images pushed to the registry. tags Lists tags of the given repository. manifests Lists manifests of the given repository for the given tag. rmi Removes a tag from an image. If given tag is the only tag, removes the image. gc Runs garbage collection on the registry. Requires SSH public key access to registry server. rmr Removes given repository from the registry. Requires SSH public key access to registry server. Examples: Suppose we have an image called 'fedora-toolbox' tagged with 'latest'. ocirh repos ocirh tags fedora-toolbox ocirh manifests fedora-toolbox latest ocirh rmi fedora-toolbox latest ocirh gc ocirh rmr fedora-toolbox """ import http.client import json import logging import math import subprocess from docopt import docopt from rich import print from rich.console import Group from rich.logging import RichHandler from rich.panel import Panel from rich.table import Table from rich.text import Text from rich.traceback import install from rich.tree import Tree install(show_locals=True) # Rich logging handler FORMAT = "%(message)s" logging.basicConfig( level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)], ) log = logging.getLogger("rich") # Taken from https://stackoverflow.com/a/14822210 # # How this function works: # If size_bytes == 0, returns 0 B. # size_name is a tuple containing binary prefixes for bytes. # # math.log takes the logarithm of size_bytes to base 1024. # math.floor rounds down the result of math.log to the nearest integer. # int ensures the result of math.floor is of type int, and stores it in i. # The value of i is used to determine which binary prefix to use from # size_name. # # math.pow returns the value of 1024 raised to the power of i, stores it in p. # # round takes the value of size_bytes, divides it by p, and stores the result # in s at precision of 2 decimal places. # # A formatted string with size s and binary prefix size_name[i] is returned. def convert_size(size_bytes: int) -> str: """ Converts a decimal integer of bytes to its respective binary-prefixed size. Parameters: size_bytes (int): A decimal integer. Returns: (str): Binary-prefixed size of size_bytes formatted as a string. """ if size_bytes == 0: return "0 B" size_name = ("B", "KiB", "MiB", "GiB") i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) s = round(size_bytes / p, 2) return "%s %s" % (s, size_name[i]) REGISTRY_URL = "registry.hyperreal.coffee" def get_auth() -> str: """ Get the base64 encoded password for registry autentication. Returns: auth (str): A string containing the base64 encoded password. """ try: with open("/run/user/1000/containers/auth.json", "r") as authfile: json_data = json.loads(authfile.read()) except Exception as ex: log.exception(ex) auth = json_data["auths"][REGISTRY_URL]["auth"] return auth def get_headers() -> dict: """ Returns headers for HTTP request authentication to the registry server. Returns: headers (dict): A dict of HTTP headers """ return { "Accept": "application/vnd.oci.image.manifest.v1+json", "Authorization": "Basic " + get_auth(), } def get_json_response(request: str, url: str) -> dict: """ Connects to registry and returns response data as JSON. Parameters: request (str): A string like "GET" or "DELETE" url (str) : A string containing the URL of the requested data Returns: json_data (dict): JSON data as a dict object """ conn = http.client.HTTPSConnection(REGISTRY_URL) headers = get_headers() try: conn.request(request, url, "", headers) res = conn.getresponse() data = res.read() json_data = json.loads(data.decode("utf-8")) except Exception as ex: log.exception(ex) return json_data def get_repositories(): """ Prints a Rich Tree that lists the repositories of the registry. """ json_data = get_json_response("GET", "/v2/_catalog") repo_tree = Tree("[green]Repositories") for repo in json_data["repositories"]: repo_tree.add("[blue]%s" % repo) print(repo_tree) def get_tags(repo: str): """ Prints a Rich Tree that lists the tags for the given repository. Parameters: repo (str): A string containing the name of the repo """ json_data = get_json_response("GET", "/v2/" + repo + "/tags/list") tags_tree = Tree("[green]%s tags" % repo) for tag in json_data["tags"]: tags_tree.add("[cyan]:%s" % tag) print(tags_tree) def get_manifests(repo: str, tag: str): """ Prints a Rich grid table that displays the manifests and metadata of the image repository. Parameters: repo (str): A string containing the name of the repo tag (str) : A string containing the tag of the desired image """ json_data = get_json_response("GET", "/v2/" + repo + "/manifests/" + tag) grid_meta = Table.grid(expand=True) grid_meta.add_column() grid_meta.add_column() meta_schema_version_key = Text("Schema version") meta_schema_version_key.stylize("bold green", 0) meta_schema_version_value = Text(str(json_data["schemaVersion"])) meta_media_type_key = Text("Media type") meta_media_type_key.stylize("bold green", 0) meta_media_type_value = Text(json_data["mediaType"]) grid_meta.add_row(meta_schema_version_key, meta_schema_version_value) grid_meta.add_row(meta_media_type_key, meta_media_type_value) grid_config = Table.grid(expand=True) grid_config.add_column() grid_config.add_column() config_media_type_key = Text("Media type") config_media_type_key.stylize("bold green", 0) config_media_type_value = Text(json_data["config"]["mediaType"]) config_digest_key = Text("Digest") config_digest_key.stylize("bold green", 0) config_digest_value = Text(json_data["config"]["digest"]) config_size_key = Text("Size") config_size_key.stylize("bold green", 0) config_size_value = Text(convert_size(json_data["config"]["size"])) grid_config.add_row(config_media_type_key, config_media_type_value) grid_config.add_row(config_digest_key, config_digest_value) grid_config.add_row(config_size_key, config_size_value) grid_annotations = Table.grid(expand=True) grid_annotations.add_column() grid_annotations.add_column() for item in json_data["annotations"].items(): annotations_item_key = Text(item[0]) annotations_item_key.stylize("bold green", 0) annotations_item_value = Text(item[1]) grid_annotations.add_row(annotations_item_key, annotations_item_value) total_size = sum(layer.get("size") for layer in json_data["layers"]) table_layers = Table(box=None, show_footer=True) table_layers.add_column( "Digest", justify="right", style="yellow", no_wrap=True, footer="Total size:" ) table_layers.add_column( "Size", justify="left", style="cyan", no_wrap=True, footer=convert_size(total_size), ) for layer in json_data["layers"]: table_layers.add_row(layer.get("digest"), convert_size(layer.get("size"))) panel_group = Group( Panel(grid_meta, title="[bold blue]Metadata"), Panel(grid_config, title="[bold blue]Config"), Panel(grid_annotations, title="Annotations"), Panel( table_layers, title="[bold blue]Layers: %s" % json_data["layers"][0].get("mediaType"), ), ) print(Panel(panel_group, title="[bold blue]%s:%s" % (repo, tag))) def delete_image(repo: str, tag: str): """ Removes the given tag from the image. If the given tag is the only tag, removes the image. Parameters: repo (str): A string containing the name of the repo tag (str) : A string containing the tag to be removed """ try: conn = http.client.HTTPSConnection(REGISTRY_URL) headers = get_headers() conn.request("GET", "/v2/" + repo + "/manifests/" + tag, "", headers) res = conn.getresponse() docker_content_digest = res.getheader("Docker-Content-Digest") except Exception as ex: log.exception(ex) try: conn.request( "DELETE", "/v2/" + repo + "/manifests/" + docker_content_digest, "", headers ) except Exception as ex: log.exception(ex) print("Untagged %s:%s successfully" % (repo, tag)) def garbage_collection(): """ Runs garbage collection command on the remote registry server. Requires SSH public key access. """ command = "/usr/local/bin/registry-gc" try: ssh = subprocess.Popen( ["ssh", "%s" % REGISTRY_URL, command], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) result = ssh.stdout.readlines() if result == []: log.error(ssh.stderr.readlines()) else: print(result) except Exception as ex: log.exception(ex) def remove_repo(repo: str): """ Runs command on remote registry server to remove the given repo. Parameters: repo (str): A string containing the name of the repo. """ command = "/usr/local/bin/registry-rm-repo " + repo try: ssh = subprocess.Popen( ["ssh", "%s" % REGISTRY_URL, command], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) result = ssh.stdout.readlines() if result == []: log.error(ssh.stderr.readlines()) else: print(result) except Exception as ex: log.exception(ex) if __name__ == "__main__": args = docopt(__doc__, options_first=True) match args[""]: case "repos": get_repositories() case "tags": get_tags(args[""][0]) case "manifests": get_manifests(args[""][0], args[""][1]) case "rmi": delete_image(args[""][0], args[""][1]) case "gc": garbage_collection() case "rmr": remove_repo(args[""]) case _: if args[""] in ["help", None]: exit(subprocess.call(["python3", "ocirh", "--help"])) else: exit( "%r is not a ocirh subcommand. See 'ocirh --help." % args[""] )