Entaxy Docs

Azure Storage Queue Service

Since Camel 3.3

Both producer and consumer are supported

The Azure Storage Queue component supports storing and retrieving the messages to/from Azure Storage Queue service using Azure APIs v12. However in case of versions above v12, we will see if this component can adopt these changes depending on how much breaking changes can result.

Prerequisites

You must have a valid Windows Azure Storage account. More information is available at Azure Documentation Portal.

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-azure-storage-queue</artifactId>
    <version>x.x.x</version>
  <!-- use the same version as your Camel core version -->
</dependency>

URI Format

azure-storage-queue://accountName[/queueName][?options]

In case of consumer, accountName and queueName are required. In case of producer, it depends on the operation that being requested, for example if operation is on a service level, e.b: listQueues, only accountName is required, but in case of operation being requested on the queue level, e.g: createQueue, sendMessage.. etc, both accountName and queueName are required.

The queue will be created if it does not already exist. You can append query options to the URI in the following format, ?options=value&option2=value&…​

For example in order to get a message content from the queue messageQueue in the camelazure storage account and, use the following snippet:

from("azure-storage-queue:/camelazure/messageQueue?accountName=yourAccountName&accessKey=yourAccessKey").
to("file://queuedirectory");

URI Options

The Azure Storage Queue Service component supports 12 options, which are listed below.

Name Description Default Type

configuration (common)

The component configurations

QueueConfiguration

serviceClient (common)

Service client to a storage account to interact with the queue service. This client does not hold any state about a particular storage account but is instead a convenient way of sending off appropriate requests to the resource on the service. This client contains all the operations for interacting with a queue account in Azure Storage. Operations allowed by the client are creating, listing, and deleting queues, retrieving and updating properties of the account, and retrieving statistics of the account.

QueueServiceClient

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

lazyStartProducer (producer)

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

operation (producer)

Queue service operation hint to the producer. The value can be one of: listQueues, createQueue, deleteQueue, clearQueue, sendMessage, deleteMessage, receiveMessages, peekMessages, updateMessage

QueueOperationDefinition

basicPropertyBinding (advanced)

Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities

false

boolean

maxMessages (queue)

Maximum number of messages to get, if there are less messages exist in the queue than requested all the messages will be returned. If left empty only 1 message will be retrieved, the allowed range is 1 to 32 messages.

1

Integer

timeout (queue)

An optional timeout applied to the operation. If a response is not returned before the timeout concludes a RuntimeException will be thrown.

Duration

timeToLive (queue)

How long the message will stay alive in the queue. If unset the value will default to 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number. The format should be in this form: PnDTnHnMn.nS., e.g: PT20.345S — parses as 20.345 seconds, P2D — parses as 2 days However, in case you are using EndpointDsl/ComponentDsl, you can do something like Duration.ofSeconds() since these Java APIs are typesafe.

Duration

visibilityTimeout (queue)

The timeout period for how long the message is invisible in the queue. The timeout must be between 1 seconds and 7 days. The format should be in this form: PnDTnHnMn.nS., e.g: PT20.345S — parses as 20.345 seconds, P2D — parses as 2 days However, in case you are using EndpointDsl/ComponentDsl, you can do something like Duration.ofSeconds() since these Java APIs are typesafe.

Duration

accessKey (security)

Access key for the associated azure account name to be used for authentication with azure queue services

String

credentials (security)

StorageSharedKeyCredential can be injected to create the azure client, this holds the important authentication information

StorageSharedKeyCredential

The Azure Storage Queue Service endpoint is configured using URI syntax:

azure-storage-queue:accountName/queueName

with the following path and query parameters:

Path Parameters (2 parameters):

Name Description Default Type

accountName

Azure account name to be used for authentication with azure queue services

String

queueName

The queue resource name

String

Query Parameters (14 parameters):

Name Description Default Type

serviceClient (common)

Service client to a storage account to interact with the queue service. This client does not hold any state about a particular storage account but is instead a convenient way of sending off appropriate requests to the resource on the service. This client contains all the operations for interacting with a queue account in Azure Storage. Operations allowed by the client are creating, listing, and deleting queues, retrieving and updating properties of the account, and retrieving statistics of the account.

QueueServiceClient

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

exceptionHandler (consumer)

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

ExceptionHandler

exchangePattern (consumer)

Sets the exchange pattern when the consumer creates an exchange. The value can be one of: InOnly, InOut, InOptionalOut

ExchangePattern

lazyStartProducer (producer)

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

false

boolean

operation (producer)

Queue service operation hint to the producer. The value can be one of: listQueues, createQueue, deleteQueue, clearQueue, sendMessage, deleteMessage, receiveMessages, peekMessages, updateMessage

QueueOperationDefinition

basicPropertyBinding (advanced)

Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities

false

boolean

synchronous (advanced)

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

boolean

maxMessages (queue)

Maximum number of messages to get, if there are less messages exist in the queue than requested all the messages will be returned. If left empty only 1 message will be retrieved, the allowed range is 1 to 32 messages.

1

Integer

timeout (queue)

An optional timeout applied to the operation. If a response is not returned before the timeout concludes a RuntimeException will be thrown.

Duration

timeToLive (queue)

How long the message will stay alive in the queue. If unset the value will default to 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number. The format should be in this form: PnDTnHnMn.nS., e.g: PT20.345S — parses as 20.345 seconds, P2D — parses as 2 days However, in case you are using EndpointDsl/ComponentDsl, you can do something like Duration.ofSeconds() since these Java APIs are typesafe.

Duration

visibilityTimeout (queue)

The timeout period for how long the message is invisible in the queue. The timeout must be between 1 seconds and 7 days. The format should be in this form: PnDTnHnMn.nS., e.g: PT20.345S — parses as 20.345 seconds, P2D — parses as 2 days However, in case you are using EndpointDsl/ComponentDsl, you can do something like Duration.ofSeconds() since these Java APIs are typesafe.

Duration

accessKey (security)

Access key for the associated azure account name to be used for authentication with azure queue services

String

credentials (security)

StorageSharedKeyCredential can be injected to create the azure client, this holds the important authentication information

StorageSharedKeyCredential

Required information options:

To use this component, you have 3 options in order to provide the required Azure authentication information:

  • Provide accountName and accessKey for your Azure account, this is the simplest way to get started. The accessKey can be generated through your Azure portal.

  • Provide a StorageSharedKeyCredential instance which can be provided into credentials option.

  • Provide a QueueServiceClient instance which can be provided into serviceClient. Note: You don’t need to create a specific client, e.g: QueueClient, the QueueServiceClient represents the upper level which can be used to retrieve lower level clients.

Batch Consumer

This component implements the Batch Consumer.

This allows you for instance to know how many messages exists in this batch and for instance let the Aggregator aggregate this number of messages.

Usage

Message headers evaluated by the component producer

Header Variable Name Type Operations Description

CamelAzureStorageQueueSegmentOptions

QueueConstants.QUEUES_SEGMENT_OPTIONS

QueuesSegmentOptions

listQueues

Options for listing queues

CamelAzureStorageQueueTimeout

QueueConstants.TIMEOUT

Duration

All

An optional timeout value beyond which a {@link RuntimeException} will be raised.

CamelAzureStorageQueueMetadata

QueueConstants.METADATA

Map<String,String>

createQueue

Metadata to associate with the queue

CamelAzureStorageQueueMessageText

QueueConstants.MESSAGE_TEXT

String

sendMessage, updateMessage

Message text to be sent or update existing message

CamelAzureStorageQueueTimeToLive

QueueConstants.TIME_TO_LIVE

Duration

sendMessage

How long the message will stay alive in the queue. If unset the value will default to 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number.

CamelAzureStorageQueueVisibilityTimeout

QueueConstants.VISIBILITY_TIMEOUT

Duration

sendMessage, receiveMessages, updateMessage

The timeout period for how long the message is invisible in the queue. If unset the value will default to 0 and the message will be instantly visible. The timeout must be between 0 seconds and 7 days.

CamelAzureStorageQueueQueueCreated

QueueConstants.QUEUE_CREATED

boolean

sendMessage

When is set to true, the queue will not be automatically created when sending messages to the queue.

CamelAzureStorageQueuePopReceipt

QueueConstants.POP_RECEIPT

String

deleteMessage, updateMessage

Unique identifier that must match for the message to be deleted or updated.

CamelAzureStorageQueueMessageId

QueueConstants.MESSAGE_ID

String

deleteMessage, updateMessage

The ID of the message to be deleted or updated.

CamelAzureStorageQueueMaxMessages

QueueConstants.MAX_MESSAGES

Integer

receiveMessages, peekMessages

Maximum number of messages to get, if there are less messages exist in the queue than requested all the messages will be returned. If left empty only 1 message will be retrieved, the allowed range is 1 to 32 messages.

CamelAzureStorageQueueOperation

QueueConstants.QUEUE_OPERATION

QueueOperationDefinition

All

Specify the producer operation to execute, please see the doc on this page related to producer operation.

CamelAzureStorageQueueName

QueueConstants.QUEUE_NAME

String

All

Override the queue name.

Message headers set by either component producer or consumer

Header Variable Name Type Description

CamelAzureStorageQueueMessageId

QueueConstants.MESSAGE_ID

String

The ID of message that being sent to the queue.

CamelAzureStorageQueueInsertionTime

QueueConstants.INSERTION_TIME

OffsetDateTime

The time the Message was inserted into the Queue.

CamelAzureStorageQueueExpirationTime

QueueConstants.EXPIRATION_TIME

OffsetDateTime

The time that the Message will expire and be automatically deleted.

CamelAzureStorageQueuePopReceipt

QueueConstants.POP_RECEIPT

String

This value is required to delete/update the Message. If deletion fails using this popreceipt then the message has been dequeued by another client.

CamelAzureStorageQueueTimeNextVisible

QueueConstants.TIME_NEXT_VISIBLE

OffsetDateTime

The time that the message will again become visible in the Queue.

CamelAzureStorageQueueDequeueCount

QueueConstants.DEQUEUE_COUNT

long

The number of times the message has been dequeued.

CamelAzureStorageQueueRawHttpHeaders

QueueConstants.RAW_HTTP_HEADERS

HttpHeaders

Returns non-parsed httpHeaders that can be used by the user.

Advanced Azure Storage Queue configuration

If your Camel Application is running behind a firewall or if you need to have more control over the QueueServiceClient instance configuration, you can create your own instance:

StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey");
String uri = String.format("https://%s.queue.core.windows.net", "yourAccountName");

QueueServiceClient client = new QueueServiceClientBuilder()
                          .endpoint(uri)
                          .credential(credential)
                          .buildClient();
// This is camel context
context.getRegistry().bind("client", client);

Then refer to this instance in your Camel azure-storage-queue component configuration:

from("azure-storage-queue://cameldev/queue1?serviceClient=#client")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");

Automatic detection of QueueServiceClient client in registry

The component is capable of detecting the presence of an QueueServiceClient bean into the registry. If it’s the only instance of that type it will be used as client and you won’t have to define it as uri parameter, like the example above. This may be really useful for smarter configuration of the endpoint.

Azure Storage Queue Producer operations

Camel Azure Storage Queue component provides wide range of operations on the producer side:

Operations on the service level

For these operations, accountName is required.

Operation Description

listQueues

Lists the queues in the storage account that pass the filter starting at the specified marker.

Operations on the queue level

For these operations, accountName and queueName are required.

Operation Description

createQueue

Creates a new queue.

deleteQueue

Permanently deletes the queue.

clearQueue

Deletes all messages in the queue..

sendMessage

Default Producer Operation Sends a message with a given time-to-live and a timeout period where the message is invisible in the queue.

deleteMessage

Deletes the specified message in the queue.

receiveMessages

Retrieves up to the maximum number of messages from the queue and hides them from other operations for the timeout period. However it will not dequeue the message from the queue due to reliability reasons.

peekMessages

Peek messages from the front of the queue up to the maximum number of messages.

updateMessage

Updates the specific message in the queue with a new message and resets the visibility timeout.

Refer to the example section in this page to learn how to use these operations into your camel application.

Consumer Examples

To consume a queue into a file component with maximum 5 messages in one batch, this can be done like this:

from("azure-storage-queue://cameldev/queue1?serviceClient=#client&maxMessages=5")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");

Producer Operations Examples

  • listQueues:

from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g, to only returns list of queues with 'awesome' prefix:
      exchange.getIn().setHeader(QueueConstants.QUEUES_SEGMENT_OPTIONS, new QueuesSegmentOptions().setPrefix("awesome"));
     })
    .to("azure-storage-queue://cameldev?serviceClient=#client&operation=listQueues")
    .log("${body}")
    .to("mock:result");
  • createQueue:

from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=createQueue");
  • deleteQueue:

from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteQueue");
  • clearQueue:

from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=clearQueue");
  • sendMessage:

from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.MESSAGE_TEXT, "message to send");
      // we set a visibility of 1min
      exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client");
  • deleteMessage:

from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      // Mandatory header:
      exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
      // Mandatory header:
      exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteMessage");
  • receiveMessages:

from("direct:start")
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=receiveMessages")
    .process(exchange -> {
        final List<QueueMessageItem> messageItems = exchange.getMessage().getBody(List.class);
        messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
    })
   .to("mock:result");
  • peekMessages:

from("direct:start")
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=peekMessages")
    .process(exchange -> {
        final List<PeekedMessageItem> messageItems = exchange.getMessage().getBody(List.class);
        messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
    })
   .to("mock:result");
  • updateMessage:

from("direct:start")
   .process(exchange -> {
       // set the header you want the producer to evaluate, refer to the previous
       // section to learn about the headers that can be set
       // e.g:
       exchange.getIn().setHeader(QueueConstants.MESSAGE_TEXT, "new message text");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
    })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=updateMessage");

Development Notes (Important)

When developing on this component, you will need to obtain your Azure accessKey in order to run the integration tests. In addition to the mocked unit tests you will need to run the integration tests with every change you make or even client upgrade as the Azure client can break things even on minor versions upgrade. To run the integration tests, on this component directory, run the following maven command:

mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey

Whereby accountName is your Azure account name and accessKey is the access key being generated from Azure portal.