Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] replace admin calls with client or consumer calls in PulsarHelper #118

Merged
merged 8 commits into from
Mar 30, 2023

Conversation

nlu90
Copy link
Collaborator

@nlu90 nlu90 commented Mar 28, 2023

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #113

Motivation

Reduce the hard dependency on PulsarAadmin as much as possible

Modifications

Replace all admin calls with the equivalent client or consumer calls:

admin call client or consumer call
admin.topics().createSubscription(...) consumer.subscribe()
admin.topics().resetCursor(...) consumer.seek()
admin.topics().deleteSubscription(...) consumer.unsubscribe()
admin.schemas().getSchemaInfo(...) clientImpl.getSchema(...)
admin.topics().getLastMessageId(...) consumer.getLastMessageId
admin.topics().getPartitionedTopicMetadata(...) clientImpl.getPartitionedTopicMetadata(...)
admin.topics().getList(dest.getNamespace) client.getLookup.getTopicsUnderNamespace(...)

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

With example applications using the connector, we can observe the following logs from spark driver:

-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------+-----+--------------------+--------------------+--------------------+-----------+-------------------+
|               value|__key|             __topic|         __messageId|       __publishTime|__eventTime|__messageProperties|
+--------------------+-----+--------------------+--------------------+--------------------+-----------+-------------------+
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 50 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 51 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 52 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 53 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 54 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 55 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 56 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 57 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 58 20 0...|2023-03-28 16:07:...|       null|                 {}|
|[48 65 6C 6C 6F 2...| null|persistent://publ...|[08 38 10 59 20 0...|2023-03-28 16:07:...|       null|                 {}|
+--------------------+-----+--------------------+--------------------+--------------------+-----------+-------------------+

23/03/28 16:07:30 INFO WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5d7f2978 committed.
23/03/28 16:07:30 INFO CheckpointFileManager: Writing atomically to file:/tmp/bb486861-bd91-44d6-ae7f-c1a5bd7b9bd2/commits/4 using temp file file:/tmp/bb486861-bd91-44d6-ae7f-c1a5bd7b9bd2/commits/.4.2b8a1aa3-d710-42fd-96ac-349a2326e026.tmp
23/03/28 16:07:30 INFO CheckpointFileManager: Renamed temp file file:/tmp/bb486861-bd91-44d6-ae7f-c1a5bd7b9bd2/commits/.4.2b8a1aa3-d710-42fd-96ac-349a2326e026.tmp to file:/tmp/bb486861-bd91-44d6-ae7f-c1a5bd7b9bd2/commits/4
23/03/28 16:07:30 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "bfc00ad2-f3df-4b1f-b289-99c4faf596f6",
  "runId" : "77483b6b-627b-445c-96e2-b105d0f2f6ff",
  "name" : null,
  "timestamp" : "2023-03-28T23:07:30.004Z",
  "batchId" : 4,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.3333222225925802,
  "processedRowsPerSecond" : 26.52519893899204,
  "durationMs" : {
    "addBatch" : 245,
    "getBatch" : 20,
    "getOffset" : 16,
    "queryPlanning" : 5,
    "triggerExecution" : 376,
    "walCommit" : 47
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "org.apache.spark.sql.pulsar.PulsarSource@4aaeb58f",
    "startOffset" : {
      "persistent://public/default/spark-test2" : [ 8, 56, 16, 79, 48, 0 ]
    },
    "endOffset" : {
      "persistent://public/default/spark-test2" : [ 8, 56, 16, 89, 48, 0 ]
    },
    "latestOffset" : {
      "persistent://public/default/spark-test2" : [ 8, 56, 16, 89, 48, 0 ]
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 0.3333222225925802,
    "processedRowsPerSecond" : 26.52519893899204
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@606bc1b6",
    "numOutputRows" : 10
  }
}

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    Internal implementation details

  • doc

    (If this PR contains doc changes)

@nlu90 nlu90 requested a review from a team as a code owner March 28, 2023 23:24
@nlu90 nlu90 self-assigned this Mar 28, 2023
@github-actions github-actions bot added the no-need-doc This pr does not need any document label Mar 28, 2023
@nlu90 nlu90 changed the title Neng/use consumer in pulsarhelper replace admin calls with client or consumer calls in PulsarHelper Mar 28, 2023
@nlu90 nlu90 changed the title replace admin calls with client or consumer calls in PulsarHelper [Improve] replace admin calls with client or consumer calls in PulsarHelper Mar 28, 2023
@nlu90 nlu90 added this to the 2023-04 v3.2.0.2 milestone Mar 29, 2023
@nlu90
Copy link
Collaborator Author

nlu90 commented Mar 29, 2023

The code style checker identify an wrong error:
image

I'll ignore it for now.

@nlu90 nlu90 merged commit 2a382a8 into master Mar 30, 2023
@delete-merged-branch delete-merged-branch bot deleted the neng/use-consumer-in-pulsarhelper branch March 30, 2023 17:16
nlu90 added a commit that referenced this pull request Apr 3, 2023
…Helper (#118)

* introduce pulsar consumer for pulsar management

* use consumer cache

* lint error
nlu90 added a commit that referenced this pull request Apr 7, 2023
* Revert "Expose some monitor metrics (#106)"

This reverts commit d7d955c.

* Reader need closed when exception (#115)

* fix: Reader did not closed when an exception occurred in readNext

* fix: Reader did not closed when an exception occurred in readNext

* [Improve] replace admin calls with client or consumer calls in PulsarHelper (#118)

* introduce pulsar consumer for pulsar management

* use consumer cache

* lint error

* remove admin url from source related components (#120)

* simplify sink schema check (#121)

---------

Co-authored-by: obobj <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
no-need-doc This pr does not need any document
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants