喜讯!TCMS 官网正式上线!一站式提供企业级定制研发、App 小程序开发、AI 与区块链等全栈软件服务,助力多行业数智转型,欢迎致电:13888011868  QQ 932256355 洽谈合作!

Python打造公司网络“守护者”:高效监控系统全解析

2025-10-30 18分钟阅读时长

本文围绕 Python 打造公司网络监控系统展开,先阐述 Python 在网络监控领域的优势,如低门槛、丰富库支撑、跨平台兼容等;接着明确公司网络监控的核心需求,涵盖设备状态、网络流量、连接与端口、故障告警四大维度;随后分模块详细讲解各功能的技术选型、代码实现与优化,包括用`ping3`和多线程实现设备检测、`psutil`统计流量、`socket`监控端口连接,以及企业微信机器人告警;最后介绍通过`APScheduler`整合定时任务,完成从脚本到可用系统的搭建,为企业提供了一套轻量化、高性价比的网络监控解决方案。

Python打造公司网络“守护者”:高效监控系统全解析
 

在数字化时代,公司网络如同“血管”般支撑着业务运转——从员工日常办公沟通,到客户数据传输、核心业务系统运行,都依赖稳定的网络环境。一旦网络中断、设备故障或出现异常流量,轻则导致工作效率下降,重则引发业务停摆、数据泄露等重大风险。而Python凭借其语法简洁、库生态丰富、跨平台兼容的特性,成为打造轻量化、高性价比网络监控系统的理想工具。本文将从系统设计思路、核心功能实现、代码落地到优化迭代,全方位解析如何用Python构建贴合企业需求的网络“守护者”。

一、Python做网络监控:为何是“最优解”之一?

在选择网络监控工具时,企业常面临“开源工具功能冗余”“商业软件成本过高”“定制化难度大”等问题。而Python的特性恰好能解决这些痛点,成为中小微企业及大型企业轻量化监控场景的首选:

  1. 低门槛高效率 :Python语法接近自然语言,开发周期仅为Java、C++的1/3,即使是非专职运维开发的工程师,也能快速上手编写监控脚本。

  2. 丰富的专业库支撑 :无需重复造轮子—— ping3 实现设备连通性检测, psutil 抓取网络流量, socket 监控端口与连接, smtplib / wxpy 实现告警推送, matplotlib / pandas 做数据可视化,覆盖监控全流程。

  3. 跨平台兼容性 :一套代码可同时运行在Windows Server(办公区交换机监控)、Linux(机房服务器监控)、macOS(运维本地调试),无需针对不同系统单独适配。

  4. 轻量化资源占用 :相比Zabbix、Nagios等开源监控平台,Python脚本内存占用通常低于100MB,不会给被监控设备(如边缘路由器、小型交换机)增加额外负载。

二、公司网络监控的核心需求:到底要“防什么”?

在动手开发前,需先明确企业网络的核心监控目标——避免“为了监控而监控”。结合多数公司的运维场景,核心需求可归纳为三类:

监控维度核心目标典型场景
设备状态确保核心网络设备(路由器、交换机、服务器)在线且正常运行机房路由器离线导致全公司断网;核心服务器宕机引发业务系统不可用
网络流量实时统计带宽使用情况,识别异常流量(如P2P下载、恶意攻击)员工大量下载占用带宽,导致客户访问公司官网卡顿;异常上传流量可能是数据泄露
连接与端口监控关键端口(如80/443 web端口、3306数据库端口)状态,识别非法连接数据库端口被外部IP频繁扫描;未经授权的设备接入内部局域网
故障告警网络异常时第一时间通知运维人员,缩短故障排查时间凌晨2点交换机故障,若等到上班后发现,已造成3小时业务中断

三、分模块实现:Python监控系统的“五脏六腑”

下面从“设备监测→流量统计→连接监控→故障告警”四个核心模块,结合实际代码与场景,拆解系统实现过程。

模块1:设备状态监测——Ping通与否只是“基础操作”

核心需求

批量检测公司内网设备(如路由器192.168.1.1、服务器192.168.1.100、打印机192.168.1.200)的在线状态,若离线则标记故障并记录时间。

技术选型

使用 ping3 库(比系统自带 ping 命令更易集成到Python脚本,支持超时设置、TTL配置),搭配多线程实现批量设备快速检测(避免单线程依次检测耗时过长)。

代码实现与解析

import ping3
import threading
from datetime import datetime

# 1. 定义待监控设备列表(可从配置文件/数据库读取,避免硬编码)
devices = [
  {"name": "核心路由器", "ip": "192.168.1.1"},
  {"name": "Web服务器", "ip": "192.168.1.100"},
  {"name": "财务打印机", "ip": "192.168.1.200"}
]

# 2. 单设备检测函数
def check_device_status(device):
   name = device["name"]
   ip = device["ip"]
   # 设置超时时间为2秒(避免等待过久)
   response_time = ping3.ping(ip, timeout=2)
   current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
   
   if response_time is not None:
       status = "在线"
       print(f"[{current_time}] {name}({ip}) - {status},响应时间:{response_time:.2f}ms")
   else:
       status = "离线"
       print(f"[{current_time}] {name}({ip}) - {status}!!!")
       # 此处可直接调用告警函数(下文模块4),触发离线告警
       send_alert(f"设备离线告警", f"{name}({ip})于{current_time}检测到离线,请及时排查")

# 3. 多线程批量检测(提高效率,支持同时检测10+设备)
def batch_check_devices():
   threads = []
   for device in devices:
       t = threading.Thread(target=check_device_status, args=(device,))
       threads.append(t)
       t.start()
   # 等待所有线程执行完毕
   for t in threads:
       t.join()

# 测试执行
if __name__ == "__main__":
   batch_check_devices()

优化点

  • 避免“误报” :若单次Ping失败可能是网络波动,可增加“连续3次检测失败才判定离线”的逻辑;

  • 配置化管理 :将设备列表写入 devices.json 文件,后续新增设备无需修改代码,示例:

    [{"name": "核心路由器", "ip": "192.168.1.1"}, {"name": "Web服务器", "ip": "192.168.1.100"}]

    代码中通过 json.load(open("devices.json")) 读取即可。

模块2:流量统计——识别“带宽小偷”与异常流量

核心需求

实时统计指定网络接口(如服务器的 eth0 、办公区路由器的 wan 口)的上传/下载速度,当流量超过阈值(如下载速度持续5分钟>10MB/s)时触发告警。

技术选型

使用 psutil 库(跨平台获取系统资源信息,支持Windows的“以太网”、Linux的“eth0”接口流量统计)。

代码实现与解析

import psutil
import time
from datetime import datetime

# 1. 获取指定接口的实时流量(单位:字节)
def get_interface_traffic(interface="eth0"):
   net_io = psutil.net_io_counters(pernic=True).get(interface)
   if not net_io:
       raise ValueError(f"未找到网络接口:{interface},请检查接口名称")
   # 返回上传字节数、下载字节数
   return net_io.bytes_sent, net_io.bytes_recv

# 2. 计算流量速度(单位:MB/s)
def calculate_speed(interface, interval=1):
   # 获取1秒前的流量
   sent_before, recv_before = get_interface_traffic(interface)
   time.sleep(interval)
   # 获取当前流量
   sent_after, recv_after = get_interface_traffic(interface)
   # 计算速度(1MB = 1024*1024 字节)
   upload_speed = (sent_after - sent_before) / (1024 * 1024)
   download_speed = (recv_after - recv_before) / (1024 * 1024)
   return round(upload_speed, 2), round(download_speed, 2)

# 3. 持续监控流量并判断阈值
def monitor_traffic(interface="eth0", max_download=10):
   while True:
       upload, download = calculate_speed(interface)
       current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
       print(f"[{current_time}] {interface} - 上传:{upload}MB/s,下载:{download}MB/s")
       
       # 若下载速度超过阈值,触发告警
       if download > max_download:
           alert_msg = f"流量超限告警\n接口:{interface}\n时间:{current_time}\n下载速度:{download}MB/s(阈值:{max_download}MB/s)"
           send_alert("流量超限", alert_msg)
       
       # 每5秒监控一次(可根据需求调整)
       time.sleep(5)

# 测试执行(Linux示例,Windows需改为接口名称如"以太网")
if __name__ == "__main__":
   monitor_traffic(interface="eth0", max_download=10)

实用场景

  • 对办公区网络,可设置“上班时间下载速度>5MB/s告警”,避免员工用公司带宽下载视频;

  • 对Web服务器,可监控“wan口上传速度>2MB/s告警”,排查是否有异常数据上传(如数据泄露)。

模块3:连接与端口监控——堵住“网络后门”

核心需求

  • 监控关键端口(如数据库3306、远程桌面3389)是否正常开放,若端口关闭则告警;

  • 统计指定端口的连接数(如Web服务器80端口的并发连接),识别异常连接(如单IP频繁连接可能是攻击)。

技术选型

  • socket 库:检测端口是否开放、建立TCP连接;

  • psutil.net_connections() :获取所有网络连接信息,筛选指定端口的连接。

代码实现与解析

import socket
import psutil
from datetime import datetime

# 1. 检测指定IP:端口是否开放
def check_port(ip, port, timeout=2):
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   sock.settimeout(timeout)
   try:
       sock.connect((ip, port))
       sock.close()
       return True  # 端口开放
   except:
       return False  # 端口关闭

# 2. 监控关键端口状态
def monitor_ports(port_list):
   while True:
       current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
       for ip, port in port_list:
           is_open = check_port(ip, port)
           status = "开放" if is_open else "关闭"
           print(f"[{current_time}] {ip}:{port} - {status}")
           if not is_open:
               alert_msg = f"端口异常告警\nIP:端口:{ip}:{port}\n时间:{current_time}\n状态:{status}(预期:开放)"
               send_alert("端口异常", alert_msg)
       time.sleep(10)  # 每10秒检测一次

# 3. 统计指定端口的连接数(如80端口)
def count_port_connections(port):
   connections = psutil.net_connections(kind="tcp")
   port_connections = [conn for conn in connections if conn.laddr.port == port and conn.status == "ESTABLISHED"]
   # 返回连接数、连接的IP列表
   ip_list = [conn.raddr.ip for conn in port_connections if conn.raddr]
   return len(port_connections), ip_list

# 4. 监控端口连接数,识别异常IP
def monitor_port_connections(port=80, max_conn=100, max_ip_conn=20):
   while True:
       conn_count, ip_list = count_port_connections(port)
       current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
       print(f"[{current_time}] 端口{port} - 并发连接数:{conn_count}")
       
       # 若连接数超过阈值,告警
       if conn_count > max_conn:
           alert_msg = f"端口连接数超限告警\n端口:{port}\n时间:{current_time}\n连接数:{conn_count}(阈值:{max_conn})"
           send_alert("端口连接超限", alert_msg)
       
       # 统计单IP连接数,若超过阈值(如单IP连20次),可能是攻击
       ip_conn_count = {}
       for ip in ip_list:
           ip_conn_count[ip] = ip_conn_count.get(ip, 0) + 1
       for ip, count in ip_conn_count.items():
           if count > max_ip_conn:
               alert_msg = f"异常IP连接告警\n端口:{port}\nIP:{ip}\n时间:{current_time}\n连接数:{count}(阈值:{max_ip_conn})"
               send_alert("异常IP连接", alert_msg)
       
       time.sleep(5)

# 测试执行(监控数据库3306端口、Web服务器80端口)
if __name__ == "__main__":
   # 监控关键端口状态
   critical_ports = [("192.168.1.100", 3306), ("192.168.1.101", 80)]
   monitor_ports(critical_ports)
   
   # 监控80端口连接数(单独线程执行,避免阻塞)
   # import threading
   # threading.Thread(target=monitor_port_connections, args=(80, 100, 20)).start()

模块4:故障告警——让运维“第一时间知情”

监控系统的核心价值之一是“异常时及时通知”,若只是记录日志而不告警,等于“守着火灾报警器却不响”。常见的告警方式包括邮件、企业微信/钉钉、短信(需对接第三方API),下面以“企业微信告警”为例实现(门槛低、免费、适合团队协作)。

代码实现(企业微信机器人告警)

import requests
import json

# 企业微信机器人WebHook(需在企业微信创建机器人,获取链接)
WECHAT_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人key"

def send_alert(title, content):
   """发送企业微信告警"""
   # 构造告警消息(支持Markdown格式,更易读)
   msg = {
       "msgtype": "markdown",
       "markdown": {
           "content": f"### {title}\n{content}\n\n**告警时间**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
      }
  }
   try:
       response = requests.post(WECHAT_WEBHOOK, data=json.dumps(msg), headers={"Content-Type": "application/json"})
       if response.json().get("errcode") == 0:
           print(f"告警发送成功:{title}")
       else:
           print(f"告警发送失败:{response.text}")
   except Exception as e:
       print(f"告警发送异常:{str(e)}")

告警优化建议

  • 分级告警 :重要故障(如核心服务器宕机)同时发邮件+企业微信+短信;一般故障(如打印机离线)只发企业微信;

  • 避免“告警轰炸” :同一故障(如设备离线)设置“10分钟内只告警1次”,避免频繁通知打扰运维。

四、系统整合与优化:从“脚本”到“可用系统”

单个模块的脚本只能解决局部问题,需通过“定时任务+数据存储+可视化”整合为完整系统,并进行性能优化。

1. 定时任务:让监控“自动化运行”

使用 APScheduler 库实现定时任务(比 time.sleep() 更灵活,支持 cron 表达式、间隔执行、指定时间执行)。

from apscheduler.schedulers.blocking import BlockingScheduler

# 1. 定义所有监控任务
def run_all_monitors():
   # 任务1:每1分钟检测一次设备状态
   scheduler.add_job(batch_check_devices, "interval", minutes=1, id="device_check")
   
   # 任务2:每5秒监控一次流量
   scheduler.add_job(monitor_traffic, "interval", seconds=5, id="traffic_monitor", args=("eth0", 10))
   
   # 任务3:每10秒监控一次关键端口
   critical_ports = [("192.168.1.100", 3306), ("192.168.1.101", 80)]
   scheduler.add_job(monitor_ports, "interval", seconds=10, id="port_check", args=(critical_ports,))
   
   # 任务4:每5秒监控80端口连接数
   scheduler.add_job(monitor_port_connections, "interval", seconds=5, id="conn_monitor", args=(80, 100, 20))

# 2. 启动调度器
if __name__ == "__main__":
   scheduler = BlockingScheduler()
   run_all_monitors()
   print("监控系统已启动,持续运行中...")
   try:
       scheduler.start()
   except (KeyboardInterrupt, SystemExit


 

新闻通讯图片
主图标
新闻通讯

订阅我们的新闻通讯

在下方输入邮箱地址后,点击订阅按钮即可完成订阅,同时代表您同意我们的条款与条件。

启用 Cookie,可让您在本网站获得更流畅的使用体验 Cookie政策