mirror of
				https://git.kernel.org/pub/scm/network/wireless/iwd.git
				synced 2025-10-31 13:17:25 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			437 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			437 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/python3
 | |
| 
 | |
| import subprocess
 | |
| import sys
 | |
| import os
 | |
| import argparse
 | |
| from xml.etree import ElementTree
 | |
| 
 | |
| class Network:
 | |
|         def __init__(self):
 | |
|                 self.eap_types = []
 | |
|                 self.outer_id = None
 | |
|                 self.inner_eap = None
 | |
|                 self.nai_realms = []
 | |
|                 self.cert_path = None
 | |
|                 self.is_hotspot = False
 | |
|                 self.username = None
 | |
|                 self.password = None
 | |
|                 self.ssid = None
 | |
|                 self.display_name = None
 | |
|                 self.tls_trusted_servers = []
 | |
| 
 | |
| def process_eap_config(eap_conf, network):
 | |
|         for m in range(len(eap_conf)):
 | |
|                 if eap_conf[m].text == "AcceptEAPTypes":
 | |
|                         tarray = eap_conf[m + 1]
 | |
|                         for i in range(len(tarray)):
 | |
|                                 network.eap_types.append(tarray[i].text)
 | |
| 
 | |
|                 elif eap_conf[m].text == "TTLSInnerAuthentication":
 | |
|                         network.inner_eap = eap_conf[m + 1].text
 | |
|                 elif eap_conf[m].text == "OuterIdentity":
 | |
|                         network.outer_id = eap_conf[m + 1].text
 | |
|                 elif eap_conf[m].text == "UserName" and args.user:
 | |
|                         network.username =  eap_conf[m + 1].text
 | |
|                 elif eap_conf[m].text == "UserPassword" and args.passwd:
 | |
|                         network.password =  eap_conf[m + 1].text
 | |
|                 elif eap_conf[m].text == "TLSTrustedServerNames":
 | |
|                         tarray = eap_conf[m + 1]
 | |
|                         for i in range(len(tarray)):
 | |
|                                 network.tls_trusted_servers.append(tarray[i].text)
 | |
| 
 | |
| def process_payload_array(parray):
 | |
|         network = Network()
 | |
| 
 | |
|         for l in range(len(parray)):
 | |
|                 if parray[l].text == "NAIRealmNames":
 | |
|                         nai_array = parray[l + 1]
 | |
|                         for i in range(len(nai_array)):
 | |
|                                 network.nai_realms.append(nai_array[i].text)
 | |
|                         continue
 | |
|                 elif parray[l].text == "IsHotspot":
 | |
|                         if parray[l + 1].tag.lower() == "true":
 | |
|                                 network.is_hotspot = True
 | |
|                         else:
 | |
|                                 network.is_hotspot = False
 | |
|                 elif parray[l].text == "SSID_STR":
 | |
|                         network.ssid = parray[l + 1].text
 | |
|                 elif parray[l].text == "DisplayedOperatorName":
 | |
|                         network.display_name = parray[l + 1].text
 | |
|                 elif parray[l].text == "EAPClientConfiguration":
 | |
|                         process_eap_config(parray[l + 1], network)
 | |
| 
 | |
|         return network
 | |
| 
 | |
| def process_payload(payload):
 | |
|         networks = []
 | |
|         for k in range(len(payload)):
 | |
|                 if payload[k].tag != "dict":
 | |
|                         continue
 | |
| 
 | |
|                 n = process_payload_array(payload[k])
 | |
|                 if n:
 | |
|                         networks.append(n)
 | |
| 
 | |
|         return networks
 | |
| 
 | |
| 
 | |
| def write_network(network, root_ca_path):
 | |
|         global cert_path
 | |
|         output = ""
 | |
|         eap = None
 | |
| 
 | |
|         # TODO: Handle multiple EAP types?
 | |
|         if len(network.eap_types) < 1:
 | |
|                 print("Not configuring open network %s" % network.ssid)
 | |
|                 return
 | |
| 
 | |
|         if network.eap_types[0] == '21':
 | |
|                 eap = 'TTLS'
 | |
|         elif network.eap_types[0] == '25':
 | |
|                 eap = 'PEAP'
 | |
| 
 | |
|         if not eap:
 | |
|                 print("TTLS or PEAP config was not found in XML")
 | |
|                 return
 | |
| 
 | |
|         if not network.inner_eap:
 | |
|                 print("No inner EAP method found in XML")
 | |
|                 return
 | |
| 
 | |
|         if network.is_hotspot and len(network.nai_realms) == 0:
 | |
|                 print("No NAI realms found in XML")
 | |
|                 return
 | |
| 
 | |
|         output = "[Security]\n"
 | |
|         output += "EAP-Method=%s\n" % eap
 | |
| 
 | |
|         # Use OuterIdentity if specified. But if not use "anonymous". Some AP's
 | |
|         # do not like an empty identity packet and will timeout.
 | |
|         if network.outer_id:
 | |
|                 output += "EAP-Identity=%s\n" % network.outer_id
 | |
|         else:
 | |
|                 output += "EAP-Identity=anonymous\n"
 | |
| 
 | |
|         if root_ca_path:
 | |
|                 output += "EAP-%s-CACert=embed:root_ca\n" % eap
 | |
| 
 | |
|         output += "EAP-%s-Phase2-Method=Tunneled-%s\n" % \
 | |
|                                                         (eap, network.inner_eap)
 | |
| 
 | |
|         if network.username:
 | |
|                 output += "EAP-%s-Phase2-Identity=%s\n" % \
 | |
|                                                         (eap, network.username)
 | |
| 
 | |
|         if network.password:
 | |
|                 output += "EAP-%s-Phase2-Password=%s\n" % \
 | |
|                                                         (eap, network.password)
 | |
| 
 | |
|         if len(network.tls_trusted_servers) > 0:
 | |
|                 output += "EAP-%s-ServerDomainMask=" % eap
 | |
|                 output += ';'.join(network.tls_trusted_servers)
 | |
|                 output += '\n'
 | |
| 
 | |
|         if network.display_name:
 | |
|                 name = network.display_name
 | |
|         elif network.ssid:
 | |
|                 name = network.ssid
 | |
|         else:
 | |
|                 name = "NameUnknown"
 | |
| 
 | |
|         if network.is_hotspot:
 | |
|                 conf_file = iwd_dir + '/hotspot/' + \
 | |
|                                 name + '.conf'
 | |
|                 output += "[Hotspot]\n"
 | |
|                 output += "NAIRealmNames="
 | |
|                 output += ','.join(network.nai_realms)
 | |
|                 output += "\n"
 | |
|                 output += "Name=%s\n" % name
 | |
| 
 | |
|         else:
 | |
|                 conf_file = iwd_dir + '/' + name + '.8021x'
 | |
| 
 | |
|         # Some AP's require older protocol versions. There should be no harm in
 | |
|         # setting this all the time.
 | |
|         output += "[EAPoL]\n"
 | |
|         output += "ProtocolVersion=1\n"
 | |
| 
 | |
|         output += "\n"
 | |
| 
 | |
|         if root_ca_path:
 | |
|                 output += "[@pem@root_ca]\n"
 | |
|                 with open(root_ca_path) as f:
 | |
|                         output += f.read()
 | |
| 
 | |
|         print("Provisioning network %s\n" % conf_file)
 | |
| 
 | |
|         if args.verbose:
 | |
|                 print(output)
 | |
| 
 | |
|         with open(conf_file, 'w+') as f:
 | |
|                 f.write(output)
 | |
| 
 | |
| def find_root_ca(chain):
 | |
|         def cleanup(certs):
 | |
|                 for c in certs:
 | |
|                         os.remove(c)
 | |
| 
 | |
|         def parse_cert_chain(file):
 | |
|                 certs = []
 | |
|                 in_cert = False
 | |
|                 current = ""
 | |
|                 f = open(file, 'r')
 | |
|                 for l in f:
 | |
|                         if '-----BEGIN CERTIFICATE-----' in l:
 | |
|                                 if in_cert:
 | |
|                                         print("invalid BEGIN")
 | |
|                                         exit()
 | |
|                                 in_cert = True
 | |
|                         elif '-----END CERTIFICATE-----' in l:
 | |
|                                 if not in_cert:
 | |
|                                         print("invalid END")
 | |
|                                         exit()
 | |
|                                 in_cert = False
 | |
|                                 current += l
 | |
|                                 certs.append(current)
 | |
|                                 current = ""
 | |
|                                 continue
 | |
| 
 | |
|                         current += l
 | |
| 
 | |
|                 return certs
 | |
| 
 | |
|         def find_root_ca_path(hash):
 | |
|                 path = '/etc/ssl/certs/%s.0' % hash
 | |
|                 if os.path.exists(path):
 | |
|                         return os.path.realpath(path)
 | |
| 
 | |
|                 return None
 | |
| 
 | |
|         #
 | |
|         # Parse each cert out of the chain.
 | |
|         #
 | |
|         certs = parse_cert_chain(chain)
 | |
|         files = []
 | |
|         subjects = []
 | |
|         self_signed = None
 | |
| 
 | |
|         if len(certs) < 1:
 | |
|                 print("No certs found")
 | |
|                 exit()
 | |
| 
 | |
|         #
 | |
|         # Write each cert into an intermediate#.pem file, get subject and see if
 | |
|         # any are self-signed (Root CA). If one is self signed, save this file
 | |
|         # to be used later when verifying the chain.
 | |
|         #
 | |
|         for index, c in enumerate(certs):
 | |
|                 with open("/tmp/intermediate" + str(index) + ".pem", 'w+') as f:
 | |
|                         f.write(c)
 | |
| 
 | |
|                         files.append("/tmp/intermediate%d.pem" % index)
 | |
| 
 | |
|                 with open(os.devnull, 'w') as devnull:
 | |
|                         proc = subprocess.Popen(['openssl', 'x509', '-subject',
 | |
|                                         '-hash', '-issuer_hash', '-noout', '-in',
 | |
|                                         '/tmp/intermediate%d.pem' % index],
 | |
|                                         stdout=subprocess.PIPE, stderr=devnull)
 | |
| 
 | |
|                 result, err = proc.communicate()
 | |
|                 results = result.decode("utf-8").split('\n')
 | |
| 
 | |
|                 sub = results[0]
 | |
|                 own_hash = results[1]
 | |
|                 issuer_hash = results[2]
 | |
| 
 | |
|                 #
 | |
|                 # Get rid of "depth=#" openssl output
 | |
|                 #
 | |
|                 sub = sub[sub.index('C ='):].strip()
 | |
|                 subjects.append(sub)
 | |
| 
 | |
|                 if own_hash == issuer_hash:
 | |
|                         self_signed = '/tmp/intermediate%d.pem' % index
 | |
| 
 | |
|         #
 | |
|         # Let openssl verify and print the subject of the cert chain. If there
 | |
|         # is a self signed cert we want openssl to bypass looking in the cert
 | |
|         # store (-no-CApath), and provide this CA (-CAfile). Otherwise, let
 | |
|         # openssl verify the chain as usual.
 | |
|         #
 | |
|         if not self_signed:
 | |
|                 with open(os.devnull, 'w') as devnull:
 | |
|                         proc = subprocess.Popen(['openssl', 'verify',
 | |
|                                         '-show_chain', '-untrusted', chain,
 | |
|                                         '/tmp/intermediate0.pem'],
 | |
|                                         stdout=subprocess.PIPE, stderr=devnull)
 | |
|         else:
 | |
|                 with open(os.devnull, 'w') as devnull:
 | |
|                         proc = subprocess.Popen(['openssl', 'verify',
 | |
|                                         '-show_chain', '-no-CApath', '-CAfile',
 | |
|                                         self_signed, '-untrusted', chain,
 | |
|                                         '/tmp/intermediate0.pem'],
 | |
|                                         stdout=subprocess.PIPE, stderr=devnull)
 | |
| 
 | |
|         results, err = proc.communicate()
 | |
|         results = results.decode("utf-8").strip().split('\n')
 | |
| 
 | |
|         #
 | |
|         # We only want lines starting with depth=
 | |
|         #
 | |
|         results = [e for e in results if e.startswith('depth=')]
 | |
| 
 | |
|         print("Found %d certs in chain" % len(results))
 | |
| 
 | |
|         #
 | |
|         # Get rid of prepended "depth=#"
 | |
|         #
 | |
|         ca_sub = results[-1]
 | |
|         ca_sub = ca_sub[ca_sub.index('C ='):].strip()
 | |
| 
 | |
|         #
 | |
|         # The last cert in the chain will either be a Root CA, or issued by a
 | |
|         # Root CA. If we find a matching subject in our previous -show_chain
 | |
|         # command we know there is a Root CA in the chain, if not we need to
 | |
|         # find the Root CA on the system.
 | |
|         #
 | |
|         ca_cert_index = len(results) - 2
 | |
| 
 | |
|         for i, sub in enumerate(subjects):
 | |
|                 if ca_sub == sub:
 | |
|                         print("Root CA found in chain, index=%d" % i)
 | |
|                         ca_cert_index = i
 | |
| 
 | |
|         #
 | |
|         # Now that we have this last cert, check if its a Root CA or not by
 | |
|         # checking if its hash matches the issuer hash.
 | |
|         #
 | |
|         with open(os.devnull, 'w') as devnull:
 | |
|                 proc = subprocess.Popen(['openssl', 'x509', '-hash',
 | |
|                                 '-issuer_hash', '-noout', '-in',
 | |
|                                 '/tmp/intermediate%d.pem' % ca_cert_index],
 | |
|                                 stdout=subprocess.PIPE, stderr=devnull)
 | |
|         hashes, err = proc.communicate()
 | |
|         hashes = hashes.decode('utf-8').strip().split('\n')
 | |
| 
 | |
|         own = hashes[0]
 | |
|         issuer = hashes[1]
 | |
| 
 | |
|         if own == issuer:
 | |
|                 #
 | |
|                 # Since this is a Root CA, we should already have a copy on the
 | |
|                 # system. Verify the hash links to a system cert.
 | |
|                 #
 | |
|                 path = find_root_ca_path(issuer)
 | |
|                 if path is not None:
 | |
|                         print("Verified Root CA exists: %s" % path)
 | |
|                 else:
 | |
|                         print("Root CA in chain could not be found on system")
 | |
|                         return None
 | |
| 
 | |
|                 cleanup(files)
 | |
|                 return path
 | |
| 
 | |
|         print("Root CA not found in cert chain, looking on system")
 | |
| 
 | |
|         #
 | |
|         # The final cert in the chain was not a Root CA, check if we have a Root
 | |
|         # CA on the system matching the last certs issuer hash.
 | |
|         #
 | |
|         path = find_root_ca_path(issuer)
 | |
|         if path is None:
 | |
|                 print("Could not find issuer %s" % issuer)
 | |
|                 return None
 | |
| 
 | |
|         with open(os.devnull, 'w') as devnull:
 | |
|                 proc = subprocess.Popen(['openssl', 'x509', '-hash',
 | |
|                                         '-issuer_hash', '-noout', '-in', path],
 | |
|                                         stdout=subprocess.PIPE, stderr=devnull)
 | |
| 
 | |
|         hashes, err = proc.communicate()
 | |
|         hashes = hashes.decode('utf-8').strip().split('\n')
 | |
| 
 | |
|         own = hashes[0]
 | |
|         issuer = hashes[1]
 | |
| 
 | |
|         if own == issuer:
 | |
|                 path = find_root_ca_path(issuer)
 | |
|                 if path is not None:
 | |
|                         print("Verified Root CA exists: %s" % path)
 | |
|                 else:
 | |
|                         print("Root CA could not be found on system")
 | |
|                         return None
 | |
| 
 | |
|         cleanup(files)
 | |
| 
 | |
|         return path
 | |
| 
 | |
| iwd_dir='/var/lib/iwd'
 | |
| cert_path = None
 | |
| 
 | |
| description = '''
 | |
| Convert iOS mobileconfig file to IWD format. Currently only TTLS and PEAP are
 | |
| supported. Inner methods supported are PAP, CHAP, MSCHAP, MSCHAPv2.
 | |
| '''
 | |
| 
 | |
| parser = argparse.ArgumentParser(description=description)
 | |
| group = parser.add_mutually_exclusive_group(required=True)
 | |
| group.add_argument('-i', '--input', nargs='?', metavar='mobileconfig',
 | |
|                 help='iOS mobileconfig file')
 | |
| parser.add_argument('-o', '--iwd-out', nargs='?', metavar='dir',
 | |
|                 help='IWD configuration directory (default /var/lib/iwd)')
 | |
| parser.add_argument('-u', '--user', action='store_true',
 | |
|                 help='Store username in provisioning file')
 | |
| parser.add_argument('-p', '--passwd', action='store_true',
 | |
|                 help='Store password (plaintext) in provisioning file')
 | |
| parser.add_argument('-v', '--verbose', action='store_true',
 | |
|                 help='Enable verbose output')
 | |
| group.add_argument('-x', '--xml', nargs='?',
 | |
|                 help='Directly pass XML')
 | |
| 
 | |
| args = parser.parse_args()
 | |
| 
 | |
| if args.iwd_out:
 | |
|         iwd_dir = args.iwd_out
 | |
| 
 | |
| if args.input:
 | |
|         with open(os.devnull, 'w') as devnull:
 | |
|                 proc = subprocess.Popen(['openssl', 'cms', '-in', args.input, '-inform',
 | |
|                                         'der', '-verify', '-noverify'],
 | |
|                                         stdout=subprocess.PIPE, stderr=devnull)
 | |
| 
 | |
|         xml, err = proc.communicate()
 | |
| 
 | |
|         xml = xml.decode('utf-8')
 | |
| else:
 | |
|         with open(args.xml) as f:
 | |
|                 xml = f.read()
 | |
| 
 | |
| if args.verbose:
 | |
|         print(xml)
 | |
| 
 | |
| if args.input:
 | |
|         subprocess.call(['openssl', 'cms', '-in', args.input, '-inform', 'der',
 | |
|                         '-outform', 'pem', '-noout', '-cmsout', '-certsout',
 | |
|                         '/tmp/certchain.crt'])
 | |
| 
 | |
| root = ElementTree.fromstring(xml)
 | |
| 
 | |
| for i in range(len(root)):
 | |
|         for j in range(len(root[i])):
 | |
|                 if root[i][j].text != "PayloadContent":
 | |
|                         continue
 | |
|                 if (root[i][j + 1].tag != "array"):
 | |
|                         continue
 | |
| 
 | |
|                 payload = root[i][j + 1]
 | |
|                 nets = process_payload(payload)
 | |
| 
 | |
| if args.input:
 | |
|         root_ca_path = find_root_ca('/tmp/certchain.crt')
 | |
| else:
 | |
|         root_ca_path = None
 | |
| 
 | |
| for n in nets:
 | |
|         write_network(n, root_ca_path)
 | 
