下面提供一个基于Python的数字化管理系统框架,使用现代技术栈实现。这个系统将包含设备监控、故障管理、装配配套表生成、生产计划管理等功能。
系统架构
数字化生产管理系统
├── 设备监控模块
├── 故障管理模块
├── 产品装配管理模块
├── 生产计划管理模块
├── 数据集成接口
└── 报表导出模块
技术栈
后端: Python + FastAPI (RESTful API)
前端: Vue.js (可选) 或直接提供API供上层系统调用
数据库: PostgreSQL (关系型) + Redis (缓存)
任务队列: Celery (异步任务处理)
文件处理: openpyxl (Excel操作)
监控: Prometheus + Grafana (可选)
核心代码实现
1. 主应用框架 (main.py)
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import devices, faults, products, production
app = FastAPI(title="数字化生产管理系统", version="1.0.0")
# 允许跨域
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 包含各模块路由
app.include_router(devices.router)
app.include_router(faults.router)
app.include_router(products.router)
app.include_router(production.router)
@app.get("/")
async def root():
return {"message": "数字化生产管理系统 API"}
2. 设备监控模块 (devices.py)
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
from datetime import datetime
import psycopg2
import redis
router = APIRouter(prefix="/devices", tags=["设备管理"])
# Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)
# 数据库连接配置
DB_CONFIG = {
"host": "localhost",
"database": "production_db",
"user": "postgres",
"password": "password"
}
class Device(BaseModel):
id: str
name: str
status: str
last_heartbeat: datetime
ip_address: str
@router.get("/", response_model=List[Device])
async def get_online_devices():
"""获取所有在线设备状态"""
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute("SELECT * FROM devices WHERE status = 'online'")
devices = cursor.fetchall()
cursor.close()
conn.close()
return [Device(id=d[0], name=d[1], status=d[2], last_heartbeat=d[3], ip_address=d[4]) for d in devices]
@router.get("/{device_id}/status")
async def get_device_status(device_id: str):
"""获取特定设备状态"""
# 先从Redis缓存获取
status = r.get(f"device:{device_id}:status")
if status:
return {"device_id": device_id, "status": status.decode()}
# 缓存没有则查数据库
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute("SELECT status FROM devices WHERE id = %s", (device_id,))
result = cursor.fetchone()
cursor.close()
conn.close()
if not result:
raise HTTPException(status_code=404, detail="Device not found")
# 更新缓存
r.setex(f"device:{device_id}:status", 30, result[0])
return {"device_id": device_id, "status": result[0]}
3. 故障管理模块 (faults.py)
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
from datetime import datetime
import psycopg2
router = APIRouter(prefix="/faults", tags=["故障管理"])
class Fault(BaseModel):
id: int
device_id: str
fault_code: str
description: str
timestamp: datetime
resolved: bool
class FaultCreate(BaseModel):
device_id: str
fault_code: str
description: str
@router.post("/", response_model=Fault)
async def report_fault(fault: FaultCreate):
"""报告设备故障"""
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO faults (device_id, fault_code, description, timestamp, resolved) "
"VALUES (%s, %s, %s, %s, %s) RETURNING id",
(fault.device_id, fault.fault_code, fault.description, datetime.now(), False)
)
fault_id = cursor.fetchone()[0]
conn.commit()
# 更新设备状态为故障
cursor.execute(
"UPDATE devices SET status = 'fault' WHERE id = %s",
(fault.device_id,)
)
conn.commit()
cursor.close()
conn.close()
return {
"id": fault_id,
**fault.dict(),
"timestamp": datetime.now(),
"resolved": False
}
@router.get("/unresolved", response_model=List[Fault])
async def get_unresolved_faults():
"""获取未解决的故障列表"""
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute("SELECT * FROM faults WHERE resolved = FALSE")
faults = cursor.fetchall()
cursor.close()
conn.close()
return [Fault(id=f[0], device_id=f[1], fault_code=f[2], description=f[3], timestamp=f[4], resolved=f[5]) for f in faults]
4. 产品装配管理模块 (products.py)
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
import psycopg2
from openpyxl import Workbook
from io import BytesIO
from fastapi.responses import StreamingResponse
router = APIRouter(prefix="/products", tags=["产品装配管理"])
class Component(BaseModel):
code: str
name: str
quantity: int
class ProductAssembly(BaseModel):
product_id: str
product_name: str
components: List[Component]
@router.get("/{product_id}/assembly", response_model=ProductAssembly)
async def get_product_assembly(product_id: str):
"""获取产品装配配套表"""
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
# 获取产品信息
cursor.execute("SELECT name FROM products WHERE id = %s", (product_id,))
product = cursor.fetchone()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# 获取零部件列表
cursor.execute(
"SELECT c.code, c.name, pc.quantity "
"FROM product_components pc "
"JOIN components c ON pc.component_id = c.id "
"WHERE pc.product_id = %s", (product_id,)
)
components = cursor.fetchall()
cursor.close()
conn.close()
return ProductAssembly(
product_id=product_id,
product_name=product[0],
components=[Component(code=c[0], name=c[1], quantity=c[2]) for c in components]
)
@router.get("/{product_id}/assembly/excel")
async def export_product_assembly_excel(product_id: str):
"""导出产品装配配套表为Excel"""
assembly = await get_product_assembly(product_id)
# 创建Excel文件
wb = Workbook()
ws = wb.active
ws.title = "装配配套表"
# 写入标题
ws.append(["产品编号", assembly.product_id])
ws.append(["产品名称", assembly.product_name])
ws.append([])
ws.append(["零部件代号", "零部件名称", "数量"])
# 写入零部件数据
for comp in assembly.components:
ws.append([comp.code, comp.name, comp.quantity])
# 保存到内存
excel_file = BytesIO()
wb.save(excel_file)
excel_file.seek(0)
# 返回文件下载
return StreamingResponse(
excel_file,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename=assembly_{product_id}.xlsx"
}
)
5. 生产计划管理模块 (production.py)
from fastapi import APIRouter, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import List
from datetime import date
import psycopg2
import openpyxl
import csv
import io
router = APIRouter(prefix="/production", tags=["生产计划管理"])
class MasterPlan(BaseModel):
id: str
product_id: str
quantity: int
start_date: date
end_date: date
priority: int
class WorkshopPlan(BaseModel):
id: str
master_plan_id: str
workshop_id: str
product_id: str
quantity: int
planned_start: date
planned_end: date
actual_start: date = None
actual_end: date = None
status: str = "pending" # pending, in_progress, completed
@router.post("/generate-workshop-plans")
async def generate_workshop_plans(master_plan_id: str):
"""根据主生产计划生成车间级生产计划"""
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
# 获取主生产计划
cursor.execute("SELECT * FROM master_production_plans WHERE id = %s", (master_plan_id,))
master_plan = cursor.fetchone()
if not master_plan:
raise HTTPException(status_code=404, detail="Master plan not found")
# 这里应该有更复杂的逻辑来分配车间任务
# 简化为直接为每个车间创建相同的任务
cursor.execute("SELECT id FROM workshops")
workshops = cursor.fetchall()
plans = []
for workshop in workshops:
cursor.execute(
"INSERT INTO workshop_plans "
"(master_plan_id, workshop_id, product_id, quantity, planned_start, planned_end, status) "
"VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id",
(
master_plan_id,
workshop[0],
master_plan[1], # product_id
master_plan[2], # quantity
master_plan[3], # start_date
master_plan[4], # end_date
"pending"
)
)
plan_id = cursor.fetchone()[0]
plans.append(plan_id)
conn.commit()
cursor.close()
conn.close()
return {"message": f"Generated {len(plans)} workshop plans", "plan_ids": plans}
@router.post("/import-workshop-plans/excel")
async def import_workshop_plans_excel(file: UploadFile = File(...)):
"""通过Excel批量导入车间生产计划"""
contents = await file.read()
wb = openpyxl.load_workbook(io.BytesIO(contents))
ws = wb.active
plans = []
for row in ws.iter_rows(min_row=2, values_only=True): # 假设第一行是标题
if not row[0]: # 跳过空行
continue
plan = {
"master_plan_id": row[0],
"workshop_id": row[1],
"product_id": row[2],
"quantity": row[3],
"planned_start": row[4],
"planned_end": row[5]
}
plans.append(plan)
# 保存到数据库
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
for plan in plans:
cursor.execute(
"INSERT INTO workshop_plans "
"(master_plan_id, workshop_id, product_id, quantity, planned_start, planned_end, status) "
"VALUES (%s, %s, %s, %s, %s, %s, %s)",
(
plan["master_plan_id"],
plan["workshop_id"],
plan["product_id"],
plan["quantity"],
plan["planned_start"],
plan["planned_end"],
"pending"
)
)
conn.commit()
cursor.close()
conn.close()
return {"message": f"Imported {len(plans)} workshop plans successfully"}
@router.get("/workshop-plans/{workshop_id}", response_model=List[WorkshopPlan])
async def get_workshop_plans(workshop_id: str):
"""获取车间生产计划及执行情况"""
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute(
"SELECT id, master_plan_id, workshop_id, product_id, quantity, "
"planned_start, planned_end, actual_start, actual_end, status "
"FROM workshop_plans WHERE workshop_id = %s",
(workshop_id,)
)
plans = cursor.fetchall()
cursor.close()
conn.close()
return [WorkshopPlan(
id=p[0],
master_plan_id=p[1],
workshop_id=p[2],
product_id=p[3],
quantity=p[4],
planned_start=p[5],
planned_end=p[6],
actual_start=p[7],
actual_end=p[8],
status=p[9]
) for p in plans]
6. 数据库模型 (SQL)
-- 设备表
CREATE TABLE devices (
id VARCHAR(50) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL CHECK (status IN ('online', 'offline', 'fault', 'maintenance')),
last_heartbeat TIMESTAMP,
ip_address VARCHAR(50)
);
-- 故障表
CREATE TABLE faults (
id SERIAL PRIMARY KEY,
device_id VARCHAR(50) REFERENCES devices(id),
fault_code VARCHAR(50) NOT NULL,
description TEXT,
timestamp TIMESTAMP NOT NULL,
resolved BOOLEAN NOT NULL DEFAULT FALSE
);
-- 产品表
CREATE TABLE products (
id VARCHAR(50) PRIMARY KEY,
name VARCHAR(100) NOT NULL
);
-- 零部件表
CREATE TABLE components (
id VARCHAR(50) PRIMARY KEY,
code VARCHAR(50) NOT NULL,
name VARCHAR(100) NOT NULL
);
-- 产品-零部件关联表
CREATE TABLE product_components (
product_id VARCHAR(50) REFERENCES products(id),
component_id VARCHAR(50) REFERENCES components(id),
quantity INTEGER NOT NULL,
PRIMARY KEY (product_id, component_id)
);
-- 主生产计划表
CREATE TABLE master_production_plans (
id VARCHAR(50) PRIMARY KEY,
product_id VARCHAR(50) REFERENCES products(id),
quantity INTEGER NOT NULL,
start_date DATE NOT NULL,
end_date DATE NOT NULL,
priority INTEGER NOT NULL
);
-- 车间表
CREATE TABLE workshops (
id VARCHAR(50) PRIMARY KEY,
name VARCHAR(100) NOT NULL
);
-- 车间生产计划表
CREATE TABLE workshop_plans (
id VARCHAR(50) PRIMARY KEY,
master_plan_id VARCHAR(50) REFERENCES master_production_plans(id),
workshop_id VARCHAR(50) REFERENCES workshops(id),
product_id VARCHAR(50) REFERENCES products(id),
quantity INTEGER NOT NULL,
planned_start DATE NOT NULL,
planned_end DATE NOT NULL,
actual_start DATE,
actual_end DATE,
status VARCHAR(20) NOT NULL CHECK (status IN ('pending', 'in_progress', 'completed', 'delayed'))
);