Initial set of code

"Set" of code .. pun intended.

Signed-off-by: Georg Pfuetzenreuter <mail@georg-pfuetzenreuter.net>
This commit is contained in:
Georg Pfuetzenreuter 2024-09-26 00:22:16 +02:00
parent a2edce446f
commit 218c9122d1
Signed by: Georg
GPG Key ID: 1ED2F138E7E6FF57
9 changed files with 292 additions and 0 deletions

5
nftables-api.py Normal file
View File

@ -0,0 +1,5 @@
from waitress import serve
from nftables_api.app import app
if __name__ == '__main__':
serve(app, host='*', port=9090)

0
nftables_api/__init__.py Normal file
View File

8
nftables_api/app.py Normal file
View File

@ -0,0 +1,8 @@
from falcon import App
from .resources import nft_set
app = App()
rSet = nft_set.SetResource()
app.add_route('/set/{xfamily}/{xtable}/{xset}', rSet)

View File

@ -0,0 +1,111 @@
import falcon
import json
from nftables import Nftables
from nftables_api.utils.parse import parse_nft_response
from nftables_api.utils.output import output_post
nft = Nftables()
nft.set_json_output(True)
class SetResource:
def on_get(self, request, response, xfamily, xtable, xset):
raw = request.get_param_as_bool('raw', default=False)
rc, out, err = nft.cmd(f'list set {xfamily} {xtable} {xset}')
out_parsed, status, err_parsed = parse_nft_response(rc, out, err, raw)
if raw:
response.text = json.dumps({'rc': rc, **out_parsed, 'err': err_parsed, 'status': status})
return
out_parsed = out_parsed.get('nftables', [])
elements = []
if len(out_parsed) > 1 and isinstance(out_parsed[1], dict):
elements_low = out_parsed[1].get('set', {}).get('elem', [])
for element in elements_low:
if isinstance(element, dict):
prefix = element.get('prefix', {})
address = prefix.get('addr')
length = prefix.get('len')
elements.append(f'{address}/{length}')
elif isinstance(element, str):
elements.append(element)
else:
status = False
if status is True:
response.text = json.dumps(elements)
else:
response.text = json.dumps({'status': status, 'error': err_parsed})
def on_post(self, request, response, xfamily, xtable, xset):
raw = request.get_param_as_bool('raw', default=False)
data = request.get_media()
if not isinstance(data, dict) or not ( 'address' in data or 'addresses' in data ):
response.text = output_post(status=False, message='Invalid body.')
return
addresses = []
for sp in ['address', 'addresses']:
if sp in data:
if isinstance(data[sp], str):
addresses.append(data[sp])
elif isinstance(data[sp], list):
addresses.extend(data[sp])
elements = []
for address in addresses:
if '/' in address:
addrsplit = address.split('/')
elements.append({
'prefix': {
'addr': addrsplit[0],
'len': int(addrsplit[1]),
}
})
else:
elements.append(address)
nft_payload = {
'nftables': [
{
'add': {
'element': {
'elem': elements,
'family': xfamily,
'name': xset,
'table': xtable,
}
}
}
]
}
if not nft.json_validate(nft_payload):
response.status = falcon.HTTP_BAD_REQUEST
response.text = output_post(False, 'Payload did not validate.')
return
rc, out, err = nft.json_cmd(nft_payload)
out_parsed, status, err_parsed = parse_nft_response(rc, out, err, raw)
if status is True:
response.status = falcon.HTTP_CREATED
else:
response.status = falcon.HTTP_BAD_REQUEST
if raw:
response.text = json.dumps({'rc': rc, 'err': err_parsed, 'status': status})
else:
response.text = output_post(status=status, message=err_parsed)

View File

@ -0,0 +1,15 @@
import json
def output_post(status, message=None):
output = {
'status': status
}
if message:
output.update(
{
'message': message
}
)
return json.dumps(output)

View File

@ -0,0 +1,37 @@
import json
def parse_nft_error(err, raw):
if isinstance(err, str):
if '\n' in err:
err = err.split('\n')
elif err == '':
err = []
if raw:
return err
if isinstance(err, list) and len(err) > 0 and 'Error: ' in err[0]:
return err[0].replace('Error: ', '')
return err
def parse_nft_output(rc, out):
if rc == 0 and out != '':
status = True
out_parsed = json.loads(out)
elif rc == 0 and out == '':
status = True
out_parsed = None
else:
status = False
out_parsed = {'nftables': []}
return out_parsed, status
def parse_nft_response(rc, out, err, raw):
return *parse_nft_output(rc, out), parse_nft_error(err, raw)

12
scripts/test.sh Executable file
View File

@ -0,0 +1,12 @@
#!/bin/sh -ex
wd='/work'
podman run \
--cap-add CAP_NET_ADMIN \
--pull=always \
--rm \
-it \
-v .:"$wd" \
registry.opensuse.org/home/crameleon/containers/containers/crameleon/pytest-nftables:latest \
env PYTHONPATH="$wd" pytest --pdb --pdbcls=IPython.terminal.debugger:Pdb -rA -s -v -x "$wd"/tests

36
tests/conftest.py Normal file
View File

@ -0,0 +1,36 @@
from falcon import testing
from pytest import exit, fixture
from nftables import Nftables
from nftables_api.app import app
def run_nft(nft, cmd):
rc, out, err = nft.cmd(cmd)
if rc != 0:
print(out, err)
exit()
@fixture
def client():
return testing.TestClient(app)
@fixture
def nft():
nft = Nftables()
run_nft(nft, 'add table inet filter')
yield nft
nft.cmd('flush ruleset')
@fixture
def nft_ruleset_empty_sets(nft):
for i in [4, 6]:
run_nft(nft, f'add set inet filter testset{i} {{ type ipv{i}_addr ; flags interval ; }}')
return nft
@fixture
def nft_ruleset_populated_sets(nft_ruleset_empty_sets):
nft = nft_ruleset_empty_sets
run_nft(nft, 'add element inet filter testset4 { 192.168.0.0/24, 127.0.0.1 }')
run_nft(nft, 'add element inet filter testset6 { fd80::/64, fe80::1 }')
return nft

68
tests/test_api_set.py Normal file
View File

@ -0,0 +1,68 @@
from pytest import mark
from falcon import HTTP_CREATED, HTTP_OK
from json import dumps, loads
vs = [4, 6]
@mark.parametrize('v', vs)
def test_get_set(client, nft_ruleset_populated_sets, v):
want_out = {
4: ["192.168.0.0/24", "127.0.0.1"],
6: ["fd80::/64", "fe80::1"],
}
response = client.simulate_get(f'/set/inet/filter/testset{v}')
have_out = loads(response.content)
assert sorted(have_out) == sorted(want_out[v])
assert response.status == HTTP_OK
@mark.parametrize('v', vs)
@mark.parametrize('plvariant', ['address', 'network'])
@mark.parametrize('plformat', ['string', 'list'])
def test_append_to_set(client, nft_ruleset_populated_sets, v, plvariant, plformat):
nft = nft_ruleset_populated_sets
# all the matrixes could be moved to parameters
want_out = {
4: ["192.168.0.0/24", "127.0.0.1", "192.168.5.0/26"],
6: ["fd80::/64", "fe80::1", "fd10:f00::/128"],
}
if plformat == 'string':
if plvariant == 'address':
to_add = {
4: '192.168.5.1',
6: 'fd10:f00::',
}
elif plvariant == 'network':
to_add = {
4: '192.168.5.0/26',
6: 'fd10:f00::/48',
}
added = to_add[v]
elif plformat == 'list':
if plvariant == 'address':
to_add = {
4: ['192.168.5.1'],
6: ['fd10:f00::'],
}
elif plvariant == 'network':
to_add = {
4: ['192.168.5.0/26'],
6: ['fd10:f00::/48'],
}
added = to_add[v][0]
response = client.simulate_post(
f'/set/inet/filter/testset{v}',
body=dumps({
'addresses': to_add[v],
}),
headers={
'content-type': 'application/json',
},
)
have_out = loads(response.content)
assert have_out == {'status': True}
assert response.status == HTTP_CREATED
assert added in nft.cmd(f'list set inet filter testset{v}')[1]