Salesforce 知识点: Streaming API - 封装好的CometD
Salesforce Streaming API 是 Salesforce 提供的实时事件推送接口,核心功能是通过建立持久连接,让外部系统(或内部组件)实时接收 Salesforce 中的数据变更事件(如记录创建/更新、用户登录等),无需频繁轮询,从而实现“数据变化即时响应”。它本质上是基于 CometD 协议(长轮询机制)构建的企业级实时通信解决方案,广泛用于数据同步、实时监控、自动化流程触发等场景。
一、Streaming API 的核心定位与价值
传统的 Salesforce API(如 REST API、SOAP API)采用“请求-响应”模式,外部系统需主动发起请求才能获取数据,无法实时感知 Salesforce 内部的变化(例如:当客户更新了联系方式,外部 CRM 系统无法立即知晓,需定时轮询)。
Streaming API 则解决了这一痛点:
- 主动推送:Salesforce 作为“服务器端”,当特定事件发生时(如
Account
记录被修改),主动将事件数据推送给已订阅的外部系统; - 持久连接:通过 CometD 协议的长轮询机制,建立“客户端- Salesforce”的持久通信通道,避免频繁无效请求,降低资源消耗;
- 事件驱动:基于“订阅-发布”(Pub/Sub)模式,外部系统只需订阅关注的“事件主题”,即可精准接收目标数据,无需处理无关信息。
二、Streaming API 支持的事件类型
Streaming API 可推送的事件覆盖 Salesforce 核心业务场景,主要分为以下几类:
事件类型 | 描述 | 适用场景举例 |
---|---|---|
平台事件(Platform Events) | 包括标准平台事件(如 RecordEvent 、LoginEvent )和自定义平台事件。 | 实时同步记录变更、监控用户登录行为 |
变更数据捕获(CDC Events) | 专门捕获标准/自定义对象的“数据变更”(创建/更新/删除/恢复),包含完整的字段前后值。 | 外部系统与 Salesforce 数据实时同步(如 ERP、BI 工具) |
推送主题(Push Topics) | 基于 SOQL 查询定义的“自定义事件主题”,仅推送符合查询条件的记录变更。 | 按业务规则筛选实时数据(如“仅推送金额>10万的 Opportunity 赢单事件”) |
Generic Events | 无需预先定义事件结构,可动态发送自定义格式的事件,灵活性高。 | 跨系统传递非结构化的实时通知(如告警信息) |
三、Streaming API 的典型应用场景
- 实时数据同步:外部系统(如企业 ERP、BI 工具)实时同步 Salesforce 数据变更(如订单创建、客户信息更新),确保数据一致性;
- 实时监控与告警:监控关键业务事件(如高价值 Opportunity 流失、用户异常登录),实时触发告警(邮件、钉钉通知);
- 前端实时交互:Salesforce Lightning 组件或外部 Web 应用(如客户门户)实时展示数据变更(如“新工单提醒”“实时销售数据看板”);
- 跨系统自动化:当 Salesforce 发生特定事件(如合同签署),实时触发外部系统流程(如财务系统自动生成发票)。
四、核心工作原理:基于 CometD 的持久连接
Streaming API 完全依赖 CometD 协议实现持久连接和事件推送,其完整流程可拆解为 5 步,本质是“连接建立→订阅主题→事件监听→推送数据→重连维护”的循环:
下面以 JavaScript 客户端(最常用场景)和 Java 客户端为例,结合完整代码,详细拆解 Salesforce Streaming API 基于 CometD 协议的5 步流程。
通用前置准备
在开始前,需完成 Salesforce 环境配置,这是后续流程的基础:
- 创建 Connected App:
- 进入 Salesforce Setup → 搜索“App Manager”→ 点击“新建连接应用程序”。
- 填写“应用程序名称”“API 名称”,在“API(启用 OAuth 设置)”中勾选“启用 OAuth 设置”,设置“回调 URL”(如
https://localhost
,测试用)。 - 在“选定的 OAuth 范围”中添加
full
(全权限,测试用)或api
(最小权限,生产用),保存后获取 Consumer Key(Client ID) 和 Consumer Secret。
- 获取 Salesforce 域名:
- 点击右上角用户头像 → “设置”→ 搜索“我的域名”,获取完整域名(如
https://your-domain-123.my.salesforce.com
)。
- 点击右上角用户头像 → “设置”→ 搜索“我的域名”,获取完整域名(如
- 准备安全令牌:
- 若使用“密码式 OAuth 流程”,需将 Salesforce 密码与“安全令牌”拼接(如密码
abc123
+ 安全令牌xyz789
→abc123xyz789
),安全令牌可通过“我的个人信息→重置我的安全令牌”获取。
- 若使用“密码式 OAuth 流程”,需将 Salesforce 密码与“安全令牌”拼接(如密码
一、JavaScript 客户端实现(基于 cometd.js
)
步骤 1:连接建立(CometD 握手 + 认证)
核心是通过 OAuth 获取 Access Token
,再携带令牌与 Salesforce Streaming API 端点建立 CometD 连接(握手),获取会话 Client ID
。
1.1 引入依赖
在 HTML 中引入 CometD 客户端库(官方推荐版本 7.0.0+
):
<!-- 引入 CometD 核心库 -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/cometd/7.0.0/cometd.js"></script>
<!-- 引入 CometD 长轮询传输层(默认已包含,显式引入确保兼容性) -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/cometd/7.0.0/cometd-long-polling.js"></script>
1.2 代码实现(连接建立)
// 1. 配置基础参数(替换为你的实际信息)
const CONFIG = {clientId: "3MVG9fe4YXyap4urFdN_7k2Q1bXx9z8hGtR3sT5v7u8iJ0kL1mO2p", // Connected App 的 Client IDclientSecret: "A1B2C3D4E5F6G7H8I9J0K1L2M3N4O5P6Q7R8S9T0U1V2W3", // Connected App 的 Client Secretusername: "user@company.com", // Salesforce 用户名password: "abc123xyz789", // 密码 + 安全令牌domain: "https://your-domain-123.my.salesforce.com", // Salesforce 域名apiVersion: "59.0" // Streaming API 版本(建议用最新稳定版)
};// 2. 获取 OAuth Access Token(密码式流程,适用于后端/测试场景)
async function getSalesforceAccessToken() {const tokenEndpoint = `${CONFIG.domain}/services/oauth2/token`;const params = new URLSearchParams({grant_type: "password",client_id: CONFIG.clientId,client_secret: CONFIG.clientSecret,username: CONFIG.username,password: CONFIG.password});try {const response = await fetch(tokenEndpoint, {method: "POST",headers: { "Content-Type": "application/x-www-form-urlencoded" },body: params});if (!response.ok) throw new Error(`获取令牌失败:${response.statusText}`);const data = await response.json();console.log("Access Token 获取成功:", data.access_token.substring(0, 20) + "..."); // 脱敏显示return data.access_token; // 返回核心令牌} catch (error) {console.error("OAuth 认证错误:", error.message);throw error; // 抛出错误,中断后续流程}
}// 3. 建立 CometD 连接(握手)
let cometdInstance; // 全局 CometD 实例,用于后续操作
async function establishCometDConnection() {try {// 3.1 获取 Access Tokenconst accessToken = await getSalesforceAccessToken();// 3.2 初始化 CometD 实例cometdInstance = new CometD();const streamingEndpoint = `${CONFIG.domain}/cometd/${CONFIG.apiVersion}/`; // Streaming API 端点// 3.3 配置连接(关键:携带认证头 + 长轮询参数)cometdInstance.configure({url: streamingEndpoint,requestHeaders: {"Authorization": `Bearer ${accessToken}`, // 认证核心:Bearer Token"Content-Type": "application/json"},logLevel: "info", // 日志级别:info/debug/error,调试时用 debugmaxConnectionAttempts: 5 // 最大重连次数(后续重连维护会用到)});// 3.4 握手(建立连接)return new Promise((resolve, reject) => {cometdInstance.handshake((handshakeReply) => {if (handshakeReply.successful) {console.log("✅ CometD 连接建立成功!");console.log("会话 Client ID:", cometdInstance.clientId); // 服务器返回的唯一会话 IDresolve(cometdInstance); // 连接成功,返回实例} else {const errorMsg = `❌ CometD 握手失败:${handshakeReply.error?.message || "未知错误"}`;console.error(errorMsg);reject(new Error(errorMsg));}});});} catch (error) {console.error("连接建立整体错误:", error.message);throw error;}
}// 执行连接建立(后续步骤依赖此结果)
establishCometDConnection().then(() => {// 连接成功后,继续执行“订阅主题”步骤(见步骤 2)subscribeToEventTopic();
});
步骤 2:订阅主题(指定要监听的事件)
连接建立后,需订阅具体的“事件主题”(Topic)——Salesforce 会仅向订阅了对应主题的客户端推送事件。常见主题示例:
- 标准对象 CDC 事件:
/data/AccountChangeEvent
(监听 Account 记录的创建/更新/删除); - 自定义平台事件:
/_event/OrderCreatedEvent__e
(假设自定义了“订单创建”平台事件); - 推送主题(Push Topic):
/topic/HighValueOpp
(基于 SOQL 筛选的 Opportunity 事件)。
代码实现(订阅主题)
// 订阅指定的事件主题(以 Account CDC 事件为例)
function subscribeToEventTopic() {if (!cometdInstance || !cometdInstance.isConnected()) {throw new Error("❌ 订阅失败:CometD 连接未建立或已断开");}// 1. 定义要订阅的主题(替换为你的目标主题)const targetTopic = "/data/AccountChangeEvent"; // Account 记录变更事件// 2. 执行订阅cometdInstance.subscribe(targetTopic, (message) => {// 此回调函数在“事件推送”时触发(见步骤 4)if (message.successful) {console.log(`\n📥 收到 ${targetTopic} 主题的事件:`, message.data);// 后续步骤 3:事件监听与处理(在此处或单独函数中实现)handleReceivedEvent(message.data);} else {console.error(`❌ 事件接收失败:${message.error?.message}`);}}, (subscribeReply) => {// 订阅结果回调(成功/失败)if (subscribeReply.successful) {console.log(`✅ 成功订阅主题:${targetTopic}`);} else {const errorMsg = `❌ 订阅主题 ${targetTopic} 失败:${subscribeReply.error?.message}`;console.error(errorMsg);throw new Error(errorMsg);}});
}
步骤 3:事件监听(处理接收到的事件数据)
当 Salesforce 发生目标事件(如创建 Account 记录),会通过 CometD 连接推送事件数据到客户端,需在步骤 2 的订阅回调中处理数据(如同步到外部系统、更新页面、触发告警)。
代码实现(事件处理)
// 处理接收到的 Salesforce 事件数据(以 Account CDC 事件为例)
function handleReceivedEvent(eventData) {try {// 1. 解析事件核心信息(CDC 事件格式示例)const { schema, payload } = eventData;const { ChangeEventHeader, Name, OldValue } = payload;// 2. 提取关键业务字段const eventType = ChangeEventHeader.changeType; // CREATE/UPDATE/DELETE/UNDELETEconst recordId = ChangeEventHeader.recordIds[0]; // 变更的 Account 记录 IDconst newAccountName = Name; // 更新后的 Account 名称const oldAccountName = OldValue?.Name; // 更新前的 Account 名称(仅 UPDATE 有)// 3. 业务逻辑处理(根据需求自定义)console.log(`\n🔍 事件详情:`);console.log(`- 事件类型:${eventType}`);console.log(`- 影响记录 ID:${recordId}`);console.log(`- 名称变更:${oldAccountName || "无"} → ${newAccountName}`);// 示例1:同步到外部 ERP 系统(模拟 API 调用)// syncToERP(recordId, newAccountName, eventType);// 示例2:前端页面更新(如刷新 Account 列表)// updateAccountList(recordId, newAccountName);// 示例3:触发告警(如删除高价值 Account 时通知管理员)if (eventType === "DELETE" && newAccountName.includes("重要客户")) {// sendAlertEmail("管理员邮箱", `重要客户 ${newAccountName} 被删除`);console.log(`⚠️ 告警:重要客户 ${newAccountName}(ID: ${recordId})被删除`);}} catch (error) {console.error("❌ 事件处理失败:", error.message);}
}
步骤 4:推送数据(Salesforce 主动触发事件)
此步骤无需客户端代码——当 Salesforce 中发生订阅主题对应的事件时,会自动通过 CometD 长轮询连接推送数据到客户端。
手动触发事件(测试用)
为验证流程,可手动在 Salesforce 中触发事件:
- 登录 Salesforce → 进入“Account”对象列表 → 新建/编辑/删除一条 Account 记录;
- 查看客户端控制台(如浏览器 F12 控制台),会看到步骤 3 中打印的事件详情(如“收到 /data/AccountChangeEvent 主题的事件”)。
事件数据格式示例(CDC 事件)
Salesforce 推送的事件数据为 JSON 格式,核心结构如下(以 Account 更新事件为例):
{"data": {"schema": "a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6", // 事件 schema ID"payload": {"ChangeEventHeader": {"entityName": "Account","changeType": "UPDATE","recordIds": ["0015i000004ABCDEFG"], // 变更的 Account 记录 ID"changeOrigin": "com/salesforce/api/soap/59.0;client=SfdcInternalAPI","sequenceNumber": 1,"transactionKey": "000000000000000ABC","commitTimestamp": 1728000000000 // 事件提交时间戳},"Name": "更新后的客户名称", // 更新后的字段值"OldValue": { "Name": "原客户名称" }, // 更新前的字段值"Industry": "Technology" // 未变更的字段(可选返回)}},"channel": "/data/AccountChangeEvent" // 订阅的主题
}
步骤 5:重连维护(网络中断/会话过期后的自动恢复)
网络波动、会话过期等会导致 CometD 连接断开,需配置自动重连机制,确保事件接收不中断。CometD 客户端已内置基础重连逻辑,只需补充配置和状态监听。
代码实现(重连维护)
// 配置 CometD 自动重连与状态监听(在“连接建立”后调用)
function setupReconnectionMechanism() {if (!cometdInstance) return;// 1. 监听连接断开事件cometdInstance.addListener("/meta/disconnect", (message) => {if (!message.successful) {console.warn("⚠️ CometD 连接意外断开,触发重连...");// 手动触发重连(也可依赖客户端内置重连)triggerReconnection();}});// 2. 监听连接状态变化cometdInstance.addListener("/meta/connect", (message) => {if (message.successful) {console.log("🔄 CometD 连接已恢复");// 重连后自动重新订阅主题(避免订阅丢失)subscribeToEventTopic();} else {const errorMsg = `⚠️ CometD 连接恢复失败:${message.error?.message}`;console.error(errorMsg);// 重试机制:3 秒后再次重连setTimeout(triggerReconnection, 3000);}});// 3. 手动触发重连的函数function triggerReconnection() {if (cometdInstance.isDisconnected()) {console.log("🔄 尝试重新建立 CometD 连接...");cometdInstance.handshake(); // 重新握手(复用之前的配置)}}// 4. 处理 Access Token 过期(可选:高级场景)cometdInstance.addListener("/meta/handshake", (message) => {if (!message.successful && message.error?.message.includes("invalid_grant")) {console.error("⚠️ Access Token 已过期,重新获取令牌并重连...");// 重新获取令牌后重连getSalesforceAccessToken().then((newToken) => {cometdInstance.configure({requestHeaders: { "Authorization": `Bearer ${newToken}` }});cometdInstance.handshake();});}});console.log("✅ 重连维护机制已启用");
}// 在“连接建立成功”后启用重连维护
establishCometDConnection().then(() => {setupReconnectionMechanism(); // 启用重连subscribeToEventTopic(); // 订阅主题
});
二、Java 客户端实现(基于 cometd-java-client
)
若需在后端服务(如 Spring Boot)中集成,可使用 cometd-java-client
,核心流程与 JavaScript 一致,以下是关键步骤代码:
步骤 1:添加 Maven 依赖
<dependencies><!-- CometD Java 客户端核心 --><dependency><groupId>org.cometd.java</groupId><artifactId>cometd-java-client</artifactId><version>7.0.0</version></dependency><!-- Jetty HTTP 客户端(CometD 依赖) --><dependency><groupId>org.eclipse.jetty</groupId><artifactId>jetty-client</artifactId><version>11.0.18</version></dependency><!-- JSON 处理(Jackson) --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.16.1</version></dependency><!-- HTTP 客户端(Apache HttpClient,用于获取 OAuth 令牌) --><dependency><groupId>org.apache.httpcomponents.client5</groupId><artifactId>httpclient5</artifactId><version>5.3.1</version></dependency>
</dependencies>
步骤 2:完整代码实现(5 步流程)
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hc.client5.http.fluent.Request;
import org.apache.hc.core5.http.ContentType;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.HttpClientTransport;
import org.eclipse.jetty.client.HttpClient;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;public class SalesforceStreamingJavaClient {// 1. 配置参数(替换为你的实际信息)private static final String CLIENT_ID = "3MVG9fe4YXyap4urFdN_7k2Q1bXx9z8hGtR3sT5v7u8iJ0kL1mO2p";private static final String CLIENT_SECRET = "A1B2C3D4E5F6G7H8I9J0K1L2M3N4O5P6Q7R8S9T0U1V2W3";private static final String USERNAME = "user@company.com";private static final String PASSWORD = "abc123xyz789"; // 密码 + 安全令牌private static final String DOMAIN = "https://your-domain-123.my.salesforce.com";private static final String API_VERSION = "59.0";private static final String TARGET_TOPIC = "/data/AccountChangeEvent"; // 订阅的主题private static BayeuxClient bayeuxClient; // 全局 CometD 客户端实例private static final ObjectMapper objectMapper = new ObjectMapper(); // JSON 解析器public static void main(String[] args) throws Exception {try {// 步骤 1:连接建立(获取令牌 + CometD 握手)establishCometDConnection();// 步骤 2:订阅主题subscribeToTopic();// 步骤 5:启用重连维护(监听连接状态)setupReconnection();// 保持程序运行(模拟后端服务持续监听)System.out.println("\n📡 客户端已启动,持续监听事件...(按 Ctrl+C 退出)");while (true) {TimeUnit.SECONDS.sleep(60);}} catch (Exception e) {System.err.println("❌ 客户端异常:" + e.getMessage());e.printStackTrace();} finally {// 关闭资源(程序退出时)if (bayeuxClient != null && bayeuxClient.isConnected()) {bayeuxClient.disconnect(1000, TimeUnit.MILLISECONDS);System.out.println("🔌 客户端已断开连接");}}}// 步骤 1.1:获取 Salesforce Access Tokenprivate static String getAccessToken() throws Exception {String tokenEndpoint = DOMAIN + "/services/oauth2/token";String params = String.format("grant_type=password&client_id=%s&client_secret=%s&username=%s&password=%s",CLIENT_ID, CLIENT_SECRET, USERNAME, PASSWORD);// 发送 OAuth 请求String response = Request.post(tokenEndpoint).bodyString(params, ContentType.APPLICATION_FORM_URLENCODED).execute().returnContent().asString();// 解析令牌JsonNode json = objectMapper.readTree(response);return json.get("access_token").asText();}// 步骤 1.2:建立 CometD 连接(握手)private static void establishCometDConnection() throws Exception {// 获取 Access TokenString accessToken = getAccessToken();System.out.println("✅ Access Token 获取成功:" + accessToken.substring(0, 20) + "...");// 初始化 Jetty HTTP 客户端HttpClient httpClient = new HttpClient();httpClient.start();// 配置 CometD 传输层(携带认证头)Map<String, Object> transportOptions = new HashMap<>();transportOptions.put(HttpClientTransport.MAX_NETWORK_DELAY_OPTION, 30000); // 30秒超时transportOptions.put("headers", Map.of("Authorization", "Bearer " + accessToken)); // 认证头ClientTransport transport = new HttpClientTransport(null, httpClient, transportOptions);// 初始化 BayeuxClient 并握手String streamingEndpoint = DOMAIN + "/cometd/" + API_VERSION + "/";bayeuxClient = new BayeuxClient(streamingEndpoint, transport);// 握手并等待结果(5秒超时)boolean handshakeSuccess = bayeuxClient.waitFor(5000, BayeuxClient.State.CONNECTED);if (handshakeSuccess) {System.out.println("✅ CometD 连接建立成功!Client ID:" + bayeuxClient.getId());} else {throw new Exception("❌ CometD 握手失败:" + bayeuxClient.getHandshakeResponse());}}// 步骤 2:订阅主题private static void subscribeToTopic() {if (bayeuxClient == null || !bayeuxClient.isConnected()) {throw new IllegalStateException("❌ 订阅失败:连接未建立");}// 订阅主题并注册事件回调(步骤 3:事件监听)bayeuxClient.subscribe(TARGET_TOPIC, (message) -> {if (message.isSuccessful()) {System.out.println("\n📥 收到事件:" + message.getChannel());// 步骤 3:处理事件数据handleEvent(message.getDataAsJson());} else {System.err.println("❌ 事件接收失败:" + message.getError());}}, (subscribeReply) -> {if (subscribeReply.isSuccessful()) {System.out.println("✅ 成功订阅主题:" + TARGET_TOPIC);} else {throw new IllegalStateException("❌ 订阅主题失败:" + subscribeReply.getError());}});}// 步骤 3:处理事件数据private static void handleEvent(JsonNode eventData) {try {// 解析 CDC 事件结构JsonNode payload = eventData.get("payload");JsonNode header = payload.get("ChangeEventHeader");String eventType = header.get("changeType").asText();String recordId = header.get("recordIds").get(0).asText();String newName = payload.get("Name").asText();String oldName = payload.has("OldValue") ? payload.get("OldValue").get("Name").asText() : "无";// 业务逻辑处理System.out.println("🔍 事件详情:");System.out.println("- 类型:" + eventType);System.out.println("- 记录 ID:" + recordId);System.out.println("- 名称变更:" + oldName + " → " + newName);// 示例:同步到 ERP(模拟)// syncToERP(recordId, newName, eventType);} catch (Exception e) {System.err.println("❌ 事件处理失败:" + e.getMessage());}}// 步骤 5:重连维护private static void setupReconnection() {// 监听断开事件bayeuxClient.addListener("/meta/disconnect", (message) -> {if (!message.isSuccessful()) {System.warn("⚠️ 连接意外断开,尝试重连...");reconnect();}});// 监听连接状态bayeuxClient.addListener("/meta/connect", (message) -> {if (message.isSuccessful()) {System.out.println("🔄 连接已恢复");subscribeToTopic(); // 重连后重新订阅} else {System.err.println("⚠️ 连接恢复失败,3秒后重试...");try {TimeUnit.SECONDS.sleep(3);reconnect();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});// 监听令牌过期(握手失败时)bayeuxClient.addListener("/meta/handshake", (message) -> {if (!message.isSuccessful() && message.getError() != null && message.getError().contains("invalid_grant")) {System.err.println("⚠️ Access Token 过期,重新获取令牌...");try {String newToken = getAccessToken();// 更新认证头并重新握手Map<String, Object> headers = new HashMap<>();headers.put("Authorization", "Bearer " + newToken);bayeuxClient.getTransport().setOption("headers", headers);bayeuxClient.handshake();} catch (Exception e) {System.err.println("❌ 令牌刷新失败:" + e.getMessage());}}});System.out.println("✅ 重连维护机制已启用");}// 手动触发重连private static void reconnect() {if (bayeuxClient.isDisconnected()) {System.out.println("🔄 正在重新建立连接...");bayeuxClient.handshake();}}
}
三、关键注意事项与错误排查
-
认证失败(401 Unauthorized):
- 检查
Access Token
是否有效(可通过https://<domain>/services/oauth2/userinfo
验证); - 确认 Connected App 的 OAuth 范围包含
api
或full
; - 密码+安全令牌是否拼接正确(无空格,安全令牌未过期)。
- 检查
-
订阅失败(403 Forbidden):
- 确认用户有“读取目标对象”的权限(如 Account 对象的
Read
权限); - 若为 CDC 事件,需在 Salesforce Setup 中启用“变更数据捕获”(搜索“变更数据捕获”→ 勾选目标对象)。
- 确认用户有“读取目标对象”的权限(如 Account 对象的
-
连接断开后无法重连:
- 检查
maxConnectionAttempts
配置是否足够; - 确认网络是否允许长轮询(部分企业防火墙可能拦截长时间未响应的 HTTP 连接,需放行 Salesforce 域名)。
- 检查
-
事件重复推送:
- Streaming API 保证“至少一次推送”,需在
handleReceivedEvent
中实现幂等处理(如通过recordId
+commitTimestamp
去重)。
- Streaming API 保证“至少一次推送”,需在
通过以上步骤,可完整实现基于 CometD 协议的 Salesforce Streaming API 集成,实现“实时事件监听-处理-重连”的闭环,适用于前端页面、后端服务等多种场景。
五、使用 Streaming API 的关键注意事项
- 认证与权限:
- 客户端需通过 OAuth 2.0 获取
Access Token
,且令牌对应的用户需具备“API 访问权限”和“订阅目标事件的权限”(如View All Data
或特定对象的Read
权限);
- 客户端需通过 OAuth 2.0 获取
- 事件容量限制:
- Salesforce 对 Streaming API 的并发连接数、事件推送频率有配额限制(如企业版默认最大并发连接数为 200),需根据业务规模规划;
- 数据可靠性:
- Streaming API 保证“至少一次推送”(At-Least-Once),但可能存在重复推送,客户端需实现“幂等处理”(如通过
recordId
和eventId
去重);
- Streaming API 保证“至少一次推送”(At-Least-Once),但可能存在重复推送,客户端需实现“幂等处理”(如通过
- 选择合适的事件类型:
- 若需完整的字段前后值,优先选择 CDC Events;若需按 SOQL 筛选事件,选择 Push Topics;若需自定义事件结构,选择 自定义平台事件。
总结
Salesforce Streaming API 是基于 CometD 协议的“实时事件总线”,通过建立持久连接和“订阅-发布”模式,让外部系统能即时接收 Salesforce 中的关键事件,解决了传统 API 轮询效率低、实时性差的问题。它是构建“实时化 Salesforce 集成方案”的核心工具,适用于数据同步、实时监控、跨系统自动化等企业级场景。