summaryrefslogtreecommitdiff
path: root/splitops/splitops
blob: 5972887d0bf2d570022296c087090419b07a0d91 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python3

"""
This script is intended to be used as a SSH command wrapper.

It allows users to propose a command that should be run.
The command will only be executed after a threshold of
other users has approved the command.
"""

import os
import shlex
import sys
import json
from pathlib import Path
import uuid
from dataclasses import dataclass
import subprocess

# Approval threshold, including the approval
# of the proposer.
APPROVAL_THRESHOLD = 2

cmdpath = Path.home() / "cmd.json"

def write_cmd(d):
    with open(cmdpath, "w") as f:
        f.write(json.dumps(d))

def read_cmd():
    try:
        with open(cmdpath, "r") as f:
            return json.load(f)
    except FileNotFoundError:
        return None

def propose(cmd):
    request_id = uuid.uuid4().hex.lower()[0:6]
    for x in cmd:
        if not x.isascii():
            print("requested command not ascii")
            sys.exit(4)
    print(f"requested command: {cmd}")
    write_cmd({"cmd": cmd, "request_id": request_id})
    print(f"assigned id: {request_id}")

def approve(my_user, request_id):
    print(f"approving command {request_id} as {my_user}")
    d = read_cmd()
    if d is None:
        print("no command proposed")
        sys.exit(1)
    if d["request_id"] != request_id:
        print("request ID does not match")
        sys.exit(1)
    approved_by = d.get("approved_by", [])
    if my_user not in approved_by:
        approved_by.append(my_user)
    d["approved_by"] = approved_by
    write_cmd(d)

def run(request_id):
    print(f"running command with ID {request_id}")
    d = read_cmd()
    if d is None:
        print("no command proposed")
        sys.exit(1)
    if d["request_id"] != request_id:
        print("request ID does not match")
        sys.exit(1)
    approved_by = d.get("approved_by", [])
    num_approvals = len(approved_by)
    if num_approvals < APPROVAL_THRESHOLD:
        print(f"not enough approvals, got {num_approvals} but need {APPROVAL_THRESHOLD}")
        sys.exit(1)
    if d.get("executed", False):
        print("command has already been executed once, please request again")
        sys.exit(1)
    cmd = d["cmd"]
    d["executed"] = True
    # Mark as executed, can only execute once!
    write_cmd(d)
    print("running command", cmd)
    res = subprocess.run(cmd, capture_output=True, encoding="utf-8")
    print(f"==stdout==\n{res.stdout}====")
    print(f"==stderr==\n{res.stderr}====")
    print(f"exit code: {res.returncode}")
    # FIXME: Write log to disk?


def usage():
    print("Commands:")
    print(" whoami: Check authentication.")
    print(" propose CMD...: Propose a new command.")
    print(" get: Get the currently proposed command.")
    print(" approve CMDID: Approve a command.")
    print(" run CMDID: Run a sufficiently approved command.")
    print(" discard: Discard the currently proposed command.")
    sys.exit(1)

def die(msg):
    print(msg)
    sys.exit(2)

def main():
    if len(sys.argv) != 2:
        die("unexpected usage")
    user = sys.argv[1]
    os_user = os.environ["USER"]
    print(f"authenticated as: {user}")
    inner_cmd = os.environ.get("SSH_ORIGINAL_COMMAND")
    if inner_cmd is None:
        print("no command provided, try help")
        sys.exit(3)
    inner_args = shlex.split(inner_cmd)
    if len(inner_args) < 1:
        usage()
    subcommand = inner_args[0]
    if subcommand == "discard":
        cmdpath.unlink()
    elif subcommand == "whoami":
        print(f"you are {user} on {os_user}")
    elif subcommand == "propose":
        propose(inner_args[1:])
    elif subcommand == "get":
        print(read_cmd())
    elif subcommand == "help":
        usage()
    elif subcommand == "run":
        if len(inner_args) != 2:
            usage()
        run(inner_args[1])
    elif subcommand == "approve":
        if len(inner_args) != 2:
            usage()
        approve(user, inner_args[1])
    else:
        print(f"unknown subcommand {subcommand}")
        usage()

if __name__ == '__main__':
    main()