Apache Camel指南-第十六章:消息路由的基本使用方法

发布于:2025-04-18 ⋅ 阅读:(82) ⋅ 点赞:(0)

摘要

消息路由模式描述了将消息通道链接在一起的各种方式。这包括可以应用于消息流的各种算法(无需修改消息主体)。

8.1。基于内容的路由器

总览

基于内容的路由器模式,使你将消息路由到基于消息内容的适当的目的地。

基于内容的路由器模式

Java DSL示例

以下示例显示了如何将请求从输入seda:a端点路由到seda:bqueue:cseda:d根据各种谓词表达式的计算:

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a").choice()
          .when(header("foo").isEqualTo("bar")).to("seda:b")
          .when(header("foo").isEqualTo("cheese")).to("seda:c")
          .otherwise().to("seda:d");
    }
};

XML配置示例

以下示例显示了如何在XML中配置相同的路由:

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <choice>
      <when>
        <xpath>$foo = 'bar'</xpath>
        <to uri="seda:b"/>
      </when>
      <when>
        <xpath>$foo = 'cheese'</xpath>
        <to uri="seda:c"/>
      </when>
      <otherwise>
        <to uri="seda:d"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

8.2。消息过滤器

总览

邮件过滤器模式是一个处理器,其消除基于特定的标准不希望的消息。在Apache Camel中,消息过滤器模式由filter()Java DSL命令实现。该filter()命令采用单个谓词参数,该谓词参数控制过滤器。当谓词为时true,允许传入消息继续进行;当谓词为时false,阻止传入消息。

邮件过滤器模式]

Java DSL示例

以下示例说明如何创建从端点seda:a到端点的路由,该路由seda:b将阻止所有消息,但foo标头值为的消息除外bar

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a").filter(header("foo").isEqualTo("bar")).to("seda:b");
    }
};

要评估更复杂的过滤器谓词,可以调用一种受支持的脚本语言,例如XPath。以下示例定义了一条路由,该路由将阻止所有消息,但包含属性等于的person元素的消息除外: name``James

from("direct:start").
        filter().xpath("/person[@name='James']").
        to("mock:result");

XML配置示例

以下示例显示了如何使用XML中的XPath谓词配置路由:

<camelContext id="simpleFilterRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <filter>
      <xpath>$foo = 'bar'</xpath>
      <to uri="seda:b"/>
    </filter>
  </route>
  </camelContext>

确保将要过滤的端点(例如<to uri="seda:b"/>)放在结束</filter>标记之前,否则将不应用过滤器(在2.8+版本中,如果省略,则会导致错误)。

用beans过滤

这是使用bean定义过滤器行为的示例:

from("direct:start")
     .filter().method(MyBean.class, "isGoldCustomer").to("mock:result").end()
     .to("mock:end");

public static class MyBean {
    public boolean isGoldCustomer(@Header("level") String level) {
        return level.equals("gold");
    }
}

使用stop()

自Camel 2.0起可用

停止是一种特殊的过滤器,可以过滤掉所有消息。当您需要停止其中一个谓词中的进一步处理时,使用Stop十分方便。

在下面的示例中,我们不希望Bye消息正文中带有单词的消息在路由中进一步传播。我们在when()谓词中使用.stop()

from("direct:start")
    .choice()
        .when(bodyAs(String.class).contains("Hello")).to("mock:hello")
        .when(bodyAs(String.class).contains("Bye")).to("mock:bye").stop()
        .otherwise().to("mock:other")
    .end()
    .to("mock:result");

知道Exchange是否被过滤

自Camel 2.5起可用

消息过滤器 EIP会,如果它过滤与否,其中列明了交易所添加属性。

该属性的键值Exchange.FILTER_MATCHED为String CamelFilterMatched。其值为一个布尔值,表示truefalse。如果值为,true则将Exchange路由到过滤器块中。

8.3。收件人清单

总览

收件人列表模式,是一种类型的路由器发送每个传入消息向多个不同的目的地。另外,收件人列表通常要求在运行时计算收件人列表。

收件人列表模式]

具有固定目的地的收件人列表

最简单的收件人列表类型是目的地列表是固定的且事先已知的,并且交换模式为InOnly。在这种情况下,您可以将目标列表硬连接到to()Java DSL命令中。

注意

对于固定目的地的收件人列表,此处给出的示例仅适用InOnly交换模式(类似于管道和过滤器模式)。如果要创建带有Out消息的交换模式的收件人列表,请改用多播模式。

Java DSL示例

以下示例显示了如何将InOnly交换从使用者端点路由queue:a到固定的目的地列表:

from("seda:a").to("seda:b", "seda:c", "seda:d");

XML配置示例

以下示例显示了如何在XML中配置相同的路由:

<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <to uri="seda:b"/>
    <to uri="seda:c"/>
    <to uri="seda:d"/>
  </route>
</camelContext>

在运行时计算出的收件人列表

在大多数情况下,使用收件人列表模式时,应在运行时计算收件人列表。为此,请使用recipientList()处理器,该处理器将目的地列表作为唯一参数。由于Apache Camel将类型转换器应用于list参数,因此应该可以使用大多数标准Java列表类型(例如,集合,列表或数组)。

接收者收到相同交换实例的副本,Apache Camel顺序执行它们。

Java DSL示例

下面的示例演示如何从名为的消息头中提取目标列表recipientListHeader,其中头值是端点URI的逗号分隔列表:

from("direct:a").recipientList(header("recipientListHeader").tokenize(","));

在某些情况下,如果标头值为列表类型,则可以直接将其用作的参数recipientList()。例如:

from("seda:a").recipientList(header("recipientListHeader"));

但是,此示例完全取决于基础组件如何解析此特定标头。如果组件解析头作为一个简单的字符串,这个例子将工作。标头必须解析为某种类型的Java列表。

XML配置示例

以下示例显示了如何以XML配置前面的路由,其中标头值是端点URI的逗号分隔列表:

<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <recipientList delimiter=",">
      <header>recipientListHeader</header>
    </recipientList>
  </route>
</camelContext>

并行发送给多个收件人

自Camel 2.2起可用

收件人列表图案支撑件parallelProcessing,其类似于在相应的特征的分离器图案。使用并行处理功能可将交换并发发送到多个收件人,例如:

from("direct:a").recipientList(header("myHeader")).parallelProcessing();

在Spring XML中,并行处理功能被实现为recipientList标记上的属性-例如:

<route>
  <from uri="direct:a"/>
  <recipientList parallelProcessing="true">
    <header>myHeader</header>
  </recipientList>
</route>

停止异常

自Camel 2.2起可用

收件人列表支持stopOnException功能,您可以用它来停止发送任何进一步的收件人,如果任何收件人失败。

from("direct:a").recipientList(header("myHeader")).stopOnException();

在Spring XML中,它是收件人列表标签上的一个属性。

在Spring XML中,异常停止功能被实现为recipientList标记上的属性-例如:

<route>
  <from uri="direct:a"/>
  <recipientList stopOnException="true">
    <header>myHeader</header>
  </recipientList>
</route>

注意
您可以组合parallelProcessingstopOnException在同一路线。

忽略无效的端点

自Camel 2.3起可用

该收件人列表模式支持的ignoreInvalidEndpoints选项,这使收件人列表跳过无效端点(路由单模式也支持这个选项)。例如:

from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();

在Spring XML中,您可以通过ignoreInvalidEndpointsrecipientList标记上设置属性来启用此选项,如下所示

<route>
  <from uri="direct:a"/>
  <recipientList ignoreInvalidEndpoints="true">
    <header>myHeader</header>
  </recipientList>
</route>

考虑myHeader包含两个端点的情况direct:foo,xxx:bar。第一个端点有效且有效。第二个无效,因此被忽略。INFO每当遇到无效的端点时,Apache Camel都会记录日志。

使用自定义AggregationStrategy

自Camel 2.2起可用

您可以AggregationStrategy对收件人列表模式使用自定义,这对于汇总列表中收件人的回复很有用。默认情况下,Apache Camel使用UseLatestAggregationStrategy聚合策略,该策略仅保留最后收到的答复。有关更复杂的聚合策略,您可以定义自己的AggregationStrategy接口实现— 有关详细信息。例如,要将自定义聚合策略应用于MyOwnAggregationStrategy回复消息,可以定义Java DSL路由,如下所示:

from("direct:a")
    .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
    .to("direct:b");

在Spring XML中,您可以将自定义聚合策略指定为recipientList标记的属性,如下所示:

<route>
  <from uri="direct:a"/>
  <recipientList strategyRef="myStrategy">
    <header>myHeader</header>
  </recipientList>
  <to uri="direct:b"/>
</route>

<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

使用自定义线程池

自Camel 2.2起可用

仅在使用时才需要parallelProcessing。默认情况下,Camel使用具有10个线程的线程池。请注意,当我们稍后检修线程池管理和配置时(可能在Camel 2.2中),这可能会有所更改。

您可以像使用自定义聚合策略一样配置它。

使用方法调用作为收件人列表

您可以使用Bean集成来提供收件人,例如:

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");

其中,MessageRouterbean定义如下:

public class MessageRouter {

    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;
    }
}

Bean作为收件人列表

通过将@RecipientList批注添加到返回收件人列表的方法中,可以使bean充当收件人列表。例如:

public class MessageRouter {

    @RecipientList
    public String routeTo() {
        String queueList = "activemq:queue:test1,activemq:queue:test2";
        return queueList;
    }
}

在这种情况下,也没有包括recipientList在路由DSL命令。如下定义路线:

from("activemq:queue:test").bean(MessageRouter.class, "routeTo");

使用超时

自Camel 2.5起可用

如果使用parallelProcessing,则可以配置总计timeout值(以毫秒为单位)。骆驼然后将并行处理消息,直到超时为止。如果一条消息很慢,这使您可以继续处理。

在下面的示例中,recipientlist标头具有值direct:a,direct:b,direct:c,以便将邮件发送给三个收件人。我们有250毫秒的超时时间,这意味着只能在该时间范围内完成最后两条消息。因此,聚合产生字符串结果BC

from("direct:start")
    .recipientList(header("recipients"), ",")
    .aggregationStrategy(new AggregationStrategy() {
            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                    return newExchange;
                }

                String body = oldExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                return oldExchange;
            }
        })
        .parallelProcessing().timeout(250)
    // use end to indicate end of recipientList clause
    .end()
    .to("mock:result");

from("direct:a").delay(500).to("mock:A").setBody(constant("A"));

from("direct:b").to("mock:B").setBody(constant("B"));

from("direct:c").to("mock:C").setBody(constant("C"));

注意

这个timeout功能也被支持splitter,并都multicastrecipientList

默认情况下,如果发生超时,AggregationStrategy则不会调用。但是,您可以实现专用版本

// Java
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {

    /**
     * A timeout occurred
     *
     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
     * @param index        the index
     * @param total        the total
     * @param timeout      the timeout value in millis
     */
    void timeout(Exchange oldExchange, int index, int total, long timeout);

AggregationStrategy如果您确实需要, 这可以让您处理超时。

超时总计

超时是总计,这意味着在X时间之后,骆驼将聚合在该时间范围内完成的消息。其余的将被取消。对于导致超时的第一个索引,Camel也只会一次调用一次该timeout方法TimeoutAwareAggregationStrategy

对传出邮件应用自定义处理

recipientList将消息发送到收件人端点之一之前,它会创建消息副本,该副本是原始消息的浅表副本。在浅表副本中,原始消息的标题和有效负载仅通过引用进行复制。每个新副本不包含这些元素的自己的实例。因此,将链接消息的浅表副本,并且在将消息路由到其他端点时无法应用自定义处理。

如果要在将每个消息副本发送到端点之前对每个消息副本执行一些自定义处理,则可以onPreparerecipientList子句中调用DSL命令。该onPrepare命令只插入一个自定义的处理器后,该消息已被浅拷贝,只是之前的消息被分派到其端点。例如,在以下路由中,在每个接收方端点CustomProc的消息副本上调用处理器:

from("direct:start")
  .recipientList().onPrepare(new CustomProc());

onPrepareDSL命令的 一个常见用例是对消息的某些或所有元素执行深度复制。这允许每个消息副本独立于其他消息副本进行修改。例如,以下CustomProc处理器类执行消息主体的深层复制,其中消息主体被假定为Type *BodyType*,并且该深层复制通过方法进行*BodyType*.deepCopy()

// Java
import org.apache.camel.*;
...
public class CustomProc implements Processor {

    public void process(Exchange exchange) throws Exception {
        BodyType body = exchange.getIn().getBody(BodyType.class);

        // Make a _deep_ copy of of the body object
        BodyType clone =  BodyType.deepCopy();
        exchange.getIn().setBody(clone);

        // Headers and attachments have already been
        // shallow-copied. If you need deep copies,
        // add some more code here.
    }
}

配置选项

recipientListDSL命令支持下列选项:

名称 默认值 描述
delimiter , 如果表达式返回多个端点,则使用定界符。
strategyRef 引用一个AggregationStrategy,用于将来自收件人的答复组合成来自“收件人列表”的单个外发消息。默认情况下,骆驼将使用最后的答复作为外发消息。
strategyMethodName 当使用POJO作为时,此选项可用于显式指定要使用的方法名称AggregationStrategy
strategyMethodAllowNull false 当将POJO用作时,可以使用此选项AggregationStrategy。如果为,则在没有数据要充实时不使用falseaggregate方法。如果没有true,则将null值用于,而oldExchange没有要丰富的数据。
parallelProcessing false Camel 2.2:如果启用,则同时向收件人发送消息。注意,调用者线程将继续等待,直到所有消息都已被完全处理。它只是发送和处理同时发生的来自收件人的答复。
parallelAggregate false 如果启用,则可以同时调用aggregateon方法AggregationStrategy。请注意,这要求实施AggregationStrategy必须是线程安全的。默认情况下,此选项为false,这意味着Camel自动将对aggregate方法的调用同步。但是,在某些用例中,可以通过实现AggregationStrategy为线程安全并将此选项设置为来提高性能true
executorServiceRef Camel 2.2:指用于并行处理的自定义线程池。请注意,如果您设置了此选项,则将自动隐含并行处理,并且您也不必启用该选项。
stopOnException false Camel 2.2:发生异常时是否停止立即继续处理。如果禁用,则Camel会将邮件发送给所有收件人,无论其中一个收件人是否失败。您可以在 AggregationStrategy类中处理异常,您可以在其中完全控制如何处理异常。
ignoreInvalidEndpoints false Camel 2.3:如果端点uri无法解析,则应将其忽略。否则,骆驼会抛出异常,指出端点uri无效。
streaming false Camel 2.5:如果启用,则Camel将无序处理回复,例如按返回顺序。如果禁用,Camel将按照与指定表达式相同的顺序处理答复。
timeout Camel 2.5:设置以毫秒为单位的总超时时间。如果“收件人列表”无法在给定的时间范围内发送和处理所有答复,则将触发超时,并且“收件人列表”将中断并继续。请注意,如果您提供 TimeoutAwareAggregationStrategy则timeout在中断之前将调用该方法。
onPrepareRef Camel 2.8:指的是自定义处理器,以准备每个收件人将收到的Exchange副本。这使您可以执行任何自定义逻辑,例如在需要时深度克隆消息有效负载等。
shareUnitOfWork false Camel 2.8:是否应该共享工作单元。
cacheSize 0 Camel 2.13.1 / 2.12.4:允许配置 ProducerCache的缓存大小,该缓存将生产者缓存以在路由清单中重复使用。默认情况下,将使用默认的缓存大小0。将值设置为-1可以同时关闭所有缓存。

在收件人列表中使用交换模式

默认情况下,收件人列表使用当前的交换模式。但是,在少数情况下,您可以使用其他交换方式将邮件发送给收件人。

例如,您可能有一条以路由形式启动的InOnly路由。现在,如果要将InOut交换模式与收件人列表一起使用,则需要直接在收件人端点中配置交换模式。

下面的示例说明了新文件将以InOnly开头然后路由到收件人列表的路由。如果要将InOut与ActiveMQ(JMS)端点一起使用,则需要使用等于InOut选项的exchangePattern进行指定。但是,来自JMS请求或答复的响应将继续被路由,因此响应将作为文件存储在发件箱目录中。

from("file:inbox")
  // the exchange pattern is InOnly initially when using a file route
  .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut")
  .to("file:outbox");

注意
InOut交换模式必须得到超时期间的响应。但是,如果未收到响应,则失败。


网站公告

今日签到

点亮在社区的每一天
去签到