Python教程:使用psutil和rich库实现一个终端资源监控小工具(附完整代码)

发布于:2024-04-30 ⋅ 阅读:(26) ⋅ 点赞:(0)

在本文中,我们将介绍如何使用 Python 的 psutil 和 rich 库来实现一个终端资源监控小工具。psutil 是一个跨平台的 Python 库,用于获取关于系统进程和系统利用率的信息,而 rich 是一个 Python 库,用于在终端中创建美观的输出。

1.psutil 库介绍

psutil(Python System and Process Utilities)是一个强大的跨平台库,用于获取关于系统进程和系统利用率的信息,例如 CPU、内存、磁盘、网络等。它提供了简单易用的接口,可以方便地监控和管理系统资源。

在本项目中,我们主要使用 psutil 来获取关于指定进程的详细信息,例如进程的用户名、创建时间、线程数、CPU 占比、内存占比等。

2.rich 库介绍

rich 是一个 Python 库,用于在终端中创建美观的输出。它提供了丰富的样式和布局选项,使得在终端中输出信息更加清晰和易读。

在本项目中,我们将使用 rich 库来创建一个美观的表格,显示获取到的进程信息。

3.设计思路

  1. 导入所需的库:我们首先需要导入 psutil 和 rich 库,以及一些其他必要的模块。

  2. 编写函数获取目标进程:编写一个函数,通过进程名称获取指定进程的详细信息。

  3. 编写函数显示信息:编写一个函数,使用 rich 库创建一个表格,并显示获取到的进程信息。

  4. 主程序逻辑:在主程序中,设置要监控的进程名称和更新时间间隔,并调用上述函数实现监控功能。

4.实现原理

  1. 使用 psutil 库的 psutil.Process() 方法获取指定进程的详细信息,例如用户名、创建时间、线程数、CPU 占比、内存占比等。

  2. 使用 rich 库的 Table 类创建一个表格,使用 add_column() 方法添加列,使用 add_row() 方法添加行,并使用 console.log() 方法在终端中输出表格。

  3. 在主程序中,设置一个循环,每隔一定时间获取一次指定进程的信息,并使用 rich 库在终端中显示。

5.工具整体功能介绍

  1. 进程信息获取功能:

    • 使用 psutil.pids() 获取当前系统所有进程的 PID 列表。
    • 遍历每个 PID,通过 psutil.Process(pid) 获取进程对象 p
    • 判断进程对象 p 的名称是否包含指定的进程名 procName,以确定目标进程。
  2. 进程信息展示功能:

    • 使用 rich 库中的 Table 类创建一个表格,用于展示进程的详细信息。
    • 将进程的关键信息(如用户名、创建时间、线程数、CPU 占比、内存占比等)添加到表格中的不同行中。
    • 使用 console.log() 方法将表格和其他相关信息以美观的格式输出到终端。
  3. 循环监控功能:

    • 在 showMsg 函数中使用 while True 循环,持续监控目标进程的状态。
    • 每次循环中,重新获取目标进程的信息并更新展示在终端上的表格。
    • 使用 time.sleep(timeInterval) 控制每次更新的时间间隔,避免过度频繁地查询和展示信息。
  4. 异常处理:

    • 使用 try 和 except 块捕获 psutil.Process(pid) 可能抛出的异常,确保程序在获取进程信息时不会意外中断。
  5. 主程序逻辑:

    • 在主程序中,通过读取配置文件(例如 Config.py)获取要监控的进程名称和更新时间间隔。
    • 初始化 rich 库中的 Console 对象,用于在终端上输出信息。
    • 调用 showMsg 函数,传入要监控的进程名称和时间间隔,实现进程信息的持续监控和展示。

6.完整代码及配置如下

monitor.py

# -*- coding: gbk -*-
'''
Created on 2022年11月24日

@author: DanMo
@status: done
'''
import time
import psutil
from rich.console import Console
from rich.table import Column, Table
from rich.progress import track
from Config import config

def getTargetProc(procName):
    pidList = psutil.pids()
    for pid in track(pidList, description="获取本机所有进程..."):
        try:
            p = psutil.Process(pid)
        except:
            continue
        if procName in p.name():
            return p

def showMsg(procName, timeInterval):
    while True:
        table = Table(show_header=True, header_style="bold magenta")
        table.add_column("项目", style='dim')
        table.add_column("详情", style='dim')
        p = getTargetProc(procName)
        table.add_row('用户', f'{p.username()}')
        table.add_row('进程创建时间', f'{p.create_time()}')
        table.add_row('线程数', f'{p.num_threads()}')
        table.add_row('父进程', f'{p.parent()}')
        table.add_row('当前进程状态', f'{p.status()}')
        table.add_row('被调用的命令行', f'{p.cmdline()}')
        table.add_row('exe', f'{p.exe()}')
        table.add_row('io', f'{p.io_counters()}')
        table.add_row('CPU占比', f'{p.cpu_percent()}')
        table.add_row('内存占比', f'{p.memory_percent()}')
        table.add_row('当前进程打开的句柄数', f'{p.num_handles()}')
        console.log('进程信息总览:\n', p.__dict__, style='dim')
        console.log('当前进程创建的所有线程:\n', p.threads(), style='dim')
        console.log('打开的文件:\n', p.open_files(), style='dim')
        console.log('子进程:\n', p.children(True), style='dim')
        console.log(
            '进程所有连接信息[IPv4、IPv6、TCP、TCP over IPv4、TCP over IPv6、UDP、UDP over IPv4、UDP over IPv6、UNIX socket (both UDP and TCP protocols)]:\n',
            p.connections('all'), style='dim')
        if config.get_conf('envPath'):
            console.log('环境变量:\n', p.environ(), style='dim')
        console.log('内存信息:\n', p.memory_full_info(), style='dim')
        if config.get_conf('memorymap'):
            console.log('内存映射:\n', p.memory_maps(), style='dim')
        console.log('其他关于进程的信息:\n', style='dim')
        console.log(table)
        time.sleep(timeInterval)


if __name__ == '__main__':
    console = Console()
    procName = config.get_conf('processname')
    timeInterval = config.get_conf('timeinterval')
    showMsg(procName, timeInterval)

Config.py

# -*- coding: gbk -*-
'''
Created on 2022年11月24日

@author: DanMo
@status: done
'''
import os
import json
import threading
import configparser

workpath = os.path.dirname(os.path.abspath(__file__))
configFile = os.path.join(workpath, 'config.txt')
_lock = threading.Lock()

class CConfig(object):
    '''
    实现对配置的操作
    本类中,配置文件为config.txt
    对外部提供的结构为:
        set_conf
        get_conf
        clean_conf
        cleanall_conf
    '''

    def __init__(self, filename='config.txt'):
        self.filename = os.path.abspath(filename)
        self.config = configparser.ConfigParser(allow_no_value=True)
        # TODO:xiehs config逻辑,可以采取预先读取所有的方式,进行性能提高

    def set_conf(self, option, value=None, section='Monitor'):
        try:
            _lock.acquire()
            if not os.path.exists(self.filename):
                tmpf = open(self.filename, 'w')
                tmpf.close()
            if not self._is_exist(self.filename):
                raise Exception('CONFIG_File_Not_File')
            self.config.read(self.filename)
            if not self.config.has_section(section):
                self.config.add_section(section)
            self.config.set(section, option, json.dumps(value))
            with open(self.filename, 'w') as configfile:
                self.config.write(configfile)
        finally:
            _lock.release()

    def get_conf(self, option, section='Monitor'):
        if not os.path.exists(self.filename):
            return None
        if not self._is_exist(self.filename):
            raise Exception('CONFIG_File_Not_File')
        self.config.read(self.filename)
        if not self.config.has_section(section) or not self.config.has_option(section, option):
            return None
        val = self.config.get(section, option)
        val = json.loads(val)
        return val

    def _is_exist(self, path):
        return os.path.isfile(path)


config = CConfig(configFile)

config.txt

[Monitor]
processname = "YourAppName"
timeinterval = 4
memorymap = false
envPath = false


网站公告

今日签到

点亮在社区的每一天
去签到