喜讯!TCMS 官网正式上线!一站式提供企业级定制研发、App 小程序开发、AI 与区块链等全栈软件服务,助力多行业数智转型,欢迎致电:13888011868  QQ 932256355 洽谈合作!

基于Python的网络监控系统

2025-10-30 10分钟阅读时长

基于Python的网络监控系统实战

Python-Based-Network-Monitoring-System
 

以下是基于Python的网络监控系统完整实现,包含核心功能模块和定时任务机制:

import os
import psutil
import socket
import smtplib
from email.mime.text import MIMEText
from apscheduler.schedulers.blocking import BlockingScheduler
import platform
import time

# ================== 配置参数 ==================
MONITORED_IPS = ["192.168.1.1", "192.168.1.100"]  # 待监控设备IP
ALERT_THRESHOLD = {
   "upload": 1024 * 1024 * 5,  # 上传流量阈值(5MB/分钟)
   "download": 1024 * 1024 * 10  # 下载流量阈值(10MB/分钟)
}
EMAIL_CONFIG = {
   "sender": "your_sender_email@example.com",
   "receiver": "your_receiver_email@example.com",
   "smtp_server": "smtp.example.com",
   "smtp_port": 587,
   "password": "your_email_password"
}

# ================== 设备状态监测 ==================
def ping_device(ip):
   """
  跨平台Ping检测
  """
   param = "-n 1" if platform.system().lower() == "windows" else "-c 1"
   command = f"ping {param} {ip}"
   return os.system(command) == 0

# ================== 流量统计(带增量计算) ==================
class TrafficMonitor:
   """
  流量监控类,计算实时上传下载流量
  """
   def __init__(self):
       self.prev_upload = 0
       self.prev_download = 0

   def get_traffic(self):
       net_io = psutil.net_io_counters()
       current_upload = net_io.bytes_sent
       current_download = net_io.bytes_recv
       traffic = {
           "upload": current_upload - self.prev_upload,
           "download": current_download - self.prev_download
      }
       self.prev_upload, self.prev_download = current_upload, current_download
       return traffic

# ================== 连接监控 ==================
def monitor_connections():
   """
  监控TCP/UDP连接
  """
   connections = []
   for conn in socket.net_connections():
       if conn.type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
           local = f"{conn.laddr[0]}:{conn.laddr[1]}"
           remote = f"{conn.raddr[0]}:{conn.raddr[1]}" if conn.raddr else "N/A"
           connections.append({
               "type": "TCP" if conn.type == socket.SOCK_STREAM else "UDP",
               "local": local,
               "remote": remote
          })
   return connections

# ================== 故障告警 ==================
def send_alert_email(subject, message):
   """
  邮件告警
  """
   msg = MIMEText(message)
   msg["Subject"] = subject
   msg["From"] = EMAIL_CONFIG["sender"]
   msg["To"] = EMAIL_CONFIG["receiver"]

   try:
       with smtplib.SMTP(EMAIL_CONFIG["smtp_server"], EMAIL_CONFIG["smtp_port"]) as server:
           server.starttls()
           server.login(EMAIL_CONFIG["sender"], EMAIL_CONFIG["password"])
           server.sendmail(EMAIL_CONFIG["sender"], [EMAIL_CONFIG["receiver"]], msg.as_string())
       print("告警邮件发送成功")
   except Exception as e:
       print(f"邮件发送失败: {str(e)}")

# ================== 核心监控逻辑 ==================
traffic_monitor = TrafficMonitor()

def device_status_check():
   """
  设备状态检查任务
  """
   offline_devices = [ip for ip in MONITORED_IPS if not ping_device(ip)]
   if offline_devices:
       message = f"设备离线告警:{', '.join(offline_devices)} 无法连接"
       send_alert_email("【设备离线】网络监控告警", message)
       print(f"[警告] 设备离线: {offline_devices}")

def traffic_statistics():
   """
  流量统计与阈值检查
  """
   traffic = traffic_monitor.get_traffic()
   upload_mb = traffic["upload"] / (1024 * 1024)
   download_mb = traffic["download"] / (1024 * 1024)
   
   alert = []
   if upload_mb > ALERT_THRESHOLD["upload"]:
       alert.append(f"上传流量异常: {upload_mb:.2f}MB (阈值: {ALERT_THRESHOLD['upload']/1024/1024:.2f}MB)")
   if download_mb > ALERT_THRESHOLD["download"]:
       alert.append(f"下载流量异常: {download_mb:.2f}MB (阈值: {ALERT_THRESHOLD['download']/1024/1024:.2f}MB)")
   
   if alert:
       send_alert_email("【流量异常】网络监控告警", "\n".join(alert))
       print(f"[警告] 流量异常: {', '.join(alert)}")
   else:
       print(f"流量正常 - 上传: {upload_mb:.2f}MB, 下载: {download_mb:.2f}MB")

def connection_monitor():
   """
  连接监控任务
  """
   connections = monitor_connections()
   if len(connections) > 100:  # 示例:连接数超过100触发告警
       message = f"异常连接数告警:当前连接数 {len(connections)} 超过阈值"
       send_alert_email("【连接异常】网络监控告警", message)
       print(f"[警告] 连接数异常: {len(connections)}")
   else:
       print(f"当前连接数: {len(connections)}")

# ================== 系统整合与定时任务 ==================
if __name__ == "__main__":
   # 初始化流量监控初始值
   traffic_monitor.get_traffic()  # 第一次调用获取基准值
   
   scheduler = BlockingScheduler()
   scheduler.add_job(func=device_status_check, trigger="interval", minutes=1, name="设备状态检查")
   scheduler.add_job(func=traffic_statistics, trigger="interval", minutes=5, name="流量统计")
   scheduler.add_job(func=connection_monitor, trigger="interval", minutes=10, name="连接监控")
   
   print("网络监控系统启动...")
   try:
       scheduler.start()
   except KeyboardInterrupt:
       print("监控系统停止")

使用说明:

  1. 配置参数修改

    • MONITORED_IPS :添加需要监控的设备 IP 列表

    • ALERT_THRESHOLD :根据需求调整流量告警阈值(单位:字节)

    • EMAIL_CONFIG :替换为你的邮件服务器配置(推荐使用应用专用密码)

  2. 依赖安装

    pip install psutil apscheduler
  3. 功能扩展建议

    • 数据存储 :添加 SQLite/MySQL 存储历史数据(参考 sqlite3 库)

    import sqlite3
    def save_to_db(data):
       conn = sqlite3.connect('network_monitor.db')
       cursor = conn.cursor()
       cursor.execute('INSERT INTO traffic (time, upload, download) VALUES (?, ?, ?)', data)
       conn.commit()
       conn.close()
    • 多设备支持 :为每个设备单独设置监控参数和告警阈值

    • 增强告警:

      • 添加短信告警

      • 支持钉钉 / 企业微信机器人通知

    • 可视化界面 :使用 Flask/Django 开发 Web 监控面板

    • 协议解析 :结合 Scapy 库实现更深入的数据包分析

  4. 运行方式

    python network_monitor.py

系统特点:

  1. 跨平台支持 :自动适配 Windows/Linux 系统 Ping 命令

  2. 多层告警 :设备离线、流量异常、连接数异常三级告警机制

  3. 可配置化 :通过参数配置灵活调整监控对象和阈值

  4. 定时任务 :使用 APScheduler 实现灵活的任务调度

注意事项:

  1. 邮件告警需要开启 SMTP 服务

  2. 流量统计精度取决于定时任务间隔,建议 5-10 分钟一次

  3. 大规模网络环境下建议使用多线程 / 异步处理提高性能

  4. 生产环境中建议添加日志记录功能(使用 logging 库)

这个实现提供了基础网络监控功能,可根据实际需求进一步扩展和优化,例如增加更多监控指标、完善数据展示、增强告警可靠性等。

新闻通讯图片
主图标
新闻通讯

订阅我们的新闻通讯

在下方输入邮箱地址后,点击订阅按钮即可完成订阅,同时代表您同意我们的条款与条件。

启用 Cookie,可让您在本网站获得更流畅的使用体验 Cookie政策