386 lines
9.7 KiB
Python
Executable File
386 lines
9.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import jsonschema
|
|
from jsonschema import validate
|
|
from rich import box
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.traceback import install
|
|
|
|
install(show_locals=True)
|
|
|
|
VERSION = "0.1.7"
|
|
|
|
default_date = dt.date.today().strftime("%Y-%m-%d")
|
|
EVLOG_DIR = os.getenv("EVLOG_DIR")
|
|
if EVLOG_DIR is None:
|
|
evlog_dir = Path("~/evlogs").expanduser()
|
|
else:
|
|
evlog_dir = Path(EVLOG_DIR)
|
|
|
|
|
|
def evlog_init(filename):
|
|
"""Initialize evlog file and directory, if necessary.
|
|
|
|
evlog_dir is taken from the current shell's EVLOG_DIR environment variable, or else it defaults to
|
|
~/evlogs.
|
|
"""
|
|
evlog_file = evlog_dir.joinpath(filename).with_suffix(".json")
|
|
|
|
evlog_dir.mkdir(exist_ok=True)
|
|
evlog_file.touch()
|
|
|
|
json_array = []
|
|
|
|
with open(evlog_file, "w") as ef:
|
|
json.dump(json_array, ef)
|
|
|
|
|
|
def evlog_list(args):
|
|
"""List evlog entries.
|
|
|
|
Lists evlog entries for provided timestamp range and/or evlog file
|
|
"""
|
|
if args.file:
|
|
selected_evlog_file = evlog_dir.joinpath(args.file)
|
|
else:
|
|
selected_evlog_file = evlog_dir.joinpath(default_date + "_evlog").with_suffix(
|
|
".json"
|
|
)
|
|
|
|
if not selected_evlog_file.exists():
|
|
exit("evlog file %s not found. Are you sure it exists?" % selected_evlog_file)
|
|
|
|
if not args.start:
|
|
ts_from = selected_evlog_file.stem[:10] + " 00:00:00"
|
|
else:
|
|
dt.datetime.strptime(args.start, "%Y-%m-%d %H:%M:%S")
|
|
ts_from = args.start
|
|
|
|
if not args.end:
|
|
ts_to = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
else:
|
|
dt.datetime.strptime(args.end, "%Y-%m-%d %H:%M:%S")
|
|
ts_to = args.end
|
|
|
|
with open(selected_evlog_file, "r") as ef:
|
|
json_data = json.load(ef)
|
|
|
|
table = Table(style="#5f00ff", header_style="bold", box=box.ROUNDED)
|
|
table.add_column("Index", justify="right", style="white")
|
|
table.add_column("Timestamp", justify="left", style="#ff87d7")
|
|
table.add_column("Message", justify="left")
|
|
|
|
for i in range(len(json_data)):
|
|
if json_data[i]["timestamp"] > ts_from and json_data[i]["timestamp"] < ts_to:
|
|
table.add_row(str(i), json_data[i]["timestamp"], json_data[i]["message"])
|
|
|
|
console = Console(color_system="256")
|
|
console.print(table)
|
|
|
|
|
|
def evlog_list_files(args):
|
|
"""List all evlog files.
|
|
|
|
Lists all evlog files currently present in evlog directory.
|
|
"""
|
|
for file in sorted(evlog_dir.iterdir()):
|
|
if file.is_file():
|
|
if args.absolute:
|
|
print(file)
|
|
else:
|
|
print(file.name)
|
|
|
|
|
|
def evlog_search(args):
|
|
"""Search for a string.
|
|
|
|
Searches all evlog files and prints found matches.
|
|
"""
|
|
found_entries = list()
|
|
evlog_list = [file.name for file in evlog_dir.iterdir()]
|
|
|
|
console = Console()
|
|
|
|
for file in evlog_list:
|
|
with open(evlog_dir.joinpath(file), "r") as ef:
|
|
json_data = json.load(ef)
|
|
for entry in json_data:
|
|
if args.word in entry["message"]:
|
|
found_entries.append(entry)
|
|
|
|
if found_entries:
|
|
for entry in found_entries:
|
|
console.print(
|
|
"[bold green]{0}[/bold green] {1}".format(
|
|
entry["timestamp"], entry["message"]
|
|
)
|
|
)
|
|
else:
|
|
console.print(
|
|
"[bold yellow]{0}[/bold yellow] was not found in any of the evlog files".format(
|
|
args.word
|
|
)
|
|
)
|
|
|
|
|
|
def evlog_sort(file):
|
|
"""Sort evlog entries.
|
|
|
|
Entries are sorted by provided timestamp.
|
|
"""
|
|
with open(file, "r") as ef:
|
|
json_data = json.load(ef)
|
|
json_data.sort(
|
|
key=lambda x: time.mktime(
|
|
time.strptime(x["timestamp"], "%Y-%m-%d %H:%M:%S")
|
|
)
|
|
)
|
|
|
|
with open(file, "w") as ef:
|
|
json.dump(json_data, ef, indent=4)
|
|
|
|
|
|
def validate_json(file):
|
|
"""Validate JSON data.
|
|
|
|
Call jsonschema.validate on `file`.
|
|
"""
|
|
evlog_schema = {
|
|
"type": "array",
|
|
"properties": {
|
|
"timestamp": {"type": "string"},
|
|
"message": {"type": "string"},
|
|
},
|
|
}
|
|
|
|
with open(file, "r") as ef:
|
|
json_data = json.load(ef)
|
|
|
|
try:
|
|
validate(instance=json_data, schema=evlog_schema)
|
|
except jsonschema.ValidationError as err:
|
|
print("Invalid JSON detected on %s" % file)
|
|
print(err)
|
|
|
|
|
|
def evlog_append(args):
|
|
"""
|
|
Append a new evlog entry to the evlog file.
|
|
|
|
Use evlog file indicated by provided timestamp, or else use the evlog file for current day.
|
|
"""
|
|
if not args.timestamp:
|
|
ts = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
else:
|
|
dt.datetime.strptime(args.timestamp, "%Y-%m-%d %H:%M:%S")
|
|
ts = args.timestamp
|
|
|
|
evlog_filename = ts[:10] + "_evlog"
|
|
evlog_file = evlog_dir.joinpath(evlog_filename).with_suffix(".json")
|
|
|
|
if not evlog_file.exists():
|
|
evlog_init(evlog_file)
|
|
|
|
entry = {"timestamp": ts, "message": args.message}
|
|
|
|
with open(evlog_file, "r+") as ef:
|
|
json_data = json.load(ef)
|
|
json_data.append(entry)
|
|
ef.seek(0)
|
|
json.dump(json_data, ef, indent=4)
|
|
|
|
evlog_sort(evlog_file)
|
|
validate_json(evlog_file)
|
|
|
|
|
|
def evlog_edit(args):
|
|
"""Edit evlog entry at provided index argument."""
|
|
if args.file:
|
|
evlog_file = evlog_dir.joinpath(args.file)
|
|
else:
|
|
evlog_file = evlog_dir.joinpath(default_date + "_evlog").with_suffix(".json")
|
|
|
|
if not evlog_file.exists():
|
|
exit(
|
|
"evlog file not found. Please run 'evlog append' to start a new evlog file."
|
|
)
|
|
|
|
with open(evlog_file, "r+") as ef:
|
|
json_data = json.load(ef)
|
|
json_data[args.index]["message"] = args.message
|
|
ef.seek(0)
|
|
json.dump(json_data, ef, indent=4)
|
|
|
|
validate_json(evlog_file)
|
|
|
|
|
|
def evlog_remove(args):
|
|
"""Remove an evlog entry at provided index argument."""
|
|
if args.file:
|
|
evlog_file = evlog_dir.joinpath(args.file)
|
|
else:
|
|
evlog_file = evlog_dir.joinpath(default_date + "_evlog").with_suffix(".json")
|
|
|
|
if not evlog_file.exists():
|
|
exit(
|
|
"evlog file not found. Please run 'evlog append' to start a new evlog file."
|
|
)
|
|
|
|
with open(evlog_file, "r") as ef:
|
|
json_data = json.load(ef)
|
|
json_data.pop(args.index)
|
|
|
|
with open(evlog_file, "w") as ef:
|
|
json.dump(json_data, ef, indent=4)
|
|
|
|
validate_json(evlog_file)
|
|
|
|
|
|
parser = argparse.ArgumentParser(prog="evlog")
|
|
parser.add_argument(
|
|
"-v",
|
|
"--version",
|
|
action="version",
|
|
version="%(prog)s {}".format(VERSION),
|
|
help="Print version information",
|
|
)
|
|
subparsers = parser.add_subparsers()
|
|
|
|
add_parser = subparsers.add_parser("add", description="Add an evlog entry")
|
|
add_parser.add_argument(
|
|
"-t",
|
|
"--timestamp",
|
|
required=False,
|
|
type=str,
|
|
action="store",
|
|
help="Timestamp for evlog entry: str",
|
|
)
|
|
add_parser.add_argument(
|
|
"-m",
|
|
"--message",
|
|
required=True,
|
|
type=str,
|
|
action="store",
|
|
help="Message for evlog entry: str",
|
|
)
|
|
add_parser.set_defaults(func=evlog_append)
|
|
|
|
edit_parser = subparsers.add_parser("edit", description="Edit an evlog entry")
|
|
edit_parser.add_argument(
|
|
"-i",
|
|
"--index",
|
|
required=True,
|
|
type=int,
|
|
action="store",
|
|
help="Index of evlog entry: int",
|
|
)
|
|
edit_parser.add_argument(
|
|
"-m",
|
|
"--message",
|
|
required=True,
|
|
type=str,
|
|
action="store",
|
|
help="New message for evlog entry: str",
|
|
)
|
|
edit_parser.add_argument(
|
|
"-f",
|
|
"--file",
|
|
required=False,
|
|
type=str,
|
|
action="store",
|
|
help="evlog file to edit. Ex: 2022-10-02_evlog.json",
|
|
)
|
|
edit_parser.set_defaults(func=evlog_edit)
|
|
|
|
rm_parser = subparsers.add_parser("rm", description="Remove an evlog entry")
|
|
rm_parser.add_argument(
|
|
"-i",
|
|
"--index",
|
|
required=True,
|
|
type=int,
|
|
action="store",
|
|
help="Index of evlog entry: int",
|
|
)
|
|
rm_parser.add_argument(
|
|
"-f",
|
|
"--file",
|
|
required=False,
|
|
type=str,
|
|
action="store",
|
|
help="evlog file to remove from. Ex: 2022-10-02_evlog.json",
|
|
)
|
|
rm_parser.set_defaults(func=evlog_remove)
|
|
|
|
ls_parser = subparsers.add_parser("ls", description="List evlog entries")
|
|
ls_parser.add_argument(
|
|
"-s",
|
|
"--start",
|
|
metavar="TIMESTAMP",
|
|
required=False,
|
|
type=str,
|
|
action="store",
|
|
help="From timestamp: str. Default is today at 00:00:00. Ex. 2022-09-28 13:45:00",
|
|
)
|
|
ls_parser.add_argument(
|
|
"-e",
|
|
"--end",
|
|
metavar="TIMESTAMP",
|
|
required=False,
|
|
type=str,
|
|
action="store",
|
|
help="To timestamp: str. Default is today at now. Ex. 2022-09-28 21:00:00",
|
|
)
|
|
ls_parser.add_argument(
|
|
"-f",
|
|
"--file",
|
|
required=False,
|
|
type=str,
|
|
action="store",
|
|
help="evlog file to view. Ex: 2022-10-02_evlog.json",
|
|
)
|
|
ls_parser.set_defaults(func=evlog_list)
|
|
|
|
ls_files_parser = subparsers.add_parser("lsfiles", description="List all evlog files")
|
|
ls_files_parser.add_argument(
|
|
"-a",
|
|
"--absolute",
|
|
required=False,
|
|
action="store_true",
|
|
help="List the absolute paths of the evlog files",
|
|
)
|
|
ls_files_parser.set_defaults(func=evlog_list_files)
|
|
|
|
search_parser = subparsers.add_parser(
|
|
"search", description="Search for keywords in evlog files"
|
|
)
|
|
search_parser.add_argument(
|
|
"-w", "--word", required=True, type=str, action="store", help="Word to search for"
|
|
)
|
|
search_parser.set_defaults(func=evlog_search)
|
|
|
|
|
|
def main():
|
|
"""Parse command line arguments and run desired function.
|
|
|
|
Checks if the number of arguments provided at the command line is less than the required number,
|
|
and if so, it prints the usage message.
|
|
"""
|
|
if len(sys.argv) < 2:
|
|
parser.print_usage()
|
|
else:
|
|
args = parser.parse_args()
|
|
args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|