基于 Rust 和土木工程、设备故障诊断、混凝土养护、GPS追踪、供应链物流跟踪系统、地下水监测等领域的实例

发布于:2025-08-01 ⋅ 阅读:(21) ⋅ 点赞:(0)

基于 Rust 和 RabbitMQ 在土木工程领域的实例

以下是基于 Rust 和 RabbitMQ 在土木工程领域的实用案例,涵盖数据采集、监控、任务调度等场景。案例结合 lapin(Rust 的 RabbitMQ 客户端库)和实际工程需求设计。


传感器数据实时采集

使用 RabbitMQ 传输施工现场的传感器数据(如温度、湿度、振动)。Rust 消费者将数据写入时序数据库。

use lapin::{Connection, ConnectionProperties, options::*, types::FieldTable};

async fn consume_sensor_data() -> Result<(), Box<dyn std::error::Error>> {
    let conn = Connection::connect("amqp://localhost:5672", ConnectionProperties::default()).await?;
    let channel = conn.create_channel().await?;
    let _queue = channel.queue_declare("sensor_data", QueueDeclareOptions::default(), FieldTable::default()).await?;
    let consumer = channel.basic_consume("sensor_data", "rust_consumer", BasicConsumeOptions::default(), FieldTable::default()).await?;

    for delivery in consumer {
        if let Ok(delivery) = delivery {
            println!("Received: {:?}", String::from_utf8_lossy(&delivery.data));
            channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).await?;
        }
    }
    Ok(())
}

结构健康监测告警

通过 RabbitMQ 发布桥梁或建筑物的异常振动数据,Rust 服务分析后触发告警。

// 发布告警消息示例
async fn publish_alert(message: &str) -> Result<(), lapin::Error> {
    let conn = Connection::connect("amqp://localhost:5672", ConnectionProperties::default()).await?;
    let channel = conn.create_channel().await?;
    channel.basic_publish("", "alerts", BasicPublishOptions::default(), message.as_bytes(), BasicProperties::default()).await?;
    Ok(())
}

分布式任务调度

协调多个施工机器人协同作业,RabbitMQ 分配任务(如混凝土浇筑区域),Rust 实现任务分配逻辑。

// 任务分配生产者
async fn assign_task(robot_id: &str, task: &str) -> Result<(), lapin::Error> {
    let queue_name = format!("robot_{}_tasks", robot_id);
    let conn = Connection::connect("amqp://localhost:5672", ConnectionProperties::default()).await?;
    let channel = conn.create_channel().await?;
    channel.queue_declare(queue_name.as_str(), QueueDeclareOptions::default(), FieldTable::default()).await?;
    channel.basic_publish("", queue_name.as_str(), BasicPublishOptions::default(), task.as_bytes(), BasicProperties::default()).await?;
    Ok(())
}

基于Rust Web 多个仓库通过 RabbitMQ 同步钢筋、水泥库存实例

实例 1:基础库存同步架构

仓库系统A发布库存变更消息到RabbitMQ的inventory_updates队列,消息格式为JSON:

// 发布端代码示例(使用lapin库)
let payload = json!({
    "material_type": "steel",
    "warehouse_id": "A",
    "quantity": -50,
    "timestamp": Utc::now().to_rfc3339()
}).to_string();
channel.basic_publish(
    "",
    "inventory_updates",
    BasicPublishOptions::default(),
    payload.as_bytes(),
    BasicProperties::default()
).await?;

仓库系统B通过消费者处理消息:

// 消费端代码示例
consumer.set_delegate(move |delivery| async move {
    if let Ok(delivery) = delivery {
        let data: Value = serde_json::from_slice(&delivery.data)?;
        update_local_inventory(
            data["material_type"].as_str(),
            data["quantity"].as_i64()
        );
        delivery.ack(BasicAckOptions::default()).await?;
    }
});

实例 2:多仓库分布式事务

使用RabbitMQ的事务机制确保跨仓库操作原子性:

// 事务处理示例
channel.tx_select().await?;
match (warehouse_A.lock_stock(), warehouse_B.reserve_stock()) {
    (Ok(_), Ok(_)) => {
        channel.tx_commit().await?;
        publish_inventory_sync();
    }
    _ => {
        channel.tx_rollback().await?;
        publish_compensation_message();
    }
}

消息头包含事务ID实现最终一致性:

BasicProperties::default()
    .with_headers(FieldTable::from([
        ("x-transaction-id".into(), AMQPValue::LongString(tx_id.into()))
    ]))

实例 3:物料分类路由

通过RabbitMQ的Direct Exchange实现分类同步:

// 钢筋和水泥使用不同路由键
channel.basic_publish(
    "inventory_exchange",
    match material_type {
        "steel" => "steel.update",
        "cement" => "cement.update",
        _ => "other.update"
    },
    BasicPublishOptions::default(),
    payload,
    BasicProperties::default()
).await?;

消费者按需绑定队列:

channel.queue_bind(
    "warehouse_B_queue",
    "inventory_exchange",
    "steel.update",
    FieldTable::default()
).await?;

实例 4:批量处理优化

使用消息批处理减少网络开销:

// 每10秒或累积100条消息时批量发送
let mut batch = Vec::new();
batch.push(InventoryUpdate { /* ... */ });

if batch.len() >= 100 || last_flush.elapsed() > Duration::from_secs(10) {
    channel.basic_publish(
        "",
        "batch_updates",
        BasicPublishOptions::default(),
        serde_json::to_vec(&batch)?,
        BasicProperties::default()
    ).await?;
    batch.clear();
}

消费者侧使用预取限制控制处理速度:

channel.basic_qos(100, BasicQosOptions::default()).await?;

实例 5:跨数据中心同步

通过Federation插件实现跨地域同步:

// 本地数据中心发布到联邦交换器
channel.basic_publish(
    "fed.inventory",
    "",
    BasicPublishOptions::default(),
    payload,
    BasicProperties::default()
    .with_headers(FieldTable::from([
        ("x-datacenter".into(), AMQPValue::LongString("east-1".into()))
    ]))
).await?;

消息包含位置标记避免循环同步:

if !delivery.properties.headers()?.contains_key("x-datacenter") {
    process_message();
    forward_to_other_datacenters();
}

每个实例都需配合错误处理、重试机制和监控系统实现生产级可靠性。建议使用Serde进行消息序列化,通过Prometheus监控队列积压情况,并在消费者实现幂等处理逻辑。

基于Rust实现的设备故障诊断与维修调度系统

以下是基于Rust实现的设备故障诊断与维修调度系统的实例代码片段,涵盖故障码解析、网络通信、数据库交互等关键环节。所有示例均遵循Rust生态的现代实践(如tokio异步、actix-web框架等),可直接用于工程机械领域的Web系统开发。


故障码解析与结构定义

// 示例1:定义故障码枚举
#[derive(Debug, Serialize, Deserialize)]
pub enum FaultCode {
    EngineOverheat(u32),
    HydraulicPressureLow(f32),
    SensorFailure { id: String, threshold: f64 },
}

// 示例2:带时间戳的故障数据结构
#[derive(Serialize, Deserialize)]
pub struct FaultReport {
    machine_id: String,
    code: FaultCode,
    timestamp: DateTime<Utc>,
    gps_coords: Option<(f64, f64)>
}

HTTP接口实现(Actix-Web)

// 示例3:接收故障码的POST接口
#[post("/api/faults")]
async fn report_fault(
    report: web::Json<FaultReport>,
    db: web::Data<DbPool>
) -> impl Responder {
    let _ = sqlx::query!("INSERT INTO faults...").execute(&db).await;
    HttpResponse::Ok().json(json!({"status": "received"}))
}

// 示例4:实时故障码SSE推送
#[get("/api/faults/stream")]
async fn fault_stream(
    broadcaster: web::Data<Broadcaster>
) -> impl Responder {
    broadcaster.new_client().await
}


数据库交互(SQLx)

// 示例5:故障记录存储
pub async fn log_fault(
    pool: &PgPool,
    report: &FaultReport
) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"INSERT INTO faults (machine_id, code, location) 
        VALUES ($1, $2::jsonb, $3)"#,
        report.machine_id,
        serde_json::to_value(&report.code)?,
        report.gps_coords
    ).execute(pool).await?;
    Ok(())
}

// 示例6:按设备ID查询历史故障
pub async fn get_machine_history(
    pool: &PgPool,
    machine_id: &str
) -> Vec<FaultReport> {
    sqlx::query_as!(FaultReport,
        "SELECT * FROM faults WHERE machine_id = $1 ORDER BY timestamp DESC",
        machine_id
    ).fetch_all(pool).await.unwrap_or_default()
}


WebSocket实时通信

// 示例7:维修工单状态更新通道
#[derive(Message)]
#[rtype(result = "()")]
pub struct RepairUpdate {
    pub ticket_id: Uuid,
    pub status: RepairStatus,
}

// 示例8:WebSocket消息处理
async fn ws_repair_feed(
    ws: Websocket,
    addr: Addr<RepairBroadcaster>
) {
    let mut rx = addr.subscribe().unwrap();
    while let Ok(msg) = rx.recv().await {
        ws.send(Text(serde_json::to_string(&msg).unwrap())).await?;
    }
}


后台任务处理

// 示例9:故障优先级计算
pub fn calculate_priority(
    fault: &FaultCode,
    machine_type: MachineType
) -> u8 {
    match (fault, machine_type) {
        (FaultCode::EngineOverheat(_), _) => 10,
        (FaultCode::HydraulicPressureLow(p), MachineType::Crane) if *p < 50.0 => 8,
        _ => 3
    }
}

// 示例10:自动分配维修工单
pub async fn dispatch_repair(
    pool: &PgPool,
    report: FaultReport
) -> Result<Uuid, DispatchError> {
    let tech_id = find_available_technician(pool, report.gps_coords).await?;
    let ticket_id = Uuid

网站公告

今日签到

点亮在社区的每一天
去签到