喜讯!TCMS 官网正式上线!一站式提供企业级定制研发、App 小程序开发、AI 与区块链等全栈软件服务,助力多行业数智转型,欢迎致电:13888011868 QQ 932256355 洽谈合作!
本文围绕 Python 打造公司网络监控系统展开,先阐述 Python 在网络监控领域的优势,如低门槛、丰富库支撑、跨平台兼容等;接着明确公司网络监控的核心需求,涵盖设备状态、网络流量、连接与端口、故障告警四大维度;随后分模块详细讲解各功能的技术选型、代码实现与优化,包括用`ping3`和多线程实现设备检测、`psutil`统计流量、`socket`监控端口连接,以及企业微信机器人告警;最后介绍通过`APScheduler`整合定时任务,完成从脚本到可用系统的搭建,为企业提供了一套轻量化、高性价比的网络监控解决方案。

在选择网络监控工具时,企业常面临“开源工具功能冗余”“商业软件成本过高”“定制化难度大”等问题。而Python的特性恰好能解决这些痛点,成为中小微企业及大型企业轻量化监控场景的首选:
低门槛高效率 :Python语法接近自然语言,开发周期仅为Java、C++的1/3,即使是非专职运维开发的工程师,也能快速上手编写监控脚本。
丰富的专业库支撑 :无需重复造轮子—— ping3 实现设备连通性检测, psutil 抓取网络流量, socket 监控端口与连接, smtplib / wxpy 实现告警推送, matplotlib / pandas 做数据可视化,覆盖监控全流程。
跨平台兼容性 :一套代码可同时运行在Windows Server(办公区交换机监控)、Linux(机房服务器监控)、macOS(运维本地调试),无需针对不同系统单独适配。
轻量化资源占用 :相比Zabbix、Nagios等开源监控平台,Python脚本内存占用通常低于100MB,不会给被监控设备(如边缘路由器、小型交换机)增加额外负载。
在动手开发前,需先明确企业网络的核心监控目标——避免“为了监控而监控”。结合多数公司的运维场景,核心需求可归纳为三类:
| 监控维度 | 核心目标 | 典型场景 |
|---|---|---|
| 设备状态 | 确保核心网络设备(路由器、交换机、服务器)在线且正常运行 | 机房路由器离线导致全公司断网;核心服务器宕机引发业务系统不可用 |
| 网络流量 | 实时统计带宽使用情况,识别异常流量(如P2P下载、恶意攻击) | 员工大量下载占用带宽,导致客户访问公司官网卡顿;异常上传流量可能是数据泄露 |
| 连接与端口 | 监控关键端口(如80/443 web端口、3306数据库端口)状态,识别非法连接 | 数据库端口被外部IP频繁扫描;未经授权的设备接入内部局域网 |
| 故障告警 | 网络异常时第一时间通知运维人员,缩短故障排查时间 | 凌晨2点交换机故障,若等到上班后发现,已造成3小时业务中断 |
下面从“设备监测→流量统计→连接监控→故障告警”四个核心模块,结合实际代码与场景,拆解系统实现过程。
批量检测公司内网设备(如路由器192.168.1.1、服务器192.168.1.100、打印机192.168.1.200)的在线状态,若离线则标记故障并记录时间。
使用 ping3 库(比系统自带 ping 命令更易集成到Python脚本,支持超时设置、TTL配置),搭配多线程实现批量设备快速检测(避免单线程依次检测耗时过长)。
import ping3
import threading
from datetime import datetime
# 1. 定义待监控设备列表(可从配置文件/数据库读取,避免硬编码)
devices = [
{"name": "核心路由器", "ip": "192.168.1.1"},
{"name": "Web服务器", "ip": "192.168.1.100"},
{"name": "财务打印机", "ip": "192.168.1.200"}
]
# 2. 单设备检测函数
def check_device_status(device):
name = device["name"]
ip = device["ip"]
# 设置超时时间为2秒(避免等待过久)
response_time = ping3.ping(ip, timeout=2)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if response_time is not None:
status = "在线"
print(f"[{current_time}] {name}({ip}) - {status},响应时间:{response_time:.2f}ms")
else:
status = "离线"
print(f"[{current_time}] {name}({ip}) - {status}!!!")
# 此处可直接调用告警函数(下文模块4),触发离线告警
send_alert(f"设备离线告警", f"{name}({ip})于{current_time}检测到离线,请及时排查")
# 3. 多线程批量检测(提高效率,支持同时检测10+设备)
def batch_check_devices():
threads = []
for device in devices:
t = threading.Thread(target=check_device_status, args=(device,))
threads.append(t)
t.start()
# 等待所有线程执行完毕
for t in threads:
t.join()
# 测试执行
if __name__ == "__main__":
batch_check_devices()避免“误报” :若单次Ping失败可能是网络波动,可增加“连续3次检测失败才判定离线”的逻辑;
配置化管理 :将设备列表写入 devices.json 文件,后续新增设备无需修改代码,示例:
[{"name": "核心路由器", "ip": "192.168.1.1"}, {"name": "Web服务器", "ip": "192.168.1.100"}]代码中通过 json.load(open("devices.json")) 读取即可。
实时统计指定网络接口(如服务器的 eth0 、办公区路由器的 wan 口)的上传/下载速度,当流量超过阈值(如下载速度持续5分钟>10MB/s)时触发告警。
使用 psutil 库(跨平台获取系统资源信息,支持Windows的“以太网”、Linux的“eth0”接口流量统计)。
import psutil
import time
from datetime import datetime
# 1. 获取指定接口的实时流量(单位:字节)
def get_interface_traffic(interface="eth0"):
net_io = psutil.net_io_counters(pernic=True).get(interface)
if not net_io:
raise ValueError(f"未找到网络接口:{interface},请检查接口名称")
# 返回上传字节数、下载字节数
return net_io.bytes_sent, net_io.bytes_recv
# 2. 计算流量速度(单位:MB/s)
def calculate_speed(interface, interval=1):
# 获取1秒前的流量
sent_before, recv_before = get_interface_traffic(interface)
time.sleep(interval)
# 获取当前流量
sent_after, recv_after = get_interface_traffic(interface)
# 计算速度(1MB = 1024*1024 字节)
upload_speed = (sent_after - sent_before) / (1024 * 1024)
download_speed = (recv_after - recv_before) / (1024 * 1024)
return round(upload_speed, 2), round(download_speed, 2)
# 3. 持续监控流量并判断阈值
def monitor_traffic(interface="eth0", max_download=10):
while True:
upload, download = calculate_speed(interface)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] {interface} - 上传:{upload}MB/s,下载:{download}MB/s")
# 若下载速度超过阈值,触发告警
if download > max_download:
alert_msg = f"流量超限告警\n接口:{interface}\n时间:{current_time}\n下载速度:{download}MB/s(阈值:{max_download}MB/s)"
send_alert("流量超限", alert_msg)
# 每5秒监控一次(可根据需求调整)
time.sleep(5)
# 测试执行(Linux示例,Windows需改为接口名称如"以太网")
if __name__ == "__main__":
monitor_traffic(interface="eth0", max_download=10)对办公区网络,可设置“上班时间下载速度>5MB/s告警”,避免员工用公司带宽下载视频;
对Web服务器,可监控“wan口上传速度>2MB/s告警”,排查是否有异常数据上传(如数据泄露)。
监控关键端口(如数据库3306、远程桌面3389)是否正常开放,若端口关闭则告警;
统计指定端口的连接数(如Web服务器80端口的并发连接),识别异常连接(如单IP频繁连接可能是攻击)。
socket 库:检测端口是否开放、建立TCP连接;
psutil.net_connections() :获取所有网络连接信息,筛选指定端口的连接。
import socket
import psutil
from datetime import datetime
# 1. 检测指定IP:端口是否开放
def check_port(ip, port, timeout=2):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((ip, port))
sock.close()
return True # 端口开放
except:
return False # 端口关闭
# 2. 监控关键端口状态
def monitor_ports(port_list):
while True:
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for ip, port in port_list:
is_open = check_port(ip, port)
status = "开放" if is_open else "关闭"
print(f"[{current_time}] {ip}:{port} - {status}")
if not is_open:
alert_msg = f"端口异常告警\nIP:端口:{ip}:{port}\n时间:{current_time}\n状态:{status}(预期:开放)"
send_alert("端口异常", alert_msg)
time.sleep(10) # 每10秒检测一次
# 3. 统计指定端口的连接数(如80端口)
def count_port_connections(port):
connections = psutil.net_connections(kind="tcp")
port_connections = [conn for conn in connections if conn.laddr.port == port and conn.status == "ESTABLISHED"]
# 返回连接数、连接的IP列表
ip_list = [conn.raddr.ip for conn in port_connections if conn.raddr]
return len(port_connections), ip_list
# 4. 监控端口连接数,识别异常IP
def monitor_port_connections(port=80, max_conn=100, max_ip_conn=20):
while True:
conn_count, ip_list = count_port_connections(port)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] 端口{port} - 并发连接数:{conn_count}")
# 若连接数超过阈值,告警
if conn_count > max_conn:
alert_msg = f"端口连接数超限告警\n端口:{port}\n时间:{current_time}\n连接数:{conn_count}(阈值:{max_conn})"
send_alert("端口连接超限", alert_msg)
# 统计单IP连接数,若超过阈值(如单IP连20次),可能是攻击
ip_conn_count = {}
for ip in ip_list:
ip_conn_count[ip] = ip_conn_count.get(ip, 0) + 1
for ip, count in ip_conn_count.items():
if count > max_ip_conn:
alert_msg = f"异常IP连接告警\n端口:{port}\nIP:{ip}\n时间:{current_time}\n连接数:{count}(阈值:{max_ip_conn})"
send_alert("异常IP连接", alert_msg)
time.sleep(5)
# 测试执行(监控数据库3306端口、Web服务器80端口)
if __name__ == "__main__":
# 监控关键端口状态
critical_ports = [("192.168.1.100", 3306), ("192.168.1.101", 80)]
monitor_ports(critical_ports)
# 监控80端口连接数(单独线程执行,避免阻塞)
# import threading
# threading.Thread(target=monitor_port_connections, args=(80, 100, 20)).start()监控系统的核心价值之一是“异常时及时通知”,若只是记录日志而不告警,等于“守着火灾报警器却不响”。常见的告警方式包括邮件、企业微信/钉钉、短信(需对接第三方API),下面以“企业微信告警”为例实现(门槛低、免费、适合团队协作)。
import requests
import json
# 企业微信机器人WebHook(需在企业微信创建机器人,获取链接)
WECHAT_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人key"
def send_alert(title, content):
"""发送企业微信告警"""
# 构造告警消息(支持Markdown格式,更易读)
msg = {
"msgtype": "markdown",
"markdown": {
"content": f"### {title}\n{content}\n\n**告警时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
}
try:
response = requests.post(WECHAT_WEBHOOK, data=json.dumps(msg), headers={"Content-Type": "application/json"})
if response.json().get("errcode") == 0:
print(f"告警发送成功:{title}")
else:
print(f"告警发送失败:{response.text}")
except Exception as e:
print(f"告警发送异常:{str(e)}")分级告警 :重要故障(如核心服务器宕机)同时发邮件+企业微信+短信;一般故障(如打印机离线)只发企业微信;
避免“告警轰炸” :同一故障(如设备离线)设置“10分钟内只告警1次”,避免频繁通知打扰运维。
单个模块的脚本只能解决局部问题,需通过“定时任务+数据存储+可视化”整合为完整系统,并进行性能优化。
使用 APScheduler 库实现定时任务(比 time.sleep() 更灵活,支持 cron 表达式、间隔执行、指定时间执行)。
from apscheduler.schedulers.blocking import BlockingScheduler
# 1. 定义所有监控任务
def run_all_monitors():
# 任务1:每1分钟检测一次设备状态
scheduler.add_job(batch_check_devices, "interval", minutes=1, id="device_check")
# 任务2:每5秒监控一次流量
scheduler.add_job(monitor_traffic, "interval", seconds=5, id="traffic_monitor", args=("eth0", 10))
# 任务3:每10秒监控一次关键端口
critical_ports = [("192.168.1.100", 3306), ("192.168.1.101", 80)]
scheduler.add_job(monitor_ports, "interval", seconds=10, id="port_check", args=(critical_ports,))
# 任务4:每5秒监控80端口连接数
scheduler.add_job(monitor_port_connections, "interval", seconds=5, id="conn_monitor", args=(80, 100, 20))
# 2. 启动调度器
if __name__ == "__main__":
scheduler = BlockingScheduler()
run_all_monitors()
print("监控系统已启动,持续运行中...")
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit