EMQX Webhook 调用本地 Supabase Edge Functions
1. 环境准备
1.1 确认服务状态
# 检查 Supabase 服务状态
cd /home/xx/dockerData/supabase/supabase-project
docker-compose ps
# 确认 Edge Functions 服务运行正常
docker-compose logs supabase-edge-functions
1.2 服务地址
-
Supabase Dashboard:
http://172.16.9.14:18000
-
Edge Functions 端点:
http://172.16.9.14:18000/functions/v1/
2. 数据库表结构
2.1 创建 MQTT 消息表
在 Supabase Dashboard 的 SQL Editor 中执行:
-- 创建 MQTT 消息表
CREATE TABLE mqtt_messages (id BIGSERIAL PRIMARY KEY,message_id TEXT UNIQUE NOT NULL,client_id TEXT NOT NULL,username TEXT,topic TEXT,payload JSONB,qos INTEGER,retain BOOLEAN,timestamp BIGINT,publish_received_at BIGINT,node TEXT,peerhost TEXT,created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 创建索引提高查询性能
CREATE INDEX idx_mqtt_client_id ON mqtt_messages(client_id);
CREATE INDEX idx_mqtt_topic ON mqtt_messages(topic);
CREATE INDEX idx_mqtt_timestamp ON mqtt_messages(timestamp);
3. Edge Function 配置
3.1 函数文件位置
/home/xx/dockerData/supabase/supabase-project/volumes/functions/mqtt-webhook/index.ts
3.1 完整的函数代码
// functions/emqx-webhook/index.ts
import { serve } from 'https://cdn.jsdelivr.net/gh/denoland/deno_std@0.131.0/http/server.ts'
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'// 处理 CORS 预检请求
const corsHeaders = {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'authorization, x-client-info, x-api-key, content-type',
}serve(async (req) => {// 处理 CORS 预检请求if (req.method === 'OPTIONS') {return new Response('ok', { headers: corsHeaders })}// 添加调试日志// 验证固定的 API Keyconst apiKey = req.headers.get('x-api-key')const expectedApiKey = Deno.env.get('EMQX_WEBHOOK_API_KEY') || 'AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP'if (!apiKey || apiKey !== expectedApiKey) {return new Response(JSON.stringify({ error: 'Unauthorized' }), { status: 401, headers: corsHeaders })}try {// 只接受 POST 请求if (req.method !== 'POST') {return new Response(JSON.stringify({ error: 'Method not allowed' }), { status: 405, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })}// 解析请求体const webhookData = await req.json()// 验证是否为消息发布事件if (webhookData?.event !== 'message.publish') {return new Response(JSON.stringify({ status: 'ignored', message: 'Not a message.publish event' }),{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })}// 创建 Supabase 客户端const supabaseUrl = Deno.env.get('SUPABASE_URL')!const supabaseKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!const supabase = createClient(supabaseUrl, supabaseKey)// 解析 payloadlet parsedPayloadtry {if (typeof webhookData.payload === 'string') {parsedPayload = JSON.parse(webhookData.payload)} else {parsedPayload = webhookData.payload}} catch {// 如果解析失败,保存为原始文本parsedPayload = { raw_text: webhookData.payload }}// 插入数据到数据库const { data, error } = await supabase.from('mqtt_messages').insert({message_id: webhookData.id,client_id: webhookData.clientid,username: webhookData.username === 'undefined' ? null : webhookData.username,topic: webhookData.topic,payload: parsedPayload,qos: webhookData.qos || 0,retain: webhookData.flags?.retain || false,timestamp: webhookData.timestamp,publish_received_at: webhookData.publish_received_at,node: webhookData.node,peerhost: webhookData.peerhost}).select()if (error) {console.error('Database error:', error)return new Response(JSON.stringify({ status: 'error', message: error.message }),{ status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })}// 返回成功响应return new Response(JSON.stringify({ status: 'success', message: 'Message stored successfully',message_id: webhookData.id}),{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })} catch (error) {console.error('Function error:', error)return new Response(JSON.stringify({ status: 'error', message: error.message }),{ status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })}
})
5. 服务重启和部署
5.1 重启服务
cd /home/xx/dockerData/supabase/supabase-project
# 重启 Edge Functions 服务
docker-compose restart supabase-edge-functions
# 或重启所有服务
docker-compose down
docker-compose up -d
5.2 查看日志
# 实时查看 Edge Functions 日志
docker-compose logs -f supabase-edge-functions
# 查看所有服务日志
docker-compose logs -f
6. 测试验证
6.1 手动测试
curl -X POST "http://172.16.9.14:18000/functions/v1/mqtt-webhook" \-H "Content-Type: application/json" \-H "x-api-key: AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP" \-d '{"id": "test-edge-003","clientid": "test_client","username": "test_user","topic": "test/edge","payload": "{\"temperature\": 25.5}","qos": 1,"timestamp": 1640995200000,"publish_received_at": 1640995200100,"node": "emqx@127.0.0.1","peerhost": "192.168.1.100","event": "message.publish","flags": {"retain": false, "dup": false}}'
6.2 预期响应
{"status": "success","message": "Message stored successfully","message_id": "test-edge-003"
}