当前位置: 首页 > news >正文

EMQX Webhook 调用本地 Supabase Edge Functions

1. 环境准备

1.1 确认服务状态

# 检查 Supabase 服务状态
cd /home/xx/dockerData/supabase/supabase-project
docker-compose ps
​
# 确认 Edge Functions 服务运行正常
docker-compose logs supabase-edge-functions

1.2 服务地址

  • Supabase Dashboard: http://172.16.9.14:18000

  • Edge Functions 端点: http://172.16.9.14:18000/functions/v1/

2. 数据库表结构

2.1 创建 MQTT 消息表

在 Supabase Dashboard 的 SQL Editor 中执行:

-- 创建 MQTT 消息表
CREATE TABLE mqtt_messages (id BIGSERIAL PRIMARY KEY,message_id TEXT UNIQUE NOT NULL,client_id TEXT NOT NULL,username TEXT,topic TEXT,payload JSONB,qos INTEGER,retain BOOLEAN,timestamp BIGINT,publish_received_at BIGINT,node TEXT,peerhost TEXT,created_at TIMESTAMPTZ DEFAULT NOW()
);
​
-- 创建索引提高查询性能
CREATE INDEX idx_mqtt_client_id ON mqtt_messages(client_id);
CREATE INDEX idx_mqtt_topic ON mqtt_messages(topic);
CREATE INDEX idx_mqtt_timestamp ON mqtt_messages(timestamp);

在这里插入图片描述

3. Edge Function 配置

3.1 函数文件位置

/home/xx/dockerData/supabase/supabase-project/volumes/functions/mqtt-webhook/index.ts

3.1 完整的函数代码

// functions/emqx-webhook/index.ts
import { serve } from 'https://cdn.jsdelivr.net/gh/denoland/deno_std@0.131.0/http/server.ts'
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'// 处理 CORS 预检请求
const corsHeaders = {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'authorization, x-client-info, x-api-key, content-type',
}serve(async (req) => {// 处理 CORS 预检请求if (req.method === 'OPTIONS') {return new Response('ok', { headers: corsHeaders })}// 添加调试日志// 验证固定的 API Keyconst apiKey = req.headers.get('x-api-key')const expectedApiKey = Deno.env.get('EMQX_WEBHOOK_API_KEY') || 'AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP'if (!apiKey || apiKey !== expectedApiKey) {return new Response(JSON.stringify({ error: 'Unauthorized' }), { status: 401, headers: corsHeaders })}try {// 只接受 POST 请求if (req.method !== 'POST') {return new Response(JSON.stringify({ error: 'Method not allowed' }), { status: 405, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })}// 解析请求体const webhookData = await req.json()// 验证是否为消息发布事件if (webhookData?.event !== 'message.publish') {return new Response(JSON.stringify({ status: 'ignored', message: 'Not a message.publish event' }),{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })}// 创建 Supabase 客户端const supabaseUrl = Deno.env.get('SUPABASE_URL')!const supabaseKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!const supabase = createClient(supabaseUrl, supabaseKey)// 解析 payloadlet parsedPayloadtry {if (typeof webhookData.payload === 'string') {parsedPayload = JSON.parse(webhookData.payload)} else {parsedPayload = webhookData.payload}} catch {// 如果解析失败,保存为原始文本parsedPayload = { raw_text: webhookData.payload }}// 插入数据到数据库const { data, error } = await supabase.from('mqtt_messages').insert({message_id: webhookData.id,client_id: webhookData.clientid,username: webhookData.username === 'undefined' ? null : webhookData.username,topic: webhookData.topic,payload: parsedPayload,qos: webhookData.qos || 0,retain: webhookData.flags?.retain || false,timestamp: webhookData.timestamp,publish_received_at: webhookData.publish_received_at,node: webhookData.node,peerhost: webhookData.peerhost}).select()if (error) {console.error('Database error:', error)return new Response(JSON.stringify({ status: 'error', message: error.message }),{ status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })}// 返回成功响应return new Response(JSON.stringify({ status: 'success', message: 'Message stored successfully',message_id: webhookData.id}),{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })} catch (error) {console.error('Function error:', error)return new Response(JSON.stringify({ status: 'error', message: error.message }),{ status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } })}
})

5. 服务重启和部署

5.1 重启服务

cd /home/xx/dockerData/supabase/supabase-project
​
# 重启 Edge Functions 服务
docker-compose restart supabase-edge-functions
​
# 或重启所有服务
docker-compose down
docker-compose up -d

5.2 查看日志

# 实时查看 Edge Functions 日志
docker-compose logs -f supabase-edge-functions
​
# 查看所有服务日志
docker-compose logs -f

6. 测试验证

6.1 手动测试

curl -X POST "http://172.16.9.14:18000/functions/v1/mqtt-webhook" \-H "Content-Type: application/json" \-H "x-api-key: AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP" \-d '{"id": "test-edge-003","clientid": "test_client","username": "test_user","topic": "test/edge","payload": "{\"temperature\": 25.5}","qos": 1,"timestamp": 1640995200000,"publish_received_at": 1640995200100,"node": "emqx@127.0.0.1","peerhost": "192.168.1.100","event": "message.publish","flags": {"retain": false, "dup": false}}'

6.2 预期响应

{"status": "success","message": "Message stored successfully","message_id": "test-edge-003"
}

在这里插入图片描述

http://www.dtcms.com/a/302367.html

相关文章:

  • 3.DRF视图和路由
  • JAVA后端开发——“全量同步”和“增量同步”
  • [AI Coding] 一.腾讯CodeBuddy IDE内测、安装及基本用法(国产AI IDE启航)
  • 使用node-cron实现Node.js定时任务
  • Office-PowerPoint-MCP-Server – 基于MCP的开源PPT生成与编辑工具
  • 每日面试题16:什么是双亲委派模型
  • DBSyncer:开源免费的全能数据同步工具,多数据源无缝支持!
  • 代码随想录day48单调栈1
  • Python全栈项目--基于深度学习的视频内容分析系统
  • html转word下载
  • 【GitHub Workflows 基础(二)】深入理解 on、jobs、steps 的核心语法与执行逻辑
  • Dify快速搭建问答系统
  • 3、CC3200串口DMA
  • Binary Classifier Optimization for Large Language Model Alignment
  • 亚远景-“过度保守”还是“激进创新”?ISO/PAS 8800的99.9%安全阈值之争
  • Windows 11 系统 Docker详细安装教程并集成使用 Redis 官方详细教程
  • uniapp,uview icon加载太慢了,老是显示叉叉,将远程加载改到本地加载。
  • LangGraph实战:整合MCP(本地模式
  • 机器学习sklearn:不纯度与决策树构建
  • 数据中心入门学习(四):服务器概述与PCIe总线
  • 【学习笔记】AD7708/18(1)-理解官网的参考代码
  • python每日一题
  • 如何在 Apache Ignite 中创建和使用自定义 SQL 函数(Custom SQL Functions)
  • 生物信息学数据技能-学习系列001
  • 牛客网之华为机试题:坐标移动
  • 利用径向条形图探索华盛顿的徒步旅行
  • 数据分析干货| 衡石科技可视化创作之仪表盘控件如何设置
  • 开源智能体-JoyAgent集成ollama私有化模型
  • 【docker】DM8达梦数据库的docker-compose以及一些启动踩坑
  • 攻防世界-引导-Web_php_unserialize