splitops (4019B)
1 #!/usr/bin/env python3 2 3 """ 4 This script is intended to be used as a SSH command wrapper. 5 6 It allows users to propose a command that should be run. 7 The command will only be executed after a threshold of 8 other users has approved the command. 9 """ 10 11 import os 12 import shlex 13 import sys 14 import json 15 from pathlib import Path 16 import uuid 17 from dataclasses import dataclass 18 import subprocess 19 20 # Approval threshold, including the approval 21 # of the proposer. 22 APPROVAL_THRESHOLD = 2 23 24 cmdpath = Path.home() / "cmd.json" 25 26 def write_cmd(d): 27 with open(cmdpath, "w") as f: 28 f.write(json.dumps(d)) 29 30 def read_cmd(): 31 try: 32 with open(cmdpath, "r") as f: 33 return json.load(f) 34 except FileNotFoundError: 35 return None 36 37 def propose(cmd): 38 request_id = uuid.uuid4().hex.lower()[0:6] 39 for x in cmd: 40 if not x.isascii(): 41 print("requested command not ascii") 42 sys.exit(4) 43 print(f"requested command: {cmd}") 44 write_cmd({"cmd": cmd, "request_id": request_id}) 45 print(f"assigned id: {request_id}") 46 47 def approve(my_user, request_id): 48 print(f"approving command {request_id} as {my_user}") 49 d = read_cmd() 50 if d is None: 51 print("no command proposed") 52 sys.exit(1) 53 if d["request_id"] != request_id: 54 print("request ID does not match") 55 sys.exit(1) 56 approved_by = d.get("approved_by", []) 57 if my_user not in approved_by: 58 approved_by.append(my_user) 59 d["approved_by"] = approved_by 60 write_cmd(d) 61 62 def run(request_id): 63 print(f"running command with ID {request_id}") 64 d = read_cmd() 65 if d is None: 66 print("no command proposed") 67 sys.exit(1) 68 if d["request_id"] != request_id: 69 print("request ID does not match") 70 sys.exit(1) 71 approved_by = d.get("approved_by", []) 72 num_approvals = len(approved_by) 73 if num_approvals < APPROVAL_THRESHOLD: 74 print(f"not enough approvals, got {num_approvals} but need {APPROVAL_THRESHOLD}") 75 sys.exit(1) 76 if d.get("executed", False): 77 print("command has already been executed once, please request again") 78 sys.exit(1) 79 cmd = d["cmd"] 80 d["executed"] = True 81 # Mark as executed, can only execute once! 82 write_cmd(d) 83 print("running command", cmd) 84 res = subprocess.run(cmd, capture_output=True, encoding="utf-8") 85 print(f"==stdout==\n{res.stdout}====") 86 print(f"==stderr==\n{res.stderr}====") 87 print(f"exit code: {res.returncode}") 88 # FIXME: Write log to disk? 89 90 91 def usage(): 92 print("Commands:") 93 print(" whoami: Check authentication.") 94 print(" propose CMD...: Propose a new command.") 95 print(" get: Get the currently proposed command.") 96 print(" approve CMDID: Approve a command.") 97 print(" run CMDID: Run a sufficiently approved command.") 98 print(" discard: Discard the currently proposed command.") 99 sys.exit(1) 100 101 def die(msg): 102 print(msg) 103 sys.exit(2) 104 105 def main(): 106 if len(sys.argv) != 2: 107 die("unexpected usage") 108 user = sys.argv[1] 109 os_user = os.environ["USER"] 110 print(f"authenticated as: {user}") 111 inner_cmd = os.environ.get("SSH_ORIGINAL_COMMAND") 112 if inner_cmd is None: 113 print("no command provided, try help") 114 sys.exit(3) 115 inner_args = shlex.split(inner_cmd) 116 if len(inner_args) < 1: 117 usage() 118 subcommand = inner_args[0] 119 if subcommand == "discard": 120 cmdpath.unlink() 121 elif subcommand == "whoami": 122 print(f"you are {user} on {os_user}") 123 elif subcommand == "propose": 124 propose(inner_args[1:]) 125 elif subcommand == "get": 126 print(read_cmd()) 127 elif subcommand == "help": 128 usage() 129 elif subcommand == "run": 130 if len(inner_args) != 2: 131 usage() 132 run(inner_args[1]) 133 elif subcommand == "approve": 134 if len(inner_args) != 2: 135 usage() 136 approve(user, inner_args[1]) 137 else: 138 print(f"unknown subcommand {subcommand}") 139 usage() 140 141 if __name__ == '__main__': 142 main() 143