From a6bbcf1dca8aee94535ca24adb0312fed49c294c Mon Sep 17 00:00:00 2001 From: Georg Date: Mon, 13 Sep 2021 05:45:50 +0200 Subject: [PATCH] DKIM Patcher Signed-off-by: Georg --- scripts/python/.env | 5 + .../python/powerdns_mailcow_dkim_patcher.py | 144 ++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 scripts/python/.env create mode 100755 scripts/python/powerdns_mailcow_dkim_patcher.py diff --git a/scripts/python/.env b/scripts/python/.env new file mode 100644 index 0000000..10eb8dc --- /dev/null +++ b/scripts/python/.env @@ -0,0 +1,5 @@ +ENDPOINT_PDNS=http://example.com +APIKEY_PDNS= +ENDPOINT_MAILCOW=https://example.com +APIKEY_MAILCOW= + diff --git a/scripts/python/powerdns_mailcow_dkim_patcher.py b/scripts/python/powerdns_mailcow_dkim_patcher.py new file mode 100755 index 0000000..95137e2 --- /dev/null +++ b/scripts/python/powerdns_mailcow_dkim_patcher.py @@ -0,0 +1,144 @@ +#!/usr/bin/python3 +""" +PowerDNS <-> Mailcow DKIM Public Key patching script. + +Created and Last modified: 13/09/2021 by Georg Pfuetzenreuter + +Example use cases: + - Administration failure caused individual zones to contain faulty DKIM records + - Automation failure caused individual zones to contain faulty DKIM records + - Security incident required replacing DKIM private keys + - Security audit required manipulating the DKIM records of selected zones + +The public key provided by Mailcow is considered as the source of truth. +""" +import requests +import sys +import os +from dotenv import load_dotenv + +if len(sys.argv) > 1: + domain = sys.argv[1] +else: + print("Specify the domain name") + sys.exit(1) + +load_dotenv() +# POWERDNS SETTINGS +ENDPOINT_PDNS = os.environ.get('ENDPOINT_PDNS') +APIKEY_PDNS = os.environ.get('APIKEY_PDNS') + +# MAILCOW SETTINGS +ENDPOINT_MAILCOW = os.environ.get('ENDPOINT_MAILCOW') +APIKEY_MAILCOW = os.environ.get('APIKEY_MAILCOW') + +if None in (ENDPOINT_PDNS, APIKEY_PDNS, ENDPOINT_MAILCOW, APIKEY_MAILCOW): + print("Could not load environment variables. Please check your .env file.") + sys.exit(0) + +print("Scanning " + domain) + +# QUERY POWERDNS +URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + './export' +try: + response = requests.get( + URL, + headers = {'accept': 'text/plain', 'X-API-Key': APIKEY_PDNS}, + ) + data = response.text + #print(data) + for record in data.split('\n'): + if '._domainkey' in record: + #print(record) + txtis = record.split('"')[1] + try: + txtis + except NameError: + print("No DKIM TXT record found.") + #else: + #print(txtis) +except requests.exceptions.ConnectionError as err: + print("Connection failed.") + sys.exit(1) +except requests.exceptions.HTTPError as err: + print(err) + sys.exit(1) + +# QUERY MAILCOW (can I put cow emoji in comment?) +server = ENDPOINT_MAILCOW +api_key = APIKEY_MAILCOW +api = '/api/v1' +get = api + '/get' +URL = server + get + '/dkim/' + domain +try: + response = requests.get( + URL, + headers = {'accept': 'application/json', 'X-API-Key': api_key}, + ) + data = response.json() + selector = data['dkim_selector'] + txtshould = data['dkim_txt'] + length = data['length'] + pubkey = data['pubkey'] + #print(f"Domain: {domain}\nSelector: {selector}\nTXT: {txtshould}\nPublic Key: {pubkey}") + #print(txtshould) +except KeyError: + print("No or faulty DKIM lookup from Mailcow. ABORTING.") + sys.exit(1) +except requests.exceptions.ConnectionError as err: + print("Connection failed.") + sys.exit(1) +except requests.exceptions.HTTPError as err: + print(err) + sys.exit(1) +# COMPARE +if txtis == txtshould: + print(domain + ": All good!") +if txtis != txtshould: + print("Mismatch!") + print("Patching SPF and DMARC ...") + URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + "." + payload = { + "rrsets": [{"name": domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\"v=spf1 mx a -all\"", "disabled": False, "name": domain + "."}, {"content": "\"v=DMARC1; p=reject; rua=mailto:system@lysergic.dev\"", "disabled": False, "name": domain + "."}]}] + } + response = requests.patch( + URL, + headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'}, + json = payload, + ) + status = response.status_code + if status == 204: + print("SPF/DMARC: OK!") + elif status == 422: + print("SPF/DMARC: Failed:") + print(response.json()) + sys.exit(1) + else: + print("Unhandled error.") + print(status) + print(response.json()) + sys.exit(1) + print("Patching DKIM ...") + payload = { + "rrsets": [{"name": selector + "._domainkey." + domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\""+ txtshould + "\"", "disabled": False, "name": selector + "._domainkey." + domain + "."}]}] + } + response = requests.patch( + URL, + headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'}, + json = payload, + ) + status = response.status_code + if status == 204: + print("DKIM: OK!") + elif status == 422: + print("DKIM: Failed:") + print(response.json()) + sys.exit(1) + else: + print("Unhandled error.") + print(status) + print(response.json()) + sys.exit(1) + print("Done.") + sys.exit(0) +