DKIM Patcher
Signed-off-by: Georg <georg@lysergic.dev>
This commit is contained in:
parent
ce0fdc5705
commit
a6bbcf1dca
5
scripts/python/.env
Normal file
5
scripts/python/.env
Normal file
@ -0,0 +1,5 @@
|
||||
ENDPOINT_PDNS=http://example.com
|
||||
APIKEY_PDNS=
|
||||
ENDPOINT_MAILCOW=https://example.com
|
||||
APIKEY_MAILCOW=
|
||||
|
144
scripts/python/powerdns_mailcow_dkim_patcher.py
Executable file
144
scripts/python/powerdns_mailcow_dkim_patcher.py
Executable file
@ -0,0 +1,144 @@
|
||||
#!/usr/bin/python3
|
||||
"""
|
||||
PowerDNS <-> Mailcow DKIM Public Key patching script.
|
||||
|
||||
Created and Last modified: 13/09/2021 by Georg Pfuetzenreuter <georg@lysergic.dev>
|
||||
|
||||
Example use cases:
|
||||
- Administration failure caused individual zones to contain faulty DKIM records
|
||||
- Automation failure caused individual zones to contain faulty DKIM records
|
||||
- Security incident required replacing DKIM private keys
|
||||
- Security audit required manipulating the DKIM records of selected zones
|
||||
|
||||
The public key provided by Mailcow is considered as the source of truth.
|
||||
"""
|
||||
import requests
|
||||
import sys
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
domain = sys.argv[1]
|
||||
else:
|
||||
print("Specify the domain name")
|
||||
sys.exit(1)
|
||||
|
||||
load_dotenv()
|
||||
# POWERDNS SETTINGS
|
||||
ENDPOINT_PDNS = os.environ.get('ENDPOINT_PDNS')
|
||||
APIKEY_PDNS = os.environ.get('APIKEY_PDNS')
|
||||
|
||||
# MAILCOW SETTINGS
|
||||
ENDPOINT_MAILCOW = os.environ.get('ENDPOINT_MAILCOW')
|
||||
APIKEY_MAILCOW = os.environ.get('APIKEY_MAILCOW')
|
||||
|
||||
if None in (ENDPOINT_PDNS, APIKEY_PDNS, ENDPOINT_MAILCOW, APIKEY_MAILCOW):
|
||||
print("Could not load environment variables. Please check your .env file.")
|
||||
sys.exit(0)
|
||||
|
||||
print("Scanning " + domain)
|
||||
|
||||
# QUERY POWERDNS
|
||||
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + './export'
|
||||
try:
|
||||
response = requests.get(
|
||||
URL,
|
||||
headers = {'accept': 'text/plain', 'X-API-Key': APIKEY_PDNS},
|
||||
)
|
||||
data = response.text
|
||||
#print(data)
|
||||
for record in data.split('\n'):
|
||||
if '._domainkey' in record:
|
||||
#print(record)
|
||||
txtis = record.split('"')[1]
|
||||
try:
|
||||
txtis
|
||||
except NameError:
|
||||
print("No DKIM TXT record found.")
|
||||
#else:
|
||||
#print(txtis)
|
||||
except requests.exceptions.ConnectionError as err:
|
||||
print("Connection failed.")
|
||||
sys.exit(1)
|
||||
except requests.exceptions.HTTPError as err:
|
||||
print(err)
|
||||
sys.exit(1)
|
||||
|
||||
# QUERY MAILCOW (can I put cow emoji in comment?)
|
||||
server = ENDPOINT_MAILCOW
|
||||
api_key = APIKEY_MAILCOW
|
||||
api = '/api/v1'
|
||||
get = api + '/get'
|
||||
URL = server + get + '/dkim/' + domain
|
||||
try:
|
||||
response = requests.get(
|
||||
URL,
|
||||
headers = {'accept': 'application/json', 'X-API-Key': api_key},
|
||||
)
|
||||
data = response.json()
|
||||
selector = data['dkim_selector']
|
||||
txtshould = data['dkim_txt']
|
||||
length = data['length']
|
||||
pubkey = data['pubkey']
|
||||
#print(f"Domain: {domain}\nSelector: {selector}\nTXT: {txtshould}\nPublic Key: {pubkey}")
|
||||
#print(txtshould)
|
||||
except KeyError:
|
||||
print("No or faulty DKIM lookup from Mailcow. ABORTING.")
|
||||
sys.exit(1)
|
||||
except requests.exceptions.ConnectionError as err:
|
||||
print("Connection failed.")
|
||||
sys.exit(1)
|
||||
except requests.exceptions.HTTPError as err:
|
||||
print(err)
|
||||
sys.exit(1)
|
||||
# COMPARE
|
||||
if txtis == txtshould:
|
||||
print(domain + ": All good!")
|
||||
if txtis != txtshould:
|
||||
print("Mismatch!")
|
||||
print("Patching SPF and DMARC ...")
|
||||
URL = ENDPOINT_PDNS + '/api/v1/servers/localhost/zones/' + domain + "."
|
||||
payload = {
|
||||
"rrsets": [{"name": domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\"v=spf1 mx a -all\"", "disabled": False, "name": domain + "."}, {"content": "\"v=DMARC1; p=reject; rua=mailto:system@lysergic.dev\"", "disabled": False, "name": domain + "."}]}]
|
||||
}
|
||||
response = requests.patch(
|
||||
URL,
|
||||
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
|
||||
json = payload,
|
||||
)
|
||||
status = response.status_code
|
||||
if status == 204:
|
||||
print("SPF/DMARC: OK!")
|
||||
elif status == 422:
|
||||
print("SPF/DMARC: Failed:")
|
||||
print(response.json())
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("Unhandled error.")
|
||||
print(status)
|
||||
print(response.json())
|
||||
sys.exit(1)
|
||||
print("Patching DKIM ...")
|
||||
payload = {
|
||||
"rrsets": [{"name": selector + "._domainkey." + domain + ".", "type": "TXT", "ttl": "3600", "changetype": "REPLACE", "records": [{"content": "\""+ txtshould + "\"", "disabled": False, "name": selector + "._domainkey." + domain + "."}]}]
|
||||
}
|
||||
response = requests.patch(
|
||||
URL,
|
||||
headers = {'accept': 'application/json', 'X-API-Key': APIKEY_PDNS, 'Content-Type': 'application/json'},
|
||||
json = payload,
|
||||
)
|
||||
status = response.status_code
|
||||
if status == 204:
|
||||
print("DKIM: OK!")
|
||||
elif status == 422:
|
||||
print("DKIM: Failed:")
|
||||
print(response.json())
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("Unhandled error.")
|
||||
print(status)
|
||||
print(response.json())
|
||||
sys.exit(1)
|
||||
print("Done.")
|
||||
sys.exit(0)
|
||||
|
Loading…
Reference in New Issue
Block a user