Плагин Apache Camel

Описание

Плагин позволяет использовать возможности Apache Camel без необходимости явного взаимодействия с ним, а так же реализует поддержку динамических классов и пересборку при их рекомпиляции и изменениях в конфигурации плагина.

Найстройка

В момент начала своей работы плагин выгружает и регистрирует из конфигурации все валидно-объявленные Camel-компоненты, после чего уже производится обработка зарегистрированных обработчиков. Все изменения в конфигурации и используемых динамических классах приводят к пересборке плагина в рантайме.

Общее

  • camel.enabled - Активация / деактивация

  • camel.configId - ID конфига плагина, пересборка будет происходить только при его изменении

Компоненты

Компоненты, в контексте Camel, являются некими точками входа и выхода данных, реализуя взаимодействие с тем или иным протоколом. (см. Components).

Регистрация компонентов

Регистрация компонентов производится путем их объявления в конфигурации:

# Обязательные параметры
camel.component.1.id=amq
camel.component.1.type=activemq

# Параметры специфичные для каждого из типов компонента
camel.component.1.url=tcp://amq.core.ufanet.ru:61616
camel.component.1.login=bill
camel.component.1.password=OssbHrZHw9N99CJ

Обработчики

Обработчик представляет из себя обыкновенный класс, единственным обязательным условием к которому является наличие пустого публичного конструктора. Он содержит в себе определенным образом аннотированные методы, на основе которых в момент его регистрации формируются Camel Routes.

Пример регистрации обработчиков
camel.handler.1.class=ru.bgcrm.plugin.camel.TestHandler
camel.handler.2.class=ru.bgcrm.dyn.camel.DynamicTestHandler

Применение

Примеры

Получение и отправка

В этом примере мы видим метод - processor, который получает на вход сообщения из очереди one. На данный момент поддерживаются только методы-процессоры, поэтому @From является единственной обязательной аннотацией.

@From(endpoint = "amq:queue:one")
public void onMessage(String message) {
    System.out.println(message);
}
аннотация @From поддерживает контекстные переменные в параметрах запроса
@From(endpoint = "amq:topic:one?clientId={{server:instanceId}}&durableSubscriptionName={{server:instanceId}}")

Указать endpoint возможно не только строкой, но и отдельными компонентами, например:

@From(
        type = ComponentType.ACTIVEMQ,
        path = {
                @PathParameter(value = "topic"),
                @PathParameter(value = "durable-subscription-test")
        },
        query = {
                @QueryParameter(name = "clientId", value = "{{server:instanceId}}"),
                @QueryParameter(name = "durableSubscriptionName", value = "{{server:instanceId}}")
       }
)
public void onMessage(String message) {
    System.out.println(message);
}

А здесь метод не только принимает данные, но и возвращает, в данном случае это будет строка, которая будет отправлена в топик lowercase

@From(endpoint = "amq:queue:uppercase")
@To(endpoint = "amq:topic:lowercase")
public String onMessage(String message) {
    return message.toLowerCase();
}

Сериализация и десериализация

Не обязательно заниматься этим самому, метод может принимать и / или отдавать объекты. В этом случае полученное сообщение будет десериализовано и сериализовано 'под капотом'.

@From(endpoint = "amq:queue:request")
@To(endpoint = "amq:queue:response")
public Response onRequest(Request request) {
   // do some stuff
}

Есть возможность указать свой transformer сообщений, для этого предусмотрена отдельная аннотация:

@Transform(transformer = DefaultGsonTransformer.class)

Перечень аннотаций

Нюансы работы в зависимости от типа компонентов могут отличаться.

Точки входа и выхода

@From(endpoint = "amq:queue:request")
@To(endpoint = "amq:queue:response")
См. Routes

Троттлинг

@Throttle(messages = 100, period = 1, timeUnit = TimeUnit.SECONDS)

Ограничение количества получаемых сообщений за указанный промежуток времени.

См. Throttler

Повторная доставка

@Redelivery(attempts = 3, deadLetterEndpoint = "amq:queue:errors")

Указание количества попыток обработки сообщений и endopoint в который исходное сообщение будет отправлено в случае если по истечении всех попыток его не удастся обработать. К инициации новой попытки приводит исключение брошенное в методе-обработчике. По истечении всех неудачных попыток, перед отправкой в DLQ к сообщению добавляется заголовок под ключем error, содержащий в себе описание ошибки, например:

TransformationException: Deserialization failed: class java.lang.String → class ru.bgcrm.dyn.camel.ufanetru.transfer.TransferRequest

Дополнительные параметры:

  • delayMillis - задержка между попытками

  • backoffMultiplier - задержка с использованием коэффициента

  • delayPattern - описание задержки с помощью паттерна, например 1:2000;3:5000

Кастомный 'transformer' сообщений

@Transform(transformer = GsonTransformer.class)

Отвечает за десериализацию / сериализацию сообщений. По умолчанию используется GsonTransformer.

Поддерживаемые функции

  • DurableSubscriber (Долгосрочная подписка) — это функция, которая гарантирует, что сообщения не будут потеряны в случае сбоев. Когда consumer подписывается на тему в качестве постоянного подписчика, брокер будет сохранять сообщения, до доступности подписчика. При повторном подключении, consumer, получит новые сообщения, поступившие за время его не доступности.

JMX

Благодаря Camel через JMX можно выдергивать подробную информацию о зарегистрированных роутах и их составляющих.

См. Camel JMX