import glob import json import re import sshpubkeys from flask import Flask, redirect, url_for, render_template, request # lyadmin # scripts and web form for a tilde / PAUS instance # # gashapwn # Nov 2020 # # https://git.lain.church/gashapwn/lyadmin # gashapwn@protonmail.com # or # gasahwpn on irc.lainchan.org app=Flask(__name__) # Paths for conf file, # user list, # directory containing # account request files... ACCOUNT_PATH = "./req/"; CONF_FN = "lyadmin.conf.json" CONF_PATH = "./" + str(CONF_FN) # validation stuff MAX_PUB_KEY_LEN = 5000 EMAIL_REGEX = "^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,10}$" KEY_REGEX = "^[ -~]+$" # Account requests are given ID numbers # the first request will have the below # id number INIT_REQ_ID = "00000" # The main home page @app.route("/") def home(): app.route('/') # Load the list of tilde users # to generate links for u_list = []; with open("user_list.txt") as u_file: for line in u_file.readlines(): u_list.append(line.strip()); return render_template("index.html", u_list=u_list, page_name="home") # Generates the page with rule. No logic needed. def rules(): return render_template("rules.html") # Generate HTML for a form widget def widg_fun(widg): if(widg.w_type == "input"): return "input id=id_%s name=%s type=text> 1): is_email_user = True else: email = "NO_EMAIL" # Validate shell if(not shell in conf_obj["shell"]): print("failed shell validation") return handle_invalid_data(req) # Validate email if( is_email_user and not re.search(EMAIL_REGEX, email)): print("failed email validation") return handle_invalid_data(req) # Validate the SSH pub key # Most software only handles up to 4096 bit keys if(len(pub_key) > MAX_PUB_KEY_LEN): print("key failed len check") return handle_invalid_data(req) # Only printable ascii characters in # a valid key # if(not re.search("^[ -~]+$", pub_key)): if(not re.search(KEY_REGEX, pub_key)): print("key failed regex") return handle_invalid_data(req) # Check the key against a library key = sshpubkeys.SSHKey( pub_key, strict_mode=False, skip_option_parsing=True ) try: key.parse() except Exception as e: print("key failed lib validation") return handle_invalid_data(request) if(len(xff_header) < 1): xff_header = "NO_XFF" # All users requests have a sequential ID # The below picks the next ID based on # how many requests we already have saved # to disk if(len(glob.glob(ACCOUNT_PATH + str("[0-9]*ident*"))) == 0): new_id = int(INIT_REQ_ID) new_id_str = INIT_REQ_ID else: max_id = max( list(map( lambda path : path.split("/")[-1].split(".")[0], glob.glob(str(ACCOUNT_PATH) + "[0-9]*ident*"))) ) zpad = len(max_id) new_id = int(max_id)+1 new_id_str = str(new_id).zfill(zpad) # write the request to disk # fn1 = str(FULL_PATH) + str(new_id_str) + ".ident" fn1 = str(ACCOUNT_PATH) + str(new_id_str) + ".ident" with open(fn1, "w") as ident_file: ident_file.write(str(username) + "\n") ident_file.write(str(email) + "\n") ident_file.write(str(shell) + "\n") ident_file.write(str(pub_key) + "\n") ident_file.write(str(xff_header) + "\n") return render_template("signup.html", is_email_user = is_email_user) @app.context_processor def get_site_name(): return {"site_name": conf_obj["site_name"]} @app.context_processor def get_admin_email(): return {"admin_email": conf_obj["admin_email"]} def check_pwd_for_conf(): pwd_file_list = list(map( lambda path : path.split("/")[-1], glob.glob("*") )) if(not CONF_FN in pwd_file_list): print("could not find " + str(CONF_PATH)) print("please run in the installation directory") return False return True # MAIN STARTS HERE if(__name__=="__main__" and check_pwd_for_conf()): # Slurp the conf file with open(CONF_PATH) as c: conf_json_str = c.read() conf_obj = json.loads(conf_json_str) # A global list of all the shell enums conf_obj["shell_tup_list"] = list(map( lambda k : ( k, conf_obj["shell"][k] ), list(conf_obj["shell"].keys()) )) # Setup URL rules app.add_url_rule('/rules', 'rules', rules) app.add_url_rule('/req', 'req', req, methods = ['POST', 'GET']) app.add_url_rule('/req/signup', 'signup', signup, methods = ['POST']) # Run that app! app.run(host=conf_obj["listen_ip"],debug=True)