Entaxy Docs

Infinispan

Since Camel 2.13

Both producer and consumer are supported

This component allows you to interact with Infinispan distributed data grid / cache. Infinispan is an extremely scalable, highly available key/value data store and data grid platform written in Java.

Infinispan requires at least Java 8.

The camel-infinispan component includes the following features:

  • Local Camel Consumer - Receives cache change notifications and sends them to be processed. This can be done synchronously or asynchronously, and is also supported with a replicated or distributed cache.

  • Local Camel Producer - A producer creates and sends messages to an endpoint. The camel-infinispan producer uses GET, PUT, REMOVE, and CLEAR operations. The local producer is also supported with a replicated or distributed cache.

  • Remote Camel Producer - In Remote Client-Server mode, the Camel producer can send messages using Hot Rod.

  • Remote Camel Consumer - In Client-Server mode, receives cache change notifications and sends them to be processed. The events are processed asynchronously.

If you use Maven, you must add the following dependency to your pom.xml:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-infinispan</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI format

infinispan://cacheName?[options]

URI Options

The producer allows sending messages to a local infinispan cache configured in the registry, or to a remote cache using the HotRod protocol. The consumer allows listening for events from local infinispan cache accessible from the registry.

If no cache configuration is provided, embedded cacheContainer is created directly in the component.

The Infinispan component supports 19 options, which are listed below.

Name Description Default Type

configuration (common)

Component configuration

InfinispanConfiguration

hosts (common)

Specifies the host of the cache on Infinispan instance

String

queryBuilder (common)

Specifies the query builder.

InfinispanQueryBuilder

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

clusteredListener (consumer)

If true, the listener will be installed for the entire cluster

false

boolean

command (consumer)

Deprecated The operation to perform.

PUT

String

customListener (consumer)

Returns the custom listener in use, if provided

InfinispanCustomListener

eventTypes (consumer)

Specifies the set of event types to register by the consumer. Multiple event can be separated by comma. The possible event types are: CACHE_ENTRY_ACTIVATED, CACHE_ENTRY_PASSIVATED, CACHE_ENTRY_VISITED, CACHE_ENTRY_LOADED, CACHE_ENTRY_EVICTED, CACHE_ENTRY_CREATED, CACHE_ENTRY_REMOVED, CACHE_ENTRY_MODIFIED, TRANSACTION_COMPLETED, TRANSACTION_REGISTERED, CACHE_ENTRY_INVALIDATED, DATA_REHASHED, TOPOLOGY_CHANGED, PARTITION_STATUS_CHANGED

String

sync (consumer)

If true, the consumer will receive notifications synchronously

true

boolean

lazyStartProducer (producer)

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

operation (producer)

The operation to perform. The value can be one of: PUT, PUTASYNC, PUTALL, PUTALLASYNC, PUTIFABSENT, PUTIFABSENTASYNC, GET, GETORDEFAULT, CONTAINSKEY, CONTAINSVALUE, REMOVE, REMOVEASYNC, REPLACE, REPLACEASYNC, SIZE, CLEAR, CLEARASYNC, QUERY, STATS, COMPUTE, COMPUTEASYNC

PUT

InfinispanOperation

basicPropertyBinding (advanced)

Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities

false

boolean

cacheContainer (advanced)

Specifies the cache Container to connect

BasicCacheContainer

cacheContainerConfiguration (advanced)

The CacheContainer configuration. Uses if the cacheContainer is not defined. Must be the following types: org.infinispan.client.hotrod.configuration.Configuration - for remote cache interaction configuration; org.infinispan.configuration.cache.Configuration - for embedded cache interaction configuration;

Object

configurationProperties (advanced)

Implementation specific properties for the CacheManager

Map

configurationUri (advanced)

An implementation specific URI for the CacheManager

String

flags (advanced)

A comma separated list of Flag to be applied by default on each cache invocation, not applicable to remote caches.

String

remappingFunction (advanced)

Set a specific remappingFunction to use in a compute operation

BiFunction

resultHeader (advanced)

Store the operation result in a header instead of the message body. By default, resultHeader == null and the query result is stored in the message body, any existing content in the message body is discarded. If resultHeader is set, the value is used as the name of the header to store the query result and the original message body is preserved. This value can be overridden by an in message header named: CamelInfinispanOperationResultHeader

Object

The Infinispan endpoint is configured using URI syntax:

infinispan:cacheName

with the following path and query parameters:

Path Parameters (1 parameters):

Name Description Default Type

cacheName

Required The name of the cache to use. Use current to use the existing cache name from the currently configured cached manager. Or use default for the default cache manager name.

String

Query Parameters (21 parameters):

Name Description Default Type

hosts (common)

Specifies the host of the cache on Infinispan instance

String

queryBuilder (common)

Specifies the query builder.

InfinispanQueryBuilder

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

clusteredListener (consumer)

If true, the listener will be installed for the entire cluster

false

boolean

command (consumer)

Deprecated The operation to perform.

PUT

String

customListener (consumer)

Returns the custom listener in use, if provided

InfinispanCustomListener

eventTypes (consumer)

Specifies the set of event types to register by the consumer. Multiple event can be separated by comma. The possible event types are: CACHE_ENTRY_ACTIVATED, CACHE_ENTRY_PASSIVATED, CACHE_ENTRY_VISITED, CACHE_ENTRY_LOADED, CACHE_ENTRY_EVICTED, CACHE_ENTRY_CREATED, CACHE_ENTRY_REMOVED, CACHE_ENTRY_MODIFIED, TRANSACTION_COMPLETED, TRANSACTION_REGISTERED, CACHE_ENTRY_INVALIDATED, DATA_REHASHED, TOPOLOGY_CHANGED, PARTITION_STATUS_CHANGED

String

sync (consumer)

If true, the consumer will receive notifications synchronously

true

boolean

exceptionHandler (consumer)

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

ExceptionHandler

exchangePattern (consumer)

Sets the exchange pattern when the consumer creates an exchange. The value can be one of: InOnly, InOut, InOptionalOut

ExchangePattern

lazyStartProducer (producer)

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

operation (producer)

The operation to perform. The value can be one of: PUT, PUTASYNC, PUTALL, PUTALLASYNC, PUTIFABSENT, PUTIFABSENTASYNC, GET, GETORDEFAULT, CONTAINSKEY, CONTAINSVALUE, REMOVE, REMOVEASYNC, REPLACE, REPLACEASYNC, SIZE, CLEAR, CLEARASYNC, QUERY, STATS, COMPUTE, COMPUTEASYNC

PUT

InfinispanOperation

basicPropertyBinding (advanced)

Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities

false

boolean

cacheContainer (advanced)

Specifies the cache Container to connect

BasicCacheContainer

cacheContainerConfiguration (advanced)

The CacheContainer configuration. Uses if the cacheContainer is not defined. Must be the following types: org.infinispan.client.hotrod.configuration.Configuration - for remote cache interaction configuration; org.infinispan.configuration.cache.Configuration - for embedded cache interaction configuration;

Object

configurationProperties (advanced)

Implementation specific properties for the CacheManager

Map

configurationUri (advanced)

An implementation specific URI for the CacheManager

String

flags (advanced)

A comma separated list of Flag to be applied by default on each cache invocation, not applicable to remote caches.

String

remappingFunction (advanced)

Set a specific remappingFunction to use in a compute operation

BiFunction

resultHeader (advanced)

Store the operation result in a header instead of the message body. By default, resultHeader == null and the query result is stored in the message body, any existing content in the message body is discarded. If resultHeader is set, the value is used as the name of the header to store the query result and the original message body is preserved. This value can be overridden by an in message header named: CamelInfinispanOperationResultHeader

Object

synchronous (advanced)

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

boolean

Camel Operations

This section lists all available operations, along with their header information.

Table 1. Put Operations
Operation Name Description

InfinispanOperation.PUT

Context: Embedded / Remote

Description: Puts a key/value pair in the cache, optionally with expiration

Required Headers: CamelInfinispanKey, CamelInfinispanValue

Optional Headers: CamelInfinispanLifespanTime, CamelInfinispanLifespanTimeUnit, CamelInfinispanMaxIdleTime, CamelInfinispanMaxIdleTimeUnit, CamelInfinispanIgnoreReturnValues

Result Header: CamelInfinispanOperationResult

InfinispanOperation.PUTASYNC

Description: Asynchronously puts a key/value pair in the cache, optionally with expiration

InfinispanOperation.PUTIFABSENT

Description: Puts a key/value pair in the cache if it did not exist, optionally with expiration

InfinispanOperation.PUTIFABSENTASYNC

Description: Asynchronously puts a key/value pair in the cache if it did not exist, optionally with expiration

Table 2. Put All Operations
Operation Name Description

InfinispanOperation.PUTALL

Context: Embedded / Remote

Description: Adds multiple entries to a cache, optionally with expiration

Required Headers: CamelInfinispanMap

Optional Headers: CamelInfinispanLifespanTime, CamelInfinispanLifespanTimeUnit, CamelInfinispanMaxIdleTime, CamelInfinispanMaxIdleTimeUnit

Result Header: None

CamelInfinispanOperation.PUTALLASYNC

Description: Asynchronously adds multiple entries to a cache, optionally with expiration

Table 3. Get Operations
Operation Name Description

InfinispanOperation.GET

Context: Embedded / Remote

Description: Retrieves the value associated with a specific key from the cache

Required Headers: CamelInfinispanKey

Optional Headers: None

Result Header: None

InfinispanOperation.GETORDEFAULT

Context: Embedded / Remote

Description: Retrieves the value, or default value, associated with a specific key from the cache

Required Headers: CamelInfinispanKey

Optional Headers: None

Result Header: None

Table 4. Contains Key Operation
Operation Name Description

InfinispanOperation.CONTAINSKEY

Context: Embedded / Remote

Description: Determines whether a cache contains a specific key

Required Headers: CamelInfinispanKey

Optional Headers: None

Result Header: CamelInfinispanOperationResult

Table 5. Contains Value Operation
Operation Name Description

InfinispanOperation.CONTAINSVALUE

Context: Embedded / Remote

Description: Determines whether a cache contains a specific value

Required Headers: CamelInfinispanKey

Optional Headers: None

Result Headers: None

Table 6. Remove Operations
Operation Name Description

InfinispanOperation.REMOVE

Context: Embedded / Remote

Description: Removes an entry from a cache, optionally only if the value matches a given one

Required Headers: CamelInfinispanKey

Optional Headers: CamelInfinispanValue

Result Header: CamelInfinispanOperationResult

InfinispanOperation.REMOVEASYNC

Description: Asynchronously removes an entry from a cache, optionally only if the value matches a given one

Table 7. Replace Operations
Operation Name Description

InfinispanOperation.REPLACE

Context: Embedded / Remote

Description: Conditionally replaces an entry in the cache, optionally with expiration

Required Headers: CamelInfinispanKey, CamelInfinispanValue, CamelInfinispanOldValue

Optional Headers: CamelInfinispanLifespanTime, CamelInfinispanLifespanTimeUnit, CamelInfinispanMaxIdleTime, CamelInfinispanMaxIdleTimeUnit, CamelInfinispanIgnoreReturnValues

Result Header: CamelInfinispanOperationResult

InfinispanOperation.REPLACEASYNC

Description: Asynchronously conditionally replaces an entry in the cache, optionally with expiration

Table 8. Clear Operations
Operation Name Description

InfinispanOperation.CLEAR

Context: Embedded / Remote

Description: Clears the cache

Required Headers: None

Optional Headers: None

Result Header: None

InfinispanOperation.CLEARASYNC

Context: Embedded / Remote

Description: Asynchronously clears the cache

Required Headers: None

Optional Headers: None

Result Header: None

Table 9. Size Operation
Operation Name Description

InfinispanOperation.SIZE

Context: Embedded / Remote

Description: Returns the number of entries in the cache

Required Headers: None

Optional Headers: None

Result Header: CamelInfinispanOperationResult

Table 10. Stats Operation
Operation Name Description

InfinispanOperation.STATS

Context: Embedded / Remote

Description: Returns statistics about the cache

Required Headers: None

Optional Headers: None

Result Header: CamelInfinispanOperationResult

Table 11. Query Operation
Operation Name Description

InfinispanOperation.QUERY

Context: Remote

Description: Executes a query on the cache

Required Headers: CamelInfinispanQueryBuilder

Optional Headers: None

Result Header: CamelInfinispanOperationResult

Any operations that take CamelInfinispanIgnoreReturnValues will receive a null result.

Message Headers

Name Default Value Type Context Description

CamelInfinispanCacheName

null

String

Shared

The cache participating in the operation or event.

CamelInfinispanOperation

PUT

InfinispanOperation

Producer

The operation to perform.

CamelInfinispanMap

null

Map

Producer

A Map to use in case of CamelInfinispanOperationPutAll operation

CamelInfinispanKey

null

Object

Shared

The key to perform the operation to or the key generating the event.

CamelInfinispanValue

null

Object

Producer

The value to use for the operation.

CamelInfinispanEventType

null

String

Consumer

The type of the received event. Possible values defined here org.infinispan.notifications.cachelistener.event.Event.Type

CamelInfinispanIsPre

null

Boolean

Consumer

Infinispan fires two events for each operation: one before and one after the operation.

CamelInfinispanLifespanTime

null

long

Producer

The Lifespan time of a value inside the cache. Negative values are interpreted as infinity.

CamelInfinispanTimeUnit

null

String

Producer

The Time Unit of an entry Lifespan Time.

CamelInfinispanMaxIdleTime

null

long

Producer

The maximum amount of time an entry is allowed to be idle for before it is considered as expired.

CamelInfinispanMaxIdleTimeUnit

null

String

Producer

The Time Unit of an entry Max Idle Time.

CamelInfinispanQueryBuilder

null

InfinispanQueryBuilder

Producer

The QueryBuilde to use for QUERY command, if not present the command defaults to InifinispanConfiguration’s one

CamelInfinispanIgnoreReturnValues

null

Boolean

Producer

If this header is set, the return value for cache operation returning something is ignored by the client application

CamelInfinispanOperationResultHeader

null

String

Producer

Store the operation result in a header instead of the message body

Examples

  • Retrieve a specific key from the default cache using a custom cache container:

from("direct:start")
    .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.GET)
    .setHeader(InfinispanConstants.KEY).constant("123")
    .to("infinispan?cacheContainer=#cacheContainer");
  • Put a key/value into a named cache:

    from("direct:start")
        .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.PUT)
        .setHeader(InfinispanConstants.KEY).constant("123")
        .to("infinispan:myCacheName");
  • Put a value with lifespan

from("direct:start")
    .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.GET)
    .setHeader(InfinispanConstants.KEY).constant("123")
    .setHeader(InfinispanConstants.LIFESPAN_TIME).constant(100L)
    .setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT.constant(TimeUnit.MILLISECONDS.toString())
    .to("infinispan:myCacheName");
  • Compute operation through a remapping function on the default cache using a custom cache container:

@BindToRegistry("mappingFunction")
BiFunction<String, String, String> comp = (k, v) -> v + "replay";

from("direct:start")
    .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.COMPUTE)
    .setHeader(InfinispanConstants.KEY).constant("123")
    .to("infinispan?cacheContainer=#cacheContainer&remappingFunction=#mappingFunction");

This will return oldValue + "replay".

This can be done also as async operation, with the InfinispanOperation.COMPUTEASYNC operation

  • Retrieve a specific key from the remote cache using a cache container configuration with additional parameters (host, port and protocol version):

org.infinispan.client.hotrod.configuration.Configuration cacheContainerConfiguration = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder()
    .addServer()
        .host("localhost")
        .port(9999)
        .version(org.infinispan.client.hotrod.ProtocolVersion.PROTOCOL_VERSION_25)
    .build();
...

from("direct:start")
    .setHeader(InfinispanConstants.OPERATION).constant(InfinispanOperation.GET)
    .setHeader(InfinispanConstants.KEY).constant("123")
    .to("infinispan?cacheContainerConfiguration=#cacheContainerConfiguration");

XML examples

Routing can also be performed using XML configuration. The following example demonstrates camel-infinispan local-camel-producer, a camel route that sends data to an embedded cache created by the local-cache module.

<camelContext id="local-producer" xmlns="http://camel.apache.org/schema/blueprint">
    <route>
        <from uri="timer://local?fixedRate=true&amp;period=5000"/>
        <setHeader headerName="CamelInfinispanKey">
            <constant>CamelTimerCounter</constant>
        </setHeader>
        <setHeader headerName="CamelInfinispanValue">
            <constant>CamelTimerCounter</constant>
        </setHeader>
        <to uri="infinispan://foo?cacheContainer=#cacheManager"/>
        <to uri="log:local-put?showAll=true"/>
    </route>
</camelContext>

The provided example requires you to instantiate the cacheManager.

You can instantiate the cacheManager bean for Spring XML as follows:

<bean id="cacheManager" class="org.infinispan.manager.DefaultCacheManager" init-method="start" destroy-method="stop">
    <constructor-arg type="java.lang.String" value="infinispan.xml"/>
</bean>

The following demonstrates how to instantiate the cacheManager bean using Blueprint XML.

<bean id="cacheManager" class="org.infinispan.manager.DefaultCacheManager" init-method="start" destroy-method="stop">
    <argument value="infinispan.xml" />
</bean>

Both the Spring XML and Blueprint XML examples use the configuration file infinispan.xml for configuration of the cache. This file must be present on the classpath.

Remote Query

When executing remote queries the cacheManager must be an instance of RemoteCacheManager, and an example configuration utilizing a RemoteCacheManager is found below for both Java and blueprint.xml:

Example 1. Using only Java
from("direct:start")
    .setHeader(InfinispanConstants.OPERATION, InfinispanConstants.QUERY)
    .setHeader(InfinispanConstants.QUERY_BUILDER,
      new InfinispanQueryBuilder() {
        public Query build(QueryFactory<Query> queryFactory) {
          return queryFactory.from(User.class).having("name").like("%abc%")
                      .build();
        }
      })
    .to("infinispan://localhost?cacheContainer=#cacheManager&cacheName=remote_query_cache") ;
Example 2. Using Blueprint and Java
Java RemoteCacheManagerFactory class:
public class RemoteCacheManagerFactory {
    ConfigurationBuilder clientBuilder;
    public RemoteCacheManagerFactory(String hostname, int port) {
        clientBuilder = new ConfigurationBuilder();
        clientBuilder.addServer()
            .host(hostname).port(port);
    }
    public RemoteCacheManager newRemoteCacheManager() {
        return new RemoteCacheManager(clientBuilder.build());
    }
}
Java InfinispanQueryExample class:
public class InfinispanQueryExample {
    public InfinispanQueryBuilder getBuilder() {
        return new InfinispanQueryBuilder() {
            public Query build(QueryFactory<Query> queryFactory) {
                return queryFactory.from(User.class)
                         .having("name")
                         .like("%abc%")
                         .build();
            }
        }
    }
}
blueprint.xml:
<bean id=”remoteCacheManagerFactory” class=“com.datagrid.RemoteCacheManagerFactory”>
    <argument value=”localhost”/>
    <argument value="11222”/>
</bean>

<bean id=”cacheManager”
    factory-ref=”remoteCacheManagerFactory”
    factory-method=“newRemoteCacheManager”>
</bean>

<bean id="queryBuilder" class="org.example.com.InfinispanQueryExample"/>

<camelContext id="route" xmlns="http://camel.apache.org/schema/blueprint">
    <route>
        <from uri="direct:start"/>
            <setHeader headerName="CamelInfinispanOperation">
                <constant>CamelInfinispanOperationQuery</constant>
            </setHeader>
            <setHeader headerName="CamelInfinispanQueryBuilder">
                <method ref="queryBuilder" method="getBuilder"/>
            </setHeader>
        <to uri="infinispan://localhost?cacheContainer=#cacheManager&cacheName=remote_query_cache"/>
    </route>
</camelContext>

The remote_query_cache is an arbitrary name for a cache that holds the data, and the results of the query will be a list of domain objects stored as a CamelInfinispanOperationResult header.

In addition, there are the following requirements:

  • The RemoteCacheManager must be configured to use ProtoStreamMarshaller.

  • The ProtoStreamMarshaller must be registered with the RemoteCacheManager's serialization context.

  • The .proto descriptors for domain objects must be registered with the remote Data Grid server.

Custom Listeners for Embedded Cache

Custom Listeners for an embedded cache can be registered through the customListener parameter as shown below:

Using Java
from("infinispan://?cacheContainer=#myCustomContainer&cacheName=customCacheName&customListener=#myCustomListener")
  .to("mock:result");
Using Blueprint
<bean id="myCustomContainer" org.infinispan.manager.DefaultCacheManager"
      init-method="start" destroy-method="stop">
      <argument value="infinispan.xml" />
</bean>

<bean id="myCustomListener" class="org.example.com.CustomListener"/>

<camelContext id="route" xmlns="http://camel.apache.org/schema/blueprint">
    <route>
        <from uri="infinispan://?cacheContainer=#myCustomContainer&cacheName=customCacheName&customListener=#myCustomListener"/>
        <to uri="mock:result"/>
    </route>
</camelContext>

The instance of myCustomListener must exist. Users are encouraged to extend the org.apache.camel.component.infinispan.embedded.InfinispanEmbeddedCustomListener and annotate the resulting class with the @Listener annotation from org.infinispan.notifications .

Custom filters and converters for embedded caches are currently not supported.

Custom Listeners for Remote Cache

Custom listeners for a remote cache can be registered in the same way as an embedded cache, with the exception that sync=false must be present. For instance:

Example 3. Using only Java
from(infinispan://?cacheContainer=#cacheManager&sync=false&customListener=#myCustomListener")
  .to(mock:result);
Example 4. Using Blueprint and Java
Java class:
public class RemoteCacheManagerFactory {
    ConfigurationBuilder clientBuilder;
    public RemoteCacheManagerFactory(String hostname, int port) {
        clientBuilder = new ConfigurationBuilder();
        clientBuilder.addServer()
            .host(hostname).port(port);
    }
    public RemoteCacheManager newRemoteCacheManager() {
        return new RemoteCacheManager(clientBuilder.build());
    }
}
blueprint.xml:
<bean id=”remoteCacheManagerFactory” class=“com.datagrid.RemoteCacheManagerFactory”>
    <argument value=”localhost”/>
    <argument value="11222”/>
</bean>

<bean id=”cacheManager”
    factory-ref=”remoteCacheManagerFactory”
    factory-method=“newRemoteCacheManager”>
</bean>

<bean id="myCustomListener" class="org.example.com.CustomListener"/>

<camelContext id="route" xmlns="http://camel.apache.org/schema/blueprint">
    <route>
        <from uri="infinispan://?cacheContainer=#cacheManager&sync=false&customListener=#myCustomListener"/>
        <to uri="mock:result"/>
    </route>
</camelContext>

The instance of myCustomListener must exist. Users are encouraged to extend the org.apache.camel.component.infinispan.remote.InfinispanRemoteCustomListener class and annotate the resulting class with @ClientListener; this annotation is found in org.infinispan.client.hotrod.annotation .

Remote listeners may also be associated with custom filters and converters as shown below:

@ClientListener(includeCurrentState=true, filterFactoryName = "static-filter-factory", converterFactoryName = "static-converter-factory")
  private static class MyCustomListener extends InfinispanRemoteCustomListener {
}

In order to use custom filters or converters classes annotated with @NamedFactory must be implemented. A skeleton that implements the necessary methods is shown below:

import org.infinispan.notifications.cachelistener.filter;

@NamedFactory(name = "static-converter-factory")
public static class StaticConverterFactory implements CacheEventConverterFactory {
  @Override
  public CacheEventConverter<Integer, String, CustomEvent> getConverter(Object[] params) {
    ...
  }

  static class StaticConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
    @Override
    public CustomEvent convert(Integer key, String previousValue, Metadata previousMetadata,
                               String value, Metadata metadata, EventType eventType) {
      ...
    }
  }
}

@NamedFactory(name = "static-filter-factory")
public static class StaticCacheEventFilterFactory implements CacheEventFilterFactory {
  @Override
  public CacheEventFilter<Integer, String> getFilter(final Object[] params) {
    ...
  }

  static class StaticCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
    @Override
    public boolean accept(Integer key, String previousValue, Metadata previousMetadata,
                          String value, Metadata metadata, EventType eventType) {
      ...
    }
  }
}

Custom filters and converters must be registered with the server. .

In order to listen for remote HotRod events the cacheManager must be of type RemoteCacheManager and instantiated.

Using the Infinispan based idempotent repository

In this section we will use the Infinispan based idempotent repository.

First, we need to create a cacheManager and then configure our

org.apache.camel.component.infinispan.processor.idempotent.InfinispanIdempotentRepository:
<!-- set up the cache manager -->
<bean id="cacheManager"
      class="org.infinispan.manager.DefaultCacheManager"
      init-method="start"
      destroy-method="stop"/>

<!-- set up the repository -->
<bean id="infinispanRepo"
      class="org.apache.camel.component.infinispan.processor.idempotent.InfinispanIdempotentRepository"
      factory-method="infinispanIdempotentRepository">
    <argument ref="cacheManager"/>
    <argument value="idempotent"/>
</bean>

Then we can create our Infinispan idempotent repository in the spring XML file as well:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="JpaMessageIdRepositoryTest">
        <from uri="direct:start" />
        <idempotentConsumer messageIdRepositoryRef="infinispanStore">
            <header>messageId</header>
            <to uri="mock:result" />
        </idempotentConsumer>
    </route>
</camelContext>