163 lines
5.6 KiB
Python
163 lines
5.6 KiB
Python
|
#!/usr/bin/python3
|
||
|
|
||
|
"""
|
||
|
PowerDNS PTR record generator, reads specified forward zones and patches the matching reverse zones
|
||
|
Copyright 2023, Georg Pfuetzenreuter <mail+opensuse@georg-pfuetzenreuter.net>
|
||
|
|
||
|
Licensed under the EUPL, Version 1.2 or - as soon they will be approved by the European Commission - subsequent versions of the EUPL (the "Licence").
|
||
|
You may not use this work except in compliance with the Licence.
|
||
|
An English copy of the Licence is shipped in a file called LICENSE along with this applications source code.
|
||
|
You may obtain copies of the Licence in any of the official languages at https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12.
|
||
|
"""
|
||
|
|
||
|
from argparse import ArgumentParser
|
||
|
import ipaddress
|
||
|
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import re
|
||
|
import requests
|
||
|
import sys
|
||
|
|
||
|
# Environment variables
|
||
|
api_url = os.environ.get('GENREV_URL')
|
||
|
user_zones = os.environ.get('GENREV_ZONES')
|
||
|
api_key = os.environ.get('GENREV_KEY')
|
||
|
|
||
|
# System variables
|
||
|
api_base = f'{api_url}/api/v1/servers/localhost'
|
||
|
do_zones_reverse = []
|
||
|
existing_ptr_records = {}
|
||
|
candidates = {}
|
||
|
headers = {'X-API-Key': api_key}
|
||
|
|
||
|
def _fail(msg):
|
||
|
log.error(f'{msg}, bailing out')
|
||
|
sys.exit(1)
|
||
|
|
||
|
def _get(path):
|
||
|
result = requests.get(f'{api_base}/{path}', headers=headers)
|
||
|
if result.status_code == 200:
|
||
|
return result.json()
|
||
|
log.error(result.text)
|
||
|
_fail('Query failed')
|
||
|
|
||
|
def _iterate_zone(zone, types, only_records=False):
|
||
|
result = {}
|
||
|
records = []
|
||
|
this_zone = _get(f'zones/{zone}')
|
||
|
|
||
|
for rrset in this_zone['rrsets']:
|
||
|
if rrset['type'] in types:
|
||
|
rrname = rrset['name']
|
||
|
if not rrname in result:
|
||
|
result[rrname] = []
|
||
|
for rr in rrset['records']:
|
||
|
result[rrname].append(rr['content'])
|
||
|
records.append(rr['content'])
|
||
|
|
||
|
if only_records:
|
||
|
return records
|
||
|
return result
|
||
|
|
||
|
def notify_zone(zone):
|
||
|
log.info(f'Sending NOTIFY for zone {zone} ...')
|
||
|
result = requests.put(f'{api_base}/zones/{zone}/notify', headers=headers)
|
||
|
if result.status_code == 200:
|
||
|
log.debug('ok')
|
||
|
return True
|
||
|
log.error(f'Notify failed ({result.status_code}): {result.text}')
|
||
|
return False
|
||
|
|
||
|
def main():
|
||
|
if None in [api_key, api_url, user_zones]:
|
||
|
_fail('Requires GENREV_KEY, GENREV_URL and GENREV_ZONES to be set')
|
||
|
do_zones = user_zones.split(',')
|
||
|
if not wet:
|
||
|
log.info('Running in dry-mode ...')
|
||
|
|
||
|
zones = _get('zones')
|
||
|
zone_names = [z['id'] for z in zones]
|
||
|
|
||
|
for zone in zones:
|
||
|
zn = zone['name']
|
||
|
if zn.endswith(('ip6.arpa.', 'in-addr.arpa.')):
|
||
|
do_zones_reverse.append(zn)
|
||
|
existing_ptr_records.update(_iterate_zone(zn, ['PTR']))
|
||
|
|
||
|
log.debug(f'Existing PTR records: {existing_ptr_records}')
|
||
|
|
||
|
for do_zone in do_zones:
|
||
|
log.info(f'Checking forward zone {do_zone} ...')
|
||
|
if not f'{do_zone}.' in zone_names:
|
||
|
_fail(f'Zone {do_zone} not found')
|
||
|
|
||
|
for record, contents in _iterate_zone(do_zone, ['A', 'AAAA']).items():
|
||
|
log.debug(f'Processing record {record} with contents {contents}')
|
||
|
for content in contents:
|
||
|
address = content.rstrip('.')
|
||
|
if isinstance(ipaddress.ip_address(address), ipaddress.IPv6Address):
|
||
|
cidr = '64'
|
||
|
elif isinstance(ipaddress.ip_address(address), ipaddress.IPv4Address):
|
||
|
cidr = '24'
|
||
|
network = ipaddress.ip_network(f'{address}/{cidr}', False)
|
||
|
reverse_zone = ipaddress.ip_address(str(network).replace(f'/{cidr}', '')).reverse_pointer.lstrip('0.') + '.'
|
||
|
entry = ipaddress.ip_address(address).reverse_pointer + '.'
|
||
|
if not entry in existing_ptr_records.keys():
|
||
|
log.debug(f'Entry {entry} does not exist yet')
|
||
|
if reverse_zone in zone_names:
|
||
|
if not reverse_zone in candidates:
|
||
|
candidates[reverse_zone] = {}
|
||
|
if not entry in candidates[reverse_zone]:
|
||
|
candidates[reverse_zone][entry] = []
|
||
|
candidates[reverse_zone][entry].append(record)
|
||
|
|
||
|
if not candidates:
|
||
|
log.info('Nothing to do!')
|
||
|
|
||
|
for zone, records in candidates.items():
|
||
|
log.info(f'Preparing patch for records {records} ...')
|
||
|
rrsets = {
|
||
|
'rrsets': [
|
||
|
{
|
||
|
'name': record,
|
||
|
'changetype': 'REPLACE',
|
||
|
'type': 'PTR',
|
||
|
'ttl': 3600,
|
||
|
'records': [
|
||
|
{
|
||
|
'content': content
|
||
|
} for content in contents
|
||
|
]
|
||
|
} for record, contents in records.items()
|
||
|
]
|
||
|
}
|
||
|
log.debug(f'Payload: {rrsets}')
|
||
|
#log.info(f'Patching {", ".join([record["name"] for record in rrsets["rrsets"]])} ... ')
|
||
|
if wet:
|
||
|
log.info(f'Patching zone {zone} ...')
|
||
|
result = requests.patch(f'{api_base}/zones/{zone}', headers=headers, json=rrsets)
|
||
|
if result.status_code == 204:
|
||
|
log.debug('ok')
|
||
|
if notify:
|
||
|
notify_zone(zone)
|
||
|
else:
|
||
|
log.error(f'failed ({result.status_code}): {result.text}')
|
||
|
else:
|
||
|
log.info(f'Would have patched {zone}')
|
||
|
|
||
|
logging.basicConfig(format='%(module)s %(levelname)s: %(message)s', datefmt='%H:%M:%S')
|
||
|
log = logging.getLogger('pdns-genrev')
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
argparser = ArgumentParser()
|
||
|
argparser.add_argument('--debug', help='Print verbose output', action='store_const', dest='loglevel', const=logging.DEBUG, default=logging.INFO)
|
||
|
argparser.add_argument('--wet', help='Run wet instead of dry', action='store_true', default=False)
|
||
|
argparser.add_argument('--notify', help='Send NOTIFY after changes', action='store_true', default=False)
|
||
|
args = argparser.parse_args()
|
||
|
log.setLevel(args.loglevel)
|
||
|
log.debug(args)
|
||
|
wet = args.wet
|
||
|
notify = args.notify
|
||
|
main()
|