当前位置: 首页 > news >正文

spring响应式编程系列:异步生产数据

目录

示例

大致流程

create

new MonoCreate

subscribe

new LambdaMonoSubscriber

monoCreate.subscribe

accept

success

onNext

时序图

类图

数据发布者

MonoCreate

数据订阅者

LambdaMonoSubscriber

订阅的消息体

DefaultMonoSink

        本篇文章我们来研究如何将现有异步 API(如回调式接口)适配到 Reactor 的响应式流中。

        默认情况下,Mono.create的代码块执行在订阅时的线程上,但如果在该代码块中启动其他线程或使用异步API,那么数据生产就会变成异步的。示例如下所示:

示例

Mono<String> mono = Mono.create(sink -> {
    // 模拟一个异步API操作
    new Thread(() -> {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            log.info("success");
            sink.success("Hello, World!"); // 成功时发射数据
        } catch (InterruptedException e) {
            sink.error(e); // 发生错误时发射错误信号
        }
    }).start();
});
log.info("main start");
mono.subscribe(x -> log.info("main finish"));
Thread.sleep(5000);

        在这里,通过Mono.create模拟一个异步API操作,API操作成功后,调用sink.success("Hello, World!")进行数据发布者发送数据,从而触发数据的订阅。

        接下来,让我们一起看看程序的流程是怎么处理的。

        点击create()方法,如下所示:

大致流程

create

public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
    return onAssembly(new MonoCreate<>(callback));
}

        在这里,new一个MonoCreate对象并返回。

        点击MonoCreate,如下所示:

new MonoCreate

final class MonoCreate<T> extends Mono<T> implements SourceProducer<T> {
   static final Disposable TERMINATED = OperatorDisposables.DISPOSED;
   static final Disposable CANCELLED = Disposables.disposed();
   final Consumer<MonoSink<T>> callback;
   MonoCreate(Consumer<MonoSink<T>> callback) {
      this.callback = callback;
   }

        在这里,将create()方法的回调接口参数赋值给callback属性。因此,Mono.create的参数就作为数据发布者的一个属性信息了。

        点击示例里的mono.subscribe(),如下所示:

subscribe

public final Disposable subscribe(
      @Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Context initialContext) {
   return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
         completeConsumer, null, initialContext));
}

        在这里,new一个LambdaMonoSubscriber对象,如下所示:

new LambdaMonoSubscriber

LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Consumer<? super Subscription> subscriptionConsumer,
      @Nullable Context initialContext) {
   this.consumer = consumer;
   this.errorConsumer = errorConsumer;
   this.completeConsumer = completeConsumer;
   this.subscriptionConsumer = subscriptionConsumer;
   this.initialContext = initialContext == null ? Context.empty() : initialContext;
}

        在这里,将subscribe的回调接口参数赋值给consumer 属性,因此,mono.subscribe的参数就作为数据消费者的属性了。

        点击上一步的subscribeWith()方法,如下所示:

monoCreate.subscribe

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
   DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual);
   actual.onSubscribe(emitter);
   try {
      callback.accept(emitter);
   }
   catch (Throwable ex) {
      emitter.error(Operators.onOperatorError(ex, actual.currentContext()));
   }
}

        在这里,首先调用了数据消费者的onSubscribe()方法,这个与《spring响应式编程系列:总体流程》一样。

        另外,调用了callback.accept()方法,也就是Mono.create()的回调接口参数。

accept

Mono<String> mono = Mono.create(sink -> {
    // 模拟一个异步操作
    new Thread(() -> {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            log.info("success");
            sink.success("Hello, World!"); // 成功时发射数据
        } catch (InterruptedException e) {
            sink.error(e); // 发生错误时发射错误信号
        }
    }).start();
});

        在这里,模拟了耗时操作,然后调用sink.success()方法。

      通常,可以将sink对象保存在线程共享环境里,等其它的业务操作执行完成后,再调用sink.success()方法,即可发射数据发布者数据,从而触发消费者订阅。

        点击sink.success(),如下所示:

​​​​​​​success

public void success(@Nullable T value) {

... ...
     for (; ; ) {
      int s = state;
      if (s == HAS_REQUEST_HAS_VALUE || s == NO_REQUEST_HAS_VALUE) {
         Operators.onNextDropped(value, actual.currentContext());
         return;
      }
      if (s == HAS_REQUEST_NO_VALUE) {
         if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
            try {
               actual.onNext(value);
               actual.onComplete();
            }
            catch (Throwable t) {
               actual.onError(t);
            }
            finally {
               disposeResource(false);
            }
         } else {
            Operators.onNextDropped(value, actual.currentContext());
         }
         return;
      }
      ... ...
   }
}

        在这里,调用了数据订阅者的onNext()方法,如下所示:

​​​​​​​onNext

public final void onNext(T x) {
   Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
   if (s == Operators.cancelledSubscription()) {
      Operators.onNextDropped(x, this.initialContext);
      return;
   }
   if (consumer != null) {
      try {
         consumer.accept(x);
      }
      catch (Throwable t) {
         Exceptions.throwIfFatal(t);
         s.cancel();
         doError(t);
      }
   }
   if (completeConsumer != null) {
      try {
         completeConsumer.run();
      }
      catch (Throwable t) {
         Operators.onErrorDropped(t, this.initialContext);
      }
   }
}

时序图

  1. 类关系的设计,与《spring响应式编程系列:总体流程》类似,主要包括数据发布者对象、数据订阅者对象及订阅的消息体对象;
  2. Mono和MonoCreate是数据发布者,LambdaMonoSubscriber是数据订阅者,DefaultMonoSink是订阅的消息体;
  3. 不同点在于,DefaultMonoSink可以通过示例里的Mono.create暴露给业务侧,业务侧的相关业务执行完成之后,可以通过调用该对象success方法,来触发订阅者的回调函数。

​​​​​​​类图

数据发布者

MonoCreate

        MonoCreate与《spring响应式编程系列:总体流程》介绍的类似,都是继承于Mono类,并且实现了CorePublisher和Publisher接口。

        不同点在于,该数据发布者多了一个属性,如下所示:

        final Consumer<MonoSink<T>> callback;

        该属性是一个可以接收所订阅消息体(类型为MonoSink<T>)参数的回调函数,在这里可以将该消息体与对应的业务建立绑定关系,为后续业务执行结束后的回调做准备。

数据订阅者

LambdaMonoSubscriber

        LambdaMonoSubscriber与《spring响应式编程系列:总体流程》介绍的一样。

订阅的消息体

DefaultMonoSink

        DefaultMonoSink与《spring响应式编程系列:总体流程​​​​​​​》介绍的类似,都实现了Subscription接口。

        不同点在于,DefaultMonoSink实现了MonoSink接口,该接口提供了供业务侧调用 的接口方法,如下所示:

void success(@Nullable T value);

        业务侧的相关业务执行完成之后,可以通过调用该接口方法,来触发订阅者的回调函数。

相关文章:

  • 计算机网络的五层结构(物理层、数据链路层、网络层、传输层、应用层)到底是什么?
  • 如何保证线程安全(含典型手段与应用场景)
  • 什么是智能导诊知识库?
  • 平面连杆机构(上)
  • H.264/AVC标准主流开源编解码器编译说明
  • 在分类任务中,显著性分析
  • 【课题推荐】基于场景的改进IMM算法
  • 在线录屏工具(压箱底)-免费高清
  • 为什么vllm能够加快大模型推理速度?
  • SM30 权限检查
  • 实验四 进程调度实验
  • 英语中的介词(preposition)
  • OSPF中DR/BDR的选举
  • 黑马Java基础笔记-4
  • Linux渗透测试
  • 7.Geometric Intersection: Interval
  • 产销协同是什么?产销协同流程有哪些?
  • 一台服务器已经有个python3.11版本了,如何手动安装 Python 3.10,两个版本共存
  • Neo4j 常用查询语句
  • 数据库系统概论(四)关系操作,关系完整性与关系代数
  • 中南财经政法大学法学院党委副书记易育去世,终年45岁
  • 人民日报评论员:把造福人民作为根本价值取向
  • 17家A股城商行一季报扫描:青岛银行营收增速领跑,杭州银行净利增速领跑
  • 竞彩湃|德甲保级白热化,都灵主帅直面旧主
  • 科普|“小石头,大麻烦”,出现输尿管结石如何应对?
  • AI世界的年轻人|他用影像大模型解决看病难题,“要做的研究还有很多”