当前位置: 首页 > news >正文

项目四:Dify智能开发与应用(零售企业基于Dify搭建会员智能运营平台)

项目原型

🛍️ 零售智能运营平台 (基于Dify)
==================================================[工作流画布 - 会员智能分群与营销]
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   📊 数据输入   │───▶│   🤖 RFM分析    │───▶│   🎯 会员分群   │
└─────────────────┘    └─────────────────┘    └─────────────────┘│                       │                       ││                       │                       │▼                       ▼                       ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   💰 优惠券生成  │◀──▶│   📝 分群命名   │    │   📱 短信触达   │
└─────────────────┘    └─────────────────┘    └─────────────────┘[实时监控面板]
今日工作流执行: 24次      成功率: 95.8%      平均耗时: 45.2s[会员分群结果 - 最新执行]
🎯 高价值会员 (15.2%): 8,542人 | 💰 人均优惠: 85元
📈 成长型会员 (28.7%): 16,123人 | 💰 人均优惠: 50元  
⚠️ 流失风险会员 (12.1%): 6,789人 | 💰 人均优惠: 100元
💤 沉睡会员 (44.0%): 24,678人 | 💰 人均优惠: 30元[营销效果分析]
📊 优惠券核销率: 34.5%    📈 复购率提升: 18.2%
💬 客户满意度: 4.5/5.0    📱 短信打开率: 67.8%[操作面板]
1. 🚀 立即执行工作流   2. ⚙️ 配置工作流参数   3. 📊 查看详细分析
4. 📋 导出会员名单     5. 🔄 更新数据源       6. ❓ 帮助文档请输入选择 [1-6]: 

配置参数

# config/dify_config.yaml
dify_platform:base_url: "https://api.dify.ai/v1"api_key: "dfy-xxxxxxxxxxxxxxxx"workspace_id: "retail_ai_workspace"database:crm_connection: host: "crm-db.retail.com"port: 5432database: "customer_relationship"username: "ai_platform"password: "encrypted_password"analytics_connection:host: "analytics-db.retail.com" port: 5432database: "business_analytics"username: "ai_platform"password: "encrypted_password"workflow_configs:member_segmentation:trigger: "schedule"schedule: "0 2 * * *"  # 每天凌晨2点执行batch_size: 10000chunk_size: 1000churn_prediction:trigger: "realtime"check_interval: "1h"batch_size: 5000notification:sms:provider: "aliyun"template_id: "SMS_001"signature: "XX零售"email:provider: "sendgrid"from_email: "member@retail.com"template_dir: "/templates/email/"monitoring:log_level: "INFO"metrics_port: 9090alert_rules:- "workflow_failure_rate > 5%"- "execution_time > 300s"

核心代码实现

from dify_client import DifyClient
import pandas as pd
from datetime import datetime,timedelta
import jsonclass RetailAIPlatform:"""基于Dify的零售智能运营平台"""def __init__(self,dify_api_key:str,base_url:str):self.client = DifyClient(api_key=dify_api_key,base_url=base_url)self.workflows = {}def create_member_segmentation_workflow(self):"""创建会员分群工作流"""work_config = {"name":"会员智能分群与营销","description":"基于会员行为数据进行自动分群和个性化营销","nodes":[{"id":"data_source","type":"data_input","data":{"type":"database","connection":"crm_database","query":"""SELECT member_id,last_purchase_date,total_orders,total_spent,favorite_categoriesFROM member_behavior WHERE last_purchase_date >= DATE_SUB(NOW(), INTERVAL 90 DAY)"""}},{"id":"rfm_analysis","type":"python_code","data":{"code":"""#RFM分析计算def calculate_rfm_score(row):#计算R(最近购买)recency = (datetime.now() - row['last_purchase_date']).daysif recency <= 30:r_score = 5elif recency <= 60:r_score = 4elif recency <= 90:r_score = 3else:r_score = 2	#计算F(购买频次)frequency = row['total_orders']if frequency >= 10:f_score = 5elif frequency >= 5:f_score = 4elif frequency >= 3:f_score = 3else:f_score = 2# 计算M(购买金额)monetary = row['total_spent']if monetary >= 5000:m_score = 5elif monetary >= 2000:m_score = 4elif monetary >= 1000:m_score = 3else:m_score = 2return r_score, f_score, m_score#应用RFM计算rfm_scores = df.apply(calculate_rfm_score, axis=1)df['r_score'] = [score[0] for score in rfm_scores]df['f_score'] = [score[1] for score in rfm_scores]df['m_score'] = [score[2] for score in rfm_scores]df['rfm_score'] = df['r_score'] + df['f_score'] + df['m_score']return df"""}},{"id":"member_segmentation","type":"ai_analysis","data":{"model":"clustering","parameters":{"algorithm":"kmeans","n_clusters":5,"features":["r_score","f_score","m_score"]}}},{"id":"segment_naming","type":"llm_processing","data":{"prompt_template":"""根据以下会员分群特征,为这个群体起一个合适的名称并制定营销策略:群体特征:- 平均最近购买天数: {{avg_recency}} 天- 平均购买频次: {{avg_frequency}} 次- 平均消费金额: {{avg_monetary}} 元- 主要偏好品类: {{top_categories}}请输出JSON格式:{"segment_name": "分群名称","description": "分群描述", "marketing_strategy": "营销策略","recommended_actions": ["行动1", "行动2"]}""","variables": {"avg_recency": "{{node.rfm_analysis.output.avg_recency}}","avg_frequency": "{{node.rfm_analysis.output.avg_frequency}}","avg_monetary": "{{node.rfm_analysis.output.avg_monetary}}","top_categories": "{{node.rfm_analysis.output.top_categories}}"}"""}},{"id":"personalized_coupon","type":"business_rule","data":{"rules":[{"condition": "{{node.member_segmentation.output.segment_id}} == 'high_value'","action": "generate_coupon","parameters": {"type": "percentage","value": 15,"min_amount": 200,"valid_days": 30}},{"condition": "{{node.member_segmentation.output.segment_id}} == 'at_risk'", "action": "generate_coupon","parameters": {"type": "fixed","value": 50,"min_amount": 300,"valid_days": 15}}]}},{"id":"sms_notification","type":"notification","data":{"channel":"sms","template":"""亲爱的会员,感谢您一直以来的支持!我们为您准备了专属优惠:{{coupon_code}}有效期至:{{expiry_date}}{{company_name}}""","variables":{"coupon_code": "{{node.personalized_coupon.output.coupon_code}}","expiry_date": "{{node.personalized_coupon.output.expiry_date}}","company_name": "XX零售"}}}],"edges":[{"source": "data_source", "target": "rfm_analysis"},{"source": "rfm_analysis", "target": "member_segmentation"},{"source": "member_segmentation", "target": "segment_naming"},{"source": "member_segmentation", "target": "personalized_coupon"},{"source": "personalized_coupon", "target": "sms_notification"}]}#在Dify平台创建工作流response = self.client.workflows.create(workflow_config)self.workflows['member_segmentation'] = response['id']return responsedef execute_workflow():"""执行工作流"""execution_data = input_data or {}response = self.client.workflows.execute(workflow_id = workflow_id,inputs = execution_data,response_mode = "blocking" #阻塞模式,等待执行完成)return {"execution_id": response['id'],"status": response['status'],"outputs": response['data']['outputs'],"execution_time": response['data']['total_time'],"node_results": response['data']['node_execution_order']}def create_churn_prediction_workflow(self):"""创建会员流失预测工作流"""workflow_config = {"name":"会员流失预警与干预","description":"预测可能流失的会员并自动触发保留策略","nodes":[{"id":"churn_features","type":"data_input","data":{"type":"database","connection":"crm_database","query":"""SELECT member_id,DATEDIFF(NOW(), last_purchase_date) as days_since_last_purchase,total_orders_90d,total_spent_90d,customer_tenure_days,complaint_count_90d,has_used_coupon_30dFROM member_churn_featuresWHERE is_active = 1"""}},{"id": "churn_prediction","type": "ml_model","data": {"model_id": "churn_prediction_v2","version": "1.0","features": ["days_since_last_purchase","total_orders_90d", "total_spent_90d","customer_tenure_days","complaint_count_90d","has_used_coupon_30d"]}},{·	"id":"risk_assessment","type":"business_rule","data":{"rules":[{"condition": "{{node.churn_prediction.output.churn_probability}} >= 0.8","action": "mark_high_risk","parameters": {"priority": "high"}},{"condition": "{{node.churn_prediction.output.churn_probability}} >= 0.6","action": "mark_medium_risk", "parameters": {"priority": "medium"}}]}},{"id": "retention_strategy","type": "llm_processing","data": {"prompt_template": """针对以下有流失风险的会员,制定个性化保留策略:会员信息:- 会员ID: {{member_id}}- 流失概率: {{churn_probability}}- 最近购买: {{days_since_last_purchase}} 天前- 历史订单数: {{total_orders}}- 历史消费: {{total_spent}}- 风险等级: {{risk_level}}请输出JSON格式的保留策略:{"personalized_message": "个性化消息内容","recommended_offer": "推荐优惠类型","contact_channel": "联系渠道","escalation_level": "升级级别"}""","variables": {"member_id": "{{node.churn_features.output.member_id}}","churn_probability": "{{node.churn_prediction.output.churn_probability}}","days_since_last_purchase": "{{node.churn_features.output.days_since_last_purchase}}","total_orders": "{{node.churn_features.output.total_orders_90d}}","total_spent": "{{node.churn_features.output.total_spent_90d}}", "risk_level": "{{node.risk_assessment.output.priority}}"}}],"edges": [{"source": "churn_features", "target": "churn_prediction"},{"source": "churn_prediction", "target": "risk_assessment"},{"source": "risk_assessment", "target": "retention_strategy"}]}response = self.client.workflows.create(workflow_config)self.workflows['churn_prediction'] = response['id']return response def get_workflow_analytics(self,workflow_id:str,days:int=30):"""获取工作流执行分析"""analytics = self.client.workflows.analytics(workflow_id=workflow_id,start_date=(datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d'),end_date=datetime.now().strftime('%Y-%m-%d'))return {"total_executions": analytics['total_executions'],"success_rate": analytics['success_rate'],"average_execution_time": analytics['avg_execution_time'],"most_used_nodes": analytics['node_usage_stats'],"error_breakdown": analytics['error_analysis']}

===========================================

Java开发

架构、依赖

Dify智能开发平台
├── 前端设计器 (Vue.js + Element Plus + GoJS)
├── RESTful API (Spring Boot)
├── 工作流引擎层 (Camunda BPM)
├── 业务逻辑层
│   ├── 工作流设计服务
│   ├── 组件管理服务
│   ├── 应用部署服务
│   └── 执行引擎服务
├── 数据访问层
├── AI组件库
└── 部署平台 (Kubernetes)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.company</groupId><artifactId>dify-platform</artifactId><version>1.0.0</version><packaging>jar</packaging><name>Dify Intelligent Development Platform</name><description>Enterprise-level low-code AI application development platform</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath/></parent><properties><java.version>11</java.version><camunda.version>7.17.0</camunda.version><kubernetes-client.version>15.0.1</kubernetes-client.version></properties><dependencies><!-- Spring Boot Starters --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- Database --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- Camunda BPM --><dependency><groupId>org.camunda.bpm.springboot</groupId><artifactId>camunda-bpm-spring-boot-starter</artifactId><version>${camunda.version}</version></dependency><dependency><groupId>org.camunda.bpm.springboot</groupId><artifactId>camunda-bpm-spring-boot-starter-rest</artifactId><version>${camunda.version}</version></dependency><dependency><groupId>org.camunda.bpm.springboot</groupId><artifactId>camunda-bpm-spring-boot-starter-webapp</artifactId><version>${camunda.version}</version></dependency><!-- Kubernetes Client --><dependency><groupId>io.kubernetes</groupId><artifactId>client-java</artifactId><version>${kubernetes-client.version}</version></dependency><!-- OpenAI Client --><dependency><groupId>com.theokanning.openai-gpt3-java</groupId><artifactId>service</artifactId><version>0.12.0</version></dependency><!-- JSON Processing --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId></dependency><!-- Utilities --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId></dependency><!-- Template Engine --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId></dependency><!-- Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.camunda.bpm.extension</groupId><artifactId>camunda-bpm-assert</artifactId><version>1.2</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
server:port: 8083servlet:context-path: /difyspring:datasource:url: jdbc:mysql://localhost:3306/dify_platform?useSSL=false&serverTimezone=UTCusername: rootpassword: passworddriver-class-name: com.mysql.cj.jdbc.Driverjpa:hibernate:ddl-auto: updateshow-sql: trueproperties:hibernate:dialect: org.hibernate.dialect.MySQL8Dialectformat_sql: trueredis:host: localhostport: 6379password: database: 0timeout: 2000mslettuce:pool:max-active: 8max-wait: -1msmax-idle: 8min-idle: 0websocket:allowed-origins: "*"freemarker:template-loader-path: classpath:/templates/suffix: .ftlcache: false# Camunda Configuration
camunda:bpm:admin-user:id: adminpassword: adminfirstName: Adminfilter:create: All tasksauto-deployment-enabled: truejob-execution:enabled: truedatabase:schema-update: truehistory-level: full# Dify Platform Configuration
dify:platform:# Workflow Configurationworkflow:max-nodes: 50max-execution-time: 3600000enable-versioning: true# Component Configurationcomponents:auto-registration: truescan-packages: com.company.dify.component# Deployment Configurationdeployment:target-platform: kubernetesnamespace: dify-appsdefault-replicas: 1image-prefix: dify-app-# Execution Configurationexecution:max-concurrent: 100timeout: 300000enable-monitoring: true# Kubernetes Configuration (if deployed on K8s)
kubernetes:master-url: ${KUBERNETES_MASTER:https://kubernetes.default.svc}namespace: ${KUBERNETES_NAMESPACE:default}ca-cert-file: ${KUBERNETES_CA_CERT_PATH:}client-cert-file: ${KUBERNETES_CLIENT_CERT_PATH:}client-key-file: ${KUBERNETES_CLIENT_KEY_PATH:}oauth-token: ${KUBERNETES_OAUTH_TOKEN:}# OpenAI Configuration
openai:api-key: ${OPENAI_API_KEY:}organization: ${OPENAI_ORG:}# Application Configuration
app:storage:workflow-definitions: ./workflows/deployed-apps: ./deployed/cache:workflow-cache-ttl: 3600component-cache-ttl: 86400# Logging
logging:level:com.company.dify: DEBUGorg.springframework.web: INFOorg.hibernate: WARNorg.camunda: WARNfile:name: logs/dify-platform.logpattern:file: "%d{yyyy-MM-dd HH:mm:ss} - %logger{36} - %msg%n"# Management Endpoints
management:endpoints:web:exposure:include: health,info,metrics,prometheusendpoint:health:show-details: always

实体类

工作流定义实体

/*** 工作流定义实体 - 存储工作流模板和定义*/
@Entity
@Table(name = "workflow_definition", indexes = {@Index(name = "idx_workflow_name", columnList = "name"),@Index(name = "idx_workflow_category", columnList = "category"),@Index(name = "idx_workflow_status", columnList = "status")
})
@Data
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class WorkflowDefinition {public enum WorkflowStatus {DRAFT, PUBLISHED, DEPRECATED, DELETED}public enum WorkflowType {SEQUENTIAL, PARALLEL, DECISION, EVENT_DRIVEN}@Id@GeneratedValue(strategy = GenerationType.IDENTITY)@EqualsAndHashCode.Includeprivate Long id;@NotBlank(message = "工作流名称不能为空")@Column(nullable = false, length = 200)private String name;@Column(length = 500)private String description;@Column(length = 100)private String category = "general";@Enumerated(EnumType.STRING)@Column(nullable = false, length = 20)private WorkflowStatus status = WorkflowStatus.DRAFT;@Enumerated(EnumType.STRING)@Column(nullable = false, length = 20)private WorkflowType type = WorkflowType.SEQUENTIAL;@Column(nullable = false)private String version = "1.0.0";@Column(nullable = false)private String createdBy;@Column(length = 100)private String updatedBy;@Type(type = "text")@Column(columnDefinition = "TEXT", nullable = false)private String workflowJson; // JSON格式的工作流定义@Type(type = "text")@Column(columnDefinition = "TEXT")private String bpmnXml; // BPMN XML定义@Column(nullable = false)private Integer nodeCount = 0;@Column(nullable = false)private Integer executionCount = 0;@Column(nullable = false)private Integer successCount = 0;@Column(precision = 5, scale = 2)private Double successRate = 0.0;@Column(length = 50)private String thumbnail; // 工作流缩略图路径@ElementCollection@CollectionTable(name = "workflow_tags", joinColumns = @JoinColumn(name = "workflow_id"))@Column(name = "tag")private List<String> tags = new ArrayList<>();@Column(length = 1000)private String metadata; // JSON格式的元数据@OneToMany(mappedBy = "workflowDefinition", cascade = CascadeType.ALL, fetch = FetchType.LAZY)private List<WorkflowVersion> versions = new ArrayList<>();@OneToMany(mappedBy = "workflowDefinition", cascade = CascadeType.ALL, fetch = FetchType.LAZY)private List<WorkflowExecution> executions = new ArrayList<>();@CreationTimestamp@Column(updatable = false)private LocalDateTime createdAt;@UpdateTimestampprivate LocalDateTime updatedAt;@Versionprivate Long versionNumber;public WorkflowDefinition(String name, String workflowJson, String createdBy) {this.name = name;this.workflowJson = workflowJson;this.createdBy = createdBy;}public void incrementExecutionCount() {this.executionCount = (this.executionCount == null) ? 1 : this.executionCount + 1;updateSuccessRate();}public void incrementSuccessCount() {this.successCount = (this.successCount == null) ? 1 : this.successCount + 1;updateSuccessRate();}private void updateSuccessRate() {if (executionCount != null && executionCount > 0) {this.successRate = (double) successCount / executionCount * 100;}}public boolean isPublished() {return WorkflowStatus.PUBLISHED.equals(this.status);}public boolean isDeployable() {return isPublished() && bpmnXml != null && !bpmnXml.trim().isEmpty();}
}

工作流版本实体

/*** 工作流版本实体 - 存储工作流的不同版本*/
@Entity
@Table(name = "workflow_version", indexes = {@Index(name = "idx_version_workflow", columnList = "workflowDefinition_id"),@Index(name = "idx_version_number", columnList = "versionNumber")
})
@Data
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class WorkflowVersion {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)@EqualsAndHashCode.Includeprivate Long id;@ManyToOne(fetch = FetchType.LAZY)@JoinColumn(name = "workflowDefinition_id", nullable = false)private WorkflowDefinition workflowDefinition;@Column(nullable = false, length = 50)private String versionNumber;@Column(length = 500)private String changeDescription;@Type(type = "text")@Column(columnDefinition = "TEXT", nullable = false)private String workflowJson;@Type(type = "text")@Column(columnDefinition = "TEXT")private String bpmnXml;@Column(nullable = false)private Integer nodeCount;@Column(nullable = false)private Boolean isCurrent = false;@Column(length = 100)private String createdBy;@CreationTimestamp@Column(updatable = false)private LocalDateTime createdAt;@Versionprivate Long version;public WorkflowVersion(WorkflowDefinition workflowDefinition, String versionNumber, String workflowJson) {this.workflowDefinition = workflowDefinition;this.versionNumber = versionNumber;this.workflowJson = workflowJson;this.nodeCount = workflowDefinition.getNodeCount();this.createdBy = workflowDefinition.getCreatedBy();}
}

工作流执行实体

/*** 工作流执行实体 - 存储工作流执行记录*/
@Entity
@Table(name = "workflow_execution", indexes = {@Index(name = "idx_execution_workflow", columnList = "workflowDefinition_id"),@Index(name = "idx_execution_status", columnList = "status"),@Index(name = "idx_execution_created", columnList = "createdAt")
})
@Data
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class WorkflowExecution {public enum ExecutionStatus {PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, TIMEOUT}@Id@GeneratedValue(strategy = GenerationType.IDENTITY)@EqualsAndHashCode.Includeprivate Long id;@ManyToOne(fetch = FetchType.LAZY)@JoinColumn(name = "workflowDefinition_id", nullable = false)private WorkflowDefinition workflowDefinition;@Column(nullable = false, length = 100)private String executionId; // Camunda流程实例ID@Column(nullable = false, length = 100)private String triggeredBy;@Enumerated(EnumType.STRING)@Column(nullable = false, length = 20)private ExecutionStatus status = ExecutionStatus.PENDING;@Type(type = "text")@Column(columnDefinition = "TEXT")private String inputData; // JSON格式的输入数据@Type(type = "text")@Column(columnDefinition = "TEXT")private String outputData; // JSON格式的输出数据@Type(type = "text")@Column(columnDefinition = "TEXT")private String executionLog; // 执行日志@Columnprivate Long executionTimeMs;@Columnprivate Integer taskCount;@Columnprivate Integer completedTaskCount;@Columnprivate Integer failedTaskCount;@Column(length = 1000)private String errorMessage;@Column(length = 100)private String camundaProcessDefinitionId;@ElementCollection@CollectionTable(name = "execution_variables", joinColumns = @JoinColumn(name = "execution_id"))@MapKeyColumn(name = "variable_name")@Column(name = "variable_value", columnDefinition = "TEXT")private Map<String, String> executionVariables = new HashMap<>();@CreationTimestamp@Column(updatable = false)private LocalDateTime createdAt;private LocalDateTime startedAt;private LocalDateTime completedAt;@Versionprivate Long version;public WorkflowExecution(WorkflowDefinition workflowDefinition, String executionId, String triggeredBy) {this.workflowDefinition = workflowDefinition;this.executionId = executionId;this.triggeredBy = triggeredBy;}public void markAsRunning() {this.status = ExecutionStatus.RUNNING;this.startedAt = LocalDateTime.now();}public void markAsCompleted(String outputData, Long executionTimeMs) {this.status = ExecutionStatus.COMPLETED;this.outputData = outputData;this.executionTimeMs = executionTimeMs;this.completedAt = LocalDateTime.now();this.completedTaskCount = taskCount;}public void markAsFailed(String errorMessage, Long executionTimeMs) {this.status = ExecutionStatus.FAILED;this.errorMessage = errorMessage;this.executionTimeMs = executionTimeMs;this.completedAt = LocalDateTime.now();this.failedTaskCount = (failedTaskCount == null) ? 1 : failedTaskCount + 1;}public void incrementCompletedTask() {this.completedTaskCount = (completedTaskCount == null) ? 1 : completedTaskCount + 1;}public void incrementFailedTask() {this.failedTaskCount = (failedTaskCount == null) ? 1 : failedTaskCount + 1;}public double getProgress() {if (taskCount == null || taskCount == 0) return 0.0;int completed = (completedTaskCount != null ? completedTaskCount : 0) + (failedTaskCount != null ? failedTaskCount : 0);return (double) completed / taskCount * 100;}
}

应用部署实体

/*** 应用部署实体 - 存储部署的AI应用信息*/
@Entity
@Table(name = "app_deployment", indexes = {@Index(name = "idx_app_name", columnList = "name"),@Index(name = "idx_app_status", columnList = "status"),@Index(name = "idx_app_created", columnList = "createdAt")
})
@Data
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class AppDeployment {public enum DeploymentStatus {DRAFT, DEPLOYING, RUNNING, STOPPED, FAILED, DELETED}public enum DeploymentPlatform {KUBERNETES, DOCKER, CLOUD_FUNCTION, EDGE_DEVICE}@Id@GeneratedValue(strategy = GenerationType.IDENTITY)@EqualsAndHashCode.Includeprivate Long id;@NotBlank(message = "应用名称不能为空")@Column(nullable = false, length = 200)private String name;@Column(length = 500)private String description;@ManyToOne(fetch = FetchType.LAZY)@JoinColumn(name = "workflowDefinition_id", nullable = false)private WorkflowDefinition workflowDefinition;@Column(nullable = false, length = 50)private String version = "1.0.0";@Enumerated(EnumType.STRING)@Column(nullable = false, length = 20)private DeploymentStatus status = DeploymentStatus.DRAFT;@Enumerated(EnumType.STRING)@Column(nullable = false, length = 20)private DeploymentPlatform platform = DeploymentPlatform.KUBERNETES;@Column(nullable = false)private String deployedBy;@Column(length = 500)private String endpoint; // 应用访问端点@Column(length = 100)private String namespace = "dify-apps";@Columnprivate Integer replicas = 1;@Type(type = "text")@Column(columnDefinition = "TEXT")private String deploymentConfig; // JSON格式的部署配置@Type(type = "text")@Column(columnDefinition = "TEXT")private String apiDefinition; // OpenAPI/Swagger定义@Column(nullable = false)private Integer requestCount = 0;@Column(nullable = false)private Integer successCount = 0;@Column(precision = 5, scale = 2)private Double successRate = 0.0;@Columnprivate Double averageResponseTime = 0.0;@Column(length = 1000)private String deploymentLog;@ElementCollection@CollectionTable(name = "app_environment_vars", joinColumns = @JoinColumn(name = "app_id"))@MapKeyColumn(name = "env_key")@Column(name = "env_value")private Map<String, String> environmentVariables = new HashMap<>();@OneToMany(mappedBy = "appDeployment", cascade = CascadeType.ALL, fetch = FetchType.LAZY)private List<AppAccessLog> accessLogs = new ArrayList<>();@CreationTimestamp@Column(updatable = false)private LocalDateTime createdAt;@UpdateTimestampprivate LocalDateTime updatedAt;private LocalDateTime deployedAt;@Versionprivate Long versionNumber;public AppDeployment(String name, WorkflowDefinition workflowDefinition, String deployedBy) {this.name = name;this.workflowDefinition = workflowDefinition;this.deployedBy = deployedBy;}public void incrementRequestCount() {this.requestCount = (requestCount == null) ? 1 : requestCount + 1;updateSuccessRate();}public void incrementSuccessCount() {this.successCount = (successCount == null) ? 1 : successCount + 1;updateSuccessRate();}private void updateSuccessRate() {if (requestCount != null && requestCount > 0) {this.successRate = (double) successCount / requestCount * 100;}}public boolean isRunning() {return DeploymentStatus.RUNNING.equals(this.status);}public boolean isAccessible() {return isRunning() && endpoint != null && !endpoint.trim().isEmpty();}
}

Web控制器层

工作流设计API控制器

/*** 工作流设计API控制器*/
@RestController
@RequestMapping("/api/v1/design")
@Validated
@Slf4j
public class WorkflowDesignController {@Autowiredprivate WorkflowDesignService workflowDesignService;@Autowiredprivate ComponentRegistryService componentRegistryService;/*** 创建工作流*/@PostMapping("/workflows")public ResponseEntity<WorkflowResponse> createWorkflow(@Valid @RequestBody WorkflowCreationRequest request) {log.info("Creating workflow: {}", request.getName());try {WorkflowDefinition definition = workflowDesignService.createWorkflow(request);WorkflowResponse response = new WorkflowResponse();response.setSuccess(true);response.setWorkflowId(definition.getId());response.setName(definition.getName());response.setMessage("工作流创建成功");return ResponseEntity.ok(response);} catch (Exception e) {log.error("Failed to create workflow", e);return ResponseEntity.badRequest().body(WorkflowResponse.error("创建工作流失败: " + e.getMessage()));}}/*** 更新工作流*/@PutMapping("/workflows/{workflowId}")public ResponseEntity<WorkflowResponse> updateWorkflow(@PathVariable Long workflowId,@Valid @RequestBody WorkflowUpdateRequest request) {log.info("Updating workflow: {}", workflowId);try {WorkflowDefinition definition = workflowDesignService.updateWorkflow(workflowId, request);WorkflowResponse response = new WorkflowResponse();response.setSuccess(true);response.setWorkflowId(definition.getId());response.setName(definition.getName());response.setMessage("工作流更新成功");return ResponseEntity.ok(response);} catch (Exception e) {log.error("Failed to update workflow: {}", workflowId, e);return ResponseEntity.badRequest().body(WorkflowResponse.error("更新工作流失败: " + e.getMessage()));}}/*** 验证工作流JSON*/@PostMapping("/workflows/validate")public ResponseEntity<ValidationResponse> validateWorkflow(@Valid @RequestBody WorkflowValidationRequest request) {log.info("Validating workflow JSON");try {workflowDesignService.validateWorkflowJson(request.getWorkflowJson());ValidationResponse response = new ValidationResponse();response.setSuccess(true);response.setValid(true);response.setMessage("工作流JSON验证通过");return ResponseEntity.ok(response);} catch (Exception e) {log.error("Workflow validation failed", e);return ResponseEntity.ok(ValidationResponse.error("工作流JSON验证失败: " + e.getMessage()));}}/*** 发布工作流*/@PostMapping("/workflows/{workflowId}/publish")public ResponseEntity<WorkflowResponse> publishWorkflow(@PathVariable Long workflowId) {log.info("Publishing workflow: {}", workflowId);try {// 这里应该实现发布逻辑WorkflowResponse response = new WorkflowResponse();response.setSuccess(true);response.setWorkflowId(workflowId);response.setMessage("工作流发布成功");return ResponseEntity.ok(response);} catch (Exception e) {log.error("Failed to publish workflow: {}", workflowId, e);return ResponseEntity.badRequest().body(WorkflowResponse.error("发布工作流失败: " + e.getMessage()));}}/*** 获取所有组件*/@GetMapping("/components")public ResponseEntity<ComponentsResponse> getAllComponents() {log.info("Getting all components");try {List<ComponentRegistryService.ComponentMetadata> components = componentRegistryService.getAllComponentMetadata();ComponentsResponse response = new ComponentsResponse();response.setSuccess(true);response.setComponents(components);response.setTotalCount(components.size());return ResponseEntity.ok(response);} catch (Exception e) {log.error("Failed to get components", e);return ResponseEntity.badRequest().body(ComponentsResponse.error("获取组件列表失败: " + e.getMessage()));}}/*** 根据类别获取组件*/@GetMapping("/components/category/{category}")public ResponseEntity<ComponentsResponse> getComponentsByCategory(@PathVariable String category) {log.info("Getting components by category: {}", category);try {List<ComponentRegistryService.ComponentMetadata> components = componentRegistryService.getComponentsByCategory(category);ComponentsResponse response = new ComponentsResponse();response.setSuccess(true);response.setComponents(components);response.setTotalCount(components.size());response.setCategory(category);return ResponseEntity.ok(response);} catch (Exception e) {log.error("Failed to get components by category: {}", category, e);return ResponseEntity.badRequest().body(ComponentsResponse.error("获取组件列表失败: " + e.getMessage()));}}/*** 搜索组件*/@GetMapping("/components/search")public ResponseEntity<ComponentsResponse> searchComponents(@RequestParam String keyword) {log.info("Searching components: {}", keyword);try {List<ComponentRegistryService.ComponentMetadata> components = componentRegistryService.searchComponents(keyword);ComponentsResponse response = new ComponentsResponse();response.setSuccess(true);response.setComponents(components);response.setTotalCount(components.size());response.setKeyword(keyword);return ResponseEntity.ok(response);} catch (Exception e) {log.error("Failed to search components: {}", keyword, e);return ResponseEntity.badRequest().body(ComponentsResponse.error("搜索组件失败: " + e.getMessage()));}}/*** 获取组件详情*/@GetMapping("/components/{componentType}")public ResponseEntity<ComponentDetailResponse> getComponentDetail(@PathVariable String componentType) {log.info("Getting component detail: {}", componentType);try {ComponentRegistryService.ComponentMetadata metadata = componentRegistryService.getComponentMetadata(componentType);ComponentDetailResponse response = new ComponentDetailResponse();response.setSuccess(true);response.setComponent(metadata);return ResponseEntity.ok(response);} catch (Exception e) {log.error("Failed to get component detail: {}", componentType, e);return ResponseEntity.badRequest().body(ComponentDetailResponse.error("获取组件详情失败: " + e.getMessage()));}}
}

核心业务服务层

工作流设计服务

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.camunda.bpm.model.bpmn.Bpmn;
import org.camunda.bpm.model.bpmn.BpmnModelInstance;
import org.camunda.bpm.model.bpmn.instance.*;

/*** 工作流设计服务 - 负责工作流的设计、验证和转换*/
@Service
@Slf4j
public class WorkflowDesignService {@Autowiredprivate WorkFlowDefinitionService workflowDefinationService;@Autowiredprivate ComponentRegistryService componentRegistryService;@Autowiredprivate ObjectMapper objectMapper;private static final Pattern NODE_ID_PATTERN = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");private static final int MAX_NODES = 50;private static final int MAX_NAME_LENGTH = 100;/**创建工作流定义*/public WorkflowDefinition createWorkflow(WorkflowCreationRequest request){log.info("Creating workflow: {}", request.getName());try {// 验证工作流JSONvalidateWorkflowJson(request.getWorkflowJson());// 创建工作流定义WorkflowDefinition definition = new WorkflowDefinition(request.getName(), request.getWorkflowJson(),request.getCreatedBy());if (request.getDescription() != null) {definition.setDescription(request.getDescription());}if (request.getCategory() != null) {definition.setCategory(request.getCategory());}if (request.getTags() != null) {definition.setTags(request.getTags());}// 解析节点数量JsonNode workflowNode = objectMapper.readTree(request.getWorkflowJson());definition.setNodeCount(calculateNodeCount(workflowNode));// 生成BPMN定义String bpmnXml = generateBpmnFromJson(request.getWorkflowJson());definition.setBpmnXml(bpmnXml);// 保存工作流定义return workflowDefinitionService.save(definition);} catch (Exception e) {log.error("Failed to create workflow: {}", request.getName(), e);throw new RuntimeException("创建工作流失败: " + e.getMessage());}}/*** 更新工作流定义*/public WorkflowDefinition updateWorkflow(Long workflowId, WorkflowUpdateRequest request) {log.info("Updating workflow: {}", workflowId);WorkflowDefinition definition = workflowDefinitionService.findById(workflowId).orElseThrow(() -> new RuntimeException("工作流不存在: " + workflowId));try {// 验证工作流JSONvalidateWorkflowJson(request.getWorkflowJson());// 创建新版本WorkflowVersion version = new WorkflowVersion(definition, generateNextVersion(definition.getVersion()),definition.getWorkflowJson());version.setBpmnXml(definition.getBpmnXml());version.setChangeDescription(request.getChangeDescription());// 更新工作流定义definition.setName(request.getName());definition.setDescription(request.getDescription());definition.setWorkflowJson(request.getWorkflowJson());definition.setNodeCount(calculateNodeCount(objectMapper.readTree(request.getWorkflowJson())));// 生成新的BPMN定义String bpmnXml = generateBpmnFromJson(request.getWorkflowJson());definition.setBpmnXml(bpmnXml);definition.setUpdatedBy(request.getUpdatedBy());// 保存更新definition.getVersions().add(version);return workflowDefinitionService.save(definition);} catch (Exception e) {log.error("Failed to update workflow: {}", workflowId, e);throw new RuntimeException("更新工作流失败: " + e.getMessage());}}/**验证工作流JSON*/public void validateWorkflowJson(String workflowJson){try{JsonNode rootNode = objectMapper.readTree(workflowJson);// 验证基本结构if (!rootNode.has("nodes") || !rootNode.has("edges")) {throw new RuntimeException("工作流JSON必须包含nodes和edges字段");}JsonNode nodes = rootNode.get("nodes");JsonNode edges = rootNode.get("edges");// 验证节点数量if (!nodes.isArray() || nodes.size() == 0) {throw new RuntimeException("工作流必须包含至少一个节点");}if (nodes.size() > MAX_NODES) {throw new RuntimeException("工作流节点数量不能超过" + MAX_NODES);}// 验证每个节点Set<String> nodeIds = new HashSet<>();for (JsonNode node : nodes) {validateNode(node, nodeIds);}// 验证边for (JsonNode edge : edges) {validateEdge(edge, nodeIds);}// 验证工作流连通性validateWorkflowConnectivity(nodes, edges, nodeIds);log.debug("Workflow JSON validation passed");}catch(Exception e){throw new RuntimeException("工作流JSON验证失败: " + e.getMessage());}}/*** 验证单个节点*/private void validateNode(JsonNode node, Set<String> nodeIds) {if (!node.has("id") || !node.has("type") || !node.has("data")) {throw new RuntimeException("节点必须包含id、type和data字段");}String nodeId = node.get("id").asText();String nodeType = node.get("type").asText();// 验证节点IDif (nodeId == null || nodeId.trim().isEmpty()) {throw new RuntimeException("节点ID不能为空");}if (!NODE_ID_PATTERN.matcher(nodeId).matches()) {throw new RuntimeException("节点ID格式无效: " + nodeId);}if (nodeIds.contains(nodeId)) {throw new RuntimeException("节点ID重复: " + nodeId);}nodeIds.add(nodeId);// 验证节点类型if (!componentRegistryService.isComponentRegistered(nodeType)) {throw new RuntimeException("未知的节点类型: " + nodeType);}// 验证节点数据JsonNode data = node.get("data");if (!data.isObject()) {throw new RuntimeException("节点data必须是对象");}// 验证组件特定配置componentRegistryService.validateComponentConfig(nodeType, data);// 验证节点位置信息(如果存在)if (node.has("position")) {JsonNode position = node.get("position");if (!position.has("x") || !position.has("y")) {throw new RuntimeException("节点位置信息不完整");}}}/*** 验证边*/private void validateEdge(JsonNode edge, Set<String> nodeIds) {if (!edge.has("id") || !edge.has("source") || !edge.has("target")) {throw new RuntimeException("边必须包含id、source和target字段");}String sourceId = edge.get("source").asText();String targetId = edge.get("target").asText();// 验证源节点和目标节点存在if (!nodeIds.contains(sourceId)) {throw new RuntimeException("边的源节点不存在: " + sourceId);}if (!nodeIds.contains(targetId)) {throw new RuntimeException("边的目标节点不存在: " + targetId);}// 验证不能连接到自身if (sourceId.equals(targetId)) {throw new RuntimeException("边不能连接节点到自身: " + sourceId);}}/*** 验证工作流连通性*/private void validateWorkflowConnectivity(JsonNode nodes, JsonNode edges, Set<String> nodeIds) {// 构建图结构Map<String, List<String>> graph = new HashMap<>();Map<String, Integer> inDegree = new HashMap<>();// 初始化for (String nodeId : nodeIds) {graph.put(nodeId, new ArrayList<>());inDegree.put(nodeId, 0);}// 构建边for (JsonNode edge : edges) {String source = edge.get("source").asText();String target = edge.get("target").asText();graph.get(source).add(target);inDegree.put(target, inDegree.get(target) + 1);}// 查找起点(入度为0的节点)List<String> startNodes = new ArrayList<>();for (Map.Entry<String, Integer> entry : inDegree.entrySet()) {if (entry.getValue() == 0) {startNodes.add(entry.getKey());}}if (startNodes.isEmpty()) {throw new RuntimeException("工作流必须包含至少一个起点节点(没有入边的节点)");}// 检查是否存在环if (hasCycle(graph, nodeIds)) {throw new RuntimeException("工作流不能包含环");}// 检查是否存在孤立节点Set<String> connectedNodes = new HashSet<>();for (String startNode : startNodes) {traverseGraph(graph, startNode, connectedNodes);}if (connectedNodes.size() != nodeIds.size()) {Set<String> isolatedNodes = new HashSet<>(nodeIds);isolatedNodes.removeAll(connectedNodes);throw new RuntimeException("工作流包含孤立节点: " + isolatedNodes);}}/*** 检测图中是否存在环*/private boolean hasCycle(Map<String, List<String>> graph, Set<String> nodeIds) {Set<String> visited = new HashSet<>();Set<String> recursionStack = new HashSet<>();for (String nodeId : nodeIds) {if (hasCycleUtil(nodeId, visited, recursionStack, graph)) {return true;}}return false;}private boolean hasCycleUtil(String nodeId, Set<String> visited, Set<String> recursionStack, Map<String, List<String>> graph) {if (recursionStack.contains(nodeId)) {return true;}if (visited.contains(nodeId)) {return false;}visited.add(nodeId);recursionStack.add(nodeId);for (String neighbor : graph.get(nodeId)) {if (hasCycleUtil(neighbor, visited, recursionStack, graph)) {return true;}}recursionStack.remove(nodeId);return false;}/*** 遍历图*/private void traverseGraph(Map<String, List<String>> graph, String currentNode, Set<String> visited) {visited.add(currentNode);for (String neighbor : graph.get(currentNode)) {if (!visited.contains(neighbor)) {traverseGraph(graph, neighbor, visited);}}}/*** 从JSON生成BPMN XML*/public String generateBpmnFromJson(String workflowJson) {try {JsonNode rootNode = objectMapper.readTree(workflowJson);JsonNode nodes = rootNode.get("nodes");JsonNode edges = rootNode.get("edges");// 创建BPMN模型实例BpmnModelInstance modelInstance = Bpmn.createProcess().id(generateProcessId()).name("Dify Workflow").executable().done();// 获取流程定义Process process = modelInstance.getModelElementsByType(Process.class).iterator().next();// 创建开始事件StartEvent startEvent = modelInstance.newInstance(StartEvent.class);startEvent.setId("startEvent");process.addChildElement(startEvent);// 创建节点映射Map<String, FlowNode> nodeMap = new HashMap<>();nodeMap.put("startEvent", startEvent);// 创建任务节点for (JsonNode node : nodes) {String nodeId = node.get("id").asText();String nodeType = node.get("type").asText();FlowNode flowNode = createFlowNode(modelInstance, nodeType, nodeId, node.get("data"));process.addChildElement(flowNode);nodeMap.put(nodeId, flowNode);}// 创建结束事件EndEvent endEvent = modelInstance.newInstance(EndEvent.class);endEvent.setId("endEvent");process.addChildElement(endEvent);nodeMap.put("endEvent", endEvent);// 创建序列流(边)createSequenceFlows(modelInstance, process, edges, nodeMap, nodes);// 转换为XML字符串StringWriter writer = new StringWriter();Bpmn.writeModelToStream(writer, modelInstance);return writer.toString();} catch (Exception e) {log.error("Failed to generate BPMN from JSON", e);throw new RuntimeException("生成BPMN定义失败: " + e.getMessage());}}/*** 创建流程节点*/private FlowNode createFlowNode(BpmnModelInstance modelInstance, String nodeType, String nodeId, JsonNode data) {FlowNode flowNode;switch (nodeType) {case "llm":ServiceTask serviceTask = modelInstance.newInstance(ServiceTask.class);serviceTask.setId(nodeId);serviceTask.setName(data.has("name") ? data.get("name").asText() : "LLM Task");// 设置任务实现serviceTask.setCamundaClass("com.company.dify.workflow.LLMServiceTask");flowNode = serviceTask;break;case "dataTransform":ServiceTask transformTask = modelInstance.newInstance(ServiceTask.class);transformTask.setId(nodeId);transformTask.setName(data.has("name") ? data.get("name").asText() : "Data Transform");transformTask.setCamundaClass("com.company.dify.workflow.DataTransformServiceTask");flowNode = transformTask;break;case "condition":ExclusiveGateway gateway = modelInstance.newInstance(ExclusiveGateway.class);gateway.setId(nodeId);gateway.setName(data.has("name") ? data.get("name").asText() : "Condition");flowNode = gateway;break;case "apiCall":ServiceTask apiTask = modelInstance.newInstance(ServiceTask.class);apiTask.setId(nodeId);apiTask.setName(data.has("name") ? data.get("name").asText() : "API Call");apiTask.setCamundaClass("com.company.dify.workflow.APICallServiceTask");flowNode = apiTask;break;default:ServiceTask defaultTask = modelInstance.newInstance(ServiceTask.class);defaultTask.setId(nodeId);defaultTask.setName(data.has("name") ? data.get("name").asText() : "Task");defaultTask.setCamundaClass("com.company.dify.workflow.GenericServiceTask");flowNode = defaultTask;}return flowNode;}/*** 创建序列流*/private void createSequenceFlows(BpmnModelInstance modelInstance, Process process, JsonNode edges, Map<String, FlowNode> nodeMap, JsonNode nodes) {// 首先连接开始事件到第一个节点String firstNodeId = findFirstNode(nodes, edges);if (firstNodeId != null) {SequenceFlow startFlow = modelInstance.newInstance(SequenceFlow.class);startFlow.setId("flow_start_" + firstNodeId);startFlow.setSource(nodeMap.get("startEvent"));startFlow.setTarget(nodeMap.get(firstNodeId));process.addChildElement(startFlow);}// 创建其他序列流for (JsonNode edge : edges) {String sourceId = edge.get("source").asText();String targetId = edge.get("target").asText();SequenceFlow sequenceFlow = modelInstance.newInstance(SequenceFlow.class);sequenceFlow.setId("flow_" + sourceId + "_" + targetId);sequenceFlow.setSource(nodeMap.get(sourceId));sequenceFlow.setTarget(nodeMap.get(targetId));// 设置条件表达式(如果是条件边)if (edge.has("condition") && !edge.get("condition").isNull()) {String condition = edge.get("condition").asText();sequenceFlow.setConditionExpression(createConditionExpression(condition));}process.addChildElement(sequenceFlow);}// 连接最后节点到结束事件List<String> endNodeIds = findEndNodes(nodes, edges);for (String endNodeId : endNodeIds) {SequenceFlow endFlow = modelInstance.newInstance(SequenceFlow.class);endFlow.setId("flow_" + endNodeId + "_end");endFlow.setSource(nodeMap.get(endNodeId));endFlow.setTarget(nodeMap.get("endEvent"));process.addChildElement(endFlow);}}/*** 查找第一个节点*/private String findFirstNode(JsonNode nodes, JsonNode edges) {Set<String> targetNodes = new HashSet<>();for (JsonNode edge : edges) {targetNodes.add(edge.get("target").asText());}for (JsonNode node : nodes) {String nodeId = node.get("id").asText();if (!targetNodes.contains(nodeId)) {return nodeId;}}return null;}/*** 查找结束节点*/private List<String> findEndNodes(JsonNode nodes, JsonNode edges) {Set<String> sourceNodes = new HashSet<>();for (JsonNode edge : edges) {sourceNodes.add(edge.get("source").asText());}List<String> endNodes = new ArrayList<>();for (JsonNode node : nodes) {String nodeId = node.get("id").asText();if (!sourceNodes.contains(nodeId)) {endNodes.add(nodeId);}}return endNodes;}/*** 创建条件表达式*/private org.camunda.bpm.model.bpmn.instance.ConditionExpression createConditionExpression(String condition) {// 简化实现,实际应该解析条件表达式return null;}/*** 生成流程ID*/private String generateProcessId() {return "dify_workflow_" + System.currentTimeMillis();}/*** 计算节点数量*/private int calculateNodeCount(JsonNode workflowNode) {if (workflowNode.has("nodes") && workflowNode.get("nodes").isArray()) {return workflowNode.get("nodes").size();}return 0;}/*** 生成下一个版本号*/private String generateNextVersion(String currentVersion) {try {String[] parts = currentVersion.split("\\.");int major = Integer.parseInt(parts[0]);int minor = Integer.parseInt(parts[1]);int patch = Integer.parseInt(parts[2]);return major + "." + minor + "." + (patch + 1);} catch (Exception e) {return "1.0.1";}}/*** 工作流创建请求*/@Datapublic static class WorkflowCreationRequest {@NotBlank private String name;private String description;private String category;@NotBlank private String workflowJson;@NotBlank private String createdBy;private List<String> tags;}/*** 工作流更新请求*/@Datapublic static class WorkflowUpdateRequest {@NotBlank private String name;private String description;@NotBlank private String workflowJson;@NotBlank private String updatedBy;private String changeDescription;}
}

组件注册表服务

/*** 组件注册表服务 - 管理所有可用的工作流组件*/
@Service
@Slf4j
public class ComponentRegistryService {@Autowiredprivate ApplicationContext applicationContext;private final Map<String, WorkflowComponent> componentRegistry = new ConcurrentHashMap<>();private final Map<String, ComponentMetadata> componentMetadata = new ConcurrentHashMap<>();/*** 初始化组件注册表*/@PostConstructpublic void init() {log.info("Initializing component registry...");// 自动注册所有实现了WorkflowComponent接口的BeanMap<String, WorkflowComponent> components = applicationContext.getBeansOfType(WorkflowComponent.class);for (Map.Entry<String, WorkflowComponent> entry : components.entrySet()) {WorkflowComponent component = entry.getValue();registerComponent(component);}log.info("Component registry initialized with {} components", componentRegistry.size());}/*** 注册组件*/public void registerComponent(WorkflowComponent component) {String componentType = component.getType();if (componentRegistry.containsKey(componentType)) {log.warn("Component type {} is already registered, skipping", componentType);return;}componentRegistry.put(componentType, component);componentMetadata.put(componentType, createComponentMetadata(component));log.info("Registered component: {} - {}", componentType, component.getName());}/*** 手动注册组件*/public void registerComponent(String type, WorkflowComponent component) {componentRegistry.put(type, component);componentMetadata.put(type, createComponentMetadata(component));log.info("Manually registered component: {} - {}", type, component.getName());}/*** 取消注册组件*/public void unregisterComponent(String componentType) {WorkflowComponent removed = componentRegistry.remove(componentType);componentMetadata.remove(componentType);if (removed != null) {log.info("Unregistered component: {}", componentType);}}/*** 检查组件是否已注册*/public boolean isComponentRegistered(String componentType) {return componentRegistry.containsKey(componentType);}/*** 获取组件实例*/public WorkflowComponent getComponent(String componentType) {WorkflowComponent component = componentRegistry.get(componentType);if (component == null) {throw new RuntimeException("组件未注册: " + componentType);}return component;}/*** 获取所有组件元数据*/public List<ComponentMetadata> getAllComponentMetadata() {return new ArrayList<>(componentMetadata.values());}/*** 获取组件元数据*/public ComponentMetadata getComponentMetadata(String componentType) {ComponentMetadata metadata = componentMetadata.get(componentType);if (metadata == null) {throw new RuntimeException("组件元数据不存在: " + componentType);}return metadata;}/*** 验证组件配置*/public void validateComponentConfig(String componentType, JsonNode config) {ComponentMetadata metadata = getComponentMetadata(componentType);List<ComponentParameter> parameters = metadata.getParameters();for (ComponentParameter param : parameters) {if (param.isRequired() && !config.has(param.getName())) {throw new RuntimeException("缺少必需参数: " + param.getName());}if (config.has(param.getName())) {JsonNode value = config.get(param.getName());validateParameterValue(param, value);}}}/*** 验证参数值*/private void validateParameterValue(ComponentParameter param, JsonNode value) {switch (param.getType()) {case "string":if (!value.isTextual()) {throw new RuntimeException("参数 " + param.getName() + " 必须是字符串类型");}if (param.getMinLength() != null && value.asText().length() < param.getMinLength()) {throw new RuntimeException("参数 " + param.getName() + " 长度不能小于 " + param.getMinLength());}if (param.getMaxLength() != null && value.asText().length() > param.getMaxLength()) {throw new RuntimeException("参数 " + param.getName() + " 长度不能大于 " + param.getMaxLength());}break;case "number":if (!value.isNumber()) {throw new RuntimeException("参数 " + param.getName() + " 必须是数字类型");}if (param.getMinValue() != null && value.asDouble() < param.getMinValue()) {throw new RuntimeException("参数 " + param.getName() + " 不能小于 " + param.getMinValue());}if (param.getMaxValue() != null && value.asDouble() > param.getMaxValue()) {throw new RuntimeException("参数 " + param.getName() + " 不能大于 " + param.getMaxValue());}break;case "boolean":if (!value.isBoolean()) {throw new RuntimeException("参数 " + param.getName() + " 必须是布尔类型");}break;case "array":if (!value.isArray()) {throw new RuntimeException("参数 " + param.getName() + " 必须是数组类型");}break;case "object":if (!value.isObject()) {throw new RuntimeException("参数 " + param.getName() + " 必须是对象类型");}break;}// 验证枚举值if (param.getEnumValues() != null && !param.getEnumValues().isEmpty()) {String stringValue = value.asText();if (!param.getEnumValues().contains(stringValue)) {throw new RuntimeException("参数 " + param.getName() + " 必须是以下值之一: " + param.getEnumValues());}}}/*** 创建组件元数据*/private ComponentMetadata createComponentMetadata(WorkflowComponent component) {ComponentMetadata metadata = new ComponentMetadata();metadata.setType(component.getType());metadata.setName(component.getName());metadata.setDescription(component.getDescription());metadata.setCategory(component.getCategory());metadata.setIcon(component.getIcon());metadata.setVersion(component.getVersion());// 设置输入输出模式metadata.setInputSchema(component.getInputSchema());metadata.setOutputSchema(component.getOutputSchema());// 设置参数定义metadata.setParameters(component.getParameters());return metadata;}/*** 根据类别获取组件*/public List<ComponentMetadata> getComponentsByCategory(String category) {List<ComponentMetadata> result = new ArrayList<>();for (ComponentMetadata metadata : componentMetadata.values()) {if (category.equals(metadata.getCategory())) {result.add(metadata);}}return result;}/*** 搜索组件*/public List<ComponentMetadata> searchComponents(String keyword) {List<ComponentMetadata> result = new ArrayList<>();String lowerKeyword = keyword.toLowerCase();for (ComponentMetadata metadata : componentMetadata.values()) {if (metadata.getName().toLowerCase().contains(lowerKeyword) ||metadata.getDescription().toLowerCase().contains(lowerKeyword) ||metadata.getType().toLowerCase().contains(lowerKeyword)) {result.add(metadata);}}return result;}/*** 组件元数据类*/@Datapublic static class ComponentMetadata {private String type;private String name;private String description;private String category;private String icon;private String version;private Map<String, Object> inputSchema;private Map<String, Object> outputSchema;private List<ComponentParameter> parameters = new ArrayList<>();private Map<String, Object> metadata = new HashMap<>();}/*** 组件参数类*/@Datapublic static class ComponentParameter {private String name;private String type; // string, number, boolean, array, objectprivate String description;private boolean required = false;private Object defaultValue;private Integer minLength;private Integer maxLength;private Double minValue;private Double maxValue;private List<String> enumValues;private Map<String, Object> validation;}
}

工作流执行服务

import org.camunda.bpm.engine.*;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.engine.variable.Variables;

/*** 工作流执行服务 - 负责工作流的执行和管理*/
@Service
@Slf4j
public class WorkflowExecutionService {@Autowiredprivate RuntimeService runtimeService;@Autowiredprivate RepositoryService repositoryService;@Autowiredprivate HistoryService historyService;@Autowiredprivate WorkflowDefinitionService workflowDefinitionService;@Autowiredprivate WorkflowExecutionEntityService workflowExecutionEntityService;/*** 执行工作流*/public WorkflowExecution executeWorkflow(Long workflowId, Map<String, Object> inputVariables, String triggeredBy) {log.info("Executing workflow: {}, triggered by: {}", workflowId, triggeredBy);WorkflowDefinition definition = workflowDefinitionService.findById(workflowId).orElseThrow(() -> new RuntimeException("工作流不存在: " + workflowId));if (!definition.isDeployable()) {throw new RuntimeException("工作流未发布或BPMN定义为空");}try {// 部署工作流(如果尚未部署)String deploymentId = deployWorkflow(definition);// 创建工作流执行记录WorkflowExecution execution = new WorkflowExecution(definition, generateExecutionId(), triggeredBy);execution.setInputData(convertMapToJson(inputVariables));execution.setTaskCount(definition.getNodeCount());execution = workflowExecutionEntityService.save(execution);// 启动流程实例ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(getProcessDefinitionKey(definition),Variables.fromMap(inputVariables));// 更新执行记录execution.setCamundaProcessDefinitionId(processInstance.getProcessDefinitionId());execution.markAsRunning();execution.getExecutionVariables().put("processInstanceId", processInstance.getId());execution = workflowExecutionEntityService.save(execution);// 更新工作流统计definition.incrementExecutionCount();workflowDefinitionService.save(definition);log.info("Workflow execution started: {}", execution.getExecutionId());return execution;} catch (Exception e) {log.error("Failed to execute workflow: {}", workflowId, e);throw new RuntimeException("执行工作流失败: " + e.getMessage());}}/*** 部署工作流*/public String deployWorkflow(WorkflowDefinition definition) {try {// 检查是否已部署String processDefinitionKey = getProcessDefinitionKey(definition);long deploymentCount = repositoryService.createProcessDefinitionQuery().processDefinitionKey(processDefinitionKey).latestVersion().count();if (deploymentCount > 0) {log.debug("Workflow already deployed: {}", processDefinitionKey);return repositoryService.createProcessDefinitionQuery().processDefinitionKey(processDefinitionKey).latestVersion().singleResult().getDeploymentId();}// 部署工作流var deployment = repositoryService.createDeployment().addString(processDefinitionKey + ".bpmn", definition.getBpmnXml()).name(definition.getName()).key(processDefinitionKey).deploy();log.info("Workflow deployed successfully: {}, deploymentId: {}", definition.getName(), deployment.getId());return deployment.getId();} catch (Exception e) {log.error("Failed to deploy workflow: {}", definition.getName(), e);throw new RuntimeException("部署工作流失败: " + e.getMessage());}}/*** 获取工作流执行状态*/public WorkflowExecution getExecutionStatus(String executionId) {return workflowExecutionEntityService.findByExecutionId(executionId).orElseThrow(() -> new RuntimeException("执行记录不存在: " + executionId));}/*** 取消工作流执行*/public void cancelWorkflowExecution(String executionId) {WorkflowExecution execution = getExecutionStatus(executionId);if (execution.getStatus() != WorkflowExecution.ExecutionStatus.RUNNING) {throw new RuntimeException("只能取消运行中的工作流执行");}try {String processInstanceId = execution.getExecutionVariables().get("processInstanceId");if (processInstanceId != null) {runtimeService.deleteProcessInstance(processInstanceId, "用户取消执行");}execution.setStatus(WorkflowExecution.ExecutionStatus.CANCELLED);execution.setCompletedAt(java.time.LocalDateTime.now());workflowExecutionEntityService.save(execution);log.info("Workflow execution cancelled: {}", executionId);} catch (Exception e) {log.error("Failed to cancel workflow execution: {}", executionId, e);throw new RuntimeException("取消工作流执行失败: " + e.getMessage());}}/*** 获取工作流执行历史*/public List<WorkflowExecution> getExecutionHistory(Long workflowId, int page, int size) {return workflowExecutionEntityService.findByWorkflowDefinitionId(workflowId, page, size);}/*** 获取执行日志*/public String getExecutionLog(String executionId) {WorkflowExecution execution = getExecutionStatus(executionId);return execution.getExecutionLog();}/*** 重试失败的工作流执行*/public WorkflowExecution retryWorkflowExecution(String executionId, String triggeredBy) {WorkflowExecution originalExecution = getExecutionStatus(executionId);if (originalExecution.getStatus() != WorkflowExecution.ExecutionStatus.FAILED) {throw new RuntimeException("只能重试失败的工作流执行");}try {// 使用原始输入数据重新执行Map<String, Object> inputVariables = convertJsonToMap(originalExecution.getInputData());return executeWorkflow(originalExecution.getWorkflowDefinition().getId(), inputVariables, triggeredBy);} catch (Exception e) {log.error("Failed to retry workflow execution: {}", executionId, e);throw new RuntimeException("重试工作流执行失败: " + e.getMessage());}}/*** 获取执行统计信息*/public ExecutionStatistics getExecutionStatistics(Long workflowId) {WorkflowDefinition definition = workflowDefinitionService.findById(workflowId).orElseThrow(() -> new RuntimeException("工作流不存在: " + workflowId));ExecutionStatistics statistics = new ExecutionStatistics();statistics.setWorkflowId(workflowId);statistics.setWorkflowName(definition.getName());statistics.setTotalExecutions(definition.getExecutionCount());statistics.setSuccessCount(definition.getSuccessCount());statistics.setSuccessRate(definition.getSuccessRate());// 计算平均执行时间List<WorkflowExecution> recentExecutions = getExecutionHistory(workflowId, 0, 100);double avgTime = recentExecutions.stream().filter(e -> e.getExecutionTimeMs() != null).mapToLong(WorkflowExecution::getExecutionTimeMs).average().orElse(0.0);statistics.setAverageExecutionTime(avgTime);// 计算最近成功率long recentSuccessCount = recentExecutions.stream().filter(e -> e.getStatus() == WorkflowExecution.ExecutionStatus.COMPLETED).count();statistics.setRecentSuccessRate(recentExecutions.isEmpty() ? 0.0 : (double) recentSuccessCount / recentExecutions.size() * 100);return statistics;}// 工具方法private String generateExecutionId() {return "exec_" + System.currentTimeMillis() + "_" + (int)(Math.random() * 1000);}private String getProcessDefinitionKey(WorkflowDefinition definition) {return "dify_workflow_" + definition.getId();}private String convertMapToJson(Map<String, Object> map) {try {com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();return mapper.writeValueAsString(map);} catch (Exception e) {log.error("Failed to convert map to JSON", e);return "{}";}}@SuppressWarnings("unchecked")private Map<String, Object> convertJsonToMap(String json) {try {com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();return mapper.readValue(json, Map.class);} catch (Exception e) {log.error("Failed to convert JSON to map", e);return new java.util.HashMap<>();}}/*** 执行统计信息类*/@Datapublic static class ExecutionStatistics {private Long workflowId;private String workflowName;private Integer totalExecutions;private Integer successCount;private Double successRate;private Double averageExecutionTime;private Double recentSuccessRate;private Map<String, Object> additionalMetrics = new HashMap<>();}
}

工作流组件实现

基础组件接口

/*** 工作流组件接口 - 所有工作流组件必须实现此接口*/
public interface WorkflowComponent {/*** 获取组件类型(唯一标识)*/String getType();/*** 获取组件显示名称*/String getName();/*** 获取组件描述*/String getDescription();/*** 获取组件分类*/String getCategory();/*** 获取组件图标*/String getIcon();/*** 获取组件版本*/String getVersion();/*** 执行组件逻辑*/ComponentResult execute(ComponentInput input) throws Exception;/*** 获取输入参数定义*/List<ComponentParameter> getParameters();/*** 获取输入模式*/Map<String, Object> getInputSchema();/*** 获取输出模式*/Map<String, Object> getOutputSchema();/*** 验证组件配置*/default void validateConfig(JsonNode config) throws IllegalArgumentException {// 默认实现,子类可以重写}/*** 组件输入类*/class ComponentInput {private Map<String, Object> variables;private JsonNode config;private String nodeId;private Map<String, Object> context;// constructors, getters, setterspublic ComponentInput() {}public ComponentInput(Map<String, Object> variables, JsonNode config, String nodeId) {this.variables = variables;this.config = config;this.nodeId = nodeId;this.context = new java.util.HashMap<>();}public Map<String, Object> getVariables() { return variables; }public void setVariables(Map<String, Object> variables) { this.variables = variables; }public JsonNode getConfig() { return config; }public void setConfig(JsonNode config) { this.config = config; }public String getNodeId() { return nodeId; }public void setNodeId(String nodeId) { this.nodeId = nodeId; }public Map<String, Object> getContext() { return context; }public void setContext(Map<String, Object> context) { this.context = context; }@SuppressWarnings("unchecked")public <T> T getVariable(String name) {return (T) variables.get(name);}public void setVariable(String name, Object value) {variables.put(name, value);}}/*** 组件结果类*/class ComponentResult {private boolean success;private Map<String, Object> output;private String errorMessage;private Map<String, Object> metadata;private Long executionTimeMs;public ComponentResult() {this.output = new java.util.HashMap<>();this.metadata = new java.util.HashMap<>();}public static ComponentResult success(Map<String, Object> output) {ComponentResult result = new ComponentResult();result.success = true;result.output = output;return result;}public static ComponentResult success(String key, Object value) {ComponentResult result = new ComponentResult();result.success = true;result.output.put(key, value);return result;}public static ComponentResult error(String errorMessage) {ComponentResult result = new ComponentResult();result.success = false;result.errorMessage = errorMessage;return result;}// getters and setterspublic boolean isSuccess() { return success; }public void setSuccess(boolean success) { this.success = success; }public Map<String, Object> getOutput() { return output; }public void setOutput(Map<String, Object> output) { this.output = output; }public String getErrorMessage() { return errorMessage; }public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }public Map<String, Object> getMetadata() { return metadata; }public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }public Long getExecutionTimeMs() { return executionTimeMs; }public void setExecutionTimeMs(Long executionTimeMs) { this.executionTimeMs = executionTimeMs; }public ComponentResult withMetadata(String key, Object value) {this.metadata.put(key, value);return this;}}/*** 组件参数类*/class ComponentParameter {private String name;private String type;private String description;private boolean required;private Object defaultValue;private Map<String, Object> validation;// constructors, getters, setterspublic ComponentParameter() {}public ComponentParameter(String name, String type, String description, boolean required) {this.name = name;this.type = type;this.description = description;this.required = required;}public String getName() { return name; }public void setName(String name) { this.name = name; }public String getType() { return type; }public void setType(String type) { this.type = type; }public String getDescription() { return description; }public void setDescription(String description) { this.description = description; }public boolean isRequired() { return required; }public void setRequired(boolean required) { this.required = required; }public Object getDefaultValue() { return defaultValue; }public void setDefaultValue(Object defaultValue) { this.defaultValue = defaultValue; }public Map<String, Object> getValidation() { return validation; }public void setValidation(Map<String, Object> validation) { this.validation = validation; }}
}

LLM组件实现

/*** LLM组件 - 调用大语言模型生成文本*/
@Component
@Slf4j
public class LLMComponent implements WorkflowComponent {@Autowired(required = false)private OpenAiService openAiService;private static final String TYPE = "llm";private static final String NAME = "大语言模型";private static final String DESCRIPTION = "调用大语言模型生成文本内容";private static final String CATEGORY = "ai";private static final String ICON = "🤖";private static final String VERSION = "1.0.0";@Overridepublic String getType() {return TYPE;}@Overridepublic String getName() {return NAME;}@Overridepublic String getDescription() {return DESCRIPTION;}@Overridepublic String getCategory() {return CATEGORY;}@Overridepublic String getIcon() {return ICON;}@Overridepublic String getVersion() {return VERSION;}@Overridepublic ComponentResult execute(ComponentInput input) throws Exception {long startTime = System.currentTimeMillis();try {JsonNode config = input.getConfig();String prompt = getPrompt(config, input.getVariables());String model = getModel(config);double temperature = getTemperature(config);int maxTokens = getMaxTokens(config);log.info("Executing LLM component: model={}, promptLength={}", model, prompt.length());// 调用OpenAI APIString generatedText = callOpenAI(prompt, model, temperature, maxTokens);// 构建输出Map<String, Object> output = new HashMap<>();output.put("text", generatedText);output.put("model", model);output.put("prompt_length", prompt.length());output.put("response_length", generatedText.length());ComponentResult result = ComponentResult.success(output);result.setExecutionTimeMs(System.currentTimeMillis() - startTime);result.withMetadata("tokens_used", estimateTokens(prompt + generatedText));log.info("LLM component executed successfully: {} chars generated", generatedText.length());return result;} catch (Exception e) {log.error("LLM component execution failed", e);return ComponentResult.error("LLM组件执行失败: " + e.getMessage());}}@Overridepublic List<ComponentParameter> getParameters() {List<ComponentParameter> parameters = new ArrayList<>();parameters.add(new ComponentParameter("prompt", "string", "提示词", true));parameters.add(new ComponentParameter("model", "string", "模型名称", false));parameters.add(new ComponentParameter("temperature", "number", "温度参数", false));parameters.add(new ComponentParameter("max_tokens", "number", "最大token数", false));parameters.add(new ComponentParameter("system_prompt", "string", "系统提示词", false));return parameters;}@Overridepublic Map<String, Object> getInputSchema() {Map<String, Object> schema = new HashMap<>();schema.put("type", "object");schema.put("properties", Map.of("prompt", Map.of("type", "string"),"variables", Map.of("type", "object")));schema.put("required", List.of("prompt"));return schema;}@Overridepublic Map<String, Object> getOutputSchema() {Map<String, Object> schema = new HashMap<>();schema.put("type", "object");schema.put("properties", Map.of("text", Map.of("type", "string"),"model", Map.of("type", "string"),"prompt_length", Map.of("type", "number"),"response_length", Map.of("type", "number")));return schema;}@Overridepublic void validateConfig(JsonNode config) throws IllegalArgumentException {if (!config.has("prompt") || config.get("prompt").asText().trim().isEmpty()) {throw new IllegalArgumentException("提示词不能为空");}if (config.has("temperature")) {double temperature = config.get("temperature").asDouble();if (temperature < 0 || temperature > 2) {throw new IllegalArgumentException("温度参数必须在0到2之间");}}if (config.has("max_tokens")) {int maxTokens = config.get("max_tokens").asInt();if (maxTokens < 1 || maxTokens > 4000) {throw new IllegalArgumentException("最大token数必须在1到4000之间");}}}/*** 获取提示词(支持变量替换)*/private String getPrompt(JsonNode config, Map<String, Object> variables) {String promptTemplate = config.get("prompt").asText();return replaceVariables(promptTemplate, variables);}/*** 获取模型名称*/private String getModel(JsonNode config) {return config.has("model") ? config.get("model").asText() : "gpt-3.5-turbo";}/*** 获取温度参数*/private double getTemperature(JsonNode config) {return config.has("temperature") ? config.get("temperature").asDouble() : 0.7;}/*** 获取最大token数*/private int getMaxTokens(JsonNode config) {return config.has("max_tokens") ? config.get("max_tokens").asInt() : 1000;}/*** 调用OpenAI API*/private String callOpenAI(String prompt, String model, double temperature, int maxTokens) {if (openAiService == null) {// 降级方案:返回模拟响应log.warn("OpenAI service not available, using mock response");return generateMockResponse(prompt);}try {List<ChatMessage> messages = new ArrayList<>();// 添加系统提示词(如果有)// if (systemPrompt != null) {//     messages.add(new ChatMessage(ChatMessageRole.SYSTEM.value(), systemPrompt));// }// 添加用户提示词messages.add(new ChatMessage(com.theokanning.openai.completion.chat.ChatMessageRole.USER.value(), prompt));ChatCompletionRequest request = ChatCompletionRequest.builder().model(model).messages(messages).temperature(temperature).maxTokens(maxTokens).build();var completion = openAiService.createChatCompletion(request);return completion.getChoices().get(0).getMessage().getContent();} catch (Exception e) {log.error("OpenAI API call failed", e);throw new RuntimeException("OpenAI API调用失败: " + e.getMessage());}}/*** 生成模拟响应(降级方案)*/private String generateMockResponse(String prompt) {// 简单的基于规则的响应生成if (prompt.contains("你好") || prompt.contains("hello")) {return "你好!我是AI助手,很高兴为您服务。";} else if (prompt.contains("天气")) {return "今天天气晴朗,气温25度,适合外出活动。";} else if (prompt.contains("介绍")) {return "这是一个基于Dify平台的AI应用,可以帮助您完成各种任务。";} else {return "我已经收到您的请求:\"" + prompt + "\"。由于系统维护中,我暂时无法提供准确的回答。";}}/*** 替换变量占位符*/private String replaceVariables(String template, Map<String, Object> variables) {String result = template;for (Map.Entry<String, Object> entry : variables.entrySet()) {String placeholder = "{{" + entry.getKey() + "}}";String value = entry.getValue() != null ? entry.getValue().toString() : "";result = result.replace(placeholder, value);}return result;}/*** 估算token数量*/private int estimateTokens(String text) {// 简单估算:英文约4字符一个token,中文约2字符一个tokenint chineseChars = 0;int englishChars = 0;for (char c : text.toCharArray()) {if (Character.UnicodeScript.of(c) == Character.UnicodeScript.HAN) {chineseChars++;} else if (Character.isLetterOrDigit(c)) {englishChars++;}}return (chineseChars / 2) + (englishChars / 4);}
}

数据转换组件

/*** 数据转换组件 - 处理和转换数据格式*/
@Component
@Slf4j
public class DataTransformComponent implements WorkflowComponent {private static final String TYPE = "dataTransform";private static final String NAME = "数据转换";private static final String DESCRIPTION = "处理和转换数据格式,支持JSONPath、模板等";private static final String CATEGORY = "data";private static final String ICON = "🔧";private static final String VERSION = "1.0.0";@Overridepublic String getType() {return TYPE;}@Overridepublic String getName() {return NAME;}@Overridepublic String getDescription() {return DESCRIPTION;}@Overridepublic String getCategory() {return CATEGORY;}@Overridepublic String getIcon() {return ICON;}@Overridepublic String getVersion() {return VERSION;}@Overridepublic ComponentResult execute(ComponentInput input) throws Exception {long startTime = System.currentTimeMillis();try {JsonNode config = input.getConfig();String operation = getOperation(config);Map<String, Object> variables = input.getVariables();log.info("Executing data transform component: operation={}", operation);Map<String, Object> output = new HashMap<>();switch (operation) {case "json_extract":output.put("result", executeJsonExtract(config, variables));break;case "template_render":output.put("result", executeTemplateRender(config, variables));break;case "format_convert":output.put("result", executeFormatConvert(config, variables));break;case "data_filter":output.put("result", executeDataFilter(config, variables));break;default:throw new IllegalArgumentException("未知的操作类型: " + operation);}ComponentResult result = ComponentResult.success(output);result.setExecutionTimeMs(System.currentTimeMillis() - startTime);log.info("Data transform component executed successfully");return result;} catch (Exception e) {log.error("Data transform component execution failed", e);return ComponentResult.error("数据转换组件执行失败: " + e.getMessage());}}@Overridepublic List<ComponentParameter> getParameters() {List<ComponentParameter> parameters = new ArrayList<>();parameters.add(new ComponentParameter("operation", "string", "操作类型", true));parameters.add(new ComponentParameter("json_path", "string", "JSONPath表达式", false));parameters.add(new ComponentParameter("template", "string", "模板字符串", false));parameters.add(new ComponentParameter("source_data", "string", "源数据", false));parameters.add(new ComponentParameter("filter_condition", "string", "过滤条件", false));return parameters;}@Overridepublic Map<String, Object> getInputSchema() {Map<String, Object> schema = new HashMap<>();schema.put("type", "object");schema.put("properties", Map.of("operation", Map.of("type", "string", "enum", List.of("json_extract", "template_render", "format_convert", "data_filter")),"input_data", Map.of("type", "object")));schema.put("required", List.of("operation"));return schema;}@Overridepublic Map<String, Object> getOutputSchema() {Map<String, Object> schema = new HashMap<>();schema.put("type", "object");schema.put("properties", Map.of("result", Map.of("type", "object")));return schema;}@Overridepublic void validateConfig(JsonNode config) throws IllegalArgumentException {if (!config.has("operation")) {throw new IllegalArgumentException("操作类型不能为空");}String operation = config.get("operation").asText();switch (operation) {case "json_extract":if (!config.has("json_path")) {throw new IllegalArgumentException("JSONPath表达式不能为空");}break;case "template_render":if (!config.has("template")) {throw new IllegalArgumentException("模板字符串不能为空");}break;case "data_filter":if (!config.has("filter_condition")) {throw new IllegalArgumentException("过滤条件不能为空");}break;}}/*** 获取操作类型*/private String getOperation(JsonNode config) {return config.get("operation").asText();}/*** 执行JSON提取操作*/private Object executeJsonExtract(JsonNode config, Map<String, Object> variables) {String jsonPath = config.get("json_path").asText();Object sourceData = config.has("source_data") ? convertJsonNodeToObject(config.get("source_data")) : variables;try {return JsonPath.read(sourceData, jsonPath);} catch (Exception e) {throw new RuntimeException("JSONPath提取失败: " + e.getMessage());}}/*** 执行模板渲染操作*/private String executeTemplateRender(JsonNode config, Map<String, Object> variables) {String template = config.get("template").asText();return renderTemplate(template, variables);}/*** 执行格式转换操作*/private Object executeFormatConvert(JsonNode config, Map<String, Object> variables) {String targetFormat = config.has("target_format") ? config.get("target_format").asText() : "json";Object sourceData = config.has("source_data") ? convertJsonNodeToObject(config.get("source_data")) : variables;switch (targetFormat) {case "json":return sourceData;case "xml":return convertToXml(sourceData);case "csv":return convertToCsv(sourceData);case "yaml":return convertToYaml(sourceData);default:throw new IllegalArgumentException("不支持的格式: " + targetFormat);}}/*** 执行数据过滤操作*/private Object executeDataFilter(JsonNode config, Map<String, Object> variables) {String condition = config.get("filter_condition").asText();Object sourceData = config.has("source_data") ? convertJsonNodeToObject(config.get("source_data")) : variables;return filterData(sourceData, condition);}/*** 渲染模板*/private String renderTemplate(String template, Map<String, Object> variables) {String result = template;for (Map.Entry<String, Object> entry : variables.entrySet()) {String placeholder = "{{" + entry.getKey() + "}}";String value = entry.getValue() != null ? entry.getValue().toString() : "";result = result.replace(placeholder, value);}return result;}/*** 转换为XML*/private String convertToXml(Object data) {// 简化实现if (data instanceof Map) {StringBuilder xml = new StringBuilder();xml.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<root>\n");@SuppressWarnings("unchecked")Map<String, Object> map = (Map<String, Object>) data;for (Map.Entry<String, Object> entry : map.entrySet()) {xml.append("  <").append(entry.getKey()).append(">").append(entry.getValue()).append("</").append(entry.getKey()).append(">\n");}xml.append("</root>");return xml.toString();}return "<root>" + data + "</root>";}/*** 转换为CSV*/private String convertToCsv(Object data) {// 简化实现if (data instanceof Map) {StringBuilder csv = new StringBuilder();@SuppressWarnings("unchecked")Map<String, Object> map = (Map<String, Object>) data;// 表头csv.append(String.join(",", map.keySet())).append("\n");// 数据行List<String> values = new ArrayList<>();for (Object value : map.values()) {values.add(value != null ? value.toString() : "");}csv.append(String.join(",", values));return csv.toString();}return data.toString();}/*** 转换为YAML*/private String convertToYaml(Object data) {// 简化实现org.yaml.snakeyaml.Yaml yaml = new org.yaml.snakeyaml.Yaml();return yaml.dump(data);}/*** 过滤数据*/@SuppressWarnings("unchecked")private Object filterData(Object data, String condition) {if (data instanceof Map) {Map<String, Object> result = new HashMap<>();Map<String, Object> map = (Map<String, Object>) data;for (Map.Entry<String, Object> entry : map.entrySet()) {if (matchesCondition(entry.getKey(), entry.getValue(), condition)) {result.put(entry.getKey(), entry.getValue());}}return result;} else if (data instanceof List) {List<Object> result = new ArrayList<>();List<Object> list = (List<Object>) data;for (Object item : list) {if (matchesCondition(null, item, condition)) {result.add(item);}}return result;}return data;}/*** 匹配条件*/private boolean matchesCondition(String key, Object value, String condition) {if (condition == null || condition.trim().isEmpty()) {return true;}// 简单实现:支持基本的字符串包含和正则匹配String stringValue = value != null ? value.toString() : "";if (condition.startsWith("/") && condition.endsWith("/")) {// 正则表达式String regex = condition.substring(1, condition.length() - 1);return Pattern.compile(regex).matcher(stringValue).find();} else {// 字符串包含return stringValue.contains(condition);}}/*** 将JsonNode转换为Java对象*/private Object convertJsonNodeToObject(JsonNode node) {if (node.isObject()) {Map<String, Object> map = new HashMap<>();node.fields().forEachRemaining(entry -> {map.put(entry.getKey(), convertJsonNodeToObject(entry.getValue()));});return map;} else if (node.isArray()) {List<Object> list = new ArrayList<>();node.forEach(item -> list.add(convertJsonNodeToObject(item)));return list;} else if (node.isTextual()) {return node.asText();} else if (node.isNumber()) {return node.asDouble();} else if (node.isBoolean()) {return node.asBoolean();} else {return null;}}
}

测试用例

工作流设计服务测试

/*** 工作流设计服务测试*/
@ExtendWith(SpringExtension.class)
@SpringBootTest
class WorkflowDesignServiceTest {@Autowiredprivate WorkflowDesignService workflowDesignService;@Autowiredprivate ComponentRegistryService componentRegistryService;@Testvoid testCreateWorkflow() {String workflowJson = """{"nodes": [{"id": "start","type": "start","position": { "x": 100, "y": 100 },"data": { "name": "开始节点" }},{"id": "llm1","type": "llm","position": { "x": 300, "y": 100 },"data": {"name": "LLM处理","config": {"prompt": "你好,请处理以下内容:{{input}}","model": "gpt-3.5-turbo"}}},{"id": "end","type": "end","position": { "x": 500, "y": 100 },"data": { "name": "结束节点" }}],"edges": [{ "id": "e1", "source": "start", "target": "llm1" },{ "id": "e2", "source": "llm1", "target": "end" }]}""";WorkflowDesignService.WorkflowCreationRequest request = new WorkflowDesignService.WorkflowCreationRequest();request.setName("测试工作流");request.setWorkflowJson(workflowJson);request.setCreatedBy("test-user");WorkflowDefinition definition = workflowDesignService.createWorkflow(request);assertNotNull(definition);assertNotNull(definition.getId());assertEquals("测试工作流", definition.getName());assertEquals(3, definition.getNodeCount());assertNotNull(definition.getBpmnXml());assertFalse(definition.getBpmnXml().isEmpty());}@Testvoid testValidateWorkflowJson() {String validWorkflowJson = """{"nodes": [{"id": "node1","type": "llm","data": { "name": "测试节点" }}],"edges": []}""";// 应该不抛出异常assertDoesNotThrow(() -> {workflowDesignService.validateWorkflowJson(validWorkflowJson);});}@Testvoid testValidateInvalidWorkflowJson() {String invalidWorkflowJson = """{"nodes": [{"id": "node1","type": "unknown_component","data": { "name": "测试节点" }}],"edges": []}""";// 应该抛出异常assertThrows(RuntimeException.class, () -> {workflowDesignService.validateWorkflowJson(invalidWorkflowJson);});}@Testvoid testGenerateBpmnFromJson() {String workflowJson = """{"nodes": [{"id": "start","type": "start","data": { "name": "开始" }},{"id": "llm1","type": "llm","data": { "name": "LLM处理" }}],"edges": [{ "id": "e1", "source": "start", "target": "llm1" }]}""";String bpmnXml = workflowDesignService.generateBpmnFromJson(workflowJson);assertNotNull(bpmnXml);assertFalse(bpmnXml.isEmpty());assertTrue(bpmnXml.contains("bpmn:definitions"));assertTrue(bpmnXml.contains("process"));}@Testvoid testComponentRegistration() {// 测试组件注册assertTrue(componentRegistryService.isComponentRegistered("llm"));assertTrue(componentRegistryService.isComponentRegistered("dataTransform"));// 测试获取组件元数据var llmMetadata = componentRegistryService.getComponentMetadata("llm");assertNotNull(llmMetadata);assertEquals("llm", llmMetadata.getType());assertEquals("大语言模型", llmMetadata.getName());// 测试获取所有组件var allComponents = componentRegistryService.getAllComponentMetadata();assertFalse(allComponents.isEmpty());// 测试按类别获取组件var aiComponents = componentRegistryService.getComponentsByCategory("ai");assertFalse(aiComponents.isEmpty());}
}

工作流执行服务测试

/*** 工作流执行服务测试*/
@ExtendWith(SpringExtension.class)
@SpringBootTest
class WorkflowExecutionServiceTest {@Autowiredprivate WorkflowExecutionService workflowExecutionService;@Autowiredprivate WorkflowDefinitionService workflowDefinitionService;@Testvoid testExecuteWorkflow() {// 首先创建一个测试工作流WorkflowDefinition definition = createTestWorkflow();// 准备输入变量Map<String, Object> inputVariables = new HashMap<>();inputVariables.put("input", "测试输入内容");inputVariables.put("temperature", 0.7);// 执行工作流WorkflowExecution execution = workflowExecutionService.executeWorkflow(definition.getId(), inputVariables, "test-user");assertNotNull(execution);assertNotNull(execution.getExecutionId());assertEquals(WorkflowExecution.ExecutionStatus.RUNNING, execution.getStatus());assertNotNull(execution.getInputData());}@Testvoid testGetExecutionStatus() {// 创建并执行一个工作流WorkflowDefinition definition = createTestWorkflow();Map<String, Object> inputVariables = Map.of("input", "测试内容");WorkflowExecution execution = workflowExecutionService.executeWorkflow(definition.getId(), inputVariables, "test-user");// 获取执行状态WorkflowExecution status = workflowExecutionService.getExecutionStatus(execution.getExecutionId());assertNotNull(status);assertEquals(execution.getExecutionId(), status.getExecutionId());assertEquals(execution.getStatus(), status.getStatus());}@Testvoid testGetExecutionStatistics() {WorkflowDefinition definition = createTestWorkflow();var statistics = workflowExecutionService.getExecutionStatistics(definition.getId());assertNotNull(statistics);assertEquals(definition.getId(), statistics.getWorkflowId());assertEquals(definition.getName(), statistics.getWorkflowName());assertNotNull(statistics.getTotalExecutions());assertNotNull(statistics.getSuccessRate());}@Testvoid testDeployWorkflow() {WorkflowDefinition definition = createTestWorkflow();String deploymentId = workflowExecutionService.deployWorkflow(definition);assertNotNull(deploymentId);assertFalse(deploymentId.isEmpty());}private WorkflowDefinition createTestWorkflow() {String workflowJson = """{"nodes": [{"id": "start","type": "start","position": { "x": 100, "y": 100 },"data": { "name": "开始" }},{"id": "llm1","type": "llm","position": { "x": 300, "y": 100 },"data": {"name": "LLM处理","config": {"prompt": "处理输入:{{input}}","model": "gpt-3.5-turbo"}}},{"id": "transform1","type": "dataTransform","position": { "x": 500, "y": 100 },"data": {"name": "数据转换","config": {"operation": "template_render","template": "结果:{{text}}"}}},{"id": "end","type": "end","position": { "x": 700, "y": 100 },"data": { "name": "结束" }}],"edges": [{ "id": "e1", "source": "start", "target": "llm1" },{ "id": "e2", "source": "llm1", "target": "transform1" },{ "id": "e3", "source": "transform1", "target": "end" }]}""";WorkflowDefinition definition = new WorkflowDefinition("测试工作流", workflowJson, "test-user");definition.setBpmnXml(workflowExecutionService.generateBpmnFromJson(workflowJson));return workflowDefinitionService.save(definition);}private String generateBpmnFromJson(String workflowJson) {// 简化的BPMN生成,实际应该调用服务方法return """<?xml version="1.0" encoding="UTF-8"?><bpmn:definitions><bpmn:process id="test_process" isExecutable="true"><bpmn:startEvent id="startEvent" /><bpmn:endEvent id="endEvent" /></bpmn:process></bpmn:definitions>""";}
}

部署

Dockerfile

FROM openjdk:11-jre-slim# 安装系统依赖
RUN apt-get update && apt-get install -y \curl \gnupg \&& rm -rf /var/lib/apt/lists/*# 创建应用目录
WORKDIR /app# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser# 复制JAR文件
COPY target/dify-platform-1.0.0.jar app.jar# 创建存储目录
RUN mkdir -p /app/workflows /app/deployed /app/logs && \chown -R appuser:appuser /app# 切换用户
USER appuser# 暴露端口
EXPOSE 8083# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \CMD curl -f http://localhost:8083/dify/actuator/health || exit 1# 启动应用
ENTRYPOINT ["java", "-jar", "app.jar"]

docker-compose.yml

version: '3.8'services:dify-platform:build: .ports:- "8083:8083"environment:- SPRING_PROFILES_ACTIVE=prod- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dify_platform- SPRING_REDIS_HOST=redis- OPENAI_API_KEY=${OPENAI_API_KEY}depends_on:- mysql- redisvolumes:- ./workflows:/app/workflows- ./deployed:/app/deployed- ./logs:/app/logsnetworks:- dify-networkmysql:image: mysql:8.0environment:- MYSQL_ROOT_PASSWORD=rootpassword- MYSQL_DATABASE=dify_platform- MYSQL_USER=dify_user- MYSQL_PASSWORD=dify_passwordvolumes:- mysql_data:/var/lib/mysqlnetworks:- dify-networkredis:image: redis:7-alpinecommand: redis-server --appendonly yesvolumes:- redis_data:/datanetworks:- dify-networkvolumes:mysql_data:redis_data:networks:dify-network:driver: bridge

使用Docker快速启动

# 克隆项目
git clone <repository-url>
cd dify-platform# 设置OpenAI API密钥
export OPENAI_API_KEY=your-api-key-here# 构建项目
mvn clean package# 启动所有服务
docker-compose up -d# 查看日志
docker-compose logs -f dify-platform

手动启动

# 创建数据库
mysql -u root -p -e "CREATE DATABASE dify_platform;"# 构建项目
mvn clean package# 启动应用
java -jar target/dify-platform-1.0.0.jar

前端界面: http://localhost:8083/dify

API文档: http://localhost:8083/dify/swagger-ui.html

健康检查: http://localhost:8083/dify/actuator/health

Camunda管理台: http://localhost:8083/dify/camunda/app/welcome/default/

创建示例工作流

# 使用API创建工作流
curl -X POST "http://localhost:8083/dify/api/v1/design/workflows" \-H "Content-Type: application/json" \-d '{"name": "智能客服工作流","description": "基于LLM的智能客服对话流程","workflowJson": "{\"nodes\":[{\"id\":\"start\",\"type\":\"start\",\"data\":{\"name\":\"开始\"}},{\"id\":\"llm1\",\"type\":\"llm\",\"data\":{\"name\":\"客服AI\",\"config\":{\"prompt\":\"你好,请问有什么可以帮助您的?\"}}},{\"id\":\"end\",\"type\":\"end\",\"data\":{\"name\":\"结束\"}}],\"edges\":[{\"id\":\"e1\",\"source\":\"start\",\"target\":\"llm1\"},{\"id\":\"e2\",\"source\":\"llm1\",\"target\":\"end\"}]}","createdBy": "admin"}'

测试工作流执行

# 执行工作流
curl -X POST "http://localhost:8083/dify/api/v1/execution/workflows/1/execute" \-H "Content-Type: application/json" \-d '{"inputVariables": {"input": "请问产品价格是多少?"},"triggeredBy": "test-user"}'

创建工作流

curl -X POST "http://localhost:8083/dify/api/v1/design/workflows" \-H "Content-Type: application/json" \-d '{"name": "数据处理流水线","workflowJson": "{\"nodes\":[...],\"edges\":[...]}","createdBy": "user123"}'

执行工作流

curl -X POST "http://localhost:8083/dify/api/v1/execution/workflows/1/execute" \-H "Content-Type: application/json" \-d '{"inputVariables": {"data": "需要处理的数据"},"triggeredBy": "user123"}'

获取组件列表

curl -X GET "http://localhost:8083/dify/api/v1/design/components"
http://www.dtcms.com/a/592727.html

相关文章:

  • 公司网站开发费计入什么科目迅当网络深圳外贸网站建设
  • 【C++11】右值引用+移动语义+完美转发
  • 商城系统的部署流程
  • 云朵课堂网站开发怎么收费装修公司口碑
  • python中numpy库学习笔记(2)
  • 【穿越Effective C++】条款16:成对使用new和delete时要采用相同形式——内存管理的精确匹配原则
  • 自己做的网站百度搜不到网站备案查询 工信部
  • 数据结构期中复习
  • TradingAgents-CN v1.0.0-preview 重磅发布!全新架构
  • 基于瑞萨 RA6M5 开发板的声源定位系统设计与实现
  • Vue 2 转 Vue 3, 差异不同点汇总, 快速上手vue3
  • 工业级环境传感器的网络通信与协议兼容性分析
  • 个人网站建设 免费下载一个公司备案两个网站
  • PR(1)11.10
  • 数据结构(19)
  • LWIP--以太网
  • 3分钟搞定,接口管理工具PostIn安装和配置
  • 【剑斩OFFER】算法的暴力美学——在排序数组中查找元素的第一个和最后一个位置
  • Agentic TASK01
  • 麒麟最新操作系统登录锁定配置
  • RLHF、DPO 算法
  • 网站排名优化课程网站建设公司华网天下官网
  • 营销型企业网站建设教案wordpress中调用文章内容
  • MySQL 错误 1046 (3D000) 是因为在执行 SQL 语句时 没有选择当前数据库
  • Jenkins Jobs 备份与恢复
  • HTTP和HTTPS工作原理、安全漏洞及防护措施全面解析
  • 百度怎样建设网站网站建设风险怎样规避
  • 使用Docker和Selenium构建自动化测试环境
  • 网站建设公司怎么写宣传语阿里云1m服务器可以搭建网站
  • 12.1 Qt 窗口与视口 详解