Camel aggregate

Camel aggregate DEFAULT

Camel Aggregation Strategies

One of the many (many many many) extension points inside Apache Camel is the . These are used in everything from Content Enrichers to Splitters to Aggregators and more. Since their use is so prevalent, I figured that I’d dedicate a whole blog post just for them. So here goes…

So what are AggregationStrategy’s anyway? Simple… they’re implementations of the that allow you to specify exactly how two exchanges will be merged. This specification can be as simple or as complex as you require for your use case. Maybe you just want to take the first response and ignore all others. Maybe you want to combine the XML bodies into a list and then merge a select few headers. The limit really is your imagination. But what do I mean by “merging exchanges”? Let’s take a look at a few concrete examples.

Out of the Box

For starters, there are several implementations that are included out of the box. You can use them “as-is” without writing any custom code at all. Let’s talk through a few of them with some potential use cases.

The first is the implementation. It’s the default strategy for most Camel EIPs that accept aggregation strategies. So if you don’t specify any strategy, this is likely the one you’re using. Basically, it takes the last exchange it receives and just uses that (ignoring any others that may have been aggregated prior). One example use case for this would be when doing an Aggregator. Perhaps you’re receiving many messages as input, but you want to buffer them (giving the user time to send in corrections/updates), and then only send the latest message to the backend after some period of inactivity. That might look like below:

1
2
3
4
5
6
7
8
9
10
11
12
<beanid="useLatest"class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>
<camelContextxmlns="http://activemq.apache.org/camel/schema/spring">
<route>
<fromuri="direct:acceptUpdateableRequest"/>
<aggregatorstrategyRef="useLatest"completionTimeout="5000">
<correlationExpression>
<header>UniqueRequestID</header>
</correlationExpression>
<touri="direct:bufferedSendToBackend"/>
</aggregator>
</route>
</camelContext>

For the next use case, we’ll cover the (very similar) implementation. As the name would suggest, it “merges” two exchanges together by completely ignoring the new exchange and just taking the original. One example of where this might be useful is when doing a Multicast. Lets say I wanted to send a copy of a message off to multiple recipients, but really don’t care about their response. After the multicast is completed, I want to perform some transformation on the original message, and then return the result. Instead of rolling my own implementation, I could simply use the one provided. Something like this:

1
2
3
4
5
6
7
8
9
10
11
<beanid="useOriginal"class="org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy"/>
<camelContextxmlns="http://activemq.apache.org/camel/schema/spring">
<route>
<fromuri="direct:acceptRequest"/>
<multicaststrategyRef="useOriginal">
<touri="direct:recipient1"/>
<touri="direct:recipient2"/>
</multicast>
<touri="xslt:transformOriginal.xsl"/>
</route>
</camelContext>

The next set of implementations, I’ll cover as a group. They are the , , and strategies. They will combine the exchanges into a list and then pass the list itself along to the next processor. They only differ by what they put in the list (ie, , , or ). So, for instance, if you wanted to split a message, process each individual part, and then combine the individual results back into a list, you could do so easily using a Splitter like below:

1
2
3
4
5
6
7
8
9
10
<beanid="listOfBody"class="org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy"/>
<camelContextxmlns="http://activemq.apache.org/camel/schema/spring">
<route>
<fromuri="direct:acceptListRequestExpectingListResponse"/>
<splitstrategyRef="listOfBody">
<simple>${body}</simple>
<touri="direct:sendIndividualRequest"/>
</split>
</route>
</camelContext>

The final implementation that I’ll cover for this section is the . It allows you to provide an XSLT that will be used to merge the original and new exchanges together. A great use case for this is when you want to Enrich an XML request with some extra data retrieved from a backend.

1
2
3
4
5
6
7
8
9
10
11
12
<beanid="xsltEnrichmentStrategy"class="org.apache.camel.util.toolbox.XsltAggregationStrategy">
<constructor-argvalue="/META-INF/xslt/EnrichIndexHtml.xsl"/>
</bean>
<camelContextxmlns="http://activemq.apache.org/camel/schema/spring">
<route>
<fromuri="direct:acceptRequest"/>
<touri="language:constant:classpath:/META-INF/html/index.html"/>
<enrichstrategyRef="xsltEnrichmentStrategy">
<constant>direct:fetchCds</constant>
</enrich>
</route>
</camelContext>

Since this example is a little more complex, it requires more than just a code snippet to explain. So I’ve put together an example application and thrown it up on GitHub. Take a look… https://github.com/joshdreagan/camel-xslt-enricher

It’s amazing how many use cases these “canned” aggregation strategies cover. But what if I they’re not quite exactly what you need?

Semi-Custom

In this section, we’ll discuss what I call “semi-custom” strategies. Basically, they’re base/utility classes that make it easy for you to implement a custom strategy with very little Java code.

The first class we’ll talk about is the . Similar to the grouping implementations mentioned above, the end result of this strategy is a list of items. The difference is that you have total control over what data gets placed in said list as well as where you pull it from. Here’s a very simple example implementation:

1
2
3
4
5
6
7
8
9
10
11
12
package org.apache.camel.examples;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;

publicclassSimpleListAggregationStrategyextendsAbstractListAggregationStrategy<String> {

@Override
public String getValue(Exchange exchange){
return exchange.getIn().getHeader("MyAwesomeHeader", String.class);
}
}

If you need even more control over the aggregation, you can use the . The FlexibleAggregationStrategy is a fluent strategy builder that lets you define fairly complex aggregation strategy implementations using a very concise syntax. If you’re using the Java DSL to define your Camel routes (or are using any Java based bean wiring mechanism), you can just use the fluent builder directly. However, if you’re using it from the Spring DSL (using Spring’s XML bean definitions) it might be easier to wrapper it in a simple Java implementation. See below for an example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.apache.camel.examples;

import org.apache.camel.Exchange;
import org.apache.camel.model.language.SimpleExpression;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.toolbox.AggregationStrategies;

publicclassCorrelationIdAggregationStrategyimplementsAggregationStrategy{

privatefinal AggregationStrategy delegate;

publicFluentAggregationStrategy(){
delegate = AggregationStrategies.flexible()
.storeInHeader("MyCorrelationID")
.pick(new SimpleExpression("${body}"))
;
}

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange){
return delegate.aggregate(oldExchange, newExchange);
}
}

You could then use your implementation like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<beanid="uuidEnrichmentStrategy"class="org.apache.camel.examples.CorrelationIdAggregationStrategy"/>
<camelContextxmlns="http://activemq.apache.org/camel/schema/spring">
<route>
<fromuri="direct:acceptRequest"/>
<enrichstrategyRef="uuidEnrichmentStrategy">
<constant>direct:fetchUuid</constant>
</enrich>
</route>
<route>
<fromuri="direct:fetchUuid"/>
<beanbeanType="java.util.UUID"method="randomUUID"/>
<convertBodyTotype="java.lang.String"/>
</route>
</camelContext>

Pretty powerful stuff! But what if you’re feeling even more imaginative?

Custom

The last type of strategy that I’ll talk about is a “completely custom” implementation. This basically just means that you will implement the interface directly without using any helper base classes (which might restrict you in some ways). Because of this direct implementation, you are free to do literally anything you want.

One example that I whipped up for a customer a while back is what I called the “semi-streaming aggregation strategy”.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package org.apache.camel.examples;

import java.util.Comparator;
import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

publicclassSemiStreamingAggregationStrategyimplementsAggregationStrategy, CamelContextAware, InitializingBean{

privatestaticfinal Logger log = LoggerFactory.getLogger(SemiStreamingAggregationStrategy.class);

publicstaticfinal String LAST_PROCESSED_INDEX = "CamelAggregatorLastProcessedIndex";

private String aggregateProcessorId;
private CamelContext camelContext;
private String sequenceIdHeaderName;


private AggregateProcessor _aggregateProcessor;
private Comparator<Message> _messageComparator;

public String getAggregateProcessorId(){
return aggregateProcessorId;
}

publicvoidsetAggregateProcessorId(String aggregateProcessorId){
this.aggregateProcessorId = aggregateProcessorId;
}

@Override
publicvoidsetCamelContext(CamelContext camelContext){
this.camelContext = camelContext;
}

@Override
public CamelContext getCamelContext(){
return camelContext;
}

public String getSequenceIdHeaderName(){
return sequenceIdHeaderName;
}

publicvoidsetSequenceIdHeaderName(String sequenceIdHeaderName){
this.sequenceIdHeaderName = sequenceIdHeaderName;
}

protected AggregateProcessor _aggregateProcessor(){
if (_aggregateProcessor == null) {
_aggregateProcessor = camelContext.getProcessor(aggregateProcessorId, AggregateProcessor.class);
}
return _aggregateProcessor;
}

protected Comparator<Message> _messageComparator(){
if (_messageComparator == null) {
_messageComparator = (Message t, Message t1) -> t.getHeader(sequenceIdHeaderName, Comparable.class).compareTo(t1.getHeader(sequenceIdHeaderName, Comparable.class));
}
return _messageComparator;
}

@Override
publicvoidafterPropertiesSet()throws Exception {
Objects.requireNonNull(aggregateProcessorId, "The aggregateProcessorId property must not be null.");
Objects.requireNonNull(camelContext, "The camelContext property must not be null.");
Objects.requireNonNull(sequenceIdHeaderName, "The sequenceIdHeaderName property must not be null.");
}

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange){

Exchange aggregateExchange = initializeAggregateExchange(oldExchange, newExchange);
log.info(String.format("Pending messages: [%s] messages", aggregateExchange.getIn().getBody(SortedSet.class).size()));

appendMessage(aggregateExchange, newExchange.getIn());

findAndEmitSequencedMessages(aggregateExchange);

return aggregateExchange;
}

protected Exchange initializeAggregateExchange(Exchange oldExchange, Exchange newExchange){

Exchange aggregateExchange;
if (oldExchange == null) {
aggregateExchange = ExchangeHelper.copyExchangeAndSetCamelContext(newExchange, camelContext);
SortedSet<Message> pendingMessages = new TreeSet<>(_messageComparator());
aggregateExchange.getIn().setBody(pendingMessages);
aggregateExchange.setProperty(LAST_PROCESSED_INDEX, -1L);
} else {
aggregateExchange = oldExchange;
}

return aggregateExchange;
}

protectedvoidappendMessage(Exchange aggregateExchange, Message message){
log.info(String.format("Adding message: index [%s], body [%s]", message.getHeader(sequenceIdHeaderName), message.getBody()));
aggregateExchange.getIn().getBody(SortedSet.class).add(message);
}

protectedvoidfindAndEmitSequencedMessages(Exchange aggregateExchange){

SortedSet<Message> pendingMessages = aggregateExchange.getIn().getBody(SortedSet.class);
Long lastProcessedIndex = aggregateExchange.getProperty(LAST_PROCESSED_INDEX, Long.class);

Message currentMessage;
Long currentMessageIndex;
SortedSet<Message> messagesToBeEmitted = new TreeSet<>(_messageComparator());
do {
currentMessage = pendingMessages.first();
currentMessageIndex = currentMessage.getHeader(sequenceIdHeaderName, Long.class);
if (currentMessageIndex == lastProcessedIndex + 1) {
messagesToBeEmitted.add(currentMessage);
pendingMessages.remove(currentMessage);
lastProcessedIndex = currentMessageIndex;
} else {
break;
}
} while (!pendingMessages.isEmpty());
if (!messagesToBeEmitted.isEmpty()) {
log.info(String.format("Messages to be emitted: [%s] messages", messagesToBeEmitted.size()));
aggregateExchange.setProperty(LAST_PROCESSED_INDEX, lastProcessedIndex);
aggregateExchange.getIn().setBody(pendingMessages);
Exchange exchangeToBeEmitted = ExchangeHelper.copyExchangeAndSetCamelContext(aggregateExchange, camelContext);
exchangeToBeEmitted.getIn().setBody(messagesToBeEmitted);
try {
for (Processor processor : _aggregateProcessor().next()) {
processor.process(exchangeToBeEmitted);
}
} catch (Exception e) {
thrownew RuntimeCamelException(e);
}
}
}
}

Here’s a link to the full source for your perusal: [https://github.com/joshdreagan/camel-streaming-aggregation]. In this implementation, I was asked to do ordering aggregation of incoming messages. But as the messages came in, if the next sequential block was completed, the customer wanted those messages to be emitted at that time instead of waiting for the entire batch to complete. So, for example, if I got messages [1,3,5], those messages would be aggregated and stored in the aggregation repository. But then, when message [2] came in, messages [1,2,3] would be emitted/processed (while message [5] would remain in the repository). Finally, when message [4] came in, messages [4,5] would be emitted/processed. That’s about as custom as they come!

Hopefully this helps highlight some of the power and flexibility of Camel. Like I said at the beginning of this post, your imagination is the limit (or rather your use case). Enjoy!

#fusecamel

Sours: https://blog.joshdreagan.com/2018/08/30/camel_aggregation_strategies/

Understanding Apache Camel EIP - Splitter and Aggregator pattern using example

Define the domain class Employee as follows-
package com.javainuse.domain; public class Employee { public Employee(String empId, String name, String type) { this.empId = empId; this.name = name; this.department = type; } private String empId; private String name; private String department; private double salary; public String getId() { return empId; } public String getName() { return name; } public double getSalary() { return salary; } public void setSalary(double salary) { this.salary = salary; } public String getDepartment() { return department; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("Employee [empId="); builder.append(empId); builder.append(", name="); builder.append(name); builder.append(", department="); builder.append(department); builder.append(", salary="); builder.append(salary); builder.append("]"); return builder.toString(); } }
Define the domain class Department as follows-
package com.javainuse.domain; import java.util.List; public class Department { private List<Employee> employees; private double totalSalary; public List<Employee> getEmployees() { return employees; } public void setEmployees(List<Employee> employees) { this.employees = employees; } public double getTotalSalary() { return totalSalary; } public void setTotalSalary(double totalSalary) { this.totalSalary = totalSalary; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("Department"); builder.append(", Employees="); builder.append(employees); builder.append(", totalSalary="); builder.append(totalSalary); builder.append("]"); return builder.toString(); } } Next define the Route as follows
package com.javainuse.aggregatesplit; import org.apache.camel.builder.RouteBuilder; public class DepartmentRouter extends RouteBuilder { @Override public void configure() throws Exception { from("direct:processDept") .split(body().method("getEmployees"), new DepartmentEmployeeStrategy()) .to("direct:processEmployee").end(); from("direct:processEmployee").choice() .when(body().method("getDepartment").isEqualTo("Finance")) .to("bean:employeeService?method=processFinanceDept") .when(body().method("getDepartment").isEqualTo("IT")) .to("bean:employeeService?method=processITDept"); } }
Define the ProcessEmployees class which processes the Employee salary based on their department.
package com.javainuse.aggregatesplit; import com.javainuse.domain.Employee; public class ProcessEmployees { public Employee processITDept(Employee employee) throws InterruptedException { System.out.println("handling employee department:" + employee); employee.setSalary(10000); System.out.println("IT dept employee processed"); return employee; } public Employee processFinanceDept(Employee employee) throws InterruptedException { System.out.println("handling employee department:" + employee); employee.setSalary(5000); System.out.println("Finance dept employee processed"); return employee; } } Next define the class implementing our AggregationStrategy.
package com.javainuse.aggregatesplit; import java.util.ArrayList; import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.processor.aggregate.AggregationStrategy; import com.javainuse.domain.Department; import com.javainuse.domain.Employee; public class DepartmentEmployeeStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldEmployeeExchange, Exchange newEmployeeExchange) { if (oldEmployeeExchange == null) { Employee newEmployee= newEmployeeExchange.getIn().getBody(Employee.class); System.out.println("Aggregate first employee: " + newEmployee); Department department = new Department(); List<Employee> employees = new ArrayList<Employee>(); employees.add(newEmployee); department.setEmployees(employees); department.setTotalSalary(newEmployee.getSalary()); newEmployeeExchange.getIn().setBody(department); return newEmployeeExchange; } Department department = oldEmployeeExchange.getIn().getBody(Department.class); Employee newEmployee= newEmployeeExchange.getIn().getBody(Employee.class); System.out.println("Aggregate old employees: " + department); System.out.println("Aggregate new department: " + newEmployee); department.getEmployees().add(newEmployee); double totalSalary = department.getTotalSalary() + newEmployee.getSalary(); department.setTotalSalary(totalSalary); return oldEmployeeExchange; } }
Next define the camel-context as follows-
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cxf="http://camel.apache.org/schema/cxf" xmlns:jaxrs="http://cxf.apache.org/jaxrs" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <bean id="deptRouter" class="com.javainuse.aggregatesplit.DepartmentRouter" /> <bean id="employeeService" class="com.javainuse.aggregatesplit.ProcessEmployees" /> <camelContext id="departmentCtx" xmlns="http://camel.apache.org/schema/spring"> <routeBuilder ref="deptRouter" /> </camelContext> </beans>
Finally load the camel context and call the routes with the data.
package com.javainuse.aggregatesplit; import java.util.ArrayList; import java.util.List; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.javainuse.domain.Department; import com.javainuse.domain.Employee; public class DepartmentApp { public static void main(String[] args) { try { ApplicationContext springCtx = new ClassPathXmlApplicationContext( "camel-context.xml"); CamelContext context = springCtx.getBean("departmentCtx", CamelContext.class); context.start(); ProducerTemplate producerTemplate = context.createProducerTemplate(); List<Employee> employees = new ArrayList<Employee>(); employees.add(new Employee("1", "emp1", "Finance")); employees.add(new Employee("2", "emp2", "IT")); employees.add(new Employee("2", "emp3", "IT")); Department dept = new Department(); dept.setEmployees(employees); Department deptDetails = producerTemplate.requestBody( "direct:processDept", dept, Department.class); System.out.println("Department Details - "+deptDetails); context.stop(); } catch (Exception e) { e.printStackTrace(); } } } We can see that the individual employee salary is processed based on the department. We also get the total Salary of all employees.

Apache Camel EIP - Splitter and Aggregator pattern

Download Source Code

Download it - Apache Camel EIP - Splitter and Aggregator pattern

See Also

Spring Boot Hello World Application- Create simple controller and jsp view using MavenSpring Boot Tutorial-Spring Data JPASpring Boot + Simple Security ConfigurationPagination using Spring Boot Simple ExampleSpring Boot + ActiveMQ Hello world ExampleSpring Boot + Swagger Example Hello World ExampleSpring Boot + Swagger- Understanding the various Swagger AnnotationsSpring Boot Main MenuSpring Boot Interview Questions
Sours: https://www.javainuse.com
  1. Jeep bloxburg
  2. Dermatologist uw
  3. Creed colonia

Aggregate

correlationExpression

Required The expression used to calculate the correlation key to use for aggregation. The Exchange which has the same correlation key is aggregated together. If the correlation key could not be evaluated an Exception is thrown. You can disable this by using the ignoreBadCorrelationKeys option.

ExpressionSubElementDefinition

completionPredicate

A Predicate to indicate when an aggregated exchange is complete. If this is not specified and the AggregationStrategy object implements Predicate, the aggregationStrategy object will be used as the completionPredicate.

ExpressionSubElementDefinition

completionTimeoutExpression

Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval, only one of the two can be used. By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option to configure how frequently to run the checker. The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. It is not recommended to use very low timeout values or checker intervals.

ExpressionSubElementDefinition

completionSizeExpression

Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.

ExpressionSubElementDefinition

optimisticLockRetryPolicy

Allows to configure retry settings when using optimistic locking.

OptimisticLockRetryPolicyDefinition

parallelProcessing

When aggregated are completed they are being send out of the aggregator. This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.

Boolean

optimisticLocking

Turns on using optimistic locking, which requires the aggregationRepository being used, is supporting this by implementing org.apache.camel.spi.OptimisticLockingAggregationRepository .

Boolean

executorServiceRef

If using parallelProcessing you can specify a custom thread pool to be used. In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.

String

timeoutCheckerExecutorServiceRef

If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a background thread is created to check for the completion for every aggregator. Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.

String

aggregationRepositoryRef

Sets the custom aggregate repository to use. Will by default use org.apache.camel.processor.aggregate.MemoryAggregationRepository.

String

strategyRef

A reference to lookup the AggregationStrategy in the Registry. The value can either refer to a bean to lookup, or to lookup a singleton bean by its type, or to create a new bean: Lookup bean - This is the default behavior to lookup an existing bean by the bean id (value) reference by type - Values can refer to singleton beans by their type in the registry by prefixing with #type: syntax, eg #type:com.foo.MyClassType reference new class - Values can refer to creating new beans by their class name by prefixing with #class, eg #class:com.foo.MyClassType. The class is created using a default no-arg constructor, however if you need to create the instance via a factory method then you specify the method as shown: #class:com.foo.MyClassType#myFactoryMethod. And if the factory method requires parameters they can be specified as follows: #class:com.foo.MyClassType#myFactoryMethod('Hello World', 5, true). Or if you need to create the instance via constructor parameters then you can specify the parameters as shown: #class:com.foo.MyClass('Hello World', 5, true). Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. At first call the oldExchange parameter is null. On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.

String

strategyMethodName

This option can be used to explicit declare the method name to use, when using beans as the AggregationStrategy.

String

strategyMethodAllowNull

If this option is false then the aggregate method is not used for the very first aggregation. If this option is true then null values is used as the oldExchange (at the very first aggregation), when using beans as the AggregationStrategy.

Boolean

completionSize

Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.

Integer

completionInterval

A repeating period in millis by which the aggregator will complete all current aggregated exchanges. Camel has a background task which is triggered every period. You cannot use this option together with completionTimeout, only one of them can be used.

String

completionTimeout

Time in millis that an aggregated exchange should be inactive before its complete (timeout). This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval, only one of the two can be used. By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option to configure how frequently to run the checker. The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. It is not recommended to use very low timeout values or checker intervals.

String

completionTimeoutCheckerInterval

Interval in millis that is used by the background task that checks for timeouts ( org.apache.camel.TimeoutMap ). By default the timeout checker runs every second. The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. It is not recommended to use very low timeout values or checker intervals.

1s

String

completionFromBatchConsumer

Enables the batch completion mode where we aggregate from a org.apache.camel.BatchConsumer and aggregate the total number of exchanges the org.apache.camel.BatchConsumer has reported as total by checking the exchange property org.apache.camel.Exchange#BATCH_COMPLETE when its complete. This option cannot be used together with discardOnAggregationFailure.

Boolean

completionOnNewCorrelationGroup

Enables completion on all previous groups when a new incoming correlation group. This can for example be used to complete groups with same correlation keys when they are in consecutive order. Notice when this is enabled then only 1 correlation group can be in progress as when a new correlation group starts, then the previous groups is forced completed.

Boolean

eagerCheckCompletion

Use eager completion checking which means that the completionPredicate will use the incoming Exchange. As opposed to without eager completion checking the completionPredicate will use the aggregated Exchange.

Boolean

ignoreInvalidCorrelationKeys

If a correlation key cannot be successfully evaluated it will be ignored by logging a DEBUG and then just ignore the incoming Exchange.

Boolean

closeCorrelationKeyOnCompletion

Closes a correlation key when its complete. Any late received exchanges which has a correlation key that has been closed, it will be defined and a ClosedCorrelationKeyException is thrown.

Integer

discardOnCompletionTimeout

Discards the aggregated message on completion timeout. This means on timeout the aggregated message is dropped and not sent out of the aggregator.

Boolean

discardOnAggregationFailure

Discards the aggregated message when aggregation failed (an exception was thrown from AggregationStrategy . This means the partly aggregated message is dropped and not sent out of the aggregator. This option cannot be used together with completionFromBatchConsumer.

Boolean

forceCompletionOnStop

Indicates to complete all current aggregated exchanges when the context is stopped.

Boolean

completeAllOnStop

Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped. This also means that we will wait for all pending exchanges which are stored in the aggregation repository to complete so the repository is empty before we can stop. You may want to enable this when using the memory based aggregation repository that is memory based only, and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete all those exchanges before its stopped, when stopping CamelContext or the route using it.

Boolean

aggregateControllerRef

To use a org.apache.camel.processor.aggregate.AggregateController to allow external sources to control this aggregator.

String

description

Sets the description of this node.

DescriptionDefinition

Sours: https://camel.apache.org/components/3.12.x/eips/aggregate-eip.html
Enterprise Integration Patterns (EIP) using Apache Camel - What is Apache Camel and how to use it?
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.*/packageorg.apache.camel.processor.aggregate;importorg.apache.camel.Exchange;/** * A strategy for aggregating two exchanges together into a single exchange. * <p/> * On the first invocation of the {@link #aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange) aggregate} * method the <tt>oldExchange</tt> parameter is <tt>null</tt>. The reason is that we have not aggregated anything yet. * So its only the <tt>newExchange</tt> that has a value. Usually you just return the <tt>newExchange</tt> in this * situation. But you still have the power to decide what to do, for example you can do some alternation on the exchange * or remove some headers. And a more common use case is for instance to count some values from the body payload. That * could be to sum up a total amount etc. * <p/> * Note that <tt>oldExchange</tt> may be <tt>null</tt> more than once when this strategy is throwing a {@link java.lang.RuntimeException} * and <tt>parallelProcessing</tt> is used. You can work around this behavior using the <tt>stopOnAggregateException</tt> option. * <p/> * It is possible that <tt>newExchange</tt> is <tt>null</tt> which could happen if there was no data possible * to acquire. Such as when using a {@link org.apache.camel.processor.PollEnricher} to poll from a JMS queue which * is empty and a timeout was set. * <p/> * Possible implementations include performing some kind of combining or delta processing, such as adding line items * together into an invoice or just using the newest exchange and removing old exchanges such as for state tracking or * market data prices; where old values are of little use. * <p/> * If an implementation also implements {@link org.apache.camel.Service} then any <ahref="http://camel.apache.org/eip">EIP</a> * that allowing configuring a {@link AggregationStrategy} will invoke the {@link org.apache.camel.Service#start()} * and {@link org.apache.camel.Service#stop()} to control the lifecycle aligned with the EIP itself. * * @version*/publicinterfaceAggregationStrategy {// TODO: In Camel 3.0 we should move this to org.apache.camel package/** * Aggregates an old and new exchange together to create a single combined exchange * * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) * @param newExchange the newest exchange (can be <tt>null</tt> if there was no data possible to acquire) * @return a combined composite of the two exchanges, favor returning the <tt>oldExchange</tt> whenever possible*/Exchangeaggregate(ExchangeoldExchange, ExchangenewExchange);}
Sours: https://github.com/Talend/apache-camel/blob/master/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java

Aggregate camel

Aggregator

This applies for Camel version 2.2 or older. If you use a newer version then the Aggregator has been rewritten from Camel 2.3 onwards and you should use this Aggregator2 link instead.

The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.

A correlation Expression is used to determine the messages which should be aggregated together. If you want to aggregate all messages into a single message, just use a constant expression. An AggregationStrategy is used to combine all the message exchanges for a single correlation key into a single message exchange. The default strategy just chooses the latest message; so its ideal for throttling messages.

For example, imagine a stock market data system; you are receiving 30,000 messages per second; you may want to throttle down the updates as, say, a GUI cannot cope with such massive update rates. So you may want to aggregate these messages together so that within a window (defined by a maximum number of messages or a timeout), messages for the same stock are aggregated together; by just choosing the latest message and discarding the older prices. (You could apply a delta processing algorithm if you prefer to capture some of the history).

Using the aggregator correctly
Torsten Mielke wrote a nice blog entry with his thoughts and experience on using the aggreagator. Its a well worth read.
AggregationStrategy changed in Camel 2.0
In Camel 2.0 the callback have been changed to also be invoked on the very first Exchange.

On the first invocation of the method the parameter is . The reason is that we have not aggregated anything yet.
So its only the that has a value. Usually you just return the in this situation. But you still have the power to decide what to do, for example you can do some alternation on the exchange or remove some headers. And a more common use case is for instance to count some values from the body payload. That could be to sum up a total amount etc.

BatchTimeout and CompletionPredicate
You cannot use both batchTimeout and completionPredicate to trigger a completion based on either on reaching its goal first. The batch timeout will always trigger first, at that given interval.

Using the Fluent Builders

The following example shows how to aggregate messages so that only the latest message for a specific value of the cheese header are sent.

from("direct:start").aggregate(header("cheese")).to("mock:result"); from("seda:header").setHeader("visited", constant(true)).aggregate(header("cheese")).to("mock:result"); from("direct:predicate").aggregate(header("cheese"), new MyAggregationStrategy()). completionPredicate(header("aggregated").isEqualTo(5)).to("mock:result"); from("direct:outBatchPredicate").aggregate(header("cheese"), new MyAggregationStrategy()). completionPredicate(header("aggregated").isEqualTo(5)).outBatchSize(10).to("mock:result");

If you were using JMS then you may wish to use the JMSDestination header as the correlation key; or some custom header for the stock symbol (using the above stock market example).

from("activemq:someReallyFastTopic").aggregator(header("JMSDestination")).to("activemq:someSlowTopicForGuis");

You can of course use many different Expression languages such as XPath, XQuery, SQL or various Scripting Languages.
Here is an example using XPath:

from("seda:start").aggregate().xpath("/stockQuote/@symbol", String.class).batchSize(5).to("mock:result"); from("seda:start").aggregate().xpath("name(/stockQuote[@symbol='APACHE'])", String.class).batchSize(5).to("mock:result");

For further examples of this pattern in use you could look at the junit test case

Using the Spring XML Extensions

The correlationExpression element is in Camel 2.0. For earliler versions of Camel you will need to specify your expression without the enclosing correlationExpression element.
<aggregator> <simple>header.cheese</simple> <to uri="mock:result"/> </aggregator>

The following example shows how to create a simple aggregator using the XML notation; using an Expression for the correlation value used to aggregate messages together.

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"><route><from uri="direct:start"/><aggregate><correlationExpression><simple>header.cheese</simple></correlationExpression><to uri="mock:result"/></aggregate></route><route><from uri="seda:header"/><process ref="setHeaderProcessor"/><to uri="direct:temp"/></route><route><from uri="direct:temp"/><aggregate><correlationExpression><simple>header.cheese</simple></correlationExpression><to uri="mock:result"/></aggregate></route><route><from uri="direct:predicate"/><aggregate strategyRef="myAggregatorStrategy"><correlationExpression><simple>header.cheese</simple></correlationExpression><to uri="mock:result"/><completionPredicate><method bean="myAggregatorStrategy" method="isCompleted"/></completionPredicate></aggregate></route><route><from uri="direct:outBatchPredicate"/><aggregate strategyRef="myAggregatorStrategy" outBatchSize="10"><correlationExpression><simple>header.cheese</simple></correlationExpression><to uri="mock:result"/><completionPredicate><method bean="myAggregatorStrategy" method="isCompleted"/></completionPredicate></aggregate></route> <!-- This route turns off in batching by setting batchSize to 1 to run unit test for out batching. Normal use cases may not want to disable in batching --> <route><from uri="direct:outBatchNoInBatching"/><aggregate strategyRef="myAggregatorStrategy" batchSize="1" outBatchSize="10"><correlationExpression><simple>header.cheese</simple></correlationExpression><to uri="mock:result"/><completionPredicate><method bean="myAggregatorStrategy" method="isCompleted"/></completionPredicate></aggregate></route></camelContext>

You can specify your own AggregationStrategy if you prefer as shown in the following example

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"><route><from uri="direct:start"/><aggregate strategyRef="aggregatorStrategy"><correlationExpression><simple>header.cheese</simple></correlationExpression><to uri="mock:result"/></aggregate></route></camelContext><bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.MyAggregator"/>

Notice how the strategyRef attribute is used on the <aggregator> element to refer to the custom strategy in Spring.

Exchange Properties

The following properties is set on each Exchange that are aggregated:

header type description
int Camel 1.x: The total number of Exchanges aggregated in this combined Exchange.
int Camel 2.0: The total number of Exchanges aggregated into this combined Exchange.
int Camel 2.0: The current index of this Exchange in the batch.

Batch options

The aggregator supports the following batch options:

Option Default Description
batchSize 100 The in batch size. This is the number of incoming exchanges that is processed by the aggregator and when this threshold is reached the batch is completed and send. Camel 1.6.2/2.0: You can disable the batch size so the Aggregator is only triggered by timeout by setting the to 0 (or negative). In Camel 1.6.1 or older you can set the to a very large number to archive the same.
outBatchSize 0 Camel 1.5: The out batch size. This is the number of exchanges currently aggregated in the . When this threshold is reached the batch is completed and send. By default this option is disabled. The difference to the options is that this is for outgoing, so setting this size to e.g. 50 ensures that this batch will at maximum contain 50 exchanges when its sent.
batchTimeout 1000L Timeout in millis. How long should the aggregator wait before its completed and sends whatever it has currently aggregated.
groupExchanges false Camel 2.0: If enabled then Camel will group all aggregated Exchanges into a single combined holder class that holds all the aggregated Exchanges. And as a result only one Exchange is being sent out from the aggregator. Can be used to combine many incomming Exchanges into a single output Exchange without coding a custom AggregationStrategy yourself.
batchConsumer false Camel 2.0: This option is if the exchanges is coming from a Batch Consumer. Then when enabled the Aggregator will use the batch size determined by the Batch Consumer in the message header . See more details at Batch Consumer. This can be used to aggregate all files consumed from a File endpoint in that given poll.
completionPredicate null Allows you to use a Predicate to signal when an aggregation is complete. See warning in top of this page.

AggregationCollection and AggregationStrategy

This aggregator uses a AggregationCollection to store the exchanges that is currently aggregated. The AggregationCollection uses a correlation Expression and an AggregationStrategy.

  • The correlation Expression is used to correlate the incoming exchanges. The default implementation will group messages based on the correlation expression. Other implementations could for instance just add all exchanges as a batch.
  • The strategy is used for aggregate the old (lookup by its correlation id) and the new exchanges together into a single exchange. Possible implementations include performing some kind of combining or delta processing, such as adding line items together into an invoice or just using the newest exchange and removing old exchanges such as for state tracking or market data prices; where old values are of little use.

Camel provides these implementations:

  • DefaultAggregationCollection
  • PredicateAggregationCollection
  • UseLatestAggregationStrategy

Examples

Default example

By default Camel uses and , so this simple example will just keep the latest received exchange for the given correlation Expression:

from("direct:start") .aggregate().header("id") .batchTimeout(500L) .to("mock:result");

Using PredicateAggregationCollection

The PredicateAggregationCollection is an extension to DefaultAggregationCollection that uses a Predicate as well to determine the completion. For instance the Predicate can test for a special header value, a number of maximum aggregated so far etc. To use this the routing is a bit more complex as we need to create our AggregationCollection object as follows:

AggregationCollection ag = new PredicateAggregationCollection(header("id"), new UseLatestAggregationStrategy(), property(Exchange.AGGREGATED_SIZE).isEqualTo(3)); from("direct:start") .aggregate(ag) .batchTimeout(500L) .to("mock:result");

In this sample we use the predicate that we want at most 3 exchanges aggregated by the same correlation id, this is defined as:

header(Exchange.AGGREGATED_COUNT).isEqualTo(3)

Using this the aggregator will complete if we receive 3 exchanges with the same correlation id or when the specified timeout of 500 msecs has elapsed (whichever criteria is met first).

Using custom aggregation strategy

In this example we will aggregate incoming bids and want to aggregate the highest bid. So we provide our own strategy where we implement the code logic:

privatestatic class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } int oldPrice = oldExchange.getIn().getBody(Integer.class); int newPrice = newExchange.getIn().getBody(Integer.class); return newPrice > oldPrice ? newExchange : oldExchange; } }

Then we setup the routing as follows:

from("direct:start") .aggregate(new MyAggregationStrategy()).header("id") .batchTimeout(500L) .to("mock:result");

And since this is based on an unit test we show the test code that send the bids and what is expected as the winners:

MockEndpoint result = getMockEndpoint("mock:result"); result.expectedMessageCount(2); result.expectedBodiesReceived("200", "150"); template.sendBodyAndHeader("direct:start", "100", "id", "1"); template.sendBodyAndHeader("direct:start", "150", "id", "2"); template.sendBodyAndHeader("direct:start", "130", "id", "2"); template.sendBodyAndHeader("direct:start", "200", "id", "1"); template.sendBodyAndHeader("direct:start", "190", "id", "1"); assertMockEndpointsSatisfied();

Using custom aggregation collection

In this example we will aggregate incoming bids and want to aggregate the bids in reverse order (this is just an example). So we provide our own collection where we implement the code logic:

class MyReverseAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection { private List<Exchange> collection = new ArrayList<Exchange>(); private Expression correlation; private AggregationStrategy strategy; public Expression getCorrelationExpression() { return correlation; } public void setCorrelationExpression(Expression correlationExpression) { this.correlation = correlationExpression; } public AggregationStrategy getAggregationStrategy() { return strategy; } public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.strategy = aggregationStrategy; } publicboolean add(Exchange exchange) { return collection.add(exchange); } public Iterator<Exchange> iterator() { Collections.reverse(collection); return collection.iterator(); } publicint size() { return collection.size(); } public void clear() { collection.clear(); } public void onAggregation(Object correlationKey, Exchange newExchange) { add(newExchange); } }

Then we setup the routing as follows:

from("direct:start") .aggregate(new MyReverseAggregationCollection()) .batchTimeout(500L) .to("mock:result");

And since this is based on an unit test we show the test code that send the bids and what is expected as the expected reverse order:

MockEndpoint result = getMockEndpoint("mock:result"); result.expectedMessageCount(5); result.expectedBodiesReceived("190", "200", "130", "150", "100"); template.sendBodyAndHeader("direct:start", "100", "id", "1"); template.sendBodyAndHeader("direct:start", "150", "id", "2"); template.sendBodyAndHeader("direct:start", "130", "id", "2"); template.sendBodyAndHeader("direct:start", "200", "id", "1"); template.sendBodyAndHeader("direct:start", "190", "id", "1"); assertMockEndpointsSatisfied();
Custom aggregation collection in Spring DSL

You can also specify a custom aggregation collection in the Spring DSL. Here is an example for Camel 2.0

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"><route><from uri="direct:start"/><aggregate batchTimeout="500" collectionRef="aggregatorCollection"><to uri="mock:result"/></aggregate></route></camelContext><bean id="aggregatorCollection" class="org.apache.camel.processor.aggregator.MyReverseAggregationCollection"/>

In Camel 1.5.1 you will need to specify the aggregator as

<aggregator batchTimeout="500" collectionRef="aggregatorCollection"> <expression/> <to uri="mock:result"/> </aggregator>

Using Grouped Exchanges

Available as of Camel 2.0

You can enable grouped exchanges to combine all aggregated exchanges into a single holder class that contains all the individual aggregated exchanges. This allows you to process a single Exchange containing all the aggregated exchange. Lets start with how to configure this in the router:

from("direct:start") .aggregate().constant(true) .batchTimeout(500L) .groupExchanges() .to("mock:result");

And the next part is part of an unit code that demonstrates this feature as we send in 5 exchanges each with a different value in the body.
And we will only get 1 exchange out of the aggregator, but we can access all the individual aggregated exchanges from the List which we can extract as a property from the Exchange using the key .

MockEndpoint result = getMockEndpoint("mock:result"); result.expectedMessageCount(1); template.sendBody("direct:start", "100"); template.sendBody("direct:start", "150"); template.sendBody("direct:start", "130"); template.sendBody("direct:start", "200"); template.sendBody("direct:start", "190"); assertMockEndpointsSatisfied(); Exchange out = result.getExchanges().get(0); List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); assertEquals(5, grouped.size()); assertEquals("100", grouped.get(0).getIn().getBody(String.class)); assertEquals("150", grouped.get(1).getIn().getBody(String.class)); assertEquals("130", grouped.get(2).getIn().getBody(String.class)); assertEquals("200", grouped.get(3).getIn().getBody(String.class)); assertEquals("190", grouped.get(4).getIn().getBody(String.class));

Using Batch Consumer

Available as of Camel 2.0

The Aggregator can work together with the Batch Consumer to aggregate the total number of messages that the Batch Consumer have reported. This allows you for instance to aggregate all files polled using the File consumer.

For example:

from("file:) .aggregate(xpath("), new AggregateCustomerOrderStrategy()).batchConsumer().batchTimeout(60000).to("bean:processOrder");

When using Camel will automatic adjust the batchSize according to reported by the Batch Consumer in this case the file consumer.
So if we poll in 7 files then the aggregator will aggregate all 7 files before it completes. As the timeout is still in play we set it to 60 seconds.

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

See also

Sours: https://people.apache.org/~dkulp/camel/aggregator.html
Conhecendo o Apache Camel

I haven't changed my mind yet. Dima asked. - No, I answered and got out. - Well then, go and wash and come to the pine tree, he said, and grabbing Lena by the arm, they began to leave.

You will also be interested:

" Sasha: I am early from work today. So we decided to pick you up. You're closing now, aren't you. " Olya: "Yes, let's go now. " Sasha threw a suspicious glance at Yura, and Yura got up from the table and greeted Sasha, and they.



1002 1003 1004 1005 1006