pdns-genrev/pdns-genrev.py

165 lines
5.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python3
"""
PowerDNS PTR record generator, reads specified forward zones and patches the matching reverse zones
Copyright 2023, Georg Pfuetzenreuter <mail+opensuse@georg-pfuetzenreuter.net>
Licensed under the EUPL, Version 1.2 or - as soon they will be approved by the European Commission - subsequent versions of the EUPL (the "Licence").
You may not use this work except in compliance with the Licence.
An English copy of the Licence is shipped in a file called LICENSE along with this applications source code.
You may obtain copies of the Licence in any of the official languages at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12.
"""
from argparse import ArgumentParser
import ipaddress
import json
import logging
import os
import re
import requests
import sys
# Environment variables
api_url = os.environ.get('GENREV_URL')
user_zones = os.environ.get('GENREV_ZONES')
api_key = os.environ.get('GENREV_KEY')
# System variables
api_base = f'{api_url}/api/v1/servers/localhost'
do_zones_reverse = []
existing_ptr_records = {}
candidates = {}
headers = {'X-API-Key': api_key}
def _fail(msg):
log.error(f'{msg}, bailing out')
sys.exit(1)
def _get(path):
result = requests.get(f'{api_base}/{path}', headers=headers)
if result.status_code == 200:
return result.json()
log.error(result.text)
_fail('Query failed')
def _iterate_zone(zone, types, only_records=False):
result = {}
records = []
escapedzone = requests.utils.quote(zone, safe='')
this_zone = _get(f'zones/{escapedzone}')
for rrset in this_zone['rrsets']:
if rrset['type'] in types:
rrname = rrset['name']
if not rrname in result:
result[rrname] = []
for rr in rrset['records']:
result[rrname].append(rr['content'])
records.append(rr['content'])
if only_records:
return records
return result
def notify_zone(zone):
log.info(f'Sending NOTIFY for zone {zone} ...')
result = requests.put(f'{api_base}/zones/{zone}/notify', headers=headers)
if result.status_code == 200:
log.debug('ok')
return True
log.error(f'Notify failed ({result.status_code}): {result.text}')
return False
def main():
if None in [api_key, api_url, user_zones]:
_fail('Requires GENREV_KEY, GENREV_URL and GENREV_ZONES to be set')
do_zones = user_zones.split(',')
if not wet:
log.info('Running in dry-mode ...')
zones = _get('zones')
zone_names = [z['id'] for z in zones]
for zone in zones:
zn = zone['name']
if zn.endswith(('ip6.arpa.', 'in-addr.arpa.')):
do_zones_reverse.append(zn)
existing_ptr_records.update(_iterate_zone(zn, ['PTR']))
log.debug(f'Existing PTR records: {existing_ptr_records}')
for do_zone in do_zones:
log.info(f'Checking forward zone {do_zone} ...')
if not f'{do_zone}.' in zone_names:
_fail(f'Zone {do_zone} not found')
for record, contents in _iterate_zone(do_zone, ['A', 'AAAA']).items():
log.debug(f'Processing record {record} with contents {contents}')
for content in contents:
address = content.rstrip('.')
if isinstance(ipaddress.ip_address(address), ipaddress.IPv6Address):
cidr = '64'
elif isinstance(ipaddress.ip_address(address), ipaddress.IPv4Address):
cidr = '24'
network = ipaddress.ip_network(f'{address}/{cidr}', False)
reverse_zone = ipaddress.ip_address(str(network).replace(f'/{cidr}', '')).reverse_pointer.lstrip('0.') + '.'
entry = ipaddress.ip_address(address).reverse_pointer + '.'
if not entry in existing_ptr_records.keys():
log.debug(f'Entry {entry} does not exist yet')
if reverse_zone in zone_names:
if not reverse_zone in candidates:
candidates[reverse_zone] = {}
if not entry in candidates[reverse_zone]:
candidates[reverse_zone][entry] = []
candidates[reverse_zone][entry].append(record)
if not candidates:
log.info('Nothing to do!')
for zone, records in candidates.items():
log.info(f'Preparing patch for records {records} ...')
rrsets = {
'rrsets': [
{
'name': record,
'changetype': 'REPLACE',
'type': 'PTR',
'ttl': 3600,
'records': [
{
'content': content
} for content in contents
]
} for record, contents in records.items()
]
}
log.debug(f'Payload: {rrsets}')
#log.info(f'Patching {", ".join([record["name"] for record in rrsets["rrsets"]])} ... ')
if wet:
log.info(f'Patching zone {zone} ...')
result = requests.patch(f'{api_base}/zones/{zone}', headers=headers, json=rrsets)
if result.status_code == 204:
log.debug('ok')
if notify:
notify_zone(zone)
else:
log.error(f'failed ({result.status_code}): {result.text}')
else:
log.info(f'Would have patched {zone}')
logging.basicConfig(format='%(module)s %(levelname)s: %(message)s', datefmt='%H:%M:%S')
log = logging.getLogger('pdns-genrev')
if __name__ == '__main__':
argparser = ArgumentParser()
argparser.add_argument('--debug', help='Print verbose output', action='store_const', dest='loglevel', const=logging.DEBUG, default=logging.INFO)
argparser.add_argument('--wet', help='Run wet instead of dry', action='store_true', default=False)
argparser.add_argument('--notify', help='Send NOTIFY after changes', action='store_true', default=False)
args = argparser.parse_args()
log.setLevel(args.loglevel)
log.debug(args)
wet = args.wet
notify = args.notify
main()