Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Spark StreamNative Driver should not require Admin URL #113

Closed
vikram-narayan opened this issue Feb 15, 2023 · 7 comments
Closed

[BUG] Spark StreamNative Driver should not require Admin URL #113

vikram-narayan opened this issue Feb 15, 2023 · 7 comments
Assignees
Labels

Comments

@vikram-narayan
Copy link

Describe the bug
Pulsar admins do not allow consumers/producers to access the admin URL. Consumers/producers using certificates for authentication/authorization. Teams using Spark driver cannot take advantage of Streamnative Spark Driver due to a lack of access to the admin URL.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

@vikram-narayan vikram-narayan changed the title [BUG] [BUG] Spark StreamNative Driver should not require Admin URL Feb 15, 2023
@JTBS
Copy link

JTBS commented Feb 27, 2023

I noticed this too. For internal workloads we can live with it. But clients like to send data to exposed Pulsar cluster, current version expects Admin URL to be provided.

Reason this is required from my understanding is:

  • to validate schema on target topic.
  • Create one if not present.
  • Throw error if schema on topic is not matching with incoming schema provided
  • Not sure if there are other reasons a client library expects Admin URL

Suggestion:
Can it be at least OPTIONAL - if Admin URL is not provided, above client side checks are not done.
Its up to the broker to accept/reject message based on topic schema compatibility

How does it work on reviewing/fixing issues with library code once approved as issue?

  • Is there a team monitoring open issues and prioritizing?
  • OR is there expectation that people who post issues to contribute and patch the issue?

@nlu90
Copy link
Collaborator

nlu90 commented Mar 2, 2023

@JTBS Yes, the Admin URL is needed for topic discovery, topic metadata query, schema management, and subscription/cursor management.

We have discussed several times internally that to remove the deps on Admin URL cleanly, Pulsar Client needs to be enhanced for doing the above tasks. This will require a Pulsar Improvement Proposal and need some more time to go through the process and be available in a release.

The short-term walkaround I'm thinking about is the same as you mentioned. Removing the hard requirement on admin URL with some functionalities sacrificed.

I'll keep you updated.

@nlu90 nlu90 self-assigned this Mar 2, 2023
@JTBS
Copy link

JTBS commented Mar 2, 2023

@nlu90 Thank you for getting back

@david-streamlio
Copy link

Do you have any updates on this? For context, Verizon would like to have this by the end of April to avoid renewing a $200k contract with Confluent. So the sooner they have a timeline from us, the better off they are in the contract process. They are unable to terminate it if/until we can commit to having this.

@diyankov
Copy link

diyankov commented Jun 6, 2023

@nlu90 I just heard back from Verizon on this feature request. Currently, they see the error below on some consumers. They are asking if we can advise them how to avoid this error.

"23/06/05 16:21:36 ERROR Executor: Exception in task 80.0 in stage 11.0 (TID 1360)
org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:860)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:429)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:323)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:395)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:276)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:220)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:101)
at org.apache.spark.sql.pulsar.PulsarRowWriter.sendRow(PulsarWriteTask.scala:193)
at org.apache.spark.sql.pulsar.PulsarWriteTask.execute(PulsarWriteTask.scala:40)
at org.apache.spark.sql.pulsar.PulsarSinks$.$anonfun$write$2(PulsarSinks.scala:153)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.sql.pulsar.PulsarSinks$.$anonfun$write$1(PulsarSinks.scala:153)
at org.apache.spark.sql.pulsar.PulsarSinks$.$anonfun$write$1$adapted(PulsarSinks.scala:150)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1011)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1011)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2276)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)"

@nlu90
Copy link
Collaborator

nlu90 commented Jun 6, 2023

@diyankov I'm checking with the platform team.

@nlu90
Copy link
Collaborator

nlu90 commented Jun 8, 2023

@diyankov To avoid the issue, spark job developer need to pass additional configuration to set the pulsar client behavior:

spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar+ssl://localhost:6651")
  .option("pulsar.client.blockIfQueueFull","true")

Notice the last line in the above code section

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants