Issue
I am using Java 17 to listen to events from Azure Event Hub. I only have the connection string and the event hub name. According to the Microsoft docs, I can do something like this:
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.credential(connectionString, eventHubName,
new DefaultAzureCredentialBuilder()
.build())
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();
// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation. If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receive().subscribe(partitionEvent -> {
PartitionContext partitionContext = partitionEvent.getPartitionContext();
EventData event = partitionEvent.getData();
System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
}, error -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.err.print("An error occurred:" + error);
}, () -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.out.print("Stream has ended.");
});
but I always get a connection reset error:
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: Connection reset, errorContext[NAMESPACE: BLABLABLA. ERROR CONTEXT: N/A]
Caused by: com.azure.core.amqp.exception.AmqpException: Connection reset, errorContext[NAMESPACE: BLABLABLA. ERROR CONTEXT: N/A]
at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85) ~[azure-core-amqp-2.8.2.jar:2.8.2]
at com.azure.core.amqp.implementation.handler.ConnectionHandler.notifyErrorContext(ConnectionHandler.java:351) ~[azure-core-amqp-2.8.2.jar:2.8.2]
at com.azure.core.amqp.implementation.handler.ConnectionHandler.onTransportError(ConnectionHandler.java:253) ~[azure-core-amqp-2.8.2.jar:2.8.2]
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.33.8.jar:na]
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.8.2.jar:2.8.2]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.5.11.jar:3.5.11]
Caused by: com.azure.core.amqp.exception.AmqpException: Connection reset, errorContext[NAMESPACE: BLABLABLA. ERROR CONTEXT: N/A]
I've ommited the namespace since I cannot share it publicly.
But, using Node.js I can connect and receive events without any issues, doing the following:
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
async function startConsumer() {
const subscription = consumerClient.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
await processEvent(event);
}
},
processError: async (error) => {
console.error("An error occurred: ", error);
}
});
// Wait for a few seconds to receive events
await new Promise((resolve) => setTimeout(resolve, timeout));
// Stop the consumer
await subscription.close();
await consumerClient.close();
}
async function processEvent(eventData) {
// some code here, cannot share
}
startConsumer().catch((error) => {
console.error("An error occurred: ", error);
});
Solution
When you construct the Java consumer, you're passing your connection string along with a token credential via .credential. This is going to cause the connection string to be interpreted as the fully qualified namespace (ex. mything.servicebus.windows.net) and used as the endpoint to connect to.
To use the connection string, you'll want to call the builder's connectionString method:
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
Answered By - Jesse Squire
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.