自定义Camel组件

使用Bean 或者 Processor

一般来说,首先建议考虑使用Bean或者Processor的方式来完成自定义Camel组件的目的。实际上所有的组件都是Processor。所以如果能够使用一个简单的Class来封装逻辑,那是最好的方式。 Bean和Processor简单易用。

java DSL


from(uri).bean(MyBean.class);
1
2

Spring XML DSL


<route> 
 <from uri="direct:hello">
 <to uri="bean:bye"/>
</route>
1
2
3
4
5

A bean: endpoint cannot be defined as the input to the route; i.e. you cannot consume from it, you can only route from some inbound message Endpoint to the bean endpoint as output. So consider using a direct: or queue: endpoint as the input.

不能把一个bean:端点作为路由的输入,即不能从bean端点消费。你只能从入站消息路由到bean端点作为输出,因此,请考虑使用一个direct:或者queue: 端点作为输入

Bean绑定

Bean作为输出端点的时候,可以不指名具体的method的名字。camel会使用一个bean bind的机制来决定使用哪一个方法。 Camel消息绑定到bean的方法的调用会以不同的方式出现,以下是重要性顺序:

  • if the message contains the header CamelBeanMethodName then that method is invoked, converting the body to the type of the method's argument. 如果消息头中带有CamelBeanMethodName的header名称,那么CamelBeanMethodName指定的方法会被调用,消息的body会被转成方法的参数类型。
    • From Camel 2.8 onwards you can qualify parameter types to select exactly which method to use among overloads with the same name (see below for more details). 从Camel 2.8开始,您可以限定参数类型,以准确选择在具有相同名称的重载中使用哪种方法(有关详细信息,请参见下文)。
    • From Camel 2.9 onwards you can specify parameter values directly in the method option (see below for more details). 从Camel 2.9开始,您可以直接在方法选项中指定参数值(有关详细信息,请参阅下文)。
  • you can explicitly specify the method name in the DSL or when using POJO Consuming or POJO Producing 您可以在DSL中或使用POJO Consuming或POJO Producing时明确指定方法名称
  • if the bean has a method marked with the @Handler annotation, then that method is selected 如果bean有一个用@Handler注释标记的方法,则选择该方法
  • if the bean can be converted to a Processor using the Type Converter mechanism, then this is used to process the message. The ActiveMQ component uses this mechanism to allow any JMS MessageListener to be invoked directly by Camel without having to write any integration glue code. You can use the same mechanism to integrate Camel into any other messaging/remoting frameworks. 如果bean可以使用Type Converter机制转换为Processor,那么这将用于处理消息。 ActiveMQ组件使用此机制允许Camel直接调用任何JMS MessageListener,而无需编写任何集成代码。 您可以使用相同的机制将Camel集成到任何其他消息传递/远程处理框架中。i
  • if the body of the message can be converted to a BeanInvocation (the default payload used by the ProxyHelper) component - then that is used to invoke the method and pass its arguments otherwise the type of the body is used to find a matching method; an error is thrown if a single method cannot be chosen unambiguously. 如果消息的body可以转换为BeanInvocation(ProxyHelper使用的默认载体)组件 - 那么它用于调用方法并传递其参数否则使用body的类型用于找到匹配的方法; 如果无法明确选择一个具体的方法,则会引发错误。
  • you can also use Exchange as the parameter itself, but then the return type must be void. 您也可以使用Exchange作为参数本身,但返回类型必须为void。
  • if the bean class is private (or package-private), interface methods will be preferred (from Camel 2.9 onwards) since Camel can't invoke class methods on such beans 如果bean类是私有的(或者是包私有的),那么接口方法将是首选的(从Camel 2.9开始),因为Camel不能在这样的bean上调用类方法

详见:http://camel.apache.org/bean-binding.html

使用Processor...

from(uri).process(new MyProcessor());

public class MyProcessor implements Processor {
    public void process(Exchange exchange) throws Exception {
        //do something... 
    }
}

1
2
3
4
5
6
7
8
<route>
	<from uri="">
	<process ref="refbean">
	<to uri="mock:result">
</route>

1
2
3
4
5
6

使用Camel Component扩展

我们拿一个实际的例子,钉钉消息机器人来说明。 我们将要创建一个组件,这个组件把Camel消息发送到钉钉机器人的调用地址。组件可以支持不同的发送地址。

使用maven archetype创建camel component项目

mvn archetype:generate
    -DarchetypeGroupId=org.apache.camel.archetypes
    -DarchetypeArtifactId=camel-archetype-component
    -DarchetypeVersion=2.23.0
    -DgroupId=com.manystar
    -DartifactId=camel-dingbot
	-Dname=DingBot 
    -Dscheme=dingbot
1
2
3
4
5
6
7
8

这会创建出一个component项目,项目结构如图

images

  • META-INF/services/org/apache/camel/component/dingbot - 允许组件自动发现的配置文件,dingbot是自定义组件的uri 格式,文件内容定义了组件类。
  • DingBotComponent.java - 创建 “dingbot:”端点的组件类
  • DingBotEndpoin Endpoint的实现类,通过其URI(“dingbot:” 在本例中)在DSL中引用。端点负责创建与之关联的Producer和Consumer实例。
  • DingBotProducer.java - 用于将消息交换发送到端点的Producer实现。
  • DingBotConsumer.java - 用于从端点消费消息的Consumer实现。

修改端点类

添加代码

    public String getDingBotUri() {
        return dingBotUri;
    }

    public void setDingBotUri(String dingBotUri) {
        this.dingBotUri = dingBotUri;
    }

    @UriPath(description = "Ding Bot uri",defaultValue = "DingBot") @Metadata(required = "true")
    private String dingBotUri;

1
2
3
4
5
6
7
8
9
10
11

在端点类中添加dingBotUri的参数,这将允许我们采用

	from("file://src/data").convertBodyTo(String.class)
			.to("log:com.manystar.Dingbot")
	  .to("dingbot:bar?dingBotUri=https://oapi.dingtalk.com/robot/send?access_token=xxx")


1
2
3
4
5

的方式调用组件。

修改Producer

这段代码负责把camel消息发送到钉钉机器人地址

    public void process(Exchange exchange) throws Exception {
        HttpClient httpClient = HttpClients.createDefault();
        HttpPost httpPost = new HttpPost(endpoint.getDingBotUri());
        httpPost.addHeader("Content-Type","application/json");
        String textMsg = "{\"msgtype\":\"text\",\"text\":{\"content\":\""+exchange.getIn().getBody()+"\"}}";
        StringEntity se = new StringEntity(textMsg,"utf-8");
        httpPost.setEntity(se);
        HttpResponse response = httpClient.execute(httpPost);
        if(response.getStatusLine().getStatusCode() == HttpStatus.SC_OK){
            String result = EntityUtils.toString(response.getEntity(),"utf-8");
            System.out.print(result);
        }

        System.out.println(exchange.getIn().getBody());    
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

至此基本工作都完成了,剩下的就是测试打包发布jar到maven库了。