Network scanner with DNS, UniFi query and switch port/PoE info
Originally posted December 2017Updated February 2023:UniFi function wouldn't get wireless clients if a USG or UniFi switch was also present. Fixed this.
Introduction
At home I currently run a Nortel 48 port switch with PoE, as well as a UniFi controller controlling two UniFi AP-AC-LITE access points. For monitoring, I wanted to be able to track which devices are connected to the network, how they are connected, and (if applicable) how much power they are using, and then have HTML output that I can embed in another page.
Originally this code had been developed as a basic network scanner with HTML output, but has grown from there.
Operation
This script uses arping to check all hosts on a network, which is by no means 'foolproof', but is good enough for this purpose. Switch port and PoE information is requested via SNMP from the switch. This was tested on a Nortel BayStack 5520 48T-PWR switch - your switch may or may not support the OIDs used. A UniFi controller is needed to provide an API for getting wireless information, and also requires an account on the UniFi controller.
Very little error checking is included in this code, however it has proven to be reasonably reliable for home use.
Source
#!/usr/bin/python3
##
## Network device scanner with HTML output, DNS resolution, UniFi info and
## PoE/port info.
##
## Required commands: arping, snmpwalk, host
## Required Python3 libraries: requests, Python Imaging Library
## Required infrastructure and services: UniFi controller, PoE switch with SNMP,
## dnsmasq (DHCP and DNS)
## Also requires ieee-data package installed.
##
## Finds devices on the local network using arping, then does DNS resolution.
## After this, the network switch is queried via SNMP to determine which port
## the device is connected to, and also whether it is using PoE, and, if so, how
## much power. The specified UniFi controller is also queried to determine if
## the device is connected via wireless, and, if so, via what AP and at what
## signal strength.
##
## All of this information is then collated and processed to produce a HTML
## table that is written to a file. It is intended to be embedded in other pages
## using SSI (Server-Side Includes). Re-runs every five minutes.
##
## Tested with:
## - Debian Buster
## - UniFi controller controlling two UniFi AP-AC-LITE
## - Nortel BayStack 5520 48T-PWR
## Network interface to bind to
bindInterface = 'eth0'
## IP address of the bound interface
bindAddress = '192.168.0.1'
## Network mask of the bound interface
bindMask = '255.255.255.0'
## Number of scan threads to run concurrently
scanThreads = 100
## Output file
htmlOutFile = 'networkDevices.html'
## OUI MAC vendor database location
ouiPath = '/usr/share/ieee-data/oui.txt'
## dnsmasq leases file path
leasePath = '/var/lib/misc/dnsmasq.leases'
## DNS server ip
dnsServer = '127.0.0.1'
## UniFi controller
unifiController = '127.0.0.1'
## UniFi controller username/password
unifiUsername = 'user'
unifiPassword = 'password'
## Names for different UniFi AP MACs
unifiNames = {'f0:9f:c2:aa:bb:cc': 'Inside wireless', 'f0:9f:c2:11:22:33': 'Outside wireless'}
## Switch address
switchAddress = 'switch'
## Switch SNMP MAC OID prefix
switch_oid_mac = 'iso.3.6.1.2.1.17.4.3.1.1'
## Switch SNMP port OID prefix
switch_oid_port = 'iso.3.6.1.2.1.17.4.3.1.2'
## Switch SNMP speed OID prefix
switch_oid_speed = 'iso.3.6.1.2.1.2.2.1.5'
## Switch SNMP PoE class OID prefix
switch_oid_poe_state = 'iso.3.6.1.2.1.105.1.1.1.6.1'
## Switch SNMP PoE class OID prefix
switch_oid_poe_class = 'iso.3.6.1.2.1.105.1.1.1.10.1'
## Switch SNMP PoE usage (combined) counters prefix
switch_oid_poe_usage = 'iso.3.6.1.2.1.105.1.3.1.1'
###
from multiprocessing.pool import ThreadPool
import time
import subprocess
import netaddr
import random
import re
import datetime
import socket
import requests
import urllib3
urllib3.disable_warnings()
class wirelessClient():
def __init__(self, mac, radio_proto, rx_rate, tx_rate, signal, ap_mac):
self.mac = mac
self.radio_proto = radio_proto
self.rx_rate = rx_rate
self.tx_rate = tx_rate
self.signal = signal
self.ap_mac = ap_mac
def getPortPower(switchAddress, oid_poe_state, oid_poe_class):
poeState = dict()
poeClass = dict()
output1 = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_state])
for line in output1.decode('utf-8').split('\n'):
regexp_state = re.search(oid_poe_state + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
if regexp_state:
port = regexp_state.group(1)
state = int(regexp_state.group(2))
poeState[str(port)] = state
output2 = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_class])
for line in output2.decode('utf-8').split('\n'):
regexp_class = re.search(oid_poe_class + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
if regexp_class:
port = regexp_class.group(1)
pclass = int(regexp_class.group(2))
## only add class if delivering power. otherwise set None
if poeState[str(port)] == 3:
poeClass[str(port)] = pclass - 1
else:
poeClass[str(port)] = None
return poeClass
def getPortSpeeds(switchAddress, oid_speed):
speedPort = dict()
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_speed])
for line in output.decode('utf-8').split('\n'):
regexp_speed = re.search(oid_speed + r'\.(\d+)\s=\sGauge32:\s(\d+)', line)
if regexp_speed:
port = regexp_speed.group(1)
speed = int(regexp_speed.group(2))
speedPort[str(port)] = speed
return speedPort
def getPortMacs(switchAddress, oid_mac, oid_port):
mapping = dict()
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, '.1.3.6.1.2.1.17.4.3'])
for line in output.decode('utf-8').split('\n'):
regexp_mac = re.search(oid_mac + r'\.([\d\.]+)\s=\sHex-STRING:\s(((([A-F0-9]{2})\s){5})([A-F0-9]{2}))', line)
regexp_port = re.search(oid_port + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
if regexp_mac:
id = regexp_mac.group(1)
mac = ':'.join(regexp_mac.group(2).split(' '))
mapping[id] = [mac]
elif regexp_port:
id = regexp_port.group(1)
port = regexp_port.group(2)
mapping[id].append(port)
macPort = []
for id in mapping.keys():
macPort.append([mapping[id][0], mapping[id][1]])
return macPort
def makePowerGraph(switchAddress, oid_poe_usage):
power_max = 0
power_cur = 0
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_usage])
for line in output.decode('utf-8').split('\n'):
regexp_max = re.search(oid_poe_usage + r'\.2.1\s=\sGauge32:\s(\d+)', line)
regexp_cur = re.search(oid_poe_usage + r'\.4.1\s=\sGauge32:\s(\d+)', line)
if regexp_max:
power_max = int(regexp_max.group(1))
elif regexp_cur:
power_cur = int(regexp_cur.group(1))
divider = power_max / 100
percent = int(power_cur / divider)
remain = 100 - percent
htmlOut = '<div id="ndl-poeUsage-title">Switch PoE usage - ' + str(percent) + '%</div><table id="ndl-poeUsage"><tr>'
for i in range(0, percent):
htmlOut += '<td class="ndl-poeUsage-on"> </td>'
for i in range(0, remain):
htmlOut += '<td class="ndl-poeUsage-off"> </td>'
htmlOut += '</tr></table>'
return htmlOut
def findPortForMac(macPort, smac):
for mac, port in macPort:
if smac.upper() == mac.upper():
return port
return None
def getWirelessClients(unifiController, unifiUsername, unifiPassword):
print('Getting UniFi wireless clients from ' + unifiController)
global wirelessClient
clientList = []
try:
payload = {'username': unifiUsername, 'password': unifiPassword}
login = requests.post('https://' + unifiController + ':8443/api/login', json = payload, verify = False)
cookies = login.cookies
sta = requests.get('https://' + unifiController + ':8443/api/s/default/stat/sta', cookies = cookies, verify = False)
for client in sta.json()['data']:
if 'radio_proto' not in client.keys(): continue ## ignore wired clients
clientList.append(wirelessClient(client['mac'], client['radio_proto'].upper(), client['rx_rate'], client['tx_rate'], client['signal'], client['ap_mac']))
except:
print('Failed to get wireless clients from UniFi!')
return clientList
def getClientInfo(clientList, mac):
for client in clientList:
if client.mac.upper() == mac.upper():
return client
return None
def getHost(ip):
global dnsServer
hostname = 'Unknown'
try:
for line in str(subprocess.check_output(['host', '-W', '1', '-r', ip, dnsServer])).split('\n'):
regexp_host = re.search(r'domain name pointer ([^\.]+)\.', str(line))
if regexp_host:
hostname = regexp_host.group(1)
break
except:
pass
return hostname
def getLeaseExpiry(ip):
fSock = open(leasePath, 'r')
expiry = 'Static assignment'
for line in fSock:
if line.split(' ')[2] == ip:
try:
leaseExpiry = int(line.split(' ')[0])
expiry = datetime.datetime.fromtimestamp(leaseExpiry).strftime('%a, %b %d %Y %H:%M:%S')
except:
pass
break
fSock.close()
return expiry
def checkHost(ip):
global ipLength, doneCount, foundDevices, bindInterface
doneCount += 1
print('\r Checking ' + str(doneCount) + '/' + str(ipLength) + ' (' + str(ip) + '). Found ' + str(len(foundDevices)) + ' device(s)...', end = '', flush = True)
try:
arpOutput = subprocess.check_output(['arping', '-r', '-c', '3', '-C', '1', '-i', bindInterface, ip])
except:
pass
else:
mac = arpOutput.decode('utf-8').replace('\n', '').upper()[0:17]
vendor = getVendor(mac)
foundDevices.append([ip, mac, vendor])
def getTargetIPs():
targetIPs = []
targetIPs += netaddr.IPNetwork(bindAddress + '/' + bindMask)
cleanList = []
for target in targetIPs:
cleanList.append(str(target))
return cleanList
def getVendor(mac):
global ouiData
mac = str(mac).upper()
searchMAC = re.sub(r'(([0-9A-F]{2}\:?){3})\:(([0-9A-F]{2}\:?){3})', r'\1', mac)
foundVendor = False
for line in ouiData.split('\n'):
regexp_info = re.search('(([0-9A-F]{2}\-){2}([0-9A-F]{2}))\s+\(hex\)\s+(.+)', line)
if regexp_info:
mac = re.sub(r'\-', ':', regexp_info.group(1))
if mac == searchMAC:
foundVendor = True
vendor = regexp_info.group(4)
return(vendor)
return('Unknown')
print('Initialising..')
pool = ThreadPool(processes=scanThreads)
print('Reading OUI database..')
ouiDB = open(ouiPath, encoding = 'utf-8')
ouiData = ouiDB.read()
ouiDB.close()
print('Getting IP list..')
## get the IP list in random order
ipList = getTargetIPs()
random.shuffle(ipList)
ipLength = len(ipList)
## run indefinitely
childThreads = []
while True:
foundDevices = []
doneCount = 0
print('Starting scan threads..')
for ipAddress in ipList:
childThreads.append(pool.apply_async(checkHost, (ipAddress, )))
wifiClients = getWirelessClients(unifiController, unifiUsername, unifiPassword)
portMacs = getPortMacs(switchAddress, switch_oid_mac, switch_oid_port)
portSpeeds = getPortSpeeds(switchAddress, switch_oid_speed)
portPower = getPortPower(switchAddress, switch_oid_poe_state, switch_oid_poe_class)
print('Waiting for scan to complete..')
for ct in childThreads:
try:
ct.get(30)
except:
pass
time.sleep(2)
print('Scan complete')
## Sort the list by IP. Uses socket.inet_aton to convert the IPs to a easily sortable string representation
foundDevices.sort(key=lambda k:(socket.inet_aton(k[0])))
# print('Devices found:')
# for ip, mac, vendor in foundDevices:
# print('IP ' + str(ip) + ' MAC ' + str(mac) + ' VENDOR ' + str(vendor))
## do html output
print('Writing HTML output..')
htmlOut = open(htmlOutFile, mode='w')
htmlOut.write('<div id="networkDevices">')
htmlOut.write('Last updated ' + str(datetime.datetime.fromtimestamp(time.time()).strftime('%a, %b %d %Y at %H:%M:%S')))
htmlOut.write('<table id="networkDeviceList"><tr><th>Hostname</th><th>MAC address</th><th>Speed (TX/RX)</th><th>Connection method</th><th>DHCP lease expires</th></tr>')
for ip, mac, vendor in foundDevices:
wifiInfo = getClientInfo(wifiClients, mac)
htmlOut.write('<tr><td>' + getHost(ip) + '<div class="ndl-smalltext2">' + str(ip) + '</div></td><td>' + str(mac) + '<div class="ndl-smalltext1">' + str(vendor) + '</td>')
if wifiInfo != None:
if wifiInfo.radio_proto == 'NA':
wifiInfo.radio_proto = 'N'
elif wifiInfo.radio_proto == 'NG':
wifiInfo.radio_proto = 'G'
if wifiInfo.signal < -90:
signalQuality = 'Unusable'
elif wifiInfo.signal < -80:
signalQuality = 'Poor'
elif wifiInfo.signal < -70:
signalQuality = 'Average'
elif wifiInfo.signal < -67:
signalQuality = 'Good'
elif wifiInfo.signal < -30:
signalQuality = 'Excellent'
else:
signalQuality = 'Unknown'
try:
apInfo = '<div class="ndl-smalltext2">Connected to ' + unifiNames[wifiInfo.ap_mac] + '</div>'
except:
apInfo = ''
htmlOut.write('<td>' + str(int(wifiInfo.tx_rate/1000)) + 'mbps/' + str(int(wifiInfo.rx_rate/1000)) + 'mbps</td><td>Wireless-' + wifiInfo.radio_proto + '<div class="ndl-smalltext2">' + str(wifiInfo.signal) + 'dBm (' + signalQuality + ')</div>' + apInfo + '</td>')
else:
port = str(findPortForMac(portMacs, mac))
poeText = ''
if port == 'None':
port = 'Unknown'
speed = 'Unknown'
elif port == '0':
port = 'Infrastructure'
speed = 'N/A'
else:
speed = str(int(portSpeeds[port]/1000000)) + 'mbps/' + str(int(portSpeeds[port]/1000000)) + 'mbps'
if portPower[port] != None:
poeText = '<div class="ndl-smalltext2">PoE device - class ' + str(portPower[port]) + '</div>'
port = 'Switch port ' + port
htmlOut.write('<td>' + speed + '</td><td>' + str(port) + poeText + '</td>')
htmlOut.write('<td>' + getLeaseExpiry(ip) + '</td></tr>')
htmlOut.write('</table>')
htmlOut.write(makePowerGraph(switchAddress, switch_oid_poe_usage))
htmlOut.write('</div>')
htmlOut.close()
print('Waiting five minutes before scanning again')
time.sleep(300)
print('Exiting!')
Also available on Github