第二章从事件驱动到信号
开始
上一章,我们已经讨论了事件驱动架构的基本概念,并从观察者模式逐步过渡到了事件驱动架构。那么,今天我们将开始编写一个基于信号(Signal)的事件驱动框架——我们称之为“轮子项目”。就像你看到的,架构的核心目标就是——接收并转发事件到相应的监听者(观察者)。说得再简单点,就是做一个 事件调度员,把事件交给那些已经“报名”的响应者来处理。
对于轮子底层的实现,我们首先要确定实现的功能有哪些,要实现怎么样的机制。
那么我们首先来捋一捋,所谓的事件驱动架构,其实就是做一个中间层,用来接收事件并且将事件转发给对应绑定的监听者(观察者)。(果然没有什么是套一层无法解决的,如果不行,就再套一层)
那么首先,我们先确定如何这一套中间层如何进行存储和转发,根据大家上一节看到的事件驱动架构的简单实现,其实是可以使用基于集合框架(上一节使用ConCurrentHashMap)的内存存储,当然,显然远远不止这点存储方案,其实包括
- 文件存储
- 数据库存储(也是磁盘文件存储其实)
- 同步到Redis
- 同步到MQ
但因为我们这个项目是“轻量级”试验,最初我们还是先使用内存存储,利用集合框架来保存事件和监听器(观察者)的关联关系。
那么存储问题解决了,接下来我们要讨论“事件的传递”机制。我们将引入 信号与槽机制,这将让我们的框架更灵活和松耦合——听起来是不是很酷?
信号与槽(Signal & Slot)
在事件驱动架构中,“信号”代表事件的发生,而“槽”则是对事件的响应。当信号被触发时,系统会调用相应的槽函数来处理这个事件。这种模式非常适合实现松耦合的系统组件。
-
信号(Signal):表示一个事件的发生。它不关心是否有任何响应者,信号只会在某些条件下发出。
-
槽(Slot):是一个回调函数,当信号被触发时,槽会被调用并执行相关逻辑。槽通常是与特定信号绑定的处理函数。
这种机制类似于我们生活中的“红包群”:
-
信号 就是某个人发了一个红包。
-
槽 就是你作为群成员,抢到了红包并且领了它。
信号与槽的机制非常强大,广泛应用于如 Qt 等框架中,通过这种机制可以轻松地实现事件和响应之间的联系,而不需要显式地直接调用
换句话说,信号就是“事件”,槽就是“事件处理”。通过这种方式,你可以在事件发生时,轻松地让多个响应者进行处理,而这些响应者和发起者之间并不直接耦合。
理论有点抽象,我们上代码来理解一下吧:
class Signal<T> {private final List<Consumer<T>> slots = new ArrayList<>();// 连接槽函数public void connect(Consumer<T> slot) {slots.add(slot);}// 触发信号public void emit(T value) {for (Consumer<T> slot : slots) {slot.accept(value);}}
}// 示例
public class SignalSlotDemo {public static void main(String[] args) {// 创建一个字符串信号Signal<String> messageSignal = new Signal<>();// 连接不同的槽messageSignal.connect(msg -> System.out.println("好友A 收到消息: " + msg));messageSignal.connect(msg -> System.out.println("好友B 收到消息: " + msg));messageSignal.connect(msg -> {if (msg.contains("失恋")) {System.out.println("好友C:兄弟,别难过,我们出去喝酒!");}});// 发出信号(相当于触发事件)messageSignal.emit("我失恋了...");messageSignal.emit("今天升职加薪了!");}
}
根据上面的代码,我们将会得到结果:
好友A 收到消息: 我失恋了...
好友B 收到消息: 我失恋了...
好友C:兄弟,别难过,我们出去喝酒!
好友A 收到消息: 今天升职加薪了!
好友B 收到消息: 今天升职加薪了!
在这个例子中,Signal 就是我们的“信号”,而messageSignal.connect() 方法则是将多个“槽”连接到信号上。每当信号触发(通过 emit() 方法),所有连接的槽函数都会执行。这就完成了一个事件驱动的机制,松耦合并且易于扩展。
那么这就是信号槽机制,通过发送信号,然后调用槽函数的方式实现事件驱动的方案,所以其实大家可以理解到:
- Signal(信号):表示某个事件的发生,它不关心有多少处理者;
- Slot(槽):就是事件触发后的响应逻辑(回调函数)。
那么我们项目的整体步骤就可以得到了
-
- 定义信号槽回调函数接口(这是信号与槽机制的核心)
-
- 实现信号的注册和发送方法
-
- 运行整体信号触发与接收的流程
我们首先开始创建一个新的Java项目(使用Maven或者Gradle作为包管理器都是可以的,不过我其实还是推荐用Maven),然后创建一个基础项目,暂时不引入别的依赖
[外链图片转存中…(img-lOFVJYuv-1756290996784)]
定义信号槽回调函数接口
现在我们开始第一步,定义信号槽回调函数接口
这里我将会给大家介绍一个注解(之前没有了解过的可以看一下):
@FunctionalInterface
@FunctionalInterface 是 Java 8 引入的一个注解,用于标识一个接口是函数式接口。以下是它的主要作用:
- 标识函数式接口
- 表明该接口只能有一个抽象方法
- 使接口可以被 Lambda 表达式实现
- 编译时检查
- 如果接口不符合函数式接口的要求,编译器会报错
- 确保接口只包含一个抽象方法(不包括来自 Object 类的方法)
- 提高代码可读性
- 明确表达设计意图
- 让其他开发者知道这个接口是专门用于函数式编程的
接下来,我们在目前的包下创建一个core核心包,在其中创建一个接口SignalHandler
@FunctionalInterface
public interface SignalHandler {void handle(Object sender, Object... params);}
这个接口就定义好了,这是一个信号处理的回调函数接口,我们来试验一下这个接口的方法,在test/java包下创建SignalHandlerTest.java的测试文件
[外链图片转存中…(img-vhN30BSb-1756290996784)]
既然涉及到测试相关的,这里我们需要引入我们的第一个依赖Junit
<dependencies><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>5.8.2</version><scope>test</scope></dependency>
</dependencies>
那么接下来我们来测试一下:
public class SignalHandlerTest {private SignalHandler signalHandler;@BeforeEach // 测试用例执行之前执行public void setUp() {signalHandler = new SignalHandler() {@Overridepublic void handle(Object sender, Object... params) {// 测试实现System.out.println("Signal handled from: " + sender);if (params != null) {for (Object param : params) {System.out.println("Param: " + param);}}}};}@Test // 测试用例void testSignalHandlerCreation() {assertNotNull(signalHandler, "SignalHandler should be created");}@Test // 测试用例void testSignalHandlerHandle() {// 测试处理方法是否能正常执行assertDoesNotThrow(() -> {signalHandler.handle(this, "param1", "param2");}, "Handle method should not throw exception");}@Test // 测试用例void testSignalHandlerHandleWithNoParams() {// 测试无参数处理assertDoesNotThrow(() -> {signalHandler.handle(this);}, "Handle method should work with no params");}
}
当然,其实还有更简单的使用方案,使用Lambda表达式
public static void main(String[] args) {SignalHandler sing = (sender, params) ->{System.out.println("Signal handled from: " + sender);};sing.handle("test", "param1", "param2");
}
(这里给不熟悉Lambda或者对Lambda表达式一知半解的伙伴讲一下: Lambda表达式是Java 8引入的一种简洁的匿名函数表示方式,是一种语法糖,它允许你以内联的方式实现函数式接口。)
SignalHandler signalHandler = new SignalHandler() {@Overridepublic void handle(Object sender, Object... params) {// 实现代码}
};============ 变成了下面这样 ============ SignalHandler signalHandler = (sender, params) -> {System.out.println("Signal handled from: " + sender);// 可以添加更多实现
};
目前有个这个接口,我们初步的第一步接口算是实现完成了(当然,目前接口的设计灵活度和优雅程度都不太够,不过谁又能一次就设计出最优方案呢,且看我们后面慢慢优化)
接下来,我们在 Signals.java 中定义我们的信号系统。这个类包含了“连接信号”和“发射信号”的方法。它允许我们将处理函数与特定的事件进行绑定,并且在事件触发时执行相应的处理逻辑,现在请在目录下创建Signals.java文件
public class Signals {/*** 监听器集合*/private final Map<String, List<SignalHandler>> sigHandlers = new ConcurrentHashMap<>();/*** 连接信号处理器到指定主题** @param topic 信号主题* @param handler 信号处理器*/public void connect(String topic, SignalHandler handler) {// 获取或创建该主题的处理器列表List<SignalHandler> handlers = sigHandlers.computeIfAbsent(topic, k -> new ArrayList<>());// 添加处理器handlers.add(handler);}/*** 发射信号到指定主题** @param topic 信号主题* @param args 传递给处理器的参数*/public void emit(String topic, Object... args) {// 获取该主题的所有处理器List<SignalHandler> handlers = sigHandlers.get(topic);if (handlers != null && !handlers.isEmpty()) {// 调用每个处理器for (SignalHandler handler : handlers) {try {handler.handle(this, args);} catch (Exception e) {// 处理异常,避免一个处理器异常影响其他处理器System.err.println("Error in signal handler for topic '" + topic + "': " + e.getMessage());e.printStackTrace();}}}}
}
这里我们使用ConcurrentHashMap存储消息和消息对应的处理器(这里的处理器也就是监听者,是一个List,可以存储多个消息处理器)
然后我们创建connect和emit进行注册和发送信号消息
下面我将会给出一个具体的示例去演示如何使用上面的功能
public static void main(String[] args) {// 创建信号系统实例Signals signals = new Signals();// 连接信号处理器signals.connect("user.login", (sender, params) -> {System.out.println("User logged in: " + params[0]);});signals.connect("user.logout", (sender, params) -> {System.out.println("User logged out: " + params[0]);});// 发射信号signals.emit("user.login", "Alice");signals.emit("user.logout", "Bob");
}
运行这段代码我们将会得到结果
User logged in: Alice
User logged out: Bob
那么,到了这一步,我们的信号驱动机制也就有了一些基础功能了,接下来让我们继续升级,目前的代码过于简陋了。
问题: 目前使用事件名字(String) 对应 处理器列表(List)
目前的SignalHandler中,使用void handle(Object sender, Object... params);
这个抽象方法,其参数还是过于简陋了,常见的问题包括:
- 类型不安全
- 语义不清
- 拓展困难
那么,我们开始在其上进行封装吧,这里我建议是创建一个信封类(Envelope)去封装接收
/*** Signal Envelope*/
public class Envelope<S, T> {/*** Event Type*/private String eventType;/*** Sender*/private S sender;/*** Payload*/private T payload;/*** Signal Context*/private SignalContext context;/*** Builder for Envelope*/public static class Builder<S, T> {private String eventType;private S sender;private T payload;private SignalContext context;private Builder() {}public static <S, T> Builder<S, T> create() {return new Builder<>();}public Builder<S, T> eventType(String eventType) {this.eventType = eventType;return this;}public Builder<S, T> sender(S sender) {this.sender = sender;return this;}public Builder<S, T> payload(T payload) {this.payload = payload;return this;}public Builder<S, T> context(SignalContext context) {this.context = context;return this;}public Envelope<S, T> build() {Envelope<S, T> envelope = new Envelope<>();envelope.eventType = this.eventType;envelope.sender = this.sender;envelope.payload = this.payload;envelope.context = this.context;return envelope;}}// Getter And Setter ...
}
这里我要补充几个比较基础的类:
1. 雪花算法的ID生成器
package io.github.signal.utils;/*** A simple implementation of the Snowflake ID generator.* <p>* This class generates 64-bit unique IDs based on the current timestamp,* data center ID, machine ID, and a sequence number.* <p>* Structure of the ID (from high to low bits):* <pre>* 0 - 41 bits timestamp - 5 bits data center ID - 5 bits machine ID - 12 bits sequence* </pre>* Inspired by Twitter's Snowflake algorithm.*/
public class SnowflakeIdGenerator {/** Custom epoch timestamp (e.g., project start time) */private static final long START_TIMESTAMP = 1625155600000L;/** Number of bits allocated for machine ID */private static final long MACHINE_ID_BITS = 5L;/** Number of bits allocated for data center ID */private static final long DATA_CENTER_ID_BITS = 5L;/** Number of bits allocated for sequence number */private static final long SEQUENCE_BITS = 12L;/** Maximum values for machine and data center IDs */private static final long MAX_MACHINE_ID = ~(-1L << MACHINE_ID_BITS);private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);/** Bit shifts */private static final long TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + MACHINE_ID_BITS + DATA_CENTER_ID_BITS;private static final long MACHINE_ID_LEFT_SHIFT = SEQUENCE_BITS + DATA_CENTER_ID_BITS;private static final long DATA_CENTER_ID_LEFT_SHIFT = SEQUENCE_BITS;/** Sequence and timestamp trackers */private static long lastTimestamp = -1L;private static long sequence = 0L;/** Machine and data center identifiers (could be loaded from config) */private static final long MACHINE_ID = 1L;private static final long DATA_CENTER_ID = 1L;static {if (MACHINE_ID > MAX_MACHINE_ID || DATA_CENTER_ID > MAX_DATA_CENTER_ID) {throw new IllegalArgumentException("Machine ID or Data Center ID is out of valid range.");}}/*** Generates the next unique ID.* This method is synchronized to ensure thread safety.** @return a globally unique 64-bit ID*/public static synchronized long nextId() {long timestamp = System.currentTimeMillis();if (timestamp < lastTimestamp) {throw new RuntimeException("Clock moved backwards. Refusing to generate ID for " + (lastTimestamp - timestamp) + "ms");}if (timestamp == lastTimestamp) {// Within the same millisecond, increment sequencesequence = (sequence + 1) & SEQUENCE_MASK;if (sequence == 0) {// Sequence overflow in same millisecond, wait for next millisecondtimestamp = waitUntilNextMillis(lastTimestamp);}} else {sequence = 0L; // Reset sequence for a new millisecond}lastTimestamp = timestamp;return ((timestamp - START_TIMESTAMP) << TIMESTAMP_LEFT_SHIFT)| (DATA_CENTER_ID << DATA_CENTER_ID_LEFT_SHIFT)| (MACHINE_ID << MACHINE_ID_LEFT_SHIFT)| sequence;}/*** Waits until the system clock moves to the next millisecond.** @param lastTs the last recorded timestamp* @return the next millisecond timestamp*/private static long waitUntilNextMillis(long lastTs) {long timestamp = System.currentTimeMillis();while (timestamp <= lastTs) {timestamp = System.currentTimeMillis();}return timestamp;}
}
2. 信号上下文容器
/*** Context information for a signal as it flows through the system.* Purpose:* - Stores user-defined attributes and intermediate values during signal processing.* - Supports tracing (spans) to visualize the flow of signal handling.*/
public class SignalContext {// User-defined attributes (e.g., business parameters)private final Map<String, Object> attributes;// Intermediate values (used internally by the framework)private final Map<String, Object> intermediateValues;// Unique identifiers for tracing and correlationprivate String traceId;// Unique identifier for the signal eventprivate String eventId;// List of spans to track the execution pathprivate final List<Span> spans = new CopyOnWriteArrayList<>();// ID of the current parent span for nested tracingprivate String parentSpanId;/*** Constructs a new, empty SignalContext.*/public SignalContext() {this.attributes = new ConcurrentHashMap<>();this.intermediateValues = new ConcurrentHashMap<>();}// ---------- Attribute handling ----------/*** Sets a user-defined attribute.** @param key attribute key* @param value attribute value*/public void setAttribute(String key, Object value) {attributes.put(key, value);}/*** Retrieves a user-defined attribute.** @param key attribute key* @return attribute value*/public Object getAttribute(String key) {return attributes.get(key);}/*** Removes a user-defined attribute.** @param key attribute key*/public void removeAttribute(String key) {attributes.remove(key);}/*** Gets a copy of all attributes.** @return attributes map*/public Map<String, Object> getAttributes() {return new ConcurrentHashMap<>(attributes);}/*** Sets the entire attribute map, replacing any existing values.** @param newAttributes new attributes to set*/public void setAttributes(Map<String, Object> newAttributes) {this.attributes.clear();if (newAttributes != null) {this.attributes.putAll(newAttributes);}}// ---------- Intermediate value handling ----------/*** Adds an intermediate value (for internal framework use).** @param key intermediate value key* @param value intermediate value*/public void addIntermediateValue(String key, Object value) {intermediateValues.put(key, value);}/*** Retrieves an intermediate value.** @param key intermediate value key* @return intermediate value*/public Object getIntermediateValue(String key) {return intermediateValues.get(key);}/*** Gets a copy of all intermediate values.** @return intermediate values map*/public Map<String, Object> getIntermediateValues() {return new ConcurrentHashMap<>(intermediateValues);}/*** Sets the entire intermediate values map, replacing any existing values.** @param values new intermediate values to set*/public void setIntermediateValues(Map<String, Object> values) {this.intermediateValues.clear();if (values != null) {this.intermediateValues.putAll(values);}}// ---------- Tracing / Spans ----------/*** Initializes a trace ID and event ID for the signal context.** @param eventName the name of the signal event*/public void initTrace(String eventName) {if (eventName == null) {eventName = "unknown";}this.traceId = UUID.randomUUID().toString();this.eventId = eventName + "_" + SnowflakeIdGenerator.nextId();}public String getTraceId() {return traceId;}public String getEventId() {return eventId;}public void setTraceId(String traceId) {this.traceId = traceId;}public void setEventId(String eventId) {this.eventId = eventId;}public String getParentSpanId() {return parentSpanId;}public void setParentSpanId(String parentSpanId) {this.parentSpanId = parentSpanId;}/*** Adds a span to the tracing list.** @param span the span to add*/public void addSpan(Span span) {spans.add(span);}/*** Gets a list of recorded spans (copied for safety).** @return list of spans*/public List<Span> getSpans() {return new ArrayList<>(spans);}@Overridepublic String toString() {return "SignalContext{" +"attributes=" + attributes +", intermediateValues=" + intermediateValues +'}';}/*** Represents a tracing span, capturing the operation details.*/public static class Span {private String spanId;private String parentSpanId;private String operation;private long startTime;private long endTime;private Map<String, Object> metadata = new HashMap<>();public String getSpanId() {return spanId;}public void setSpanId(String spanId) {this.spanId = spanId;}public String getParentSpanId() {return parentSpanId;}public void setParentSpanId(String parentSpanId) {this.parentSpanId = parentSpanId;}public String getOperation() {return operation;}public void setOperation(String operation) {this.operation = operation;}public long getStartTime() {return startTime;}public void setStartTime(long startTime) {this.startTime = startTime;}public long getEndTime() {return endTime;}public void setEndTime(long endTime) {this.endTime = endTime;}public Map<String, Object> getMetadata() {return metadata;}public void setMetadata(Map<String, Object> metadata) {this.metadata = metadata;}}
}
3. 信号优先级枚举
/*** Enumeration for signal processing priority levels.* <p>* Priorities are used to control the execution order or urgency* of signal handlers in the system.*/
public enum SignalPriority {/*** High priority (most urgent)*/HIGH(0),/*** Medium priority (default)*/MEDIUM(1),/*** Low priority (least urgent)*/LOW(2);private final int value;SignalPriority(int value) {this.value = value;}/*** Returns the integer value of the priority level.** @return numerical representation of the priority*/public int getValue() {return value;}
}
这里定义了三种事件的优先级(暂定三种)
这里我们使用泛型来优化
让我们修改SignalHandler接口
@FunctionalInterface
public interface SignalHandler<S, T> {void handle(Envelope<S, T> envelope);}
那么目前我们的使用,在类型这一块就好了很多了,让我们看看Signals呢:
public class Signals {/*** 监听器集合*/private final Map<String, List<SignalHandler<String, Object>>> sigHandlers = new ConcurrentHashMap<>();/*** 连接信号处理器到指定主题** @param topic 信号主题* @param handler 信号处理器*/public void connect(String topic, SignalHandler<String, Object> handler) {// 获取或创建该主题的处理器列表List<SignalHandler<String, Object>> handlers = sigHandlers.computeIfAbsent(topic, k -> new ArrayList<>());// 添加处理器handlers.add(handler);}/*** 发射信号到指定主题** @param topic 信号主题* @param args 传递给处理器的参数*/public void emit(String topic, Object... args) {// 获取该主题的所有处理器List<SignalHandler<String, Object>> handlers = sigHandlers.get(topic);if (handlers != null && !handlers.isEmpty()) {// 调用每个处理器for (SignalHandler<String, Object> handler : handlers) {try {handler.handle(Envelope.Builder.<String, Object>create().eventType("TEST_EVENT").sender("test-sender").payload(123).context(new SignalContext()).build());} catch (Exception e) {// 处理异常,避免一个处理器异常影响其他处理器System.err.println("Error in signal handler for topic '" + topic + "': " + e.getMessage());e.printStackTrace();}}}}public static void main(String[] args) {// 创建信号系统实例Signals signals = new Signals();// 连接信号处理器signals.connect("user.login", (envelope) -> {System.out.println("User logged in: " + envelope.getPayload());});signals.connect("user.logout", (envelope) -> {System.out.println("User logged out: " + envelope.getPayload());});// 发射信号signals.emit("user.login", "Alice");signals.emit("user.logout", "Bob");}
}
那么目前就是一个比较好一些的基于信号的事件驱动框架了(很简陋版),不过其实对于一些及其简单的场景其实还是能用。
🚀 深入思考:事件驱动架构的演进
通过上述示例,你应该能清楚地理解信号与槽机制是如何实现的。那么接下来,让我们探讨事件驱动架构的实际应用和技术演进。
1. 事件驱动架构在现代开发中的应用
事件驱动架构(EDA)不仅仅是一个编程范式,它已经深入到了现代软件系统中:
微服务架构:在分布式系统中,服务之间的通信通常通过事件驱动来实现。比如,使用 Kafka 或 RabbitMQ 来传递事件。
前端开发:浏览器和网页中,几乎所有的交互都是事件驱动的。比如,点击按钮、滑动页面、滚动列表,都是触发事件的方式。
IoT(物联网):传感器通过事件触发数据采集和动作执行,例如当温度超过某个阈值时触发报警。
2. 异步与同步:性能与解耦的权衡
在事件驱动架构中,事件处理的异步化是提高性能的关键。例如,当你发一个动态到朋友圈时,你不希望系统去等每个人都回复后才继续。事件驱动架构支持异步处理,保证不会阻塞主流程。
然而,异步处理的代价是事件的顺序和幂等性问题(事件可能被多次处理)。这时你就需要考虑事件的幂等性和消息顺序问题。
总结:从“信号”到“事件驱动架构”
通过这一章,我们从 观察者模式 的基本概念,到 信号与槽 的实现,逐步构建了一个简单的事件驱动框架。这只是一个起点,未来可以将其扩展到分布式系统,加入更多复杂的功能,比如事件的持久化、消息的异步处理等。
相信你现在已经对事件驱动架构有了更深刻的理解,也可以开始尝试在你的项目中使用这种机制了。如果你有任何问题或者想进一步了解更深入的知识,记得随时向我提问哦!
💡 思考题
-
在你的项目中,哪些功能可以通过事件驱动架构来优化?
-
除了 Kafka 和 RabbitMQ,你知道哪些技术可以用来实现事件驱动系统?
-
如何确保在事件驱动架构中,事件的顺序不被打乱?
更多资料,请关注Code百分百