#!/usr/bin/env python3 """ This script is intended to be used as a SSH command wrapper. It allows users to propose a command that should be run. The command will only be executed after a threshold of other users has approved the command. """ import os import shlex import sys import json from pathlib import Path import uuid from dataclasses import dataclass import subprocess # Approval threshold, including the approval # of the proposer. APPROVAL_THRESHOLD = 2 cmdpath = Path.home() / "cmd.json" def write_cmd(d): with open(cmdpath, "w") as f: f.write(json.dumps(d)) def read_cmd(): try: with open(cmdpath, "r") as f: return json.load(f) except FileNotFoundError: return None def propose(cmd): request_id = uuid.uuid4().hex.lower()[0:6] for x in cmd: if not x.isascii(): print("requested command not ascii") sys.exit(4) print(f"requested command: {cmd}") write_cmd({"cmd": cmd, "request_id": request_id}) print(f"assigned id: {request_id}") def approve(my_user, request_id): print(f"approving command {request_id} as {my_user}") d = read_cmd() if d is None: print("no command proposed") sys.exit(1) if d["request_id"] != request_id: print("request ID does not match") sys.exit(1) approved_by = d.get("approved_by", []) if my_user not in approved_by: approved_by.append(my_user) d["approved_by"] = approved_by write_cmd(d) def run(request_id): print(f"running command with ID {request_id}") d = read_cmd() if d is None: print("no command proposed") sys.exit(1) if d["request_id"] != request_id: print("request ID does not match") sys.exit(1) approved_by = d.get("approved_by", []) num_approvals = len(approved_by) if num_approvals < APPROVAL_THRESHOLD: print(f"not enough approvals, got {num_approvals} but need {APPROVAL_THRESHOLD}") sys.exit(1) if d.get("executed", False): print("command has already been executed once, please request again") sys.exit(1) cmd = d["cmd"] d["executed"] = True # Mark as executed, can only execute once! write_cmd(d) print("running command", cmd) res = subprocess.run(cmd, capture_output=True, encoding="utf-8") print(f"==stdout==\n{res.stdout}====") print(f"==stderr==\n{res.stderr}====") print(f"exit code: {res.returncode}") # FIXME: Write log to disk? def usage(): print("Commands:") print(" whoami: Check authentication.") print(" propose CMD...: Propose a new command.") print(" get: Get the currently proposed command.") print(" approve CMDID: Approve a command.") print(" run CMDID: Run a sufficiently approved command.") print(" discard: Discard the currently proposed command.") sys.exit(1) def die(msg): print(msg) sys.exit(2) def main(): if len(sys.argv) != 2: die("unexpected usage") user = sys.argv[1] os_user = os.environ["USER"] print(f"authenticated as: {user}") inner_cmd = os.environ.get("SSH_ORIGINAL_COMMAND") if inner_cmd is None: print("no command provided, try help") sys.exit(3) inner_args = shlex.split(inner_cmd) if len(inner_args) < 1: usage() subcommand = inner_args[0] if subcommand == "discard": cmdpath.unlink() elif subcommand == "whoami": print(f"you are {user} on {os_user}") elif subcommand == "propose": propose(inner_args[1:]) elif subcommand == "get": print(read_cmd()) elif subcommand == "help": usage() elif subcommand == "run": if len(inner_args) != 2: usage() run(inner_args[1]) elif subcommand == "approve": if len(inner_args) != 2: usage() approve(user, inner_args[1]) else: print(f"unknown subcommand {subcommand}") usage() if __name__ == '__main__': main()