数字化生产管理系统设计

发布于:2025-08-04 ⋅ 阅读:(15) ⋅ 点赞:(0)

下面提供一个基于Python的数字化管理系统框架,使用现代技术栈实现。这个系统将包含设备监控、故障管理、装配配套表生成、生产计划管理等功能。

系统架构

数字化生产管理系统
├── 设备监控模块
├── 故障管理模块
├── 产品装配管理模块
├── 生产计划管理模块
├── 数据集成接口
└── 报表导出模块

 

技术栈

  • 后端: Python + FastAPI (RESTful API)

  • 前端: Vue.js (可选) 或直接提供API供上层系统调用

  • 数据库: PostgreSQL (关系型) + Redis (缓存)

  • 任务队列: Celery (异步任务处理)

  • 文件处理: openpyxl (Excel操作)

  • 监控: Prometheus + Grafana (可选)

核心代码实现

1. 主应用框架 (main.py)

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import devices, faults, products, production

app = FastAPI(title="数字化生产管理系统", version="1.0.0")

# 允许跨域
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 包含各模块路由
app.include_router(devices.router)
app.include_router(faults.router)
app.include_router(products.router)
app.include_router(production.router)

@app.get("/")
async def root():
    return {"message": "数字化生产管理系统 API"}

2. 设备监控模块 (devices.py)

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
from datetime import datetime
import psycopg2
import redis

router = APIRouter(prefix="/devices", tags=["设备管理"])

# Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 数据库连接配置
DB_CONFIG = {
    "host": "localhost",
    "database": "production_db",
    "user": "postgres",
    "password": "password"
}

class Device(BaseModel):
    id: str
    name: str
    status: str
    last_heartbeat: datetime
    ip_address: str

@router.get("/", response_model=List[Device])
async def get_online_devices():
    """获取所有在线设备状态"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM devices WHERE status = 'online'")
    devices = cursor.fetchall()
    cursor.close()
    conn.close()
    return [Device(id=d[0], name=d[1], status=d[2], last_heartbeat=d[3], ip_address=d[4]) for d in devices]

@router.get("/{device_id}/status")
async def get_device_status(device_id: str):
    """获取特定设备状态"""
    # 先从Redis缓存获取
    status = r.get(f"device:{device_id}:status")
    if status:
        return {"device_id": device_id, "status": status.decode()}
    
    # 缓存没有则查数据库
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute("SELECT status FROM devices WHERE id = %s", (device_id,))
    result = cursor.fetchone()
    cursor.close()
    conn.close()
    
    if not result:
        raise HTTPException(status_code=404, detail="Device not found")
    
    # 更新缓存
    r.setex(f"device:{device_id}:status", 30, result[0])
    return {"device_id": device_id, "status": result[0]}

3. 故障管理模块 (faults.py)

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
from datetime import datetime
import psycopg2

router = APIRouter(prefix="/faults", tags=["故障管理"])

class Fault(BaseModel):
    id: int
    device_id: str
    fault_code: str
    description: str
    timestamp: datetime
    resolved: bool

class FaultCreate(BaseModel):
    device_id: str
    fault_code: str
    description: str

@router.post("/", response_model=Fault)
async def report_fault(fault: FaultCreate):
    """报告设备故障"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute(
        "INSERT INTO faults (device_id, fault_code, description, timestamp, resolved) "
        "VALUES (%s, %s, %s, %s, %s) RETURNING id",
        (fault.device_id, fault.fault_code, fault.description, datetime.now(), False)
    )
    fault_id = cursor.fetchone()[0]
    conn.commit()
    
    # 更新设备状态为故障
    cursor.execute(
        "UPDATE devices SET status = 'fault' WHERE id = %s",
        (fault.device_id,)
    )
    conn.commit()
    
    cursor.close()
    conn.close()
    
    return {
        "id": fault_id,
        **fault.dict(),
        "timestamp": datetime.now(),
        "resolved": False
    }

@router.get("/unresolved", response_model=List[Fault])
async def get_unresolved_faults():
    """获取未解决的故障列表"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM faults WHERE resolved = FALSE")
    faults = cursor.fetchall()
    cursor.close()
    conn.close()
    return [Fault(id=f[0], device_id=f[1], fault_code=f[2], description=f[3], timestamp=f[4], resolved=f[5]) for f in faults]

4. 产品装配管理模块 (products.py)

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
import psycopg2
from openpyxl import Workbook
from io import BytesIO
from fastapi.responses import StreamingResponse

router = APIRouter(prefix="/products", tags=["产品装配管理"])

class Component(BaseModel):
    code: str
    name: str
    quantity: int

class ProductAssembly(BaseModel):
    product_id: str
    product_name: str
    components: List[Component]

@router.get("/{product_id}/assembly", response_model=ProductAssembly)
async def get_product_assembly(product_id: str):
    """获取产品装配配套表"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    # 获取产品信息
    cursor.execute("SELECT name FROM products WHERE id = %s", (product_id,))
    product = cursor.fetchone()
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    
    # 获取零部件列表
    cursor.execute(
        "SELECT c.code, c.name, pc.quantity "
        "FROM product_components pc "
        "JOIN components c ON pc.component_id = c.id "
        "WHERE pc.product_id = %s", (product_id,)
    )
    components = cursor.fetchall()
    cursor.close()
    conn.close()
    
    return ProductAssembly(
        product_id=product_id,
        product_name=product[0],
        components=[Component(code=c[0], name=c[1], quantity=c[2]) for c in components]
    )

@router.get("/{product_id}/assembly/excel")
async def export_product_assembly_excel(product_id: str):
    """导出产品装配配套表为Excel"""
    assembly = await get_product_assembly(product_id)
    
    # 创建Excel文件
    wb = Workbook()
    ws = wb.active
    ws.title = "装配配套表"
    
    # 写入标题
    ws.append(["产品编号", assembly.product_id])
    ws.append(["产品名称", assembly.product_name])
    ws.append([])
    ws.append(["零部件代号", "零部件名称", "数量"])
    
    # 写入零部件数据
    for comp in assembly.components:
        ws.append([comp.code, comp.name, comp.quantity])
    
    # 保存到内存
    excel_file = BytesIO()
    wb.save(excel_file)
    excel_file.seek(0)
    
    # 返回文件下载
    return StreamingResponse(
        excel_file,
        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        headers={
            "Content-Disposition": f"attachment; filename=assembly_{product_id}.xlsx"
        }
    )

5. 生产计划管理模块 (production.py)

from fastapi import APIRouter, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import List
from datetime import date
import psycopg2
import openpyxl
import csv
import io

router = APIRouter(prefix="/production", tags=["生产计划管理"])

class MasterPlan(BaseModel):
    id: str
    product_id: str
    quantity: int
    start_date: date
    end_date: date
    priority: int

class WorkshopPlan(BaseModel):
    id: str
    master_plan_id: str
    workshop_id: str
    product_id: str
    quantity: int
    planned_start: date
    planned_end: date
    actual_start: date = None
    actual_end: date = None
    status: str = "pending"  # pending, in_progress, completed

@router.post("/generate-workshop-plans")
async def generate_workshop_plans(master_plan_id: str):
    """根据主生产计划生成车间级生产计划"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    # 获取主生产计划
    cursor.execute("SELECT * FROM master_production_plans WHERE id = %s", (master_plan_id,))
    master_plan = cursor.fetchone()
    if not master_plan:
        raise HTTPException(status_code=404, detail="Master plan not found")
    
    # 这里应该有更复杂的逻辑来分配车间任务
    # 简化为直接为每个车间创建相同的任务
    cursor.execute("SELECT id FROM workshops")
    workshops = cursor.fetchall()
    
    plans = []
    for workshop in workshops:
        cursor.execute(
            "INSERT INTO workshop_plans "
            "(master_plan_id, workshop_id, product_id, quantity, planned_start, planned_end, status) "
            "VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id",
            (
                master_plan_id,
                workshop[0],
                master_plan[1],  # product_id
                master_plan[2],  # quantity
                master_plan[3],  # start_date
                master_plan[4],  # end_date
                "pending"
            )
        )
        plan_id = cursor.fetchone()[0]
        plans.append(plan_id)
    
    conn.commit()
    cursor.close()
    conn.close()
    
    return {"message": f"Generated {len(plans)} workshop plans", "plan_ids": plans}

@router.post("/import-workshop-plans/excel")
async def import_workshop_plans_excel(file: UploadFile = File(...)):
    """通过Excel批量导入车间生产计划"""
    contents = await file.read()
    wb = openpyxl.load_workbook(io.BytesIO(contents))
    ws = wb.active
    
    plans = []
    for row in ws.iter_rows(min_row=2, values_only=True):  # 假设第一行是标题
        if not row[0]:  # 跳过空行
            continue
        
        plan = {
            "master_plan_id": row[0],
            "workshop_id": row[1],
            "product_id": row[2],
            "quantity": row[3],
            "planned_start": row[4],
            "planned_end": row[5]
        }
        plans.append(plan)
    
    # 保存到数据库
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    for plan in plans:
        cursor.execute(
            "INSERT INTO workshop_plans "
            "(master_plan_id, workshop_id, product_id, quantity, planned_start, planned_end, status) "
            "VALUES (%s, %s, %s, %s, %s, %s, %s)",
            (
                plan["master_plan_id"],
                plan["workshop_id"],
                plan["product_id"],
                plan["quantity"],
                plan["planned_start"],
                plan["planned_end"],
                "pending"
            )
        )
    
    conn.commit()
    cursor.close()
    conn.close()
    
    return {"message": f"Imported {len(plans)} workshop plans successfully"}

@router.get("/workshop-plans/{workshop_id}", response_model=List[WorkshopPlan])
async def get_workshop_plans(workshop_id: str):
    """获取车间生产计划及执行情况"""
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute(
        "SELECT id, master_plan_id, workshop_id, product_id, quantity, "
        "planned_start, planned_end, actual_start, actual_end, status "
        "FROM workshop_plans WHERE workshop_id = %s",
        (workshop_id,)
    )
    plans = cursor.fetchall()
    cursor.close()
    conn.close()
    
    return [WorkshopPlan(
        id=p[0],
        master_plan_id=p[1],
        workshop_id=p[2],
        product_id=p[3],
        quantity=p[4],
        planned_start=p[5],
        planned_end=p[6],
        actual_start=p[7],
        actual_end=p[8],
        status=p[9]
    ) for p in plans]

6. 数据库模型 (SQL)

-- 设备表
CREATE TABLE devices (
    id VARCHAR(50) PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    status VARCHAR(20) NOT NULL CHECK (status IN ('online', 'offline', 'fault', 'maintenance')),
    last_heartbeat TIMESTAMP,
    ip_address VARCHAR(50)
);

-- 故障表
CREATE TABLE faults (
    id SERIAL PRIMARY KEY,
    device_id VARCHAR(50) REFERENCES devices(id),
    fault_code VARCHAR(50) NOT NULL,
    description TEXT,
    timestamp TIMESTAMP NOT NULL,
    resolved BOOLEAN NOT NULL DEFAULT FALSE
);

-- 产品表
CREATE TABLE products (
    id VARCHAR(50) PRIMARY KEY,
    name VARCHAR(100) NOT NULL
);

-- 零部件表
CREATE TABLE components (
    id VARCHAR(50) PRIMARY KEY,
    code VARCHAR(50) NOT NULL,
    name VARCHAR(100) NOT NULL
);

-- 产品-零部件关联表
CREATE TABLE product_components (
    product_id VARCHAR(50) REFERENCES products(id),
    component_id VARCHAR(50) REFERENCES components(id),
    quantity INTEGER NOT NULL,
    PRIMARY KEY (product_id, component_id)
);

-- 主生产计划表
CREATE TABLE master_production_plans (
    id VARCHAR(50) PRIMARY KEY,
    product_id VARCHAR(50) REFERENCES products(id),
    quantity INTEGER NOT NULL,
    start_date DATE NOT NULL,
    end_date DATE NOT NULL,
    priority INTEGER NOT NULL
);

-- 车间表
CREATE TABLE workshops (
    id VARCHAR(50) PRIMARY KEY,
    name VARCHAR(100) NOT NULL
);

-- 车间生产计划表
CREATE TABLE workshop_plans (
    id VARCHAR(50) PRIMARY KEY,
    master_plan_id VARCHAR(50) REFERENCES master_production_plans(id),
    workshop_id VARCHAR(50) REFERENCES workshops(id),
    product_id VARCHAR(50) REFERENCES products(id),
    quantity INTEGER NOT NULL,
    planned_start DATE NOT NULL,
    planned_end DATE NOT NULL,
    actual_start DATE,
    actual_end DATE,
    status VARCHAR(20) NOT NULL CHECK (status IN ('pending', 'in_progress', 'completed', 'delayed'))
);


网站公告

今日签到

点亮在社区的每一天
去签到