狼视 ·

简化软件集成:一个Apache Camel教程

前言

本周收到的是一篇关于使用Apache Camel整合企业中各种软件的教程,涉及到从基础到Kubernetes集成。推荐直接看原文:

原文Streamline Software Integration: An Apache Camel Tutorial

作者Anton Goncharov, Russia

文章正文

软件很少(如果有的话)存在于信息真空中。至少,这是我们软件工程师可以为我们开发的大多数应用程序做出的假设。

在任何规模上,每种软件都以某种方式与其他软件进行通信,出于各种原因:从某处获取参考数据,发送监控信号,与其他服务保持联系,同时作为分布式的一部分系统等等。

简化软件集成:一个Apache Camel教程

在本教程中,您将了解集成大型软件的一些最大挑战,以及Apache Camel如何轻松解决这些难题。

问题:系统集成的体系结构设计

在您的软件工程中,您可能至少做了一次以下操作:

  • 确定应启动数据发送的业务逻辑片段。
  • 在相同的应用程序层,根据收件人的期望写入数据转换。
  • 将数据封装在适合通过网络传输和路由的结构中。
  • 使用适当的驱动程序或客户端SDK打开到目标应用程序的连接。
  • 发送数据并处理响应。

为什么这是一个不好的行为?

虽然你只有这种几个连接,它仍然是可管理的。随着系统之间关系的增加,应用程序的业务逻辑与集成逻辑混合在一起,即集成数据,补偿两个系统之间的技术差异,并通过SOAP,REST或更多异常请求将数据传输到外部系统。

如果您要集成多个应用程序,那么在这样的代码中追溯依赖关系的整个画面是非常困难的:数据产生在哪里以及哪些服务使用它?您将有许多地方集成逻辑重复,以引导。

有了这样的方法,虽然这个任务在技术上已经完成,但是我们在集成的可维护性和可伸缩性方面遇到了很大的问题。这个系统中数据流的快速重组几乎是不可能的,更不用说更深层次的问题,比如缺少监视,断路,数据恢复等等。

当将软件集成到一个相当大的企业的范围时,这一点尤为重要。要处理企业集成,就意味着要与一组应用程序一起工作,这些应用程序运行在广泛的平台上,并且存在于不同的位置。在这样一个软件环境中,数据交换是相当苛刻的。它必须符合行业的高安全标准,并提供可靠的数据传输方式。在企业环境中,系统集成需要一个独立的、全面的架构设计。

本文将向您介绍软件集成面临的独特困难,并为集成任务提供一些经验驱动的解决方案。我们将熟悉Apache Camel,这是一个有用的框架,可以减轻集成开发人员头痛的最坏情况。我们将以骆驼如何帮助建立由Kubernetes提供支持的微服务集群中的通信为例。

整合困难

解决该问题的一个广泛使用的方法是在应用程序中分离一个集成层。它可以存在于同一个应用程序中,也可以作为一个独立运行的专用软件 - 在后一种情况下称为中间件

在开发和支持中间件时,您通常会遇到什么问题?一般来说,你有以下关键点:

  • 所有数据通道在一定程度上都不可靠。数据强度低到中等时,可能不会出现由此不可靠性引起的问题。从应用程序内存到下面的缓存和设备的每个存储级别都可能出现故障。只有大量的数据才会出现一些罕见的错误。即使成熟的生产就绪供应商产品也有未解决的与数据丢失有关的错误跟踪器问题。一个中间件系统应该能够通知你这些数据的伤亡,并及时提供消息重新传递。
  • 应用程序使用不同的协议和数据格式。这意味着集成系统是数据转换和适配器到其他参与者的帷幕,并利用了各种技术。这些方法可以包括简单的REST API调用,但也可以访问队列代理,通过FTP发送CSV命令,或者将数据批量拖到数据库表中。这是一张长长的单子,它不会变短的。
  • 数据格式和路由规则的变化是不可避免的。应用程序开发过程中的每个步骤都会改变数据结构,这通常会导致集成数据格式和转换的变化。有时候,重组企业数据流的基础设施变化是必要的。例如,引入一个验证参考数据的单点时,可能会发生这些更改,这些参考数据必须处理整个公司的所有主数据条目。有了N系统,我们最终可能N^2在它们之间有最大的连接,所以必须应用更改的地方的数量增长得相当快。这将像雪崩一样。为了保持可维护性,中间件层必须通过多种路由和数据转换提供清晰的依赖关系图。

在设计集成和选择最合适的中间件解决方案时,应该牢记这些想法。处理这个问题的可能方法之一是利用企业服务总线(ESB)。但是主要供应商提供的ESB通常过于沉重,而且往往比他们的价值更麻烦:ESB几乎不可能快速启动,它的学习曲线相当陡峭,而且它的灵活性被牺牲于一长串的功能和内置工具。在我看来,轻量级的开源集成解决方案要优越得多 - 它们更具弹性,易于部署到云中,并且易于扩展。

软件集成并不容易。今天,当我们构建微服务架构并处理大量的小型服务时,我们对于它们应该如何有效沟通也抱有很高的期望。

企业集成模式

正如所料,像一般的软件开发一样,数据路由和转换的发展涉及重复的操作。经过一段时间的处理整合问题的专业人员对这方面的经验进行了总结和系统化。在结果中,有一组称为企业集成模式的提取模板,用于设计数据流。这些整合方法在Gregor Hophe和Bobby Wolfe的同名书中有描述,这很像“四人帮”的书,但是在胶合软件方面。

举一个例子,规范化模式引入了一个组件,它将具有不同数据格式的语义相同的消息映射到单个规范模型,或者聚合器是一个将一系列消息合并为一个的EIP

由于它们是用于解决架构问题的技术无关的抽象,所以EIP有助于编写一个架构设计,它不会深入到代码级别,而是足够详细地描述数据流。这种描述整合路线的符号不仅使设计简洁,而且在解决与各业务领域的团队成员的整合任务的背景下,设置了一个通用的术语和通用的语言,这是非常重要的。

介绍Apache Camel

  • 集成路由被写成由块组成的管道。它创建了一个完全透明的图像来帮助追踪数据流。
  • 骆驼有许多流行的API适配器。例如,从Apache Kafka获取数据,监控AWS EC2实例,与Salesforce集成 - 所有这些任务都可以使用现成的组件来解决。

几年前,我正在一个大型食品杂货零售网络中建立一个企业集成体系,商店分布广泛。我从一个专有的ESB解决方案开始,后来证明这个方案过于繁琐。然后,我们的团队遇到了Apache Camel,在做了一些“概念验证”工作之后,我们很快地将所有的数据流改写成了Camel路由。

Apache Camel可以被描述为一个“中介路由器”,它是一个面向消息的中间件框架,实现我熟悉的EIP列表。它利用这些模式,支持所有常见的传输协议,并且包含了大量有用的适配器。骆驼能够处理大量的集成例程,而无需编写自己的代码。

除此之外,我会选出下面的Apache Camel特性:

  • 集成路由被写成由块组成的管道。它创建了一个完全透明的图像来帮助追踪数据流。
  • Camel有许多流行的API适配器。例如,从Apache Kafka获取数据,监控AWS EC2实例,与Salesforce集成 - 所有这些任务都可以使用现成的组件来解决。

Apache Camel路由可以用Java或Scala DSL编写。(XML配置也可用,但过于冗长,调试功能更差)。它不会对通信服务的技术堆栈施加限制,但是如果您使用Java或Scala编写,则可以将Camel嵌入到应用程序中独立运行。

Camel使用的路由符号可以用下面的简单伪代码来描述:

from(Source)
   .transform(Transformer)
   .to(Destination)

SourceTransformer以及Destination是指由其uri指向实现组件的端点。

是什么让Camel解决了我之前描述的整合问题?我们来看一下。首先,路由和转换逻辑现在只能用于专门的Apache Camel配置。其次,通过简洁自然的DSL结合EIP的使用,出现了系统之间的依赖关系图。它由易理解的抽象构成,路由逻辑易于调整。最后,我们不必编写转换代码的堆,因为适当的适配器可能已经包含在内。

简化软件集成:一个Apache Camel教程

我应该补充一点,Apache Camel是一个成熟的框架,并定期更新。它有一个伟大的社区和相当庞大的知识库。

它确实有它自己的缺点。骆驼不应该被视为一个复杂的整合套件。这是一个没有高级功能(如业务流程管理工具或活动监视器)的工具箱,但可用于创建此类软件。

替代系统可能是,例如Spring Integration或Mule ESB。对于Spring Integration来说,尽管它被认为是轻量级的,但根据我的经验,把它放在一起并编写大量的XML配置文件可能会变得异常复杂,并且不是一个简单的出路。Mule ESB是一个功能强大且功能强大的工具集,但顾名思义,它是一种企业服务总线,因此它属于不同的权重类别。Mule可以与Fuse ESB进行比较,Fuse ESB是一款基于Apache Camel的类似产品,具有丰富的功能。对我来说,使用Apache Camel来粘贴服务是一件不容易的事情。它很容易使用,并产生一个干净的描述,在什么地方,同时,它的功能足够建设复杂的集成。

编写一个示例路线

我们开始编写代码。我们将从一个同步数据流开始,这个数据流将消息从单一来源路由到收件人列表。路由规则将用Java DSL编写。

我们将使用Maven构建项目。首先将以下依赖项添加到pom.xml

<dependencies>
  ...
  <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-core</artifactId>
    <version>2.20.0</version>
  </dependency>
</dependencies>

或者,应用程序可以建立在camel-archetype-java原型之上。

Camel路径定义在RouteBuilder.configure方法中声明。

public void configure() {

    errorHandler(defaultErrorHandler().maximumRedeliveries(0));

    from("file:orders?noop=true").routeId("main")
        .log("Incoming File: ${file:onlyname}")
        .unmarshal().json(JsonLibrary.Jackson, Order.class)     // unmarshal JSON to Order class containing List<OrderItem>
        .split().simple("body.items")   // split list to process one by one
        .to("log:inputOrderItem")
    .choice()
        .when().simple("${body.type} == 'Drink'")
            .to("direct:bar")
        .when().simple("${body.type} == 'Dessert'")
            .to("direct:dessertStation")
        .when().simple("${body.type} == 'Hot Meal'")
            .to("direct:hotMealStation")
        .when().simple("${body.type} == 'Cold Meal'")
            .to("direct:coldMealStation")
        .otherwise()
            .to("direct:others");

    from("direct:bar").routeId("bar").log("Handling Drink");
    from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert");
    from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal");
    from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal");
    from("direct:others").routeId("others").log("Handling Something Other");
}

在这个定义中,我们创建了一个从JSON文件中获取记录的路径,将它们拆分成条目,并根据消息内容路由到一组处理程序。

让我们在准备好的测试数据上运行它。我们将得到输出:

INFO | Total 6 routes, of which 6 are started
INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds
INFO | Incoming File: order1.json
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='1', type='Drink', name='Americano', qty='1'}]
INFO | Handling Drink
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='2', type='Hot Meal', name='French Omelette', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='3', type='Hot Meal', name='Lasagna', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='4', type='Hot Meal', name='Rice Balls', qty='1'}]
INFO | Handling Hot Meal
INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{id='5', type='Dessert', name='Blueberry Pie', qty='1'}]
INFO | Handling Dessert

正如所料,Camel路由消息到目的地。

数据传输选择

在上面的示例中,组件之间的交互是同步的,并通过应用程序内存执行。但是,当我们处理不共享内存的单独应用程序时,还有更多的通信方式:

  • 文件交换。一个应用程序产生共享数据文件供另一个使用。这是老派精神的生存之地。这种沟通方式带来了诸多后果:缺乏交易和一致性,性能较差,系统之间的孤立协调。许多开发人员最终编写了自制的集成解决方案,使这个过程或多或少地可以管理。
  • 通用数据库。让应用程序将他们希望共享的数据存储在单个数据库的通用模式中。设计统一模式和处理并发访问表是这种方法最突出的挑战。与文件交换一样,这很容易成为永久的瓶颈。
  • 远程API调用。提供一个接口,允许应用程序与另一个正在运行的应用程序进行交互,如典型的方法调用。应用程序通过API调用共享功能,但是它在过程中紧密耦合它们。
  • 消息。让每个应用程序连接到一个通用的消息传递系统,并使用消息异步交换数据和调用行为。发送者和接收者都不必同时启动并运行消息。

有更多的交互方式,但是我们应该记住,从广义上讲,有两种类型的交互:同步和异步。第一个就像在你的代码中调用一个函数 - 执行流程将一直等待,直到它执行并返回一个值。使用异步方法,相同的数据通过中间消息队列或订阅主题发送。异步远程函数调用可以作为请求 - 回复EIP来实现。

异步消息传递不是万能的,它涉及到一定的限制。您很少在网络上看到消息API; 同步REST服务更受欢迎。但是消息中间件被广泛用于企业内部网或分布式系统后端基础设施。

使用消息队列

让我们的示例异步。管理队列和订阅主题的软件系统称为消息代理。这就像一个表和列的RDBMS。队列用作点对点集成,而主题用于与许多接收者的发布 - 订阅通信。我们将使用Apache ActiveMQ作为JMS消息代理,因为它是可靠且可嵌入的。

添加以下依赖项。有时activemq-all,向项目中添加包含所有ActiveMQ jar 的过度,但我们会保持我们的应用程序的依赖关系不复杂。

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.2</version>
</dependency>

然后以编程方式启动代理。在Spring Boot中,通过插入spring-boot-starter-activemqMaven依赖关系,我们得到了一个自动配置。

使用以下命令运行新的消息代理,只指定连接器的端点:

BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616"); 
broker.start();

并将以下配置片段添加到configure方法体:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

现在我们可以使用消息队列来更新前面的例子。队列将自动创建消息传递。

public void configure() {

    errorHandler(defaultErrorHandler().maximumRedeliveries(0));
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

    from("file:orders?noop=true").routeId("main")
            .log("Incoming File: ${file:onlyname}")
            .unmarshal().json(JsonLibrary.Jackson, Order.class)     // unmarshal JSON to Order class containing List<OrderItem>
            .split().simple("body.items")   // split list to process one by one
            .to("log:inputOrderItem")
            .choice()
        .when().simple("${body.type} == 'Drink'")
            .to("activemq:queue:bar")
        .when().simple("${body.type} == 'Dessert'")
            .to("activemq:queue:dessertStation")
        .when().simple("${body.type} == 'Hot Meal'")
            .to("activemq:queue:hotMealStation")
        .when().simple("${body.type} == 'Cold Meal'")
            .to("activemq:queue:coldMealStation")
        .otherwise()
            .to("activemq:queue:others");

    from("activemq:queue:bar").routeId("barAsync").log("Drinks");
    from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert");
    from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals");
    from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals");
    from("activemq:queue:others").routeId("othersAsync").log("Others");
}

好了,现在交互已经变得异步了。这些数据的潜在消费者在准备好时可以访问它。这是一个松耦合的例子,我们试图在一个被动的架构中实现。其中一项服务不可用将不会阻止其他服务。而且,消费者可以并行地从队列中缩放和读取。队列本身可以扩展和分区。持久队列可以将数据存储在磁盘上,等待处理,即使所有参与者都关闭了。因此,这个系统更容错。

一个惊人的事实是,CERN使用Apache Camel和ActiveMQ来监视大型强子对撞机(LHC)的系统。还有一个有趣的硕士论文解释了为这个任务选择合适的中间件解决方案。所以,正如他们在主题演讲中所说:“没有JMS-没有粒子物理学!”

监控

在前面的例子中,我们创建了两个服务之间的数据通道。这是架构中一个额外的潜在失败点,所以我们必须照顾它。我们来看看Apache Camel提供的监视功能。基本上,它通过JMX提供有关其路由的统计信息。ActiveMQ以相同的方式公开队列统计信息。

我们打开应用程序中的JMX服务器,使其能够使用命令行选项运行:

-Dorg.apache.camel.jmx.createRmiConnector=true
-Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel
-Dorg.apache.camel.jmx.rmiConnector.registryPort=1099
-Dorg.apache.camel.jmx.serviceUrlPath=camel

现在运行该应用程序,以便该路线已完成其工作。打开标准jconsole工具并连接到应用程序进程。连接到网址service:jmx:rmi:///jndi/rmi://localhost:1099/camel。转到MBeans树中的org.apache.camel域。

简化软件集成:一个Apache Camel教程

我们可以看到,关于路由的一切都在控制之中。我们有正在进行的消息的数量,错误计数和队列中的消息计数。这些信息可以通过流水线连接到一些监视工具集,如Graphana或Kibana。你可以通过实现知名的ELK栈来做到这一点。

还有一个可插拔和可扩展的Web控制台提供了一个用户界面,用于管理骆驼的ActiveMQ,和更多的人,叫hawt.io

简化软件集成:一个Apache Camel教程

测试路线

Apache Camel具有相当广泛的功能,可以用模拟组件编写测试路由。这是一个强大的工具,但是为了测试而编写单独的路由是一个耗时的过程。在生产线上运行测试而不修改管线会更有效率。骆驼有这个功能,可以使用AdviceWith组件来实现。

让我们在我们的示例中启用测试逻辑并运行示例测试。

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-test</artifactId>
    <version>2.20.0</version>
    <scope>test</scope>
</dependency>

测试类是:

public class AsyncRouteTest extends CamelTestSupport {

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new AsyncRouteBuilder();
    }

    @Before
    public void mockEndpoints() throws Exception {
        context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() {
            @Override
            public void configure() throws Exception {
                // we substitute all actual queues with mock endpoints
                mockEndpointsAndSkip("activemq:queue:bar");
                mockEndpointsAndSkip("activemq:queue:dessertStation");
                mockEndpointsAndSkip("activemq:queue:hotMealStation");
                mockEndpointsAndSkip("activemq:queue:coldMealStation");
                mockEndpointsAndSkip("activemq:queue:others");
                // and replace the route's source with test endpoint
                replaceFromWith("file://testInbox");
            }
        });
    }

    @Test
    public void testSyncInteraction() throws InterruptedException {
        String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}";
        // get mocked endpoint and set an expectation
        MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation");
        mockEndpoint.expectedMessageCount(3);
        // simulate putting file in the inbox folder
        template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json");
        //checks that expectations were met
        assertMockEndpointsSatisfied();
    }

}

现在运行与应用程序的测试mvn test。我们可以看到,我们的路线已经成功地通过了测试建议。没有消息通过实际的队列传递,测试已经通过。

INFO | Route: main started and consuming from: file://testInbox
<...>
INFO | Incoming File: test.json
<...>
 INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied

使用Apache Camel和Kubernetes集群

今天的一个集成问题是应用程序不再是静态的。在云基础架构中,我们同时处理在多个节点上运行的虚拟服务。它使得微服务架构能够与小型,轻量级服务网络相互作用。这些服务的寿命是不可靠的,我们必须动态地发现它们。

将云服务合并在一起是Apache Camel可以解决的任务。特别有趣的是,由于EIP的风格和骆驼有足够的适配器和支持多种协议的事实。最近的2.18版本添加了ServiceCall组件,该组件引入了调用API并通过集群发现机制解析其地址的功能。目前,它支持Consul,Kubernetes,Ribbon等。可以很容易地找到代理的一些例子,其中ServiceCall用Consul配置。我们将在这里使用Kubernetes,因为这是我最喜欢的集群解决方案。

整合架构如下:

简化软件集成:一个Apache Camel教程

Order服务和Inventory服务将是一个简单的Spring Boot应用程序返回静态数据。我们不是绑定在这里的一个特定的技术堆栈。这些服务正在产生我们想要处理的数据。

订购服务控制器:

@RestController
public class OrderController {

    private final OrderStorage orderStorage;

    @Autowired
    public OrderController(OrderStorage orderStorage) {
        this.orderStorage = orderStorage;
    }

    @RequestMapping("/info")
    public String info() {
        return "Order Service UUID = " + OrderApplication.serviceID;
    }

    @RequestMapping("/orders")
    public List<Order> getAll() {
        return orderStorage.getAll();
    }

    @RequestMapping("/orders/{id}")
    public Order getOne(@PathVariable Integer id) {
        return orderStorage.getOne(id);
    }
}

它以如下格式产生数据:

[{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]

Inventory服务控制器是完全类似Order服务的:

@RestController
public class InventoryController {

    private final InventoryStorage inventoryStorage;

    @Autowired
    public InventoryController(InventoryStorage inventoryStorage) {
        this.inventoryStorage = inventoryStorage;
    }

    @RequestMapping("/info")
    public String info() {
        return "Inventory Service UUID = " + InventoryApplication.serviceID;
    }

    @RequestMapping("/items")
    public List<InventoryItem> getAll() {
        return inventoryStorage.getAll();
    }

    @RequestMapping("/items/{id}")
    public InventoryItem getOne(@PathVariable Integer id) {
        return inventoryStorage.getOne(id);
    }

}

InventoryStorage是保存数据的通用存储库。在这个例子中,它返回静态预定义的对象,这些对象被封送到下面的格式。

[{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]

让我们编写一个连接它们的网关路由,但在这个步骤中没有ServiceCall:

rest("/orders")
        .get("/").description("Get all orders with details").outType(TestResponse.class)
        .route()
        .setHeader("Content-Type", constant("application/json"))
        .setHeader("Accept", constant("application/json"))
        .setHeader(Exchange.HTTP_METHOD, constant("GET"))
        .removeHeaders("CamelHttp*")
        .to("http4://localhost:8082/orders?bridgeEndpoint=true")
        .unmarshal(formatOrder)
        .enrich("direct:enrichFromInventory", new OrderAggregationStrategy())
        .to("log:result")
        .endRest();

from("direct:enrichFromInventory")
        .transform().simple("${null}")
        .setHeader("Content-Type", constant("application/json"))
        .setHeader("Accept", constant("application/json"))
        .setHeader(Exchange.HTTP_METHOD, constant("GET"))
        .removeHeaders("CamelHttp*")
        .to("http4://localhost:8081/items?bridgeEndpoint=true")
        .unmarshal(formatInventory);

现在想象一下,每个服务不再是一个特定的实例,而是一个运行一个实例的云。我们将使用Minikube在本地尝试Kubernetes集群。

配置网络路由以在本地查看Kubernetes节点(给出的示例适用于Mac / Linux环境):

# remove existing routes
sudo route -n delete 10/24 > /dev/null 2>&1
# add routes
sudo route -n add 10.0.0.0/24 $(minikube ip)  
# 172.17.0.0/16 ip range is used by docker in minikube                                                                                                        
sudo route -n add 172.17.0.0/16 $(minikube ip)                                                                                                            
ifconfig 'bridge100' | grep member | awk '{print $2}’ 
# use interface name from the output of the previous command 
# needed for xhyve driver, which I'm using for testing
sudo ifconfig bridge100 -hostfilter en5

使用Dockerfile配置将服务包装在Docker容器中,如下所示:

FROM openjdk:8-jdk-alpine
VOLUME /tmp
ADD target/order-srv-1.0-SNAPSHOT.jar app.jar
ADD target/lib lib
ENV JAVA_OPTS=""
ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar

构建并将服务映像推送到Docker注册表。现在运行本地Kubernetes集群中的节点。

Kubernetes.yaml部署配置:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: inventory
spec:
  replicas: 3
  selector:
    matchLabels:
      app: inventory
  template:
    metadata:
      labels:
        app: inventory
    spec:
      containers:
      - name: inventory
        image: inventory-srv:latest
        imagePullPolicy: Never
        ports:
        - containerPort: 8081

将这些部署作为集群中的服务公开:

kubectl expose deployment order-srv --type=NodePort
kubectl expose deployment inventory-srv --type=NodePort

现在我们可以检查请求是否由集群中随机选择的节点提供服务。curl -X http://192.168.99.100:30517/info依次运行几次以访问minikube NodePort以获得公开的服务(使用您的主机和端口)。在输出中,我们看到我们已经实现了请求平衡。

Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5
Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda
Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda
Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5
Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4

添加camel-kubernetescamel-netty4-http依赖项目的pom.xml。然后将ServiceCall组件配置为使用共享路径定义中的所有服务调用的Kubernetes主节点发现:

KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration();
kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443");
kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt");
kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key");
kubernetesConfiguration.setNamespace("default”);

ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition();
config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration));
context.setServiceCallConfiguration(config);

ServiceCall EIP完善了Spring Boot。大多数选项可以直接在application.properties文件中配置。

使用ServiceCall组件授权Camel 路由:

rest("/orders")
        .get("/").description("Get all orders with details").outType(TestResponse.class)
        .route()
        .hystrix()
        .setHeader("Content-Type", constant("application/json"))
        .setHeader("Accept", constant("application/json"))
        .setHeader(Exchange.HTTP_METHOD, constant("GET"))
        .removeHeaders("CamelHttp*")
        .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true")
        .unmarshal(formatOrder)
        .enrich("direct:enrichFromInventory", new OrderAggregationStrategy())
        .to("log:result")
        .endRest();

from("direct:enrichFromInventory")
        .transform().simple("${null}")
        .setHeader("Content-Type", constant("application/json"))
        .setHeader("Accept", constant("application/json"))
        .setHeader(Exchange.HTTP_METHOD, constant("GET"))
        .removeHeaders("CamelHttp*")
        .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true")
        .unmarshal(formatInventory);

我们还启动了路线中的断路器。这是一个集成挂钩,允许在发送错误或收件人不可用的情况下暂停远程系统调用。这旨在避免级联系统故障。Hystrix组件通过实现断路器模式来帮助实现这一点。

让我们运行它并发送测试请求; 我们会得到这两个服务聚合的响应。

[{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]

结果如预期。

其他用例

我展示了Apache Camel如何在一个集群中集成微服务。这个框架的其他用途是什么?一般来说,在基于规则的路由可能是解决方案的任何地方都是有用的。例如,Apache Camel可以成为Eclipse Kura适配器的物联网中间件。它可以处理来自各种组件和服务的日志信号的监视,就像在CERN系统中一样。它也可以是企业级SOA的集成框架,也可以是批量数据处理的管道,虽然它在这方面与Apache Spark没有很好的竞争。

结论

你可以看到系统集成不是一个简单的过程。我们很幸运,因为收集了很多经验。正确应用它来构建灵活和容错的解决方案非常重要。

为了确保正确的应用,我建议有一个重要的集成方面的清单。必须具备的项目包括:

  • 是否有单独的集成层?
  • 是否有集成测试?
  • 我们知道预期的峰值数据强度吗?
  • 我们是否知道预期的数据交付时间?
  • 消息相关性是否重要?如果序列中断?
  • 我们应该以同步还是异步的方式来做?
  • 格式和路由规则更频繁地变化在哪里?
  • 我们有办法监督这个过程吗?

在本文中,我们尝试了Apache Camel,这是一个轻量级集成框架,可帮助您在解决集成问题时节省时间和精力。正如我们所展示的,它可以作为一个工具,支持相关的微服务体系结构,全面负责微服务之间的数据交换。

如果您有兴趣了解有关Apache Camel的更多信息,我强烈建议框架创建者Claus Ibsen撰写“Camel in Action”一书。官方文档可以在camel.apache.org上找到

了解基础知识

什么是EIP?

EIP是企业集成模式的缩写,是用于设计不同企业软件之间数据流的软件模式。

什么是Apache Camel?

关于作者

Anton是一个熟练的全栈软件开发人员和热情的学习者。他在设计强大且可扩展的应用程序方面拥有丰富的专业知识 - 他的工作经验涉及创建和支持多个大型分布式系统。他精通Java / Spring,熟悉JavaScript开发。

参与评论