Back to the main page

Subnet management on DHCP server

Intro

This tool add and delete DHCP subnet.

Design

Implementation

The module to create cgf file is lib/subnet_cfg.py, hence lib directory is place for modules. There is the empty file lib/__init__.py so Python knows to treat this directory as having modules (packages).

Usage

$ sudo subnet_mgmt-loc.py -h usage: subnet_mgmt-loc.py [-h] (-a | -d) --subnet SUBNET --netmask NETMASK --defaultrouter DEFAULTROUTER [--comment COMMENT] Subnet management for LOC optional arguments: -h, --help show this help message and exit -a, --add Add a subnet -d, --delete Delete a subnet --subnet SUBNET Subnet as x.x.x.x --netmask NETMASK Netmask as x.x.x.x --defaultrouter DEFAULTROUTER Default router as x.x.x.x --comment COMMENT One line comment about subnet, max 70 characters

Resulting subnet cfg file

Example.

# some comment, like request number subnet 10.x0.x2.0 netmask 255.255.254.0 { option subnet-mask 255.255.254.0; option routers 10.x0.x2.1 ; option domain-name-servers 20x.2xx.2x.1, 20x.2xx.2x.2, 10.20x.7x.19x ; option ntp-servers 10.x0.x2.1, 10.x0.25x.2x0, 10.x0.25x.2x1; }

Logging

Log example when subnet is added.

2019-10-24 17:05:44,152:DEBUG: START AT : Thu, 24 Oct 2019 23:05:44 2019-10-24 17:05:44,189:DEBUG: OK: defalt gateway 10.x0.190.126 is in 10.x0.190.64/26 2019-10-24 17:05:44,189:DEBUG: Comment is: bz xx migrated 2019-10-24 17:05:44,189:DEBUG: Subnet's object/ID: 10_x0_190_64 2019-10-24 17:05:44,189:DEBUG: Adding cfg for subnet 10.x0.190.64 with netmask 255.255.255.192 and default router 10.x0.190.126 2019-10-24 17:05:44,190:DEBUG: Checking the cfg file: Showing file /etc/dhcp/subnets/10_x0_190_64.26 #bz 16592 migrated subnet 10.x0.190.64 netmask 255.255.255.192 { option routers 10.x0.190.126 ; option domain-name-servers 206.x23.27.1, 206.x23.27.2, 10.x09.76.198 ; option ntp-servers 10.x0.190.126, 10.x0.255.250, 10.x0.255.251 ; } 2019-10-24 17:05:44,190:DEBUG: OK: Compiled new /etc/dhcp/subnets.include 2019-10-24 17:05:44,191:DEBUG: Check DHCP config for OL[78] 2019-10-24 17:05:44,219:DEBUG: OK: DHCP config looks good 2019-10-24 17:05:44,220:DEBUG: OL[78] DHCP service restart 2019-10-24 17:05:44,276:DEBUG: OK: DHCP service has been restarted 2019-10-24 17:05:44,277:DEBUG: FINISH AT : Thu, 24 Oct 2019 23:05:44

Files

Module subnet_cfg.py

" Module to manage DHCP subnets (python 3.5) " # The module name is filename.py # When import, use only filename (import filename) # Use sys.path.append("/path/to/module") if needed # ------------------------------------------------------ import os import socket import sys import netaddr from netaddr import IPAddress global SUBNETDIR SUBNETDIR = "/etc/dhcp/subnets/" # ========= LOC ============== class LOC_SUBNET(object): """ Blueprint for LOC DHCP subnet cfg file DNS: list them NTP: dg, 10.x0.255.250, 10.x0.255.251 Attributes: 1. subnet (string, user input) 2. netmask (string, user input) 3. default gateway (string, user input) 4. comment (string, user input, only one line): default is None object) """ def __init__(self, subnet, netmask, dg, comment=None): """ Initialize object whos attributes are collection of data used to create DHCP subnet config file """ self.subnet = subnet self.netmask = netmask self.dg = dg self.comment = comment # # Using netmask, finds CIDR (Classless inter-domain routing) # Custom idea is : "255.255.255.0".split('.') gives ['255', '255', '255', '0'] # int("255") gives 255 # bin(int("255")) gives '0b11111111' # bin(int("255")).count('1') gives 8 # CIDR = sum([bin(int(x)).count('1') for x in self.netmask.split('.')]) # No error check here. # global CIDR CIDR = IPAddress(self.netmask).netmask_bits() # netaddr does error checks global LOC_SUBNET_CFG, DEV_DNS, DEV_NTP LOC_SUBNET_CFG = SUBNETDIR + self.subnet.replace(".", "_") + "." + str(CIDR) LOC_DNS = "20x.22x.27.1, 20x.22x.27.2, 10.20x.7x.197" LOC_NTP = "10.x0.255.250, 10.x0.255.251" # ============================== # DEFINE METHODS FOR THE OBJECT # ============================== def create_cfg_file(self): """ Define method for creating subnet config file """ print ("Creating subnet configuraiton: " + DEV_SUBNET_CFG ) try: if self.comment != None: f = open(LOC_SUBNET_CFG, "w+") f.write("#" + self.comment + " \n") f.write("subnet " + self.subnet + " netmask " + self.netmask + " { \n") f.write("\toption routers\t\t\t" + self.dg + " ;\n") f.write("\toption domain-name-servers\t" + LOC_DNS + " ;\n") f.write("\toption ntp-servers\t\t" + self.dg + ", " + LOC_NTP + " ;\n") f.write(" } \n") f.close() else: f = open(LOC_SUBNET_CFG, "w+") f.write("subnet " + self.subnet + " netmask " + self.netmask + " { \n") f.write("\toption routers\t\t\t" + self.dg + " ;\n") f.write("\toption domain-name-servers\t" + LOC_DNS + " ;\n") f.write("\toption ntp-servers\t\t" + self.dg + ", " + LOC_NTP + " ;\n") f.write(" } \n") f.close() except IOError as err: print("IOError: {0}".format(err)) def remove_cfg_file(self): """ Define method for removing subnet config file """ try: os.remove(LOC_SUBNET_CFG) print ("File " + LOC_SUBNET_CFG + " is removed!") except IOError as err: print("IOError: {0}".format(err)) def show_cfg_file(self): """ Define method for showing/reading subnet config file """ print ("Showing file " + LOC_SUBNET_CFG ) try: f = open(LOC_SUBNET_CFG, "r") print(f.read()) f.close() except IOError as err: print("IOError: {0}".format(err)) # ========= LOC_A ============== class LOC_A_SUBNET(object): """ Blueprint for LOC_A DHCP subnet cfg file DNS: list them NTP: only dg Attributes: 1. subnet (string, user input) 2. netmask (string, user input) 3. default gateway (string, user input) 4. comment (string, user input, only one line): default is None object) """ def __init__(self, subnet, netmask, dg, comment=None): """ Initialize object whos attributes are collection of data used to create DHCP subnet config file """ self.subnet = subnet self.netmask = netmask self.dg = dg self.comment = comment global CIDR CIDR = IPAddress(self.netmask).netmask_bits() # netaddr does error checks global LOC_A_SUBNET_CFG, LOC_A_DNS, LOC_A_NTP LOC_A_SUBNET_CFG = SUBNETDIR + self.subnet.replace(".", "_") + "." + str(CIDR) LOC_A_DNS = "x06.x23.27.1, x06.x23.27.2, x0.x09.76.197" LOC_A_NTP = "" # only dg is used # ============================== # DEFINE METHODS FOR THE OBJECT # ============================== def create_cfg_file(self): """ Define method for creating subnet config file """ print ("Creating subnet configuraiton: " + LOC_A_SUBNET_CFG ) try: if self.comment != None: f = open(LOC_A_SUBNET_CFG, "w+") f.write("#" + self.comment + " \n") f.write("subnet " + self.subnet + " netmask " + self.netmask + " { \n") f.write("\toption routers\t\t\t" + self.dg + " ;\n") f.write("\toption domain-name-servers\t" + LOC_A_DNS + " ;\n") f.write("\toption ntp-servers\t\t" + self.dg + " ;\n") f.write(" } \n") f.close() else: f = open(LOC_A_SUBNET_CFG, "w+") f.write("subnet " + self.subnet + " netmask " + self.netmask + " { \n") f.write("\toption routers\t\t\t" + self.dg + " ;\n") f.write("\toption domain-name-servers\t" + LOC_A_DNS + " ;\n") f.write("\toption ntp-servers\t\t" + self.dg + " ;\n") f.write(" } \n") f.close() except IOError as err: print("IOError: {0}".format(err)) def remove_cfg_file(self): """ Define method for removing subnet config file """ try: os.remove(LOC_A_SUBNET_CFG) print ("File " + LOC_A_SUBNET_CFG + " is removed!") except IOError as err: print("IOError: {0}".format(err)) def show_cfg_file(self): """ Define method for showing/reading subnet config file """ print ("Showing file " + LOC_A_SUBNET_CFG ) try: f = open(LOC_A_SUBNET_CFG, "r") print(f.read()) f.close() except IOError as err: print("IOError: {0}".format(err)) # ---------- MAIN ---------- if __name__ == '__main__': print(LOC_SUBNET.__doc__) print("\n") subnet1 = LOC_SUBNET("10x.10.0.0", "255.255.240.0", "10x.10.10.254", "bz-xxx test comment") subnet1.create_cfg_file() subnet1.show_cfg_file() subnet1.remove_cfg_file() print("\n") subnet = LOC_SUBNET("10.x0.70.0", "255.255.240.0", "10.x0.70.2") subnet.create_cfg_file() subnet.show_cfg_file() subnet.remove_cfg_file() print("\n") print(LOC_A_SUBNET.__doc__) print("\n") subnet1 = LOC_A_SUBNET("10x.10.0.0", "255.255.240.0", "100.1x.10.254", "bz-xxx test comment") subnet1.create_cfg_file() subnet1.show_cfg_file() subnet1.remove_cfg_file() print("\n") subnet = LOC_A_SUBNET("10.x0.70.0", "255.255.240.0", "10.x0.70.2") subnet.create_cfg_file() subnet.show_cfg_file() subnet.remove_cfg_file()

Main script subnet_mgmt-loc.py (loc for location)

#!/bin/python3.5 # ------------ LOC ------------ # Subnet management # --------------------------- import os import argparse import logging, logging.handlers from time import gmtime, strftime import datetime from pathlib import Path import socket import netaddr from netaddr import IPAddress import re import sys import platform import getpass import subprocess import ipaddress sys.path.append(os.path.abspath(os.path.join(sys.path[0], os.pardir))) # append parent dir import lib.subnet_cfg as subnet_cfg # import LabOps module for subnet mgmt i_am=getpass.getuser() if i_am != "root": sys.exit(i_am + ", please run this via sudo") # logging destination PROGRAM = os.path.basename(sys.argv[0]) # name of this script LOG_PATH = ("/var/log/" + PROGRAM) # put together "/var/log/" if not os.path.exists(LOG_PATH): # create LOG_PATH if doesn't exists os.makedirs(LOG_PATH) os.chown(LOG_PATH,-1,10) # root(-1 means no change):staff DSEE group os.chmod(LOG_PATH,0o775) # drwxrwxr-x # --- Define logging LOG_FILE = (LOG_PATH + "/" + datetime.datetime.now().strftime("%m-%d-%Y_%Hh%Mm%Ss")) # create logger (interface that script uses for logging) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # create file handler (define log destination) handler = logging.handlers.TimedRotatingFileHandler(LOG_FILE, when='MIDNIGHT', backupCount=50, utc=False) # create formatter (define layout of logs) formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s') handler.setFormatter(formatter) # add handler to logger logger.addHandler(handler) # start logging logger.debug("") logger.debug("START AT : " + strftime("%a, %d %b %Y %H:%M:%S", gmtime())) def only_loc_dhcp(): """ Check that script is run only on ca-dhcp-locs.xxx.com """ if socket.gethostname() != "ca-dhcp-loc.xxx.com": logger.debug("ERROR: This can be run only on DHCP LOC") sys.exit("ERROR: This can be run only on DHCP LOC") # Only run in ca-dhcp-loc only_loc_dhcp() def ip_check(ipadd): """ Check validity of IPv4 """ try: if ipaddress.ip_address(ipadd): return ipadd except: logger.debug(ipadd + " is not valid IP input") sys.exit(ipadd + " is not valid IP input") parser = argparse.ArgumentParser( formatter_class=argparse.RawTextHelpFormatter, description="Subnet management for LOC", epilog='Brought to you by Zarko') mgmt = parser.add_mutually_exclusive_group(required=True) mgmt.add_argument("-a", "--add", help="Add a subnet", action="store_true") # store_true for flag, no value mgmt.add_argument("-d", "--delete", help="Delete a subnet", action="store_true") parser.add_argument("--subnet", help="Subnet as x.x.x.x", type=ip_check, required=True) parser.add_argument("--netmask", help="Netmask as x.x.x.x", type=ip_check, required=True) parser.add_argument("--defaultrouter", help="Default router as x.x.x.x", type=ip_check, required=True) parser.add_argument("--comment", help="One line comment about subnet, max 70 characters") args = parser.parse_args() addsubnet=args.add deletesubnet=args.delete subnet=args.subnet netmask=args.netmask defaultrouter=args.defaultrouter comment=args.comment def is_dg_in_subnet(): """ Check if dg is indeed in subnet """ CIDR = IPAddress(netmask).netmask_bits() # netaddr does error checks if netaddr.IPAddress(defaultrouter) in netaddr.IPNetwork(subnet + "/" + str(CIDR) ): logger.debug("OK: defalt gateway " + defaultrouter + " is in " + subnet + "/" + str(CIDR) ) else: logger.debug("ERROR: your default gateway " + defaultrouter + " is not in " + subnet + "/" + str(CIDR) ) sys.exit("ERROR: your default gateway " + defaultrouter + " is not in " + subnet + "/" + str(CIDR) ) def comment2oneline(): """ Make sure comment is only one line, and max 70 characters """ global commentoneline if comment: commentoneline = comment.replace("\n","\b") # replace new line with backspace logger.debug("Comment is: " + commentoneline) if len(commentoneline) > 70 : logger.debug("Comment can't be more then 70 characters") sys.exit("Comment can't be more then 70 characters") return commentoneline else: commentoneline = None def add_subnet(): """ Adding subnet Note: dash (-) & dot (.) can't be used in object name, hence create first subnet object """ subnetobject = subnet.replace(".", "_") logger.debug("Subnet's object/ID: " + subnetobject ) try: subnetobject = subnet_cfg.LOC_SUBNET(subnet, netmask, defaultrouter, commentoneline) logger.debug("Adding cfg for subnet " + subnetobject.subnet + " with netmask " + subnetobject.netmask + " and default router " + subnetobject.dg ) subnetobject.create_cfg_file() except: logger.debug("ERROR: can't add subnet " + subnet + " with netmask " + netmask + " and def router " + defaultrouter ) sys.exit("ERROR: can't add subnet " + subnet + " with netmask " + netmask + " and def router " + defaultrouter ) orig = sys.stdout # initial stdout sys.stdout = open(LOG_FILE, "a") # want to read cfg file into log logger.debug("Checking the cfg file:") subnetobject.show_cfg_file() sys.stdout.close() sys.stdout = orig # restore initial stdout def del_subnet(): """ Delete subnet, check if user gives correct router """ subnetobject = subnet.replace(".", "_") logger.debug("Subnet's object/ID: " + subnetobject ) subnetobject = subnet_cfg.LOC_SUBNET(subnet, netmask, defaultrouter, commentoneline) logger.debug("Deleting cfg for subnet " + subnetobject.subnet + " with netmask " + subnetobject.netmask + " and default router " + subnetobject.dg ) if not input("Are you sure? (y/n): ").lower().strip() == "y": logger.debug("Fair enough, you don't want to remove subnet " + subnet ) sys.exit("Fair enough, you don't want to remove subnet " + subnet ) if os.path.exists("/tmp/tmp-" + subnet): os.remove("/tmp/tmp-" + subnet) orig = sys.stdout # initial stdout sys.stdout = open("/tmp/tmp-" + subnet, "a") subnetobject.show_cfg_file() sys.stdout.close() sys.stdout = orig # restore initial stdout with open("/tmp/tmp-" + subnet, 'r') as f: #no need to close f here if re.search ("\s+option routers\s+" + defaultrouter + "\s+;\s+", f.read()): result = True else: result = False if (result == True): print("Deleting subnet " + subnet) logger.debug("Deleting subnet " + subnet) subnetobject.remove_cfg_file() logger.debug("Subnet " + subnet + " has been deleted.") print("Subnet " + subnet + " has been deleted.") else: logger.debug("Error: can't find the subnet with your input, it doesn't match with any active subnets.") sys.exit("Error: can't find the subnet with your input, it doesn't match with any active subnets.") if os.path.exists("/tmp/tmp-" + subnet): os.remove("/tmp/tmp-" + subnet) def compile_subnet_list(): """ Compile subnets.include list maybe, need to import os module again or get error: AttributeError: '_MutuallyExclusiveGroup' object has no attribute 'path' """ #import os subnetdir = "/etc/dhcp/subnets/" if not os.path.exists(subnetdir): logger.debug("ERROR: " + subnetdir + " does not exist.") sys.exit("ERROR: " + subnetdir + " does not exist.") f = open("/etc/dhcp/subnets.include", "w+") try: for root, dirs, files in os.walk(subnetdir): for subnetname in files: f.write("include \"" + subnetdir + subnetname + "\" ;\n") logger.debug("OK: Compiled new /etc/dhcp/subnets.include") except: logger.debug("ERROR: cannot compile /etc/dhcp/subnets.include") sys.exit("ERROR: cannot compile /etc/dhcp/subnets.include") f.close() def dhcp_cfg_check(): """ Check DHCP cfg syntax, support ol[678] """ if re.match("6.*", platform.dist()[1]): logger.debug("Check DHCP config for OL6") return True if subprocess.call(['service', 'dhcpd', 'configtest']) == 0 else False else: logger.debug("Check DHCP config for OL[78]") return True if subprocess.call(['dhcpd', '-t']) == 0 else False def dhcp_restart(): """ Restart DHCP service, support ol[678] """ if re.match("6.*", platform.dist()[1]): logger.debug("OL6 DHCP service restart") return True if subprocess.call(['service', 'dhcpd', 'restart']) == 0 else False else: logger.debug("OL[78] DHCP service restart") return True if subprocess.call(['systemctl', 'restart', 'dhcpd']) == 0 else False if __name__ == '__main__': if addsubnet: is_dg_in_subnet() comment2oneline() add_subnet() compile_subnet_list() if not dhcp_cfg_check(): logger.debug("ERROR: DHCP config check fails") sys.exit("ERROR: DHCP config check fails") else: logger.debug("OK: DHCP config looks good") print("OK: DHCP config looks good") if not dhcp_restart(): logger.debug("ERROR: DHCP service restart fails") sys.exit("ERROR: DHCP service restart fails") else: logger.debug("OK: DHCP service has been restarted") print("OK: DHCP service has been restarted") elif deletesubnet: is_dg_in_subnet() comment2oneline() del_subnet() compile_subnet_list() if not dhcp_cfg_check(): logger.debug("ERROR: DHCP config check fails") sys.exit("ERROR: DHCP config check fails") else: logger.debug("OK: DHCP config looks good") print("OK: DHCP config looks good") if not dhcp_restart(): logger.debug("ERROR: DHCP service restart fails") sys.exit("ERROR: DHCP service restart fails") else: logger.debug("OK: DHCP service has been restarted") print("OK: DHCP service has been restarted") logger.debug("FINISH AT : " + strftime("%a, %d %b %Y %H:%M:%S", gmtime())) sys.exit(0)

Back to the main page