Georg c27753da86
Init
Signed-off-by: Georg <georg@lysergic.dev>
2021-08-14 10:10:42 +02:00

275 lines
9.1 KiB
Python
Executable File

#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
# Original by https://github.com/sleeepyjack/dockersh
# Modified by georg@lysergic.dev
# POC / IN DEVELOPMENT
# Do NOT use this in insecure environments
import os
os.environ['TERM'] = 'xterm' # removes warning on non-tty commands
import argparse
#import argcomplete
#from argcomplete.completers import ChoicesCompleter
from configparser import ConfigParser, ExtendedInterpolation
import docker
import random
import string
import sys
from pwd import getpwnam
import socket
import os
import getpass
prog = 'dockersh'
version = prog + " v1.0"
config_file = "/etc/dockersh.ini"
user = getpass.getuser()
hostname = socket.gethostname()
cli = docker.APIClient()
def containers(image_filter='', container_filter='', sort_by='Created', all=True):
cs = cli.containers(all=all, filters={'label': "user="+user})
cs.sort(key=lambda c: c[sort_by])
cs = [c for c in cs if str(c['Image']+':sh0').startswith(image_filter)]
cs = [c for c in cs if c['Names'][0][1:].startswith(container_filter)]
return cs
def random_string(length):
def random_char():
return random.choice(string.ascii_uppercase + string.digits)
return ''.join(random_char() for _ in range(length))
def strip(s, suffix=''):
for c in ['/', ':', '.', ' ']: #QUESTION does this suffice?
s = s.replace(c, '')
if s.endswith(suffix):
s = s[:len(s)-len(suffix)]
return s
def pull(image):
if not image in image_names:
s = image.split(':')
if len(s) > 1:
cli.pull(s[0], s[1])
else:
cli.pull(s[0])
def image_split(s):
sp = s.split(':')
if len(sp) == 1:
return sp[0], 'sh0'
else:
return sp[0], sp[1]
def selection_menu(choices):
if len(choices) == 1:
return 0
print("There are multiple matching containers running:")
for j, c in enumerate(choices):
print("[" + str(j+1) + "]\t" + c)
inp = input("select [1]: ")
if inp == "":
i = 0
else:
i = int(inp) - 1
assert(0 <= i < len(choices))
return i
#if __name__ == "__main__":
def parse_args():
parser = argparse.ArgumentParser(prog=prog)
parser.add_argument('--version',
action='version',
version=version)
parser.add_argument('-i', '--image',
dest='image',
help="base image to be used",
default="") #.completer = ChoicesCompleter(tuple(images))
parser.add_argument('-n', '--name',
dest='name',
help="container name",
default="") #.completer = ChoicesCompleter(tuple(containers))
parser.add_argument('-t', '--temporary',
dest='temp',
action='store_true',
help="execute in temporary container",
default=False)
parser.add_argument('-c', '--command',
dest='cmd',
help="pass command to bash in container",
default="")
parser.add_argument('--home',
dest='home',
help="user home directory",
default=ini['homedir'])
#argcomplete.autocomplete(parser) #TODO make autocompletion work
args = parser.parse_args()
args.suffix = ini['suffix']
args.greeting = ini['greeting']
args.ini = ini
return args
# load ini
cfg = ConfigParser({"USER": user, "HOSTNAME": hostname}, interpolation=ExtendedInterpolation())
cfg.read(config_file, encoding="utf-8")
if os.getenv("USER") and user != os.getenv('USER') and user in cfg["ADMIN"]["names"].splitlines():
user = os.getenv('USER')
# reread config
cfg = ConfigParser({"USER": user, "HOSTNAME": hostname}, interpolation=ExtendedInterpolation())
cfg.read(config_file, encoding="utf-8")
admin_cmd = "admin"
admin_shell = "/bin/bash"
if cfg.has_section("ADMIN") and "command" in cfg["ADMIN"]:
admin_cmd = cfg["ADMIN"]["command"]
if "admin_shell" in cfg["ADMIN"]:
admin_shell = cfg["ADMIN"]["admin_shell"]
ini = cfg[user] if cfg.has_section(user) else cfg['DEFAULT']
#if __name__ == "__main__":
args = parse_args()
if args.cmd == admin_cmd:
print("Trying to login into host: "+user)
if not sys.stdout.isatty():
print()
print("admin mode is only possible using pseudo tty-allocation.")
print("Try login using:")
print("ssh -t ...")
sys.exit(0)
os.system("sudo -u "+user+" sudo "+admin_shell)
sys.exit(0)
if cfg.has_section("ADMIN") and "maintenance" in cfg["ADMIN"] and cfg["ADMIN"]["maintenance"] == "on" and (not "maintenance_scp" in cfg["ADMIN"] or cfg["ADMIN"]["maintenance_scp"] != "on"):
if "maintenance_text" in cfg["ADMIN"]:
print(cfg["ADMIN"]["maintenance_text"])
else:
print("This Maschine is in Maintanence Mode.")
sys.exit(0)
is_scp_cmd = False
if args.cmd:
if os.path.basename(args.cmd).startswith(("scp","rsync --server","sftp-server","ls","*")):
is_scp_cmd = True
if args.cmd == "envir":
print(os.environ)
name_passed = (args.name != "")
image_passed = (args.image != "")
if not is_scp_cmd and cfg.has_section("ADMIN") and "maintenance" in cfg["ADMIN"] and cfg["ADMIN"]["maintenance"] == "on":
if "maintenance_text" in cfg["ADMIN"]:
print(cfg["ADMIN"]["maintenance_text"])
else:
print("This Maschine is in Maintanence Mode. However, you can copy files with scp, rsync, sftp or list files with ls without connecting to the maschine.")
sys.exit(0)
if args.temp:
if not image_passed:
args.image = args.ini['image']
args.image_base, args.image_tag = image_split(args.image)
args.image = args.image_base + ':' + args.image_tag
args.name = strip(args.image) + '_tmp' + random_string(4)
else:
if name_passed:
args.name = strip(args.name, args.suffix)
filtered_con = containers(image_filter=args.image, container_filter=args.name)
if len(filtered_con) > 0:
con_names = [c['Names'][0][1:] for c in filtered_con]
i = selection_menu(con_names)
args.name = strip(con_names[i], args.suffix)
else:
if not image_passed:
args.image = args.ini['image']
args.image_base, args.image_tag = image_split(args.image)
args.image = args.image_base + ':' + args.image_tag
if not name_passed:
args.name = strip(args.image)
if len(containers(container_filter=args.name)) != 0:
print("WARNING: container name already exists (ignoring --image)")
args.full_name = args.name + args.suffix
initing = False
if len(containers(container_filter=args.name)) == 0:
volumes = []
if "volumes" in args.ini:
volumes = volumes + args.ini["volumes"].split(",")
volumes = [v.split(":") for v in volumes]
binds = {v[0].strip():{"bind":v[1].strip(),"mode":v[2].strip()} for v in volumes}
volumes = [v[1] for v in volumes]
host_config = cli.create_host_config(
binds=binds,
restart_policy={'Name' : 'unless-stopped'})
#cli.pull(args.image)
userpwd = getpwnam(user)
cli.create_container(args.image,
stdin_open=True,
tty=True,
name=args.full_name,
hostname=args.name,
labels={'group': prog, 'user': user},
volumes=volumes,
working_dir=args.home,
environment={
"HOST_USER_ID": userpwd.pw_uid,
"HOST_USER_GID": userpwd.pw_gid,
"HOST_USER_NAME": user
},
host_config=host_config
)
initing=True
cli.start(args.full_name)
if initing:
print("")
print("Initializing...")
#os.popen('docker exec '+args.full_name + ' /bin/bash -c "if [ -e /init-user ]; then /init-user; else echo \"No Initialization skript found for container\"; fi; echo Initialization finished."').read().split(":")[-1]
init_cmd = 'docker exec '+args.full_name + ' /bin/bash -c "if [ -e /init-user ]; then /init-user; else echo \\\"Script...\\\"; fi; echo Initialization finished."'
print(os.popen(init_cmd).read())
#print("Please login again.")
#sys.exit(0)
if len(args.cmd) == 0:
try:
print(args.greeting.replace("```",""))
except UnicodeEncodeError:
print(hostname)
user_bash = os.popen('docker exec -u root '+args.full_name + ' getent passwd '+user+'').read().split(":")[-1]
if user_bash == "":
user_bash = "/bin/bash"
cmd = args.cmd if args.cmd else user_bash
#custom
#user_bash = "/bin/sh"
#cmd = user_bash
cmd = "/bin/bash -c \"" + cmd + "\""
#custom
#os.system('docker exec -u '+user+' '+ args.full_name + ' ' + 'useradd -s /bin/bash user')
# a tty needs -it, scp needs -i
docker_arg = "-i" if not sys.stdout.isatty() or is_scp_cmd else "-it"
os.system('docker exec -u '+user+" " + docker_arg +' '+ args.full_name + ' ' + cmd+"")
if args.temp:
cli.remove_container(args.full_name, v=True, force=True)
cli.close()