本地区块链服务在物联网中的应用实例:分布式传感器数据共享平台
一、系统架构设计
二、硬件准备清单
设备 | 规格要求 | 数量 | 用途 |
---|---|---|---|
Raspberry Pi 4 | 4GB RAM | 3台 | 区块链节点 |
DHT22温湿度传感器 | 精度±0.5℃ | 5个 | 环境数据采集 |
ESP32开发板 | 双核240MHz | 5个 | 边缘计算节点 |
LoRa模块 | SX1276芯片 | 3个 | 低功耗通信 |
OLED显示屏 | 0.96英寸 | 1个 | 状态监控 |
三、软件栈配置
# 区块链核心
sudo apt install docker-compose
git clone https://github.com/hyperledger/fabric-samples
cd fabric-samples/test-network
# 物联网设备端
pip install micropython-lib
git clone https://github.com/adafruit/Adafruit_CircuitPython_DHT
# 网关服务
npm install -g @hyperledger/caliper-cli
npm install ethers mqttjs
四、实施步骤
1. 搭建本地区块链网络(Hyperledger Fabric)
# 启动测试网络
./network.sh up createChannel -c iotchannel
# 部署链码
./network.sh deployCC -ccn iotchaincode -ccp ../chaincode/javascript/ -ccl javascript
2. 编写物联网智能合约 (iotchaincode.js
)
const { Contract } = require('fabric-contract-api');
class IoTContract extends Contract {
async InitLedger(ctx) {
await ctx.stub.putState('deviceCount', Buffer.from('0'));
}
async RecordData(ctx, deviceId, temp, humidity) {
const timestamp = new Date().toISOString();
const data = { deviceId, temp, humidity, timestamp };
// 关键:哈希校验防止篡改
const hash = crypto.createHash('sha256').update(JSON.stringify(data)).digest('hex');
await ctx.stub.putState(`DATA_${timestamp}`, Buffer.from(JSON.stringify({...data, hash})));
// 超温告警逻辑
if (parseFloat(temp) > 30.0) {
await ctx.stub.setEvent('HIGH_TEMP', Buffer.from(JSON.stringify({
deviceId,
temp,
location: await this.GetDeviceLocation(ctx, deviceId)
})));
}
}
async GetDeviceHistory(ctx, deviceId) {
const query = { selector: { deviceId } };
const iterator = await ctx.stub.getQueryResult(JSON.stringify(query));
return await this._iteratorToArray(iterator);
}
}
3. ESP32传感器数据采集代码
#include <DHT.h>
#include <LoRa.h>
#include <ArduinoJson.h>
#define DHTPIN 4
#define DHTTYPE DHT22
DHT dht(DHTPIN, DHTTYPE);
void setup() {
Serial.begin(9600);
LoRa.begin(868E6); // EU LoRa频段
dht.begin();
}
void loop() {
float temp = dht.readTemperature();
float humidity = dht.readHumidity();
if (!isnan(temp) && !isnan(humidity)) {
StaticJsonDocument<200> doc;
doc["device"] = "sensor01";
doc["temp"] = temp;
doc["humidity"] = humidity;
doc["timestamp"] = millis();
String output;
serializeJson(doc, output);
LoRa.beginPacket();
LoRa.print(output);
LoRa.endPacket();
}
delay(10000); // 10秒间隔
}
4. 区块链网关服务(Node.js)
const { Gateway } = require('fabric-network');
const mqtt = require('mqtt');
const fs = require('fs');
// MQTT连接
const mqttClient = mqtt.connect('mqtt://localhost:1883');
mqttClient.subscribe('sensors/#');
// Hyperledger连接
const gateway = new Gateway();
await gateway.connect({
wallet,
identity: 'admin',
discovery: { enabled: true, asLocalhost: true }
});
const network = await gateway.getNetwork('iotchannel');
const contract = network.getContract('iotchaincode');
// 处理传感器数据
mqttClient.on('message', async (topic, message) => {
const data = JSON.parse(message.toString());
try {
// 提交到区块链
await contract.submitTransaction(
'RecordData',
data.device,
data.temp.toString(),
data.humidity.toString()
);
console.log(`✅ 数据上链: ${data.temp}℃`);
} catch (error) {
console.error(`❌ 上链失败: ${error}`);
}
});
5. 数据查询API(Express.js)
const express = require('express');
const app = express();
const { Gateway } = require('fabric-network');
app.get('/history/:deviceId', async (req, res) => {
const gateway = new Gateway();
await gateway.connect(/* 连接配置 */);
const contract = gateway.getContract('iotchaincode');
const history = await contract.evaluateTransaction(
'GetDeviceHistory',
req.params.deviceId
);
res.json(JSON.parse(history.toString()));
});
// 实时数据SSE流
app.get('/realtime', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
const listener = (event) => {
if (event.eventName === 'HIGH_TEMP') {
res.write(`data: ${event.payload.toString()}\n\n`);
}
};
contract.addContractListener(listener);
});
app.listen(3000);
五、系统工作流程
- 数据采集:ESP32每10秒采集温湿度数据
- 无线传输:通过LoRa发送到区块链网关
- 数据上链:网关验证后写入Fabric区块链
- 智能告警:温度>30℃时触发事件
- 数据查询:通过API获取历史数据或实时SSE流
六、安全增强措施
设备身份认证
// 设备注册智能合约 async RegisterDevice(ctx, deviceId, publicKey) { const exists = await this._deviceExists(ctx, deviceId); if (exists) throw new Error('设备已注册'); await ctx.stub.putState(`DEVICE_${deviceId}`, Buffer.from(publicKey)); }
数据签名验证
// ESP32端添加数字签名 #include <Crypto.h> #include <SHA256.h> SHA256 sha; String signature = sha.hmacHash(privateKey, dataString);
传输层加密
# 启用LoRaWAN加密 LoRa.enableCrypto(); LoRa.setKey("mySecretKey123456");
七、监控仪表盘实现(React)
function Dashboard() {
const [realtimeData, setRealtimeData] = useState([]);
useEffect(() => {
const eventSource = new EventSource('/realtime');
eventSource.onmessage = (e) => {
setRealtimeData(prev => [...prev, JSON.parse(e.data)]);
};
return () => eventSource.close();
}, []);
return (
<div>
<h3>实时监控 (温度>30℃告警)</h3>
<ul>
{realtimeData.map((alert, i) => (
<li key={i}>
{alert.deviceId} 温度过高: {alert.temp}℃
</li>
))}
</ul>
</div>
);
}
八、部署与测试
启动网络:
cd fabric-samples/test-network ./network.sh up
模拟传感器:
# 模拟数据脚本 import paho.mqtt.publish as publish import random, time while True: temp = random.uniform(20.0, 35.0) msg = f'{{"device":"sensor01", "temp":{temp:.1f}, "humidity":50}}' publish.single("sensors/temp", msg, hostname="localhost") time.sleep(5)
验证区块链数据:
docker exec cli peer chaincode query -C iotchannel -n iotchaincode \ -c '{"Args":["GetDeviceHistory","sensor01"]}'
九、应用场景扩展
- 冷链药品监控:实时追踪温度变化并永久记录
- 工业设备预测性维护:记录设备状态历史
- 智能电表数据共享:防篡改的能源消耗记录
- 农业环境监测:分布式农场数据采集
关键优势:
- 数据不可篡改:所有传感器数据永久记录在区块链上
- 去中心化验证:多个节点共同验证数据真实性
- 自动执行规则:通过智能合约实现实时告警
- 零第三方依赖:完全本地化部署,无云服务依赖
此方案可在树莓派集群上运行,适合工厂、实验室等需要高数据可信度的物联网场景,同时保障数据主权和隐私安全。