taler-deployment

Deployment scripts and configuration files
Log | Files | Refs | README

splitops (4019B)


      1 #!/usr/bin/env python3
      2 
      3 """
      4 This script is intended to be used as a SSH command wrapper.
      5 
      6 It allows users to propose a command that should be run.
      7 The command will only be executed after a threshold of
      8 other users has approved the command.
      9 """
     10 
     11 import os
     12 import shlex
     13 import sys
     14 import json
     15 from pathlib import Path
     16 import uuid
     17 from dataclasses import dataclass
     18 import subprocess
     19 
     20 # Approval threshold, including the approval
     21 # of the proposer.
     22 APPROVAL_THRESHOLD = 2
     23 
     24 cmdpath = Path.home() / "cmd.json"
     25 
     26 def write_cmd(d):
     27     with open(cmdpath, "w") as f:
     28         f.write(json.dumps(d))
     29 
     30 def read_cmd():
     31     try:
     32         with open(cmdpath, "r") as f:
     33             return json.load(f)
     34     except FileNotFoundError:
     35         return None
     36 
     37 def propose(cmd):
     38     request_id = uuid.uuid4().hex.lower()[0:6]
     39     for x in cmd:
     40         if not x.isascii():
     41             print("requested command not ascii")
     42             sys.exit(4)
     43     print(f"requested command: {cmd}")
     44     write_cmd({"cmd": cmd, "request_id": request_id})
     45     print(f"assigned id: {request_id}")
     46 
     47 def approve(my_user, request_id):
     48     print(f"approving command {request_id} as {my_user}")
     49     d = read_cmd()
     50     if d is None:
     51         print("no command proposed")
     52         sys.exit(1)
     53     if d["request_id"] != request_id:
     54         print("request ID does not match")
     55         sys.exit(1)
     56     approved_by = d.get("approved_by", [])
     57     if my_user not in approved_by:
     58         approved_by.append(my_user)
     59     d["approved_by"] = approved_by
     60     write_cmd(d)
     61 
     62 def run(request_id):
     63     print(f"running command with ID {request_id}")
     64     d = read_cmd()
     65     if d is None:
     66         print("no command proposed")
     67         sys.exit(1)
     68     if d["request_id"] != request_id:
     69         print("request ID does not match")
     70         sys.exit(1)
     71     approved_by = d.get("approved_by", [])
     72     num_approvals = len(approved_by)
     73     if num_approvals < APPROVAL_THRESHOLD:
     74         print(f"not enough approvals, got {num_approvals} but need {APPROVAL_THRESHOLD}")
     75         sys.exit(1)
     76     if d.get("executed", False):
     77         print("command has already been executed once, please request again")
     78         sys.exit(1)
     79     cmd = d["cmd"]
     80     d["executed"] = True
     81     # Mark as executed, can only execute once!
     82     write_cmd(d)
     83     print("running command", cmd)
     84     res = subprocess.run(cmd, capture_output=True, encoding="utf-8")
     85     print(f"==stdout==\n{res.stdout}====")
     86     print(f"==stderr==\n{res.stderr}====")
     87     print(f"exit code: {res.returncode}")
     88     # FIXME: Write log to disk?
     89 
     90 
     91 def usage():
     92     print("Commands:")
     93     print(" whoami: Check authentication.")
     94     print(" propose CMD...: Propose a new command.")
     95     print(" get: Get the currently proposed command.")
     96     print(" approve CMDID: Approve a command.")
     97     print(" run CMDID: Run a sufficiently approved command.")
     98     print(" discard: Discard the currently proposed command.")
     99     sys.exit(1)
    100 
    101 def die(msg):
    102     print(msg)
    103     sys.exit(2)
    104 
    105 def main():
    106     if len(sys.argv) != 2:
    107         die("unexpected usage")
    108     user = sys.argv[1]
    109     os_user = os.environ["USER"]
    110     print(f"authenticated as: {user}")
    111     inner_cmd = os.environ.get("SSH_ORIGINAL_COMMAND")
    112     if inner_cmd is None:
    113         print("no command provided, try help")
    114         sys.exit(3)
    115     inner_args = shlex.split(inner_cmd)
    116     if len(inner_args) < 1:
    117         usage()
    118     subcommand = inner_args[0]
    119     if subcommand == "discard":
    120         cmdpath.unlink()
    121     elif subcommand == "whoami":
    122         print(f"you are {user} on {os_user}")
    123     elif subcommand == "propose":
    124         propose(inner_args[1:])
    125     elif subcommand == "get":
    126         print(read_cmd())
    127     elif subcommand == "help":
    128         usage()
    129     elif subcommand == "run":
    130         if len(inner_args) != 2:
    131             usage()
    132         run(inner_args[1])
    133     elif subcommand == "approve":
    134         if len(inner_args) != 2:
    135             usage()
    136         approve(user, inner_args[1])
    137     else:
    138         print(f"unknown subcommand {subcommand}")
    139         usage()
    140 
    141 if __name__ == '__main__':
    142     main()
    143