喜讯!TCMS 官网正式上线!一站式提供企业级定制研发、App 小程序开发、AI 与区块链等全栈软件服务,助力多行业数智转型,欢迎致电:13888011868 QQ 932256355 洽谈合作!
基于Python的网络监控系统实战

import os
import psutil
import socket
import smtplib
from email.mime.text import MIMEText
from apscheduler.schedulers.blocking import BlockingScheduler
import platform
import time
# ================== 配置参数 ==================
MONITORED_IPS = ["192.168.1.1", "192.168.1.100"] # 待监控设备IP
ALERT_THRESHOLD = {
"upload": 1024 * 1024 * 5, # 上传流量阈值(5MB/分钟)
"download": 1024 * 1024 * 10 # 下载流量阈值(10MB/分钟)
}
EMAIL_CONFIG = {
"sender": "your_sender_email@example.com",
"receiver": "your_receiver_email@example.com",
"smtp_server": "smtp.example.com",
"smtp_port": 587,
"password": "your_email_password"
}
# ================== 设备状态监测 ==================
def ping_device(ip):
"""
跨平台Ping检测
"""
param = "-n 1" if platform.system().lower() == "windows" else "-c 1"
command = f"ping {param} {ip}"
return os.system(command) == 0
# ================== 流量统计(带增量计算) ==================
class TrafficMonitor:
"""
流量监控类,计算实时上传下载流量
"""
def __init__(self):
self.prev_upload = 0
self.prev_download = 0
def get_traffic(self):
net_io = psutil.net_io_counters()
current_upload = net_io.bytes_sent
current_download = net_io.bytes_recv
traffic = {
"upload": current_upload - self.prev_upload,
"download": current_download - self.prev_download
}
self.prev_upload, self.prev_download = current_upload, current_download
return traffic
# ================== 连接监控 ==================
def monitor_connections():
"""
监控TCP/UDP连接
"""
connections = []
for conn in socket.net_connections():
if conn.type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
local = f"{conn.laddr[0]}:{conn.laddr[1]}"
remote = f"{conn.raddr[0]}:{conn.raddr[1]}" if conn.raddr else "N/A"
connections.append({
"type": "TCP" if conn.type == socket.SOCK_STREAM else "UDP",
"local": local,
"remote": remote
})
return connections
# ================== 故障告警 ==================
def send_alert_email(subject, message):
"""
邮件告警
"""
msg = MIMEText(message)
msg["Subject"] = subject
msg["From"] = EMAIL_CONFIG["sender"]
msg["To"] = EMAIL_CONFIG["receiver"]
try:
with smtplib.SMTP(EMAIL_CONFIG["smtp_server"], EMAIL_CONFIG["smtp_port"]) as server:
server.starttls()
server.login(EMAIL_CONFIG["sender"], EMAIL_CONFIG["password"])
server.sendmail(EMAIL_CONFIG["sender"], [EMAIL_CONFIG["receiver"]], msg.as_string())
print("告警邮件发送成功")
except Exception as e:
print(f"邮件发送失败: {str(e)}")
# ================== 核心监控逻辑 ==================
traffic_monitor = TrafficMonitor()
def device_status_check():
"""
设备状态检查任务
"""
offline_devices = [ip for ip in MONITORED_IPS if not ping_device(ip)]
if offline_devices:
message = f"设备离线告警:{', '.join(offline_devices)} 无法连接"
send_alert_email("【设备离线】网络监控告警", message)
print(f"[警告] 设备离线: {offline_devices}")
def traffic_statistics():
"""
流量统计与阈值检查
"""
traffic = traffic_monitor.get_traffic()
upload_mb = traffic["upload"] / (1024 * 1024)
download_mb = traffic["download"] / (1024 * 1024)
alert = []
if upload_mb > ALERT_THRESHOLD["upload"]:
alert.append(f"上传流量异常: {upload_mb:.2f}MB (阈值: {ALERT_THRESHOLD['upload']/1024/1024:.2f}MB)")
if download_mb > ALERT_THRESHOLD["download"]:
alert.append(f"下载流量异常: {download_mb:.2f}MB (阈值: {ALERT_THRESHOLD['download']/1024/1024:.2f}MB)")
if alert:
send_alert_email("【流量异常】网络监控告警", "\n".join(alert))
print(f"[警告] 流量异常: {', '.join(alert)}")
else:
print(f"流量正常 - 上传: {upload_mb:.2f}MB, 下载: {download_mb:.2f}MB")
def connection_monitor():
"""
连接监控任务
"""
connections = monitor_connections()
if len(connections) > 100: # 示例:连接数超过100触发告警
message = f"异常连接数告警:当前连接数 {len(connections)} 超过阈值"
send_alert_email("【连接异常】网络监控告警", message)
print(f"[警告] 连接数异常: {len(connections)}")
else:
print(f"当前连接数: {len(connections)}")
# ================== 系统整合与定时任务 ==================
if __name__ == "__main__":
# 初始化流量监控初始值
traffic_monitor.get_traffic() # 第一次调用获取基准值
scheduler = BlockingScheduler()
scheduler.add_job(func=device_status_check, trigger="interval", minutes=1, name="设备状态检查")
scheduler.add_job(func=traffic_statistics, trigger="interval", minutes=5, name="流量统计")
scheduler.add_job(func=connection_monitor, trigger="interval", minutes=10, name="连接监控")
print("网络监控系统启动...")
try:
scheduler.start()
except KeyboardInterrupt:
print("监控系统停止")配置参数修改 :
MONITORED_IPS :添加需要监控的设备 IP 列表
ALERT_THRESHOLD :根据需求调整流量告警阈值(单位:字节)
EMAIL_CONFIG :替换为你的邮件服务器配置(推荐使用应用专用密码)
依赖安装 :
pip install psutil apscheduler功能扩展建议 :
数据存储 :添加 SQLite/MySQL 存储历史数据(参考 sqlite3 库)
import sqlite3
def save_to_db(data):
conn = sqlite3.connect('network_monitor.db')
cursor = conn.cursor()
cursor.execute('INSERT INTO traffic (time, upload, download) VALUES (?, ?, ?)', data)
conn.commit()
conn.close()多设备支持 :为每个设备单独设置监控参数和告警阈值
增强告警:
添加短信告警
支持钉钉 / 企业微信机器人通知
可视化界面 :使用 Flask/Django 开发 Web 监控面板
协议解析 :结合 Scapy 库实现更深入的数据包分析
运行方式 :
python network_monitor.py跨平台支持 :自动适配 Windows/Linux 系统 Ping 命令
多层告警 :设备离线、流量异常、连接数异常三级告警机制
可配置化 :通过参数配置灵活调整监控对象和阈值
定时任务 :使用 APScheduler 实现灵活的任务调度
邮件告警需要开启 SMTP 服务
流量统计精度取决于定时任务间隔,建议 5-10 分钟一次
大规模网络环境下建议使用多线程 / 异步处理提高性能
生产环境中建议添加日志记录功能(使用 logging 库)