system/scripts/python/mxme.py
Georg 89524ab066
mxme.py: Checking/Enabling secondary MX.
Signed-off-by: Georg <georg@lysergic.dev>
2021-09-15 14:18:48 +02:00

480 lines
14 KiB
Python
Executable File

#!/usr/bin/python3
"""
The flagship email-domain enabler and repair tool for Mailcow/PowerDNS infrastructures.
The following checks will be performed:
DNS:
- DMARC record
- DKIM record
- MX1 + MX2 records
Mail:
- MX1 domain entry
- MX2 domain entry
- MX1 DKIM keypair
If any of the checks fail, the failed operation will be written to the respective system, in addition to overwriting possibly existing DNS records with fresh DMARC/DKIM/SPF/MX records.
Created and Last modified: 15/09/2021 by Georg Pfuetzenreuter <georg@lysergic.dev>
"""
import requests
import sys
import os
from dotenv import load_dotenv
import time
if len(sys.argv) > 1:
domain = sys.argv[1]
else:
print("Specify the domain name")
sys.exit(1)
load_dotenv()
# POWERDNS SETTINGS
ENDPOINT_PDNS = os.environ.get('ENDPOINT_PDNS')
APIKEY_PDNS = os.environ.get('APIKEY_PDNS')
# MAILCOW SETTINGS
ENDPOINT_MAILCOW = os.environ.get('ENDPOINT_MAILCOW')
APIKEY_MAILCOW = os.environ.get('APIKEY_MAILCOW')
ENDPOINT_MAILCOW_2 = os.environ.get('ENDPOINT_MAILCOW_2')
APIKEY_MAILCOW_2 = os.environ.get('APIKEY_MAILCOW_2')
if None in (ENDPOINT_PDNS, APIKEY_PDNS, ENDPOINT_MAILCOW, APIKEY_MAILCOW, ENDPOINT_MAILCOW_2, APIKEY_MAILCOW_2):
print("Could not load environment variables. Please check your .env file.")
sys.exit(0)
print("Scanning " + domain)
# QUERY POWERDNS
print("DNS: Querying zone ...")
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + './export'
try:
response = requests.get(
URL,
headers = {'accept': 'text/plain', 'X-API-Key': APIKEY_PDNS},
)
response.raise_for_status()
data = response.text
#print(data)
status = response.status_code
if status == 200:
dnsok = True
print("DNS: Zone found.")
else:
dnsok = False
print("DNS: No zone found, or faulty lookup. Aborting.")
sys.exit(1)
except requests.exceptions.HTTPError as err:
#print("No zone for this domain exists.")
raise SystemExit(err)
sys.exit(1)
except requests.exceptions.ConnectionError as err:
print("Connection failed.")
sys.exit(1)
if dnsok == False:
sys.exit(1)
# MAILCOW (can I put cow emoji in comment?)
print("Mail [MX1]: Querying domain ...")
server = ENDPOINT_MAILCOW
api_key = APIKEY_MAILCOW
api = '/api/v1'
get = api + '/get'
add = api + '/add'
URL = server + get + '/domain/' + domain
try:
response = requests.get(
URL,
headers = {'accept': 'application/json', 'X-API-Key': api_key},
)
data = response.json()
status = response.status_code
#print(data)
if 'max_new_mailbox_quota' in data:
print("Mail: Domain found.")
mailok = True
initprimary = False
domain_name = data['domain_name']
relayhost = data['relayhost']
else:
mailok = False
initprimary = True
print("Mail: Domain NOT found.")
dkimok = False
except requests.exceptions.ConnectionError as err:
print("Connection failed.")
sys.exit(1)
except requests.exceptions.HTTPError as err:
print(err)
sys.exit(1)
print("Mail [MX2]: Querying domain ...")
server2 = ENDPOINT_MAILCOW_2
api_key2 = APIKEY_MAILCOW_2
api2 = '/api/v1'
URL = server2 + get + '/domain/' + domain
try:
response = requests.get(
URL,
headers = {'accept': 'application/json', 'X-API-Key': api_key2},
)
data = response.json()
status = response.status_code
#print(data)
if 'max_new_mailbox_quota' in data:
print("Mail: Domain found.")
mail2ok = True
initsecondary = False
domain_name = data['domain_name']
relayhost = data['relayhost']
else:
mail2ok = False
initsecondary = True
print("Mail: Domain NOT found.")
dkim2ok = False
except requests.exceptions.ConnectionError as err:
print("Connection failed.")
sys.exit(1)
except requests.exceptions.HTTPError as err:
print(err)
sys.exit(1)
#if mailok == True and initprimary == False:
print("Mail: Querying DKIM ...")
URL = server + get + '/dkim/' + domain
try:
response = requests.get(
URL,
headers = {'accept': 'application/json', 'X-API-Key': api_key},
)
data = response.json()
#print(data)
if 'dkim_selector' in data:
selector = data['dkim_selector']
txtshould = data['dkim_txt']
length = data['length']
pubkey = data['pubkey']
#print(f"Domain: {domain}\nSelector: {selector}\nTXT: {txtshould}\nPublic Key: {pubkey}")
#print(txtshould)
print("Mail: DKIM keypair found.")
dkimok = True
else:
dkimok = False
print("Mail: No DKIM keypair found.")
except KeyError:
print("Mail: No or faulty DKIM lookup.")
except requests.exceptions.ConnectionError as err:
print("Connection failed.")
sys.exit(1)
except requests.exceptions.HTTPError as err:
print(err)
sys.exit(1)
print("DNS: Querying records ...")
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + './export'
try:
response = requests.get(
URL,
headers = {'accept': 'text/plain', 'X-API-Key': APIKEY_PDNS},
)
data = response.text
#print(data)
for record in data.split('\n'):
txtsel = selector + '._'
if txtsel in record:
#print(record)
txtis = record.split('"')[1]
try:
txtis
print("DNS: Found DKIM TXT record.")
dnsdkimok = True
except NameError:
print("DNS: No DKIM TXT record found.")
dnsdkimok = False
for record in data.split('\n'):
txtsel = selector + '._'
if '_dmarc' in record:
#print(record)
txtis = record.split('"')[1]
try:
txtis
print("DNS: Found DMARC TXT record.")
dnsdmarcok = True
except NameError:
print("DNS: No DMARC TXT record found.")
dnsdmarcok = False
for record in data.split('\n'):
if '10 3gy.de.' in record:
mxrec1 = record
try:
mxrec1
print("DNS: Found primary MX record.")
dnsmx1ok = True
except NameError:
print("DNS: Did NOT find primary MX record.")
dnsmx1ok = False
for record in data.split('\n'):
if '20 3gy.pl.' in record:
mxrec2 = record
try:
mxrec2
print("DNS: Found secondary MX record.")
dnsmx2ok = True
except NameError:
print("DNS: Did NOT find secondary MX record.")
dnsmx2ok = False
except NameError:
print("DNS: Missing or faulty records.")
dnsdmarcok = False
dnsdkimok = False
dnsmx1ok = False
dnsmx2ok = False
except requests.exceptions.ConnectionError as err:
print("Connection failed.")
sys.exit(1)
except requests.exceptions.HTTPError as err:
print(err)
sys.exit(1)
if dnsok == True and mailok == True and dkimok == True and dnsdkimok == True and dnsdmarcok == True and dnsmx1ok == True and dnsmx2ok == True:
print("All good. No changes seem to be needed. Aborting.")
sys.exit(0)
else:
print("Found inconsistencies:")
print(f"DNS OK: {dnsok} - Mail OK: {mailok} - Mail DKIM OK: {dkimok} - DNS DKIM OK: {dnsdkimok} - DNS DMARC OK: {dnsdmarcok} - DNS MX1 OK: {dnsmx1ok} - DNS MX2 OK: {dnsmx2ok}")
try:
print("Will attempt a full repair if not cancelled within 5 seconds ...")
time.sleep(5)
except KeyboardInterrupt:
print(" Cancelled!")
sys.exit(1)
if initprimary == True:
print("Mail [MX1]: Initializing domain ...")
URL = server + add + '/domain'
payload = {
"active": "1",
"aliases": "20",
"backupmx": "0",
"defquota": "1024",
"description": domain,
"domain": domain,
"mailboxes": "10",
"maxquota": "2048",
"quota": "5120",
"relay_all_recipients": "0",
"rl_frame": "s",
"rl_value": "10",
"restart_sogo": "10"
}
response = requests.post(
URL,
headers = {'accept': 'application/json', 'X-API-Key': api_key, 'Content-Type': 'application/json'},
json = payload,
)
data = response.json()
try:
status = data[0]['type']
status
except:
print("Mail [MX1] Error:")
print(data)
sys.exit(1)
if status == 'success':
print("Mail [MX1]: Created domain.")
if status == 'danger':
print("Mail [MX1]: Failed to create domain.")
print(data)
#print(f"CREATION: {status}")
if initsecondary == True:
print("Mail [MX2]: Initializing domain ...")
URL = server2 + add + '/domain'
payload = {
"active": "1",
"aliases": "20",
"backupmx": "1",
"defquota": "1024",
"description": domain + ' - Failover',
"domain": domain,
"mailboxes": "10",
"maxquota": "2048",
"quota": "5120",
"relay_all_recipients": "1",
"rl_frame": "s",
"rl_value": "10",
"restart_sogo": "0"
}
response = requests.post(
URL,
headers = {'accept': 'application/json', 'X-API-Key': api_key2, 'Content-Type': 'application/json'},
json = payload,
)
data = response.json()
try:
status = data[0]['type']
status
except:
print("Mail [MX2] Error:")
print(data)
sys.exit(1)
if status == 'success':
print("Mail [MX2]: Created domain.")
if status == 'danger':
print("Mail [MX2]: Failed to create domain.")
print(data)
#print(f"CREATION: {status}")
#if initprimary == True or dkimok == False:
if dkimok == False:
print("Mail: Initializing DKIM ...")
URL = server + add + '/dkim'
payload = {
"dkim_selector": "primary",
"domains": domain,
"key_size": "2048"
}
response = requests.post(
URL,
headers = {'accept': 'application/json', 'X-API-Key': api_key, 'Content-Type': 'application/json'},
json = payload,
)
data = response.json()
status = data[0]['type']
try:
status
except:
print("Mail Error:")
print(data)
sys.exit(1)
if status == 'success':
print("Mail: Created DKIM keypair.")
if status == 'danger':
print("Mail: Failed to create DKIM keypair.")
print(data)
sys.exit(1)
#print(f"CREATION: {status}")
URL = server + get + '/dkim/' + domain
print("Mail: Querying DKIM ...")
try:
response = requests.get(
URL,
headers = {'accept': 'application/json', 'X-API-Key': api_key},
)
data = response.json()
#print(data)
if 'dkim_selector' in data:
selector = data['dkim_selector']
txtshould = data['dkim_txt']
length = data['length']
pubkey = data['pubkey']
#print(f"Domain: {domain}\nSelector: {selector}\nTXT: {txtshould}\nPublic Key: {pubkey}")
#print(txtshould)
print("Mail: DKIM keypair found.")
dkimok = True
else:
dkimok = False
print("Mail: No DKIM keypair found. Unable to continue. ABORTING.")
sys.exit(1)
except KeyError:
print("Mail: No or faulty DKIM lookup. Unable to continue. ABORTING.")
sys.exit(1)
except requests.exceptions.ConnectionError as err:
print("Connection failed.")
sys.exit(1)
except requests.exceptions.HTTPError as err:
print(err)
sys.exit(1)
# PATCH
print("DNS: Patching SPF ...")
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + "."
payload = {
"rrsets": [{"name": domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\"v=spf1 mx a -all\"", "disabled": False, "name": domain + "."}]}]
}
response = requests.patch(
URL,
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
json = payload,
)
status = response.status_code
if status == 204:
print("SPF: OK!")
elif status == 422:
print("SPF: Failed:")
print(response.json())
sys.exit(1)
else:
print("Unhandled error.")
print(status)
print(response.json())
sys.exit(1)
print("DNS: Patching DMARC ...")
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + "."
payload = {
"rrsets": [{"name": "_dmarc." + domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\"v=DMARC1; p=reject; rua=mailto:system@lysergic.dev\"", "disabled": False, "name": "._dmarc." + domain + "."}]}]
}
response = requests.patch(
URL,
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
json = payload,
)
status = response.status_code
if status == 204:
print("DMARC: OK!")
elif status == 422:
print("DMARC: Failed:")
print(response.json())
sys.exit(1)
else:
print("Unhandled error.")
print(status)
print(response.json())
sys.exit(1)
print("DNS: Patching DKIM ...")
payload = {
"rrsets": [{"name": selector + "._domainkey." + domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\""+ txtshould + "\"", "disabled": False, "name": selector + "._domainkey." + domain + "."}]}]
}
response = requests.patch(
URL,
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
json = payload,
)
status = response.status_code
if status == 204:
print("DKIM: OK!")
elif status == 422:
print("DKIM: Failed:")
print(response.json())
sys.exit(1)
else:
print("Unhandled error.")
print(status)
print(response.json())
sys.exit(1)
print("DNS: Patching MX ...")
payload = {
"rrsets": [{"name": domain + ".", "type": "MX", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "10 3gy.de.", "disabled": False, "name": domain + "."}, {"content": "20 3gy.pl.", "disabled": False, "name": domain + "."}]}]
}
response = requests.patch(
URL,
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
json = payload,
)
status = response.status_code
if status == 204:
print("MX: OK!")
elif status == 422:
print("MX: Failed:")
print(response.json())
sys.exit(1)
else:
print("Unhandled error.")
print(status)
print(response.json())
sys.exit(1)
print("Done.")
sys.exit(0)