system/scripts/python/powerdns_mailcow_dkim_dmarc_spf_patcher.py
Georg 28e4afe72d
Removed Patcher Early Exit
Signed-off-by: Georg <georg@lysergic.dev>
2021-09-13 10:21:33 +02:00

167 lines
4.7 KiB
Python
Executable File

#!/usr/bin/python3
"""
PowerDNS DKIM + DMARC + SPF patching script.
Pulls DKIM information from Mailcow.
Created and Last modified: 13/09/2021 by Georg Pfuetzenreuter <georg@lysergic.dev>
"""
import requests
import sys
import os
from dotenv import load_dotenv
if len(sys.argv) > 1:
domain = sys.argv[1]
else:
print("Specify the domain name")
sys.exit(1)
load_dotenv()
# POWERDNS SETTINGS
ENDPOINT_PDNS = os.environ.get('ENDPOINT_PDNS')
APIKEY_PDNS = os.environ.get('APIKEY_PDNS')
# MAILCOW SETTINGS
ENDPOINT_MAILCOW = os.environ.get('ENDPOINT_MAILCOW')
APIKEY_MAILCOW = os.environ.get('APIKEY_MAILCOW')
if None in (ENDPOINT_PDNS, APIKEY_PDNS, ENDPOINT_MAILCOW, APIKEY_MAILCOW):
print("Could not load environment variables. Please check your .env file.")
sys.exit(0)
print("Scanning " + domain)
# QUERY MAILCOW (can I put cow emoji in comment?)
server = ENDPOINT_MAILCOW
api_key = APIKEY_MAILCOW
api = '/api/v1'
get = api + '/get'
URL = server + get + '/dkim/' + domain
try:
response = requests.get(
URL,
headers = {'accept': 'application/json', 'X-API-Key': api_key},
)
data = response.json()
selector = data['dkim_selector']
txtshould = data['dkim_txt']
length = data['length']
pubkey = data['pubkey']
#print(f"Domain: {domain}\nSelector: {selector}\nTXT: {txtshould}\nPublic Key: {pubkey}")
#print(txtshould)
except KeyError:
print("No or faulty DKIM lookup from Mailcow. ABORTING.")
sys.exit(1)
except requests.exceptions.ConnectionError as err:
print("Connection failed.")
sys.exit(1)
except requests.exceptions.HTTPError as err:
print(err)
sys.exit(1)
# QUERY POWERDNS
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + './export'
try:
response = requests.get(
URL,
headers = {'accept': 'text/plain', 'X-API-Key': APIKEY_PDNS},
)
data = response.text
#print(data)
for record in data.split('\n'):
txtsel = selector + '._'
if selector in record:
#print(record)
txtis = record.split('"')[1]
try:
txtis
except NameError:
print("No DKIM TXT record found.")
for record in data.split('\n'):
if '._dmarc' in record:
print(record)
try:
txtis
except NameError:
print("No DMARC TXT record found.")
#else:
#print(txtis)
except requests.exceptions.ConnectionError as err:
print("Connection failed.")
sys.exit(1)
except requests.exceptions.HTTPError as err:
print(err)
sys.exit(1)
# PATCH
print("Patching SPF ...")
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + "."
payload = {
"rrsets": [{"name": domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\"v=spf1 mx a -all\"", "disabled": False, "name": domain + "."}]}]
}
response = requests.patch(
URL,
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
json = payload,
)
status = response.status_code
if status == 204:
print("SPF/DMARC: OK!")
elif status == 422:
print("SPF/DMARC: Failed:")
print(response.json())
sys.exit(1)
else:
print("Unhandled error.")
print(status)
print(response.json())
sys.exit(1)
print("Patching DMARC ...")
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + "."
payload = {
"rrsets": [{"name": "_dmarc." + domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\"v=DMARC1; p=reject; rua=mailto:system@lysergic.dev\"", "disabled": False, "name": "._dmarc." + domain + "."}]}]
}
response = requests.patch(
URL,
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
json = payload,
)
status = response.status_code
if status == 204:
print("DMARC: OK!")
elif status == 422:
print("DMARC: Failed:")
print(response.json())
sys.exit(1)
else:
print("Unhandled error.")
print(status)
print(response.json())
sys.exit(1)
print("Patching DKIM ...")
payload = {
"rrsets": [{"name": selector + "._domainkey." + domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\""+ txtshould + "\"", "disabled": False, "name": selector + "._domainkey." + domain + "."}]}]
}
response = requests.patch(
URL,
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
json = payload,
)
status = response.status_code
if status == 204:
print("DKIM: OK!")
elif status == 422:
print("DKIM: Failed:")
print(response.json())
sys.exit(1)
else:
print("Unhandled error.")
print(status)
print(response.json())
sys.exit(1)
print("Done.")
sys.exit(0)