1. 环境准备
1.1 确认服务状态
# 检查 Supabase 服务状态
cd /home/xx/dockerData/supabase/supabase-project
docker-compose ps
# 确认 Edge Functions 服务运行正常
docker-compose logs supabase-edge-functions
1.2 服务地址
Supabase Dashboard:
http://172.16.9.14:18000
Edge Functions 端点:
http://172.16.9.14:18000/functions/v1/
2. 数据库表结构
2.1 创建 MQTT 消息表
在 Supabase Dashboard 的 SQL Editor 中执行:
-- 创建 MQTT 消息表
CREATE TABLE mqtt_messages (
id BIGSERIAL PRIMARY KEY,
message_id TEXT UNIQUE NOT NULL,
client_id TEXT NOT NULL,
username TEXT,
topic TEXT,
payload JSONB,
qos INTEGER,
retain BOOLEAN,
timestamp BIGINT,
publish_received_at BIGINT,
node TEXT,
peerhost TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 创建索引提高查询性能
CREATE INDEX idx_mqtt_client_id ON mqtt_messages(client_id);
CREATE INDEX idx_mqtt_topic ON mqtt_messages(topic);
CREATE INDEX idx_mqtt_timestamp ON mqtt_messages(timestamp);
3. Edge Function 配置
3.1 函数文件位置
/home/xx/dockerData/supabase/supabase-project/volumes/functions/mqtt-webhook/index.ts
3.1 完整的函数代码
// functions/emqx-webhook/index.ts
import { serve } from 'https://cdn.jsdelivr.net/gh/denoland/deno_std@0.131.0/http/server.ts'
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'
// 处理 CORS 预检请求
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'authorization, x-client-info, x-api-key, content-type',
}
serve(async (req) => {
// 处理 CORS 预检请求
if (req.method === 'OPTIONS') {
return new Response('ok', { headers: corsHeaders })
}
// 添加调试日志
// 验证固定的 API Key
const apiKey = req.headers.get('x-api-key')
const expectedApiKey = Deno.env.get('EMQX_WEBHOOK_API_KEY') || 'AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP'
if (!apiKey || apiKey !== expectedApiKey) {
return new Response(
JSON.stringify({ error: 'Unauthorized' }),
{ status: 401, headers: corsHeaders }
)
}
try {
// 只接受 POST 请求
if (req.method !== 'POST') {
return new Response(
JSON.stringify({ error: 'Method not allowed' }),
{
status: 405,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
)
}
// 解析请求体
const webhookData = await req.json()
// 验证是否为消息发布事件
if (webhookData?.event !== 'message.publish') {
return new Response(
JSON.stringify({
status: 'ignored',
message: 'Not a message.publish event'
}),
{
status: 200,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
)
}
// 创建 Supabase 客户端
const supabaseUrl = Deno.env.get('SUPABASE_URL')!
const supabaseKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
const supabase = createClient(supabaseUrl, supabaseKey)
// 解析 payload
let parsedPayload
try {
if (typeof webhookData.payload === 'string') {
parsedPayload = JSON.parse(webhookData.payload)
} else {
parsedPayload = webhookData.payload
}
} catch {
// 如果解析失败,保存为原始文本
parsedPayload = { raw_text: webhookData.payload }
}
// 插入数据到数据库
const { data, error } = await supabase
.from('mqtt_messages')
.insert({
message_id: webhookData.id,
client_id: webhookData.clientid,
username: webhookData.username === 'undefined' ? null : webhookData.username,
topic: webhookData.topic,
payload: parsedPayload,
qos: webhookData.qos || 0,
retain: webhookData.flags?.retain || false,
timestamp: webhookData.timestamp,
publish_received_at: webhookData.publish_received_at,
node: webhookData.node,
peerhost: webhookData.peerhost
})
.select()
if (error) {
console.error('Database error:', error)
return new Response(
JSON.stringify({
status: 'error',
message: error.message
}),
{
status: 500,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
)
}
// 返回成功响应
return new Response(
JSON.stringify({
status: 'success',
message: 'Message stored successfully',
message_id: webhookData.id
}),
{
status: 200,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
)
} catch (error) {
console.error('Function error:', error)
return new Response(
JSON.stringify({
status: 'error',
message: error.message
}),
{
status: 500,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
}
)
}
})
5. 服务重启和部署
5.1 重启服务
cd /home/xx/dockerData/supabase/supabase-project
# 重启 Edge Functions 服务
docker-compose restart supabase-edge-functions
# 或重启所有服务
docker-compose down
docker-compose up -d
5.2 查看日志
# 实时查看 Edge Functions 日志
docker-compose logs -f supabase-edge-functions
# 查看所有服务日志
docker-compose logs -f
6. 测试验证
6.1 手动测试
curl -X POST "http://172.16.9.14:18000/functions/v1/mqtt-webhook" \
-H "Content-Type: application/json" \
-H "x-api-key: AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP" \
-d '{
"id": "test-edge-003",
"clientid": "test_client",
"username": "test_user",
"topic": "test/edge",
"payload": "{\"temperature\": 25.5}",
"qos": 1,
"timestamp": 1640995200000,
"publish_received_at": 1640995200100,
"node": "emqx@127.0.0.1",
"peerhost": "192.168.1.100",
"event": "message.publish",
"flags": {"retain": false, "dup": false}
}'
6.2 预期响应
{
"status": "success",
"message": "Message stored successfully",
"message_id": "test-edge-003"
}