Split模式

split是EIP模式的一种,用于把把消息分解为若干个消息,分别进行处理。

images

Split Java Object

可以被split的消息

  • Collection
  • Iterator
  • Array
  • String

该方法定义在org.apache.camel.util.ObjectHelper类

   public static Iterable<Object> createIterable(Object value, String delimiter,
                                                  final boolean allowEmptyValues, final boolean pattern) {

        // if its a message than we want to iterate its body
        if (value instanceof Message) {
            value = ((Message) value).getBody();
        }

        if (value == null) {
            return Collections.emptyList();
        } else if (value instanceof Iterator) {
            final Iterator<Object> iterator = (Iterator<Object>)value;
            return new Iterable<Object>() {
                @Override
                public Iterator<Object> iterator() {
                    return iterator;
                }
            };
        } else if (value instanceof Iterable) {
            return (Iterable<Object>)value;
        } else if (value.getClass().isArray()) {
            if (isPrimitiveArrayType(value.getClass())) {
                final Object array = value;
                return new Iterable<Object>() {
                    @Override
                    public Iterator<Object> iterator() {
                        return new Iterator<Object>() {
                            private int idx;

                            public boolean hasNext() {
                                return idx < Array.getLength(array);
                            }

                            public Object next() {
                                if (!hasNext()) {
                                    throw new NoSuchElementException("no more element available for '" + array + "' at the index " + idx);
                                }

                                return Array.get(array, idx++);
                            }

                            public void remove() {
                                throw new UnsupportedOperationException();
                            }
                        };
                    }
                };
            } else {
                return Arrays.asList((Object[]) value);
            }
        } else if (value instanceof NodeList) {
            // lets iterate through DOM results after performing XPaths
            final NodeList nodeList = (NodeList) value;
            return new Iterable<Object>() {
                @Override
                public Iterator<Object> iterator() {
                    return new Iterator<Object>() {
                        private int idx;

                        public boolean hasNext() {
                            return idx < nodeList.getLength();
                        }

                        public Object next() {
                            if (!hasNext()) {
                                throw new NoSuchElementException("no more element available for '" + nodeList + "' at the index " + idx);
                            }

                            return nodeList.item(idx++);
                        }

                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                }
            };
        } else if (value instanceof String) {
            final String s = (String) value;

            // this code is optimized to only use a Scanner if needed, eg there is a delimiter

            if (delimiter != null && (pattern || s.contains(delimiter))) {
                // use a scanner if it contains the delimiter or is a pattern
                final Scanner scanner = new Scanner((String)value);

                if (DEFAULT_DELIMITER.equals(delimiter)) {
                    // we use the default delimiter which is a comma, then cater for bean expressions with OGNL
                    // which may have balanced parentheses pairs as well.
                    // if the value contains parentheses we need to balance those, to avoid iterating
                    // in the middle of parentheses pair, so use this regular expression (a bit hard to read)
                    // the regexp will split by comma, but honor parentheses pair that may include commas
                    // as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)"
                    // then the regexp will split that into two:
                    // -> bean=foo?method=killer(a,b)
                    // -> bean=bar?method=great(a,b)
                    // http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts
                    delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))";
                }
                scanner.useDelimiter(delimiter);

                return new Iterable<Object>() {
                    @Override
                    public Iterator<Object> iterator() {
                        return CastUtils.cast(scanner);
                    }
                };
            } else {
                return new Iterable<Object>() {
                    @Override
                    public Iterator<Object> iterator() {
                        // use a plain iterator that returns the value as is as there are only a single value
                        return new Iterator<Object>() {
                            private int idx;

                            public boolean hasNext() {
                                return idx == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s));
                            }

                            public Object next() {
                                if (!hasNext()) {
                                    throw new NoSuchElementException("no more element available for '" + s + "' at the index " + idx);
                                }

                                idx++;
                                return s;
                            }

                            public void remove() {
                                throw new UnsupportedOperationException();
                            }
                        };
                    }
                };
            }
        } else {
            return Collections.singletonList(value);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139

Spring XML

<simple>标签是一个表达式标签,告诉split使用什么表达式来提取消息,进行拆分 表达式有多种语言表示,参考:http://camel.apache.org/languages.html

<split> 
<simple>${body}</simple> 
<to uri="mock:result"/> 
</split> 

<split> 
<simple>${header.foo}</simple> 
<to uri="mock:result"/> 
</split>  
1
2
3
4
5
6
7
8
9

stream流模式

对于较大的文件或者消息可以使用流模式处理,减少内存开销 例如有这样需要处理的xml消息

<orders> 
<order> <!-- order stuff here --> </order> 
<order> <!-- order stuff here --> </order> ... 
<order> <!-- order stuff here --> </order> 
</orders>
1
2
3
4
5

如果使用xpath处理会导致整个文件被加载入内存,可以使用Tokenizer方式处理

<route> 
<from uri="file:inbox"/> 
<split streaming="true"> 
<tokenize token="order" xml="true"/> 
<to uri="activemq:queue:order"/> 
</split> 
</route>

1
2
3
4
5
6
7
8

tokenizeXML方法会把使用order tag标签把之间的子节点进行拆分处理,拆分后的消息如下格式

<order> <!-- order stuff here --> </order>
1

拆分消息放入队列

<route> 
<from uri="file:inbox"/> 
<split streaming="true"> 
<tokenize token="order" xml="true"/> 
<to uri="activemq:queue:order"/> 
</split> 
</route>

1
2
3
4
5
6
7
8

更详细的内容参考:http://camel.apache.org/splitter.html

Expression表达式提取数据

提取数据

Object value = expression.evaluate(exchange, Object.class)
1

创建迭代器

if (isStreaming()) {
            answer = createProcessorExchangePairsIterable(exchange, value);
        } else {
            answer = createProcessorExchangePairsList(exchange, value);
        }
1
2
3
4
5

createProcessorExchangePairsIterable返回一个SplitterIterable对象,这个对象的构造器会调用上面提到的 ObjectHelper.createIterator



    @Override
    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
        Object value = expression.evaluate(exchange, Object.class);
        if (exchange.getException() != null) {
            // force any exceptions occurred during evaluation to be thrown
            throw exchange.getException();
        }

        Iterable<ProcessorExchangePair> answer;
        if (isStreaming()) {
            answer = createProcessorExchangePairsIterable(exchange, value);
        } else {
            answer = createProcessorExchangePairsList(exchange, value);
        }
        if (exchange.getException() != null) {
            // force any exceptions occurred during creation of exchange paris to be thrown
            // before returning the answer;
            throw exchange.getException();
        }

        return answer;
    }
    
    private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable {

       .....
        private SplitterIterable(Exchange exchange, Object value) {
            this.original = exchange;
            this.value = value;
            this.iterator = ObjectHelper.createIterator(value);
            this.copy = copyExchangeNoAttachments(exchange, true);
            this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
        }
       .....
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    public static Expression simpleExpression(final String expression) {
        return new ExpressionAdapter() {
            public Object evaluate(Exchange exchange) {
                Language language = exchange.getContext().resolveLanguage("simple");
                return language.createExpression(expression).evaluate(exchange, Object.class);
            }

            public String toString() {
                return "simple(" + expression + ")";
            }
        };
    }
1
2
3
4
5
6
7
8
9
10
11
12

split实例

下例为读取xls生成list,然后把list的每一个元素拆分单独的exchange消息

        <route>
            <from uri="file:src/data"/>
            <log message="readfile"/>
            <to uri="file:src/processed"/>
            <log message="parsefile"/>
            <unmarshal>
                <custom ref="xlsBeansDataFormat" />
            </unmarshal>
            <log message="split"/>
            <split>
                <simple>${body.items}</simple>
                <log message="${body}"/>
                <to uri="mock:result"/>
            </split>
        </route>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Split Json数据

JsonPath

sample json

{ "store": {
    "book": [ 
      { "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      { "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      },
      { "category": "fiction",
        "author": "Herman Melville",
        "title": "Moby Dick",
        "isbn": "0-553-21311-3",
        "price": 8.99
      },
      { "category": "fiction",
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings",
        "isbn": "0-395-19395-8",
        "price": 22.99
      }
    ],
    "bicycle": {
      "color": "red",
      "price": 19.95
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

JsonPath 举例

XPath JSONPath Result
/store/book/author $.store.book[*].author the authors of all books in the store
//author $..author all authors
/store/* $.store.* all things in store, which are some books and a red bicycle.
/store//price $.store..price the price of everything in the store.
//book[3] $..book[2] the third book
//book[last()] $..book[(@.length-1)] $..book[-1:] the last book in order.
//book[position() < 3] $..book[0,1] $..book[:2] the first two books
//book[isbn] $..book[?(@.isbn)] filter all books with isbn number
//book[price<10] $..book[?(@.price<10)] filter all books cheapier than 10
//* $..* all Elements in XML document.

拆分举例

json数据格式

[
  {
    "name": "Ram",
    "email": "ram@gmail.com",
    "age": 23,
    "hobby":[
      "writing",
      "riding",
      "drawing"
    ]
  },
  {
    "name": "Shyam",
    "email": "shyam23@gmail.com",
    "age": 28
  }
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

Spring XML DSL 配置

    <camel:camelContext id="jsonAggregate">
        <camel:route>
            <camel:from uri="file://src/data"/>
            <camel:convertBodyTo type="java.lang.String"/>
            <camel:log message="${body}"></camel:log>
            <camel:to uri="direct:start"/>
        </camel:route>
        <camel:route>
            <camel:from uri="direct:start" />
            <camel:split streaming="true">
                <camel:jsonpath>$.[*].email</camel:jsonpath>
                    <camel:log message="${body}"></camel:log>
                    <camel:to uri="activemq:result"></camel:to>
            </camel:split>
        </camel:route>
    </camel:camelContext>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

说明: 第一个route用于从文件中读取json数据,转换成string 第二个route使用jsonpath作为表达式,拆分数据