EMQX Webhook 调用本地 Supabase Edge Functions

发布于:2025-07-29 ⋅ 阅读:(21) ⋅ 点赞:(0)

1. 环境准备

1.1 确认服务状态

# 检查 Supabase 服务状态
cd /home/xx/dockerData/supabase/supabase-project
docker-compose ps
​
# 确认 Edge Functions 服务运行正常
docker-compose logs supabase-edge-functions

1.2 服务地址

  • Supabase Dashboard: http://172.16.9.14:18000

  • Edge Functions 端点: http://172.16.9.14:18000/functions/v1/

2. 数据库表结构

2.1 创建 MQTT 消息表

在 Supabase Dashboard 的 SQL Editor 中执行:

-- 创建 MQTT 消息表
CREATE TABLE mqtt_messages (
  id BIGSERIAL PRIMARY KEY,
  message_id TEXT UNIQUE NOT NULL,
  client_id TEXT NOT NULL,
  username TEXT,
  topic TEXT,
  payload JSONB,
  qos INTEGER,
  retain BOOLEAN,
  timestamp BIGINT,
  publish_received_at BIGINT,
  node TEXT,
  peerhost TEXT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);
​
-- 创建索引提高查询性能
CREATE INDEX idx_mqtt_client_id ON mqtt_messages(client_id);
CREATE INDEX idx_mqtt_topic ON mqtt_messages(topic);
CREATE INDEX idx_mqtt_timestamp ON mqtt_messages(timestamp);

在这里插入图片描述

3. Edge Function 配置

3.1 函数文件位置

/home/xx/dockerData/supabase/supabase-project/volumes/functions/mqtt-webhook/index.ts

3.1 完整的函数代码

// functions/emqx-webhook/index.ts
import { serve } from 'https://cdn.jsdelivr.net/gh/denoland/deno_std@0.131.0/http/server.ts'
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'

// 处理 CORS 预检请求
const corsHeaders = {
  'Access-Control-Allow-Origin': '*',
  'Access-Control-Allow-Headers': 'authorization, x-client-info, x-api-key, content-type',
}

serve(async (req) => {
  // 处理 CORS 预检请求
  if (req.method === 'OPTIONS') {
    return new Response('ok', { headers: corsHeaders })
  }
  
  // 添加调试日志

  // 验证固定的 API Key
  const apiKey = req.headers.get('x-api-key')
  const expectedApiKey = Deno.env.get('EMQX_WEBHOOK_API_KEY') || 'AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP'

  if (!apiKey || apiKey !== expectedApiKey) {
    return new Response(
      JSON.stringify({ error: 'Unauthorized' }), 
      { status: 401, headers: corsHeaders }
    )
  }

  try {
    // 只接受 POST 请求
    if (req.method !== 'POST') {
      return new Response(
        JSON.stringify({ error: 'Method not allowed' }), 
        { 
          status: 405, 
          headers: { ...corsHeaders, 'Content-Type': 'application/json' } 
        }
      )
    }

    // 解析请求体
    const webhookData = await req.json()
    
    // 验证是否为消息发布事件
    if (webhookData?.event !== 'message.publish') {
      return new Response(
        JSON.stringify({ 
          status: 'ignored', 
          message: 'Not a message.publish event' 
        }),
        { 
          status: 200, 
          headers: { ...corsHeaders, 'Content-Type': 'application/json' } 
        }
      )
    }

    // 创建 Supabase 客户端
    const supabaseUrl = Deno.env.get('SUPABASE_URL')!
    const supabaseKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
    const supabase = createClient(supabaseUrl, supabaseKey)

    // 解析 payload
    let parsedPayload
    try {
      if (typeof webhookData.payload === 'string') {
        parsedPayload = JSON.parse(webhookData.payload)
      } else {
        parsedPayload = webhookData.payload
      }
    } catch {
      // 如果解析失败,保存为原始文本
      parsedPayload = { raw_text: webhookData.payload }
    }

    // 插入数据到数据库
    const { data, error } = await supabase
      .from('mqtt_messages')
      .insert({
        message_id: webhookData.id,
        client_id: webhookData.clientid,
        username: webhookData.username === 'undefined' ? null : webhookData.username,
        topic: webhookData.topic,
        payload: parsedPayload,
        qos: webhookData.qos || 0,
        retain: webhookData.flags?.retain || false,
        timestamp: webhookData.timestamp,
        publish_received_at: webhookData.publish_received_at,
        node: webhookData.node,
        peerhost: webhookData.peerhost
      })
      .select()

    if (error) {
      console.error('Database error:', error)
      return new Response(
        JSON.stringify({ 
          status: 'error', 
          message: error.message 
        }),
        { 
          status: 500, 
          headers: { ...corsHeaders, 'Content-Type': 'application/json' } 
        }
      )
    }

    // 返回成功响应
    return new Response(
      JSON.stringify({ 
        status: 'success', 
        message: 'Message stored successfully',
        message_id: webhookData.id
      }),
      { 
        status: 200, 
        headers: { ...corsHeaders, 'Content-Type': 'application/json' } 
      }
    )

  } catch (error) {
    console.error('Function error:', error)
    return new Response(
      JSON.stringify({ 
        status: 'error', 
        message: error.message 
      }),
      { 
        status: 500, 
        headers: { ...corsHeaders, 'Content-Type': 'application/json' } 
      }
    )
  }
})

5. 服务重启和部署

5.1 重启服务

cd /home/xx/dockerData/supabase/supabase-project
​
# 重启 Edge Functions 服务
docker-compose restart supabase-edge-functions
​
# 或重启所有服务
docker-compose down
docker-compose up -d

5.2 查看日志

# 实时查看 Edge Functions 日志
docker-compose logs -f supabase-edge-functions
​
# 查看所有服务日志
docker-compose logs -f

6. 测试验证

6.1 手动测试

curl -X POST "http://172.16.9.14:18000/functions/v1/mqtt-webhook" \
  -H "Content-Type: application/json" \
  -H "x-api-key: AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP" \
  -d '{
    "id": "test-edge-003",
    "clientid": "test_client",
    "username": "test_user",
    "topic": "test/edge",
    "payload": "{\"temperature\": 25.5}",
    "qos": 1,
    "timestamp": 1640995200000,
    "publish_received_at": 1640995200100,
    "node": "emqx@127.0.0.1",
    "peerhost": "192.168.1.100",
    "event": "message.publish",
    "flags": {"retain": false, "dup": false}
  }'

6.2 预期响应

{
  "status": "success",
  "message": "Message stored successfully",
  "message_id": "test-edge-003"
}

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到