0 просмотров
Рейтинг статьи
1 звезда2 звезды3 звезды4 звезды5 звезд
Загрузка...

Java и Project Reactor. Эпизод 2

Java и Project Reactor. Эпизод 2

Java and Project Reactor. Episode 2

Hello! It’s amazing, but the first part of Article even someone liked it.

Special thanks for your feedback and comments. I have for you
bad
good news: we still have something to talk about! And more precisely, about some details of the work of Reactor.
Inside the Docker, this method can easily lie to you.

It should be noted that the boxed Schedulers.single () and Schedulers.parallel () throw an IllegalStateException when they try to start a blocking statement in them:
block (), blockLast (), toIterable (), toStream ()
. This innovation appeared in the release
.

If you still want to deal with such perversions — use
Shchedulers.newSingle ()
and
Schedulers.newParallel ()
. But the best practice for blocking operators is using Schedulers.elastic () or Schedulers.newElastic ().

Instances Scheduler can also be initialized from ExecutorService using
Schedulers.fromExecutorService ()
. Of Executor also possible, but not recommended.

Some operators from Flux and Mono are launched immediately on a specific Scheduler (but you can also transfer your own). For example, already familiar
Flux.interval ()
by default it runs on Schedulers.parallel ().

Flux.interval (Duration.ofMillis (300), Schedulers.newSingle («test»))

The context of the performance is

How can I change the execution context? We need to resort to one of the operators already familiar to us:

They both accept the Scheduler as an argument and allow you to change the execution context to the specified Scheduler.

But why them two and in what a difference?

In the case of
publishOn
This operator is used in the same way as any other, in the middle of a chain of calls. All subsequent Subscriber will be executed in the context of the specified Scheduler.

In the case of
subscribeOn
operator «global», is triggered immediately on the entire chain of the Subscriber. After calling subscribe (), the execution context will be the specified Scheduler. The context can then be changed using the publishOn operator. Subsequent calls to subscribeOn are ignored.

Flux.just («a», «b», «c») //this is where subscription triggers data production
//this is influenced by subscribeOn
.doOnNext (v -> System.out.println («before publishOn:» + Thread.currentThread (). getName ()))
.publishOn (Schedulers.elastic ())
//the rest is influenced by publishOn
.doOnNext (v -> System.out.println («after publishOn:» + Thread.currentThread (). getName ()))
.subscribeOn (Schedulers.parallel ())
.subscribe (v -> System.out.println («received» + v + «on» + Thread.currentThread (). getName ()));
Thread.sleep (5000);

will print the following result:

before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
before publishOn: parallel-1
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2

Error handling

In Reactor, exceptions are treated as terminal event (terminal event).

If there is an exception somewhere, then something went wrong, our pipeline stops, and the error is thrown to the final Subscriber and its method onerror .

Why is that? Reactor does not know about the seriousness of the exception that arises and has no idea what to do with it. Such situations should somehow be handled at the application level. For this, Subscriber has a fine method onerror (). Reactor forces us to redefine it and somehow react to an exception, otherwise we will receive UnsupportedOperationException with errors.

To be honest, he throws out the heir to UnsupportedOperationException — ErrorCallbackNotImplemented . To understand that this is really him, there is an auxiliary static method Errors.errorCallbackNotImplemented (Throwable t) .

Philosophy of try /catch

What is usually done inside catch -block in Java? Well, not counting all your favorite empty catch -blocks.

  1. Static Fallback Value. Return some static value by default:

Fallback Method. Calling an alternative method in case of an error:

Dynamic Fallback Value. Return some dynamic value depending on the exception:

Catch and Rethrow . Wrap in some domain exception and throw an exception further:

Log or React on the Side . Ignore the error and throw an exception further:

Using Resources and the Finally Block. Freeing resources in a finally-block or using try-with-resources.

Pleasant news: All this is in Reactor in the form of equivalent operators.

Less pleasant news: in the event of an error, your beautiful sequence of data will still end (terminal event), despite the error handling operator.

Such operators are used more likely to create a new, fallback sequence to replace the completed one.

Let’s give an example:

You can compare this with a similar try /catch block:

Please note: for is interrupted!

Another example of completing a sequence in case of an error:

On the screen, we get:

Try /catch implementation

Static Fallback Value

Using the operator onerrorReturn :

You can add a predicate so that the statement is not executed for all exceptions:

Fallback Method

Using the operator onerrorResume ,

you can add a predicate so that the statement is not executed for all exceptions:

Dynamic Fallback Value

All the same onerrorResume:

Catch and Rethrow

Can be done in two ways. The first one is with the operator onerrorResume :

And more concisely — with the help of onerrorMap :

Log or React on the Side

Add some side effect (metrics, logging) using the doonerror operator

Using Resources and the Finally Block

So, how do you get a try-with-resources analogue or a finally block? The operator comes to the rescue. Flux.using () .

First, you need to familiarize yourself with the Disposable interface. It forces us to implement the method. dispose () . Calling this method must cancel or terminate a task or sequence. Method calls must be idempotent. The resources used must be released.

Repeat | Retrying

When you retry, you see a similar behavior, the original sequence terminates (terminate event), we re-subscribed (re-subscribing) to Flux.

Java для начинающих: часть 4 из 4

Данные статьи помогут легко и быстро разобраться в концепциях и программировании на Java. Даже при нулевых знаниях в Java трудностей в освоении этих материалов не возникнет. А опытные Java-разработчики смогут освежить свои знания.

Поддержка функционального программирования в Java 8

Функциональное программирование — это некая альтернатива объектно-ориентированному программированию, основанная на чистых функциях. Функциональные приложения не имеют общего состояния. По сравнению с объектно-ориентированным кодом такие приложения более лаконичные и предсказуемые. Используя термины из чистой математики, можно сказать, что чистые функции:

  • Без состояния. Переменные не хранят своего состояния или значения. Отсутствуют глобальные переменные.
  • Без памяти. Отсутствует память и побочные эффекты.
  • Выполняют параллельные вычисления. Не требуют синхронизации. Намного проще, чем многопоточность.

Функциональное программирование позволяет писать более чистый и читабельный объектно-ориентированный код с меньшими трудностями. Так как же применять функциональное программирование в Java, и что еще можно о нем добавить?

1. Хранение функции в объекте

В Java 8 появился интерфейс java.util.Function . Он может хранить функцию, которая принимает аргумент, а возвращать — объект. Generic T — это тип аргумента, а R — тип возвращаемого объекта. Пример:

2. Функциональный интерфейс

Это интерфейс с одним абстрактным методом. В нем доступна всего одна функция. В Java 8 для представления экземпляра функционального интерфейса используются лямбда-выражения. Примеры функциональных интерфейсов: Runnable, Comparator, Comparable.

Использование лямбда-выражения в функциональном интерфейсе:

Подробнее о таких выражениях поговорим чуть позже.

3. Стандартные методы в интерфейсе

В интерфейсах всегда присутствовали публичные статические конечные (final) поля. А в Java 8 к ним добавились и стандартные методы. Раньше приходилось создавать безымянные внутренние объекты класса или реализовывать эти интерфейсы. Суть стандартных методов сводится к тому, что это методы интерфейса со стандартной реализацией, а в наследуемом классе можно настроить свою реализацию.

Почему бы не продолжить работать с абстрактными классами?
Абстрактные классы так и остались одним из способов представления состояния или реализации главных методов объекта. Они идут в паре с состоянием. Стандартные методы в интерфейсе используются для чистого поведения.

Пример метода по умолчанию: nullLast в интерфейсе Comparator.

Затем появились лямбда-выражения и новая трудность — поддержка этих выражений в существующих интерфейсах коллекций. С этой целью потребовалось переписать все интерфейсы коллекций. Так и возникла концепция стандартных методов в интерфейсах.

Поэтому с поддержкой функционального программирования интерфейсы лишились состояния и обрели несколько статических стандартных методов.

4. Никаких методов с синхронизацией интерфейса

Синхронизация — это контроль доступа нескольких потоков к общим ресурсам. Синхронизированные статические методы ставят блокировку класса. Поэтому как только поток достигает синхронизированного статического метода, класс блокируется монитором потока, и ни один другой поток не может обратиться к статическим синхронизированным методам данного класса. В отличие от методов создания экземпляров, несколько потоков могут получать доступ к одинаковым синхронизированным методам создания экземпляров одновременно и для разных экземпляров.

Например, метод run из класса Runnable можно синхронизировать. Если это сделать, то перед запуском метода run блокировка объекта Runnable будет занята другим процессом.

Синхронизация сводится к блокировке. Блокировка — это контроль над общим доступом к изменяемому состоянию. Каждый объект имеет свои правила синхронизации, в которых определяется, какие «замки» применяются к тем или иным состояниям переменных. Многие объекты используют собственную политику синхронизации — Java Monitor Pattern, в которой состояние объекта защищено встроенной блокировкой.

Но интерфейсы не владеют состояниями объектов. Подклассы могут переопределять методы, объявленные синхронизированными в суперклассе. Тем самым удаляется синхронизация. Так и возникает ложная уверенность в том, что вы приняли какие-то меры для безопасности потока. А отсутствие сообщений об ошибке не позволяет оценить правильность критериев синхронизации.

5. Optional

Все мы ненавидим null и их проверку. Проверять каждый аргумент на «пустоту» — это сущий ад.

В Java 8 появился java.util.Optional , созданный для обработки объектов, которые могут и не существовать. Это объект-контейнер с включением еще одного объекта. Generic T — тип содержимого объекта.

В классе Optional нет публичного конструктора. Для создания класса, который ни при каких обстоятельствах не может оказаться пустым, используется Optional.of(object). Для null-объектов применяется Optional.ofNullable(object).

6. Лямбда-выражения

Являются читабельным и выразительным способом обработки списков и коллекций по средством кода. Это метод без объявления (напр., модификатор доступа); возвращает значение и имя. Экономит время на объявлении и написании отдельного метода для содержащего класса.

Довольно часто лямбда-выражения:

  • Передаются как аргументы функций высшего порядка.
  • Используются для построения результата возвращаемой функции для функции высшего порядка.
  • Передаются как аргументы (основное назначение).

Лямбда-выражения позволяют рассматривать функции как аргумент метода, а код — как данные.

7. Стримы

Это отличнейшая новинка для работы с коллекциями данных. Почти каждый метод стрима возвращает стрим, поэтому программисты могут продолжать свою работу. В процессе прохождения стрима предусмотрены возможности фильтра, а также map/reduce.

Кроме того, стримы неизменяемы и являются одноразовыми объектами. Как только объект пройден, повторение действия невозможно. При каждой манипуляции разработчики создают новый стрим. Поддержка функционального программирования: разработчики преобразовывают структуру данных в стрим и продолжают работу с ним, не изменяя оригинальных данных. Никакой нехватки памяти или побочных эффектов!

Несколько примеров использования стримов:

Массив в стриме

Конкатенация нескольких списков в стрим

Применение фильтра с условиями для стрима

Использование коллектора (Collector) для преобразования стрима в список

Пример с reduce в решении задачи

Сортировка данных в стриме

8. Дополнительная информация

Наряду с корректным использованием этих примеров для написания чистого и читабельного кода, в функциональном программировании есть ряд дополнительных нюансов: никаких глобальных переменных в функциях, все переменные — конечные, функции используются как параметры, написание функций зависит только от их параметров.

Поддержка реактивного программирования в Java 9

Реактивное программирование основано на потоках данных. Потоки данных исходят из одного компонента и движутся в другой. EventBus, события click, лента в Twitter — это те же асинхронные стримы событий или потоки данных. Класс Observable и интерфейс Observer — отличный пример парадигмы реактивности. Иначе говоря, реактивное программирование сводится к созданию следующей архитектуры: Event driven или Message driven (асинхронная), масштабируемой, отказоустойчивой и отзывчивой.

Java 9 представила Flow API и общие интерфейсы для пошаговой реализации реактивного программирования. Flow API берет на себя всю часть по взаимодействию (запрос, замедление, просмотр, блокировка и т.д.) и упрощает наш переход на реактивное программирование, позволяя работать без дополнительных библиотек (к примеру, RxJava и Project Reactor и т.д.). Во Flow предусмотрено 4 вложенных интерфейса:

  • Flow.Processor → для преобразования входящего сообщения и его передачи следующему Подписчику нужна реализация интерфейса Processor. Подходит для последовательной передачи элементов от Издателей к Подписчикам.
  • Flow.Publisher → создание/публикация элементов и обработка сигналов.
  • Flow.Subscriber → Для получения сообщений и сигналов необходимо реализовать интерфейс Subscriber.
  • Flow.Subscription → связывает Издателя и Подписчика.
  • java.util.concurrent.SubmissionPublisher → Во Flow API есть только один класс Издателя с реализацией Flow.Publisher для создания элементов, которые поддерживаются инициативой Reactive Streams.

Пример ниже наглядно показывает потоки, различные интерфейсы и методы их применения.

Опубликованные и потребленные сообщения

В простом реактивном потоке Издатель публикует сообщения, а обычный Подписчик их потребляет — по одному в процессе появления. Издатель публикует поток данных, на который асинхронно подписан Подписчик.

В примере ниже показано, как класс SubmissionPublisher реализует Publisher. В Publisher есть только один метод — subscribe(), он позволяет подписчикам получать созданные события. А метод отправки SubmissionPublisher создает элементы.

Теперь поговорим о методах интерфейса для Подписчика:

  • onSubscribe(subcription) → Издатель вызывает этот метод при появлении нового Подписчика. Обычно подписка либо сразу сохраняется, т.к. она нужна для отправки сигналов Издателю (запрос большего количества элементов, отмена подписки), либо непосредственно используется для запроса первого элемента (как и показано в примере).
  • onNext(item) → Этот метод вызывается при получении нового элемента. Как правило, там же происходит и обработка элемента, его логирование и запрос нового.
  • onError(throwable) → Вызывается Издателем, предупреждает Подписчика о каких-либо проблемах. Здесь же ведутся логи сообщений о сбоях при размещении элемента Издателем.
  • onComplete() → Вызывается при отсутствии элементов для отправки у Издателя. Указывает, что Подписка завершена.

Теперь посмотрим на реализацию Подписчика:

В чем их отличие от шаблона «Наблюдатель»?

Такой вопрос может возникнуть. В Java уже есть класс Observable и интерфейс Observer. Так что зачем это новшество, и в чем их отличия?

Одним из главных отличий является то, что в шаблоне «Издатель-Подписчик» оба участника не знакомы друг с другом, и коммуникация ведется посредством запросов и посредников. Эти участники слабо связаны. В шаблоне «Наблюдатель» сам наблюдатель знает всех наблюдаемых. Поэтому, в отличие от синхронного «Наблюдателя», шаблон «Издатель-Подписчик» представляет собой асинхронный метод использования элементов в нескольких приложениях или микросервисах.

Java и Project Reactor. Эпизод 2

76 просмотра

2 ответа

334 Репутация автора

У меня есть следующая задача, и я хочу решить ее с помощью Project Reactor (или RxJava)

Есть источники событий. Каждое событие состоит из serviceId и некоторой полезной нагрузки. Как только событие получено, нам нужно выполнить действие для указанного serviceId с полезной нагрузкой. Но мы должны убедиться, что промежуток времени между двумя запросами к одному и тому же идентификатору службы должен быть больше или равен одной секунде. Но запросы к разностным службам могут выполняться параллельно.

Также стоит отметить, что количество сервисов является динамическим.

Похоже на следующее изображение

В настоящее время у меня есть следующий код:

Ответы (2)

плюса

6718 Репутация автора

Если события идентифицируют сервис, в котором они инициируют вызовы, вы можете использовать groupBy() оператора для разделения потоков по сервисам. Чтобы ввести задержку после каждого запроса на обслуживание, используйте flatMap() параметр для однопотокового использования.

  1. Сгруппируйте события по сервису, который они будут использовать. Идентификатор будет использоваться в качестве ключа позже. Это будет генерировать новые элементы при обнаружении нового идентификатора службы.
  2. serviceObservable это GroupByObservable будет обработано ниже.
  3. Каждое излучение от этой наблюдаемой является событием, которое должно быть направлено на один сервис.
  4. serviceObservable.getKey() возвращает идентификатор службы для использования. Я изобрел метод, service() который отправляет событие в службу по идентификатору службы. Кроме того, этот параметр 1 сообщает flatMap() однопотоковой операции, поэтому одновременно может выполняться только один запрос на обслуживание.
  5. delay() (Или в зависимости от того оператора вы хотите) будет ждать второго перед освобождением операции.

(Отказ от ответственности: этот код не тестировался, но я делал подобные виды планирования в прошлых проектах, поэтому основная идея — надежная.)

плюса

329 Репутация автора

Flux.groupBy () поможет вам в этом случае. Оператор использует функцию отображения для создания ключей и группирует испускаемые элементы на основе ключа. Вы можете рассматривать serviceId в качестве ключа.

Вы также можете добавить различные задержки в зависимости от идентификатора сервиса. Проверьте приведенный ниже фрагмент в качестве примера — четные целые числа будут задерживаться на 2 секунды, а нечетные на 1 секунду.

Reactive Programming With Project Reactor

Want to learn more about reactive programming with Project Reactor? Check out this post to learn more with these example scenarios.

Join the DZone community and get the full member experience.

If you are building reactive microservices, you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some of the most common scenarios of using reactive streams in microservice-based architecture during inter-service communication. I have already described some aspects related to reactive programming with Spring based on Spring WebFlux and Spring Data JDBC projects in the following articles:

Spring Framework supports reactive programming since version 5. That support is build on top of Project Reactor. Reactor is a fourth-generation Reactive library for building non-blocking applications on the JVM based on the Reactive Streams Specification. Working with this library can be difficult at first, especially if you don’t have any experience with reactive streams. Reactive Core gives us two data types that enable us to produce a stream of data: Mono and Flux . With Flux ,we can emit 0..nelements. While with Mono , we can create a stream of 0..1 elements. Both those types implement the Publisher interface. Both of these types are lazy, which means they won’t be executed until you consume it. Therefore, when building reactive APIs, it is important not to block the stream. Spring WebFlux doesn’t allow that.

Introduction

The sample project is available on GitHub in repository reactive-playground. It is written in Kotlin. In addition to some Kotlin libraries, the only single dependency that needs to be added in order to use Project Reactor is reactor-core .

I would not like to show you the features of Project Reactor based on simple String objects, like in many other articles. Therefore, I have created the following class hierarchy for our tests that allows us to simulate APIs built for three different domain objects.

Class Organization contains a list of Employee and Department . Each department contains a list of Employee assigned only to the given department inside organization. Class Employee has properties: organizationId that assigns it to the organization and departmentId that assigns it to the department.

Here’s the implementation of the Department class.

Here’s the implementation of the Organization class.

Scenario 1

We have two API methods that return data streams.The first of them return Flux , emitting employees assigned to the given organization. The second returns Mono with the current organization.

We would like to return the single stream emitting organization that contains a list of employees, as shown below.

Here’s the solution. We use the zipWhen method that waits for result from source Mono and then calls the second Mono . Because we can zip only the same stream types (in that case these are Mono ), we need to convert Flux returned by the getEmployeesByOrganization method into Mono > using collectList function. Thanks to zipWhen , we can then combine two Mono streams and create a new object inside the map function.

Scenario 2

Let’s consider another scenario. Now, we have two Flux streams that emit employees and departments. Every employee has a property departmentId responsible for assignment to the department.

The goal is to merge those two streams and return the single Flux stream, emitting departments that contain all employees assigned to the given department. Here’s the picture that illustrates the transformation described above.

We can do that in two ways. First, we call the flatMap function on stream with departments. Inside flatMap , we zip every single Department with stream of employees. That stream is then filtered by the departmentId and converted into the Mono type. Finally, we create the Mono type using the map function that emits the department containing the list of employees.

The second way groups Flux with employees by departmentId . Then, it invokes zipping and mapping functions similar to the previous approach.

Scenario 3

This scenario is simpler than the two previous scenarios. We have two API methods that emit Flux with the same object types. The first of them contains a list of employees — id , name , salary properties— while the second includes the id , organizationId , and departmentId properties.

We want to convert it into a single stream emitting employees with full set of properties. The following picture illustrates the described transformation.

In that case, the solution is pretty simple. We are zipping two Flux streams using the zipWith function, and then, we map the two zipped objects into a single, containing the full set of properties.

Scenario 4

In this scenario, we have two independent Flux streams that emit the same type of objects – Employee .

We would like to merge those two streams into a single stream ordered by id . The following picture shows that transformation.

Here’s the solution. We use the mergeOrderedWith function with comparator that compares id . Then, we can perform some transformations on every object, but it is only an option that shows the usage on map function.

Scenario 5

And, the last scenario in this article – we have a single input stream Mono that contains a list of departments. Each department inside that list contains the list of all employees assigned to the given department. Here’s our API method implementation.

The goal is to convert the stream to the same stream Flux , but containing the list of all employees in department. The following picture visualizes the described transformation.

Here’s the solution. We invoke the flatMapIterable function that converts Flux into Flux by returning List . Then, we convert it to Mono and add to newly created Organization object inside the map function.

Java и Project Reactor. Эпизод 2

76 просмотра

2 ответа

334 Репутация автора

У меня есть следующая задача, и я хочу решить ее с помощью Project Reactor (или RxJava)

Есть источники событий. Каждое событие состоит из serviceId и некоторой полезной нагрузки. Как только событие получено, нам нужно выполнить действие для указанного serviceId с полезной нагрузкой. Но мы должны убедиться, что промежуток времени между двумя запросами к одному и тому же идентификатору службы должен быть больше или равен одной секунде. Но запросы к разностным службам могут выполняться параллельно.

Также стоит отметить, что количество сервисов является динамическим.

Похоже на следующее изображение

В настоящее время у меня есть следующий код:

Ответы (2)

плюса

6718 Репутация автора

Если события идентифицируют сервис, в котором они инициируют вызовы, вы можете использовать groupBy() оператора для разделения потоков по сервисам. Чтобы ввести задержку после каждого запроса на обслуживание, используйте flatMap() параметр для однопотокового использования.

  1. Сгруппируйте события по сервису, который они будут использовать. Идентификатор будет использоваться в качестве ключа позже. Это будет генерировать новые элементы при обнаружении нового идентификатора службы.
  2. serviceObservable это GroupByObservable будет обработано ниже.
  3. Каждое излучение от этой наблюдаемой является событием, которое должно быть направлено на один сервис.
  4. serviceObservable.getKey() возвращает идентификатор службы для использования. Я изобрел метод, service() который отправляет событие в службу по идентификатору службы. Кроме того, этот параметр 1 сообщает flatMap() однопотоковой операции, поэтому одновременно может выполняться только один запрос на обслуживание.
  5. delay() (Или в зависимости от того оператора вы хотите) будет ждать второго перед освобождением операции.

(Отказ от ответственности: этот код не тестировался, но я делал подобные виды планирования в прошлых проектах, поэтому основная идея — надежная.)

плюса

329 Репутация автора

Flux.groupBy () поможет вам в этом случае. Оператор использует функцию отображения для создания ключей и группирует испускаемые элементы на основе ключа. Вы можете рассматривать serviceId в качестве ключа.

Вы также можете добавить различные задержки в зависимости от идентификатора сервиса. Проверьте приведенный ниже фрагмент в качестве примера — четные целые числа будут задерживаться на 2 секунды, а нечетные на 1 секунду.

Java и Project Reactor. Эпизод 2

Я нахожусь в процессе запуска нового проекта (java-based). Мне нужно построить его как модульную, распределенную и устойчивую архитектуру. Поэтому мне бы хотелось, чтобы бизнес-процессы общались.

Какой вид «EventBus» использовать в Spring? Встроенный, реактор, Akka?

Мы собираемся запустить новое приложение Spring 4 через несколько недель. И мы хотели бы использовать некоторую событийную архитектуру. В этом году я читал здесь и там о Reactor и, ища его в.

Как регистрировать тела запросов и ответов в Spring WebFlux

Я хочу иметь централизованное ведение журнала для запросов и ответов в моем REST API на Spring WebFlux с Kotlin. До сих пор я пробовал этот подход @Bean fun apiRouter() = router <.

Как проверить, пуст ли Mono?

Я разрабатываю приложение с Spring Boot 2.0 и Kotlin, используя фреймворк WebFlux. Я хочу проверить, выходит ли идентификатор пользователя перед сохранением транзакции. Я застрял в такой простой.

Mono против потока в реактивном потоке

Согласно документации: Поток-это поток, который может излучать 0..N элементов: Flux fl = Flux.just(a, b, c); Mono-это поток из 0..1 элементов: Mono mn = Mono.just(hello);.

Преимущества/Недостатки Реактивного Программирования

Я продолжаю изучать и пробовать реактивный стиль кодирования, используя Reactor и RxJava. Я понимаю, что реактивное кодирование позволяет лучше использовать CPU по сравнению с однопоточным.

Сравнение Java реактивных структур

Я вижу много фреймворков / библиотек, которые утверждают, что они могут помочь построить реактивные приложения в Java, такие как: Akka, Vert.x, RxJava, Reactor, QBit и т. д. Они, похоже, имеют.

Как конвертировать Mono
> в Flux

Я превращаю небольшой проект, написанный на RxJava 1.x, в реактор 3.x. Все хорошо, за исключением того, что я не смог выяснить, как заменить flatMap(Observable::from) на соответствующий аналог. У.

Преимущества HTTP конечные точки возврата флюса/Mono экземпляры, а не объекты переноса данных

Я смотрел Spring Tips: Functional Reactive Endpoints with Spring Framework 5.0 и немного читал о реакторе spring, но я не совсем понимаю его. Каковы преимущества того, что конечные точки возвращают.

Преобразовать поток в Mono

Как я могу преобразовать поток с 1 элементом в Mono? Flux.fromArray(arrayOf(1,2,1,1,1,2)) .distinct() .take(1) Как я могу сделать это a Mono(1)?

Spring WebFlux, как я могу отладить свой WebClient POST exchange?

У меня возникли проблемы с пониманием того, что я сделал неправильно при построении моего запроса WebClient. Я хотел бы понять, как выглядит фактический запрос HTTP. (например, сброс необработанного.

Как правильно прочитать Flux и преобразовать его в один входной поток

Я использую WebClient и пользовательский BodyExtractor класс для моего приложения spring-boot WebClient webLCient = WebClient.create(); webClient.get() .uri(url, params).

Как обработать ошибку при выполнении Flux.map()

Я пытаюсь понять, как обрабатывать ошибки при отображении элементов внутри потока. Например, я разбираю строку CSV в один из моих бизнес-процессов POJOs: myflux.map(stock ->.

Как выполнить блокирующие вызовы в веб-приложении Spring Webflux / Reactor Netty

В моем случае использования, когда у меня есть микросервис Spring Webflux с реактором Netty, у меня есть следующие зависимости: org.springframework.boot.spring-boot-starter-webflux (2.0.1.RELEASE).

Функционал WebFlux: как обнаружить пустой поток и вернуть 404?

У меня есть следующая упрощенная функция обработчика (Spring WebFlux и функционал API с использованием Kotlin). Однако мне нужна подсказка, как обнаружить пустой поток и затем использовать.

карте против flatMap в реактор

Я нашел много ответов относительно RxJava , но я хочу понять, как это работает в Reactor. Мое нынешнее понимание очень расплывчато, я склонен думать о map как о синхронном, а flatMap-как об.

Spring WebFlux: разрешено только одно подключение приемного абонента

Я пишу простое приложение с Spring 5 Webflux и Kotlin. Я пытаюсь реализовать конечную точку PUT следующим образом: PUT(/confs/, < val id = it.pathVariable(id).

Модель резьбы Spring WebFlux и Реактора

В настоящее время экспериментирует реактивное программирование с Spring 5.0.0.RC2 , реактором 3.1.0.M2 и Spring Boot 2.0.0.M2 . Интересно узнать о параллелизме и потоковой модели , используемой.

Spring 5 Web Reactive — как мы можем использовать WebClient для получения потоковых данных в потоке?

Текущая документация milestone (M4) показывает и пример о том, как получить Mono с помощью WebClient : WebClient webClient = WebClient.create(new ReactorClientHttpConnector());.

Как перебрать поток и смешать с Mono

У меня есть вариант использования, когда я должен отправить email пользователям. Сначала я создаю тело email. Mono emailBody = . cache(); А затем я выбираю пользователей и отправляю.

Читать еще:  Как выключить 3g на iphone 5s?
Ссылка на основную публикацию
Статьи c упоминанием слов:
Adblock
detector