Toxicantidote Network scanner with DNS, UniFi query and switch port/PoE info
Advertisement
Advertisement

Network scanner with DNS, UniFi query and switch port/PoE info

Originally posted December 2017
Updated February 2023:UniFi function wouldn't get wireless clients if a USG or UniFi switch was also present. Fixed this.

Introduction

At home I currently run a Nortel 48 port switch with PoE, as well as a UniFi controller controlling two UniFi AP-AC-LITE access points. For monitoring, I wanted to be able to track which devices are connected to the network, how they are connected, and (if applicable) how much power they are using, and then have HTML output that I can embed in another page.

Originally this code had been developed as a basic network scanner with HTML output, but has grown from there.

Operation

This script uses arping to check all hosts on a network, which is by no means 'foolproof', but is good enough for this purpose. Switch port and PoE information is requested via SNMP from the switch. This was tested on a Nortel BayStack 5520 48T-PWR switch - your switch may or may not support the OIDs used. A UniFi controller is needed to provide an API for getting wireless information, and also requires an account on the UniFi controller.

Very little error checking is included in this code, however it has proven to be reasonably reliable for home use.

Source


#!/usr/bin/python3
##
## Network device scanner with HTML output, DNS resolution, UniFi info and 
## PoE/port info.
##
## Required commands: arping, snmpwalk, host
## Required Python3 libraries: requests, Python Imaging Library
## Required infrastructure and services: UniFi controller, PoE switch with SNMP,
## dnsmasq (DHCP and DNS)
## Also requires ieee-data package installed.
##
## Finds devices on the local network using arping, then does DNS resolution.
## After this, the network switch is queried via SNMP to determine which port
## the device is connected to, and also whether it is using PoE, and, if so, how
## much power. The specified UniFi controller is also queried to determine if
## the device is connected via wireless, and, if so, via what AP and at what 
## signal strength.
##
## All of this information is then collated and processed to produce a HTML
## table that is written to a file. It is intended to be embedded in other pages
## using SSI (Server-Side Includes). Re-runs every five minutes.
##
## Tested with:
##  - Debian Buster
##  - UniFi controller controlling two UniFi AP-AC-LITE
##  - Nortel BayStack 5520 48T-PWR

## Network interface to bind to
bindInterface = 'eth0'

## IP address of the bound interface
bindAddress = '192.168.0.1'

## Network mask of the bound interface
bindMask = '255.255.255.0'

## Number of scan threads to run concurrently
scanThreads = 100

## Output file
htmlOutFile = 'networkDevices.html'

## OUI MAC vendor database location
ouiPath = '/usr/share/ieee-data/oui.txt'

## dnsmasq leases file path
leasePath = '/var/lib/misc/dnsmasq.leases'

## DNS server ip
dnsServer = '127.0.0.1'

## UniFi controller 
unifiController = '127.0.0.1'

## UniFi controller username/password
unifiUsername = 'user'
unifiPassword = 'password'

## Names for different UniFi AP MACs
unifiNames = {'f0:9f:c2:aa:bb:cc': 'Inside wireless', 'f0:9f:c2:11:22:33': 'Outside wireless'}

## Switch address
switchAddress = 'switch'

## Switch SNMP MAC OID prefix
switch_oid_mac = 'iso.3.6.1.2.1.17.4.3.1.1'

## Switch SNMP port OID prefix
switch_oid_port = 'iso.3.6.1.2.1.17.4.3.1.2'

## Switch SNMP speed OID prefix
switch_oid_speed = 'iso.3.6.1.2.1.2.2.1.5'

## Switch SNMP PoE class OID prefix
switch_oid_poe_state = 'iso.3.6.1.2.1.105.1.1.1.6.1'

## Switch SNMP PoE class OID prefix
switch_oid_poe_class = 'iso.3.6.1.2.1.105.1.1.1.10.1'

## Switch SNMP PoE usage (combined) counters prefix
switch_oid_poe_usage = 'iso.3.6.1.2.1.105.1.3.1.1'

###
from multiprocessing.pool import ThreadPool
import time
import subprocess
import netaddr
import random
import re
import datetime
import socket
import requests

import urllib3
urllib3.disable_warnings()

class wirelessClient():
    def __init__(self, mac, radio_proto, rx_rate, tx_rate, signal, ap_mac):
        self.mac = mac
        self.radio_proto = radio_proto
        self.rx_rate = rx_rate
        self.tx_rate = tx_rate
        self.signal = signal
        self.ap_mac = ap_mac

def getPortPower(switchAddress, oid_poe_state, oid_poe_class):
    poeState = dict()
    poeClass = dict()
    output1 = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_state])
    for line in output1.decode('utf-8').split('\n'):
            regexp_state = re.search(oid_poe_state + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
            if regexp_state:
                port = regexp_state.group(1)
                state = int(regexp_state.group(2))
                poeState[str(port)] = state

    output2 = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_class])
    for line in output2.decode('utf-8').split('\n'):
            regexp_class = re.search(oid_poe_class + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
            if regexp_class:
                port = regexp_class.group(1)
                pclass = int(regexp_class.group(2))
                ## only add class if delivering power. otherwise set None
                if poeState[str(port)] == 3:
                    poeClass[str(port)] = pclass - 1
                else:
                    poeClass[str(port)] = None

    return poeClass

def getPortSpeeds(switchAddress, oid_speed):
    speedPort = dict()
    output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_speed])
    for line in output.decode('utf-8').split('\n'):
            regexp_speed = re.search(oid_speed + r'\.(\d+)\s=\sGauge32:\s(\d+)', line)
            if regexp_speed:
                port = regexp_speed.group(1)
                speed = int(regexp_speed.group(2))
                speedPort[str(port)] = speed
                
    return speedPort                    
        
def getPortMacs(switchAddress, oid_mac, oid_port):
    mapping = dict()

    output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, '.1.3.6.1.2.1.17.4.3'])
    for line in output.decode('utf-8').split('\n'):
            regexp_mac = re.search(oid_mac + r'\.([\d\.]+)\s=\sHex-STRING:\s(((([A-F0-9]{2})\s){5})([A-F0-9]{2}))', line)
            regexp_port = re.search(oid_port + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
            if regexp_mac:
                    id = regexp_mac.group(1)
                    mac = ':'.join(regexp_mac.group(2).split(' '))
                    mapping[id] = [mac]
            elif regexp_port:
                    id = regexp_port.group(1)
                    port = regexp_port.group(2)
                    mapping[id].append(port)

    macPort = []
    for id in mapping.keys():
        macPort.append([mapping[id][0], mapping[id][1]])
                    
    return macPort

def makePowerGraph(switchAddress, oid_poe_usage):
    power_max = 0
    power_cur = 0

    output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_usage])
    for line in output.decode('utf-8').split('\n'):
        regexp_max = re.search(oid_poe_usage + r'\.2.1\s=\sGauge32:\s(\d+)', line)
        regexp_cur = re.search(oid_poe_usage + r'\.4.1\s=\sGauge32:\s(\d+)', line)

        if regexp_max:
            power_max = int(regexp_max.group(1))
        elif regexp_cur:
            power_cur = int(regexp_cur.group(1))

    divider = power_max / 100
    percent = int(power_cur / divider)
    remain = 100 - percent

    htmlOut = '<div id="ndl-poeUsage-title">Switch PoE usage - ' + str(percent) + '%</div><table id="ndl-poeUsage"><tr>'

    for i in range(0, percent):
        htmlOut += '<td class="ndl-poeUsage-on"> </td>'

    for i in range(0, remain):
        htmlOut += '<td class="ndl-poeUsage-off"> </td>'

    htmlOut += '</tr></table>'
    return htmlOut


def findPortForMac(macPort, smac):
    for mac, port in macPort:
        if smac.upper() == mac.upper():
            return port
            
    return None
        
def getWirelessClients(unifiController, unifiUsername, unifiPassword):
    print('Getting UniFi wireless clients from ' + unifiController)
    global wirelessClient
    clientList = []
    try:
        payload = {'username': unifiUsername, 'password': unifiPassword}
        login = requests.post('https://' + unifiController + ':8443/api/login', json = payload, verify = False)
        cookies = login.cookies
        sta = requests.get('https://' + unifiController + ':8443/api/s/default/stat/sta', cookies = cookies, verify = False)
        for client in sta.json()['data']:
            if 'radio_proto' not in client.keys(): continue ## ignore wired clients
            clientList.append(wirelessClient(client['mac'], client['radio_proto'].upper(), client['rx_rate'], client['tx_rate'], client['signal'], client['ap_mac']))

    except:
        print('Failed to get wireless clients from UniFi!')
        
    return clientList

def getClientInfo(clientList, mac):
    for client in clientList:
        if client.mac.upper() == mac.upper():
            return client
    
    return None

def getHost(ip):
    global dnsServer
    hostname = 'Unknown'
    try:
        for line in str(subprocess.check_output(['host', '-W', '1', '-r', ip, dnsServer])).split('\n'):
            regexp_host = re.search(r'domain name pointer ([^\.]+)\.', str(line))
            if regexp_host:
                hostname = regexp_host.group(1)
                break
    except:
        pass

    return hostname

def getLeaseExpiry(ip):
    fSock = open(leasePath, 'r')
    expiry = 'Static assignment'
    for line in fSock:
        if line.split(' ')[2] == ip:
            try:
                leaseExpiry = int(line.split(' ')[0])
                expiry = datetime.datetime.fromtimestamp(leaseExpiry).strftime('%a, %b %d %Y %H:%M:%S')
            except:
                pass
            break

    fSock.close()

    return expiry       

def checkHost(ip):
    global ipLength, doneCount, foundDevices, bindInterface
    doneCount += 1
    print('\r Checking ' + str(doneCount) + '/' + str(ipLength) + ' (' + str(ip) + '). Found ' + str(len(foundDevices)) + ' device(s)...', end = '', flush = True)
    try:
        arpOutput = subprocess.check_output(['arping', '-r', '-c', '3', '-C', '1', '-i', bindInterface, ip])
    except:
        pass
    else:
        mac = arpOutput.decode('utf-8').replace('\n', '').upper()[0:17]
        vendor = getVendor(mac)
        foundDevices.append([ip, mac, vendor])


def getTargetIPs():
    targetIPs = []
    targetIPs += netaddr.IPNetwork(bindAddress + '/' + bindMask)

    cleanList = []
    for target in targetIPs:
        cleanList.append(str(target))

    return cleanList

def getVendor(mac):
    global ouiData
    mac = str(mac).upper()
    
    searchMAC = re.sub(r'(([0-9A-F]{2}\:?){3})\:(([0-9A-F]{2}\:?){3})', r'\1', mac)

    foundVendor = False
    for line in ouiData.split('\n'):
        regexp_info = re.search('(([0-9A-F]{2}\-){2}([0-9A-F]{2}))\s+\(hex\)\s+(.+)', line)
        if regexp_info:
            mac = re.sub(r'\-', ':', regexp_info.group(1))
            if mac == searchMAC:
                foundVendor = True
                vendor = regexp_info.group(4)
                return(vendor)

    return('Unknown')

print('Initialising..')
pool = ThreadPool(processes=scanThreads)

print('Reading OUI database..')
ouiDB = open(ouiPath, encoding = 'utf-8')
ouiData = ouiDB.read()
ouiDB.close()

print('Getting IP list..')
## get the IP list in random order
ipList = getTargetIPs()
random.shuffle(ipList)
ipLength = len(ipList)

## run indefinitely
childThreads = []
while True:
    foundDevices = []
    doneCount = 0
    print('Starting scan threads..')
    for ipAddress in ipList:
        childThreads.append(pool.apply_async(checkHost, (ipAddress, )))

    wifiClients = getWirelessClients(unifiController, unifiUsername, unifiPassword)
    portMacs = getPortMacs(switchAddress, switch_oid_mac, switch_oid_port)
    portSpeeds = getPortSpeeds(switchAddress, switch_oid_speed)
    portPower = getPortPower(switchAddress, switch_oid_poe_state, switch_oid_poe_class)
        
    print('Waiting for scan to complete..')
    for ct in childThreads:
        try:
            ct.get(30)
        except:
            pass

    time.sleep(2)
    print('Scan complete')

    ## Sort the list by IP. Uses socket.inet_aton to convert the IPs to a easily sortable string representation
    foundDevices.sort(key=lambda k:(socket.inet_aton(k[0])))

#   print('Devices found:')
#   for ip, mac, vendor in foundDevices:
#       print('IP ' + str(ip) + ' MAC ' + str(mac) + ' VENDOR ' + str(vendor))

    ## do html output
    print('Writing HTML output..')
    htmlOut = open(htmlOutFile, mode='w')
    htmlOut.write('<div id="networkDevices">')
    htmlOut.write('Last updated ' + str(datetime.datetime.fromtimestamp(time.time()).strftime('%a, %b %d %Y at %H:%M:%S')))
    htmlOut.write('<table id="networkDeviceList"><tr><th>Hostname</th><th>MAC address</th><th>Speed (TX/RX)</th><th>Connection method</th><th>DHCP lease expires</th></tr>')
    for ip, mac, vendor in foundDevices:
        wifiInfo = getClientInfo(wifiClients, mac)        
        htmlOut.write('<tr><td>' + getHost(ip) + '<div class="ndl-smalltext2">' + str(ip) + '</div></td><td>' + str(mac) + '<div class="ndl-smalltext1">' + str(vendor) + '</td>')
        if wifiInfo != None:
            if wifiInfo.radio_proto == 'NA':
                wifiInfo.radio_proto = 'N'
            elif wifiInfo.radio_proto == 'NG':
                wifiInfo.radio_proto = 'G'

            if wifiInfo.signal < -90:
                signalQuality = 'Unusable'
            elif wifiInfo.signal < -80:
                signalQuality = 'Poor'
            elif wifiInfo.signal < -70:
                signalQuality = 'Average'
            elif wifiInfo.signal < -67:
                signalQuality = 'Good'
            elif wifiInfo.signal < -30:
                signalQuality = 'Excellent'
            else:
                signalQuality = 'Unknown'

            try:
                apInfo = '<div class="ndl-smalltext2">Connected to ' + unifiNames[wifiInfo.ap_mac] + '</div>'
            except:
                apInfo = ''

            htmlOut.write('<td>' + str(int(wifiInfo.tx_rate/1000)) + 'mbps/' + str(int(wifiInfo.rx_rate/1000)) + 'mbps</td><td>Wireless-' + wifiInfo.radio_proto + '<div class="ndl-smalltext2">' + str(wifiInfo.signal) + 'dBm (' + signalQuality + ')</div>' +  apInfo + '</td>')
        else:
            port = str(findPortForMac(portMacs, mac))
            poeText = ''
            if port == 'None':
                port = 'Unknown'
                speed = 'Unknown'
            elif port == '0':
                port = 'Infrastructure'
                speed = 'N/A'
            else:
                speed = str(int(portSpeeds[port]/1000000)) + 'mbps/' + str(int(portSpeeds[port]/1000000)) + 'mbps'
                if portPower[port] != None:
                    poeText = '<div class="ndl-smalltext2">PoE device - class ' + str(portPower[port]) + '</div>'

                port = 'Switch port ' + port

            htmlOut.write('<td>' + speed + '</td><td>' + str(port) + poeText + '</td>')
        htmlOut.write('<td>' + getLeaseExpiry(ip) + '</td></tr>')

    htmlOut.write('</table>')
    htmlOut.write(makePowerGraph(switchAddress, switch_oid_poe_usage))
    htmlOut.write('</div>')
    htmlOut.close()
    print('Waiting five minutes before scanning again')
    time.sleep(300)


print('Exiting!')

Also available on Github