diff --git a/.jenkins/build.sh b/.jenkins/build.sh new file mode 100755 index 00000000..d7045bdd --- /dev/null +++ b/.jenkins/build.sh @@ -0,0 +1,25 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e + +JENKINS_DIR=`dirname "$0"` +PRJ_HOME=`cd ${JENKINS_DIR}/..;pwd` + +cd ${PRJ_HOME} + +mvn clean license:check scalastyle:check install +retcode=$? +cat target/surefire-reports/ConnectorTestSuite.txt +exit $retcode diff --git a/README.md b/README.md index 1db83b29..c915f3d0 100644 --- a/README.md +++ b/README.md @@ -1,53 +1,52 @@ # pulsar-spark [![Version](https://img.shields.io/github/release/streamnative/pulsar-spark/all.svg)](https://github.com/streamnative/pulsar-spark/releases) + [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fstreamnative%2Fpulsar-spark.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fstreamnative%2Fpulsar-spark?ref=badge_shield) -![contribution](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat) Unified data processing with [Apache Pulsar](https://pulsar.apache.org) and [Apache Spark](https://spark.apache.org). ## Prerequisites - Java 8 or later -- Spark 3.2.0 or later -- Pulsar 2.10.0 or later +- Spark 3.2.2 or later +- Pulsar 2.10.2 or later ## Preparations ### Link +#### Client library For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact: ``` -groupId = io.streamnative.connectors -artifactId = pulsar-spark-connector_{{SCALA_BINARY_VERSION}} -version = {{PULSAR_SPARK_VERSION}} + groupId = io.streamnative.connectors + artifactId = pulsar-spark-connector_{{SCALA_BINARY_VERSION}} + version = {{PULSAR_SPARK_VERSION}} ``` ### Deploy -#### Client library - +#### Client library As with any Spark applications, `spark-submit` is used to launch your application. `pulsar-spark-connector_{{SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages`. Example -```shell +``` $ ./bin/spark-submit --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}} ... ``` -#### CLI - +#### CLI For experimenting on `spark-shell` (or `pyspark` for Python), you can also use `--packages` to add `pulsar-spark-connector_{{SCALA_BINARY_VERSION}}` and its dependencies directly. Example -```shell +``` $ ./bin/spark-shell --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}} ... @@ -56,7 +55,9 @@ $ ./bin/spark-shell When locating an artifact or library, `--packages` option checks the following repositories in order: 1. Local maven repository + 2. Maven central repository + 3. Other repositories specified by `--repositories` The format for the coordinates should be `groupId:artifactId:version`. @@ -68,9 +69,7 @@ For more information about **submitting applications with external dependencies* ### Read data from Pulsar #### Create a Pulsar source for streaming queries - The following examples are in Scala. - ```scala // Subscribe to 1 topic val df = spark @@ -106,19 +105,17 @@ df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] ``` -> **Note** -> +> #### Tip > For more information on how to use other language bindings for Spark Structured Streaming, > see [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). #### Create a Pulsar source for batch queries - If you have a use case that is better suited to batch processing, you can create a Dataset/DataFrame for a defined range of offsets. The following examples are in Scala. - ```scala + // Subscribe to 1 topic defaults to the earliest and latest offsets val df = spark .read @@ -133,7 +130,7 @@ df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") // Subscribe to multiple topics, specifying explicit Pulsar offsets import org.apache.spark.sql.pulsar.JsonUtils._ val startingOffsets = topicOffsets(Map("topic1" -> messageId1, "topic2" -> messageId2)) -val endingOffsets = topicOffsets(Map("topic1" -> MessageId.latest, "topic2" -> MessageId.latest)) +val endingOffsets = topicOffsets(...) val df = spark .read .format("pulsar") @@ -163,114 +160,185 @@ df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") The following options must be set for the Pulsar source for both batch and streaming queries. -| Option | Value | Description | -|-----------------|-------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------| -| `topic` | A topic name string | The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for the Pulsar source. | -| `topics` | A comma-separated list of topics | The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for the Pulsar source. | -| `topicsPattern` | A Java regex string | The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for the Pulsar source. | -| `service.url` | A service URL of your Pulsar cluster | The Pulsar `serviceUrl` configuration for creating the `PulsarClient` instance. | -| `admin.url` | A service HTTP URL of your Pulsar cluster | The Pulsar `serviceHttpUrl` configuration for creating the `PulsarAdmin` instance. | + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionValueDescription
`topic`A topic name stringThe topic to be consumed. + Only one of `topic`, `topics` or `topicsPattern` + options can be specified for Pulsar source.
`topics`A comma-separated list of topicsThe topic list to be consumed. + Only one of `topic`, `topics` or `topicsPattern` + options can be specified for Pulsar source.
`topicsPattern`A Java regex stringThe pattern used to subscribe to topic(s). + Only one of `topic`, `topics` or `topicsPattern` + options can be specified for Pulsar source.
`service.url`A service URL of your Pulsar clusterThe Pulsar `serviceUrl` configuration.
`admin.url`A service HTTP URL of your Pulsar clusterThe Pulsar `serviceHttpUrl` configuration.
The following configurations are optional. - + + + - - + + + + + * "earliest"(batch query)
+ + * "latest"(streaming query) + + + + `startingOffsets` option controls where a reader reads data from. + + * "earliest": lacks a valid offset, the reader reads all the data in the partition, starting from the very beginning.
+ +* "latest": lacks a valid offset, the reader reads from the newest records written after the reader starts running.
+ +* A JSON string: specifies a starting offset for each Topic.
+You can use `org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, MessageId])` to convert a message offset to a JSON string.
+ +**Note**:
+ +* For batch query, "latest" is not allowed, either implicitly specified or use `MessageId.latest ([8,-1,-1,-1,-1,-1,-1,-1,-1,127,16,-1,-1,-1,-1,-1,-1,-1,-1,127])` in JSON.
+ +* For streaming query, "latest" only applies when a new query is started, and the resuming will + always pick up from where the query left off. Newly discovered partitions during a query will start at + "earliest". + + - - + + - + + + + + + `endingOffsets` option controls where a reader stops reading data. + + * "latest": the reader stops reading data at the latest record. + + * A JSON string: specifies an ending offset for each topic.
+ + **Note**:
+ + `MessageId.earliest ([8,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,16,-1,-1,-1,-1,-1,-1,-1,-1,-1,1])` is not allowed. + + - - + + - + + + + + + `failOnDataLoss` option controls whether to fail a query when data is lost (for example, topics are deleted, or + messages are deleted because of retention policy).
+ + This may cause a false alarm. You can set it to `false` when it doesn't work as you expected.
+ + A batch query always fails if it fails to read any data from the provided offsets due to data loss. + - - - - - + + + + + +
OptionValueDefaultApplied Query TypeDescription
OptionValueDefaultQuery TypeDescription
startingOffsets -

The following are valid values:

-
    -
  • earliest: (streaming and batch queries)
  • -
  • latest: (only streaming query)
  • -
  • -

    JSON string

    -

    Example

    -

    {"topic-1":[8,11,16,101,24,1,32,1],"topic-5":[8,15,16,105,24,5,32,5]}

    -
  • -
+ +
`startingOffsets`The following are valid values:
+ + * "earliest"(streaming and batch queries)
+ + * "latest" (streaming query)
+ + * A JSON string
+ + **Example**
+ + """ {"topic-1":[8,11,16,101,24,1,32,1],"topic-5":[8,15,16,105,24,5,32,5]} """
-
    -
  • Batch query: earliest
  • -
  • Streaming query: latest
  • -
-
Streaming and batch queries -

startingOffsets option controls where a reader reads data from.

-
    -
  • earliest: lacks a valid offset, the reader reads all the data in the partition, starting from the very beginning.
  • -
  • latest: lacks a valid offset, the reader reads from the newest records written after the reader starts running.
  • -
  • A JSON string: specifies a starting offset for each Topic.
  • -
-

You can use org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, MessageId]) to convert a message offset to a JSON string.

-

Note

-
    -
  • For batch query, latest is not allowed, either implicitly specified or use MessageId.latest in JSON.
  • -
  • For streaming query, latest only applies when a new query is started, and the resuming will -always pick up from where the query left off. Newly discovered partitions during a query will start at "earliest".
  • -
-
endingOffsets -

The following are valid values:

-
    -
  • latest (only batch query)
  • -
  • -

    JSON string

    -

    Example

    -

    {"topic-1":[8,12,16,102,24,2,32,2],"topic-5":[8,16,16,106,24,6,32,6]}

    -
  • -
+ +
`endingOffsets`The following are valid values:
+ + * "latest" (batch query)
+ + * A JSON string
+ + **Example**
+ + {"topic-1":[8,12,16,102,24,2,32,2],"topic-5":[8,16,16,106,24,6,32,6]} +
latest"latest"Batch query -

endingOffsets option controls where a reader stops reading data.

-
    -
  • latest: the reader stops reading data at the latest record.
  • -
  • A JSON string: specifies an ending offset for each topic.
  • -
-

Note

-

MessageId.earliest is not allowed.

-
failOnDataLoss -

The following are valid values:

-
    -
  • true
  • -
  • false
  • -
+ +
`failOnDataLoss`The following are valid values:
+ + * true + + * false +
truetrueStreaming query -

failOnDataLoss option controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).

-

This may cause a false alarm. You can set it to false when it doesn't work as you expected.

-

A batch query always fails if it fails to read any data from the provided offsets due to data loss.

-
allowDifferentTopicSchemasBoolean valuefalseStreaming query -

If multiple topics with different schemas are read, using this parameter automatic schema-based topic value deserialization can be turned off. In that way, topics with different schemas can be read in the same pipeline - which is then responsible for deserializing the raw values based on some schema. Since only the raw values are returned when this is true. Pulsar topic schema(s) are not taken into account during operation.

-
+`allowDifferentTopicSchemas` + +Boolean value + +`false` + +Streaming query + +If multiple topics with different schemas are read, +using this parameter automatic schema-based topic +value deserialization can be turned off. +In that way, topics with different schemas can +be read in the same pipeline - which is then responsible +for deserializing the raw values based on some +schema. Since only the raw values are returned when +this is `true`, Pulsar topic schema(s) are not +taken into account during operation. +
#### Authentication - Should the Pulsar cluster require authentication, credentials can be set in the following way. The following examples are in Scala. - ```scala // Secure connection with authentication, using the same credentials on the // Pulsar client and admin interface (if not given explicitly, the client configuration @@ -286,7 +354,6 @@ val df = spark .load() df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] - // Secure connection with authentication, using different credentials for // Pulsar client and admin interfaces. val df = spark @@ -325,7 +392,6 @@ df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") ``` #### Schema of Pulsar source - - For topics without schema or with primitive schema in Pulsar, messages' payload is loaded to a `value` column with the corresponding type with Pulsar schema. - For topics with Avro or JSON schema, their field names and field types are kept in the result rows. @@ -335,28 +401,43 @@ raw form. In this case it is the responsibility of the pipeline to apply the sch on this content, which is loaded to the `value` column. Besides, each row in the source has the following metadata fields as well. + + + + + + + + + + + + + + + + + + + + + + + + + + +
ColumnType
`__key`Binary
`__topic`String
`__messageId`Binary
`__publishTime`Timestamp
`__eventTime`Timestamp
`__messageProperties`Map < String, String >
-| Column | Type | -|-----------------------|-----------------------| -| `__key` | Binary | -| `__topic` | String | -| `__messageId` | Binary | -| `__publishTime` | Timestamp | -| `__eventTime` | Timestamp | -| `__messageProperties` | `Map` | - -**Example** - -The topic of AVRO schema in Pulsar is as below: +** Example** +The topic of AVRO schema _s_ in Pulsar is as below: ```scala -case class Foo(i: Int, f: Float, bar: Bar) -case class Bar(b: Boolean, s: String) -val s = Schema.AVRO(Foo.getClass) + case class Foo(i: Int, f: Float, bar: Bar) + case class Bar(b: Boolean, s: String) + val s = Schema.AVRO(Foo.getClass) ``` - has the following schema as a DataFrame/DataSet in Spark: - ``` root |-- i: integer (nullable = false) @@ -371,12 +452,11 @@ root |-- __messageProperties: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) -``` + ``` -For Pulsar topic with `Schema.DOUBLE`, it's schema as a DataFrame is: - -``` -root + For Pulsar topic with `Schema.DOUBLE`, it's schema as a DataFrame is: + ``` + root |-- value: double (nullable = false) |-- __key: binary (nullable = true) |-- __topic: string (nullable = true) @@ -386,21 +466,19 @@ root |-- __messageProperties: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) -``` + ``` ### Write data to Pulsar The DataFrame written to Pulsar can have arbitrary schema, since each record in DataFrame is transformed as one message sent to Pulsar, fields of DataFrame are divided into two groups: `__key` and `__eventTime` fields are encoded as metadata of Pulsar message; other fields are grouped and encoded using AVRO and put in `value()`: - ```scala producer.newMessage().key(__key).value(avro_encoded_fields).eventTime(__eventTime) ``` #### Create a Pulsar sink for streaming queries - The following examples are in Scala. - ```scala + // Write key-value data from a DataFrame to a specific Pulsar topic specified in an option val ds = df .selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") @@ -422,10 +500,9 @@ val ds = df ``` #### Write the output of batch queries to Pulsar - The following examples are in Scala. - ```scala + // Write key-value data from a DataFrame to a specific Pulsar topic specified in an option df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .write @@ -449,66 +526,71 @@ df.selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)") Currently, we provide at-least-once semantic. Consequently, when writing either streaming queries or batch queries to Pulsar, some records may be duplicated. A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform de-duplication when reading. -### Pulsar specific configurations - -The client, producer and reader configuration of Pulsar can be set via `DataStreamReader.option` -with `pulsar.client.`, `pulsar.producer.` and `pulsar.reader.` prefix. -E.g, `stream.option("pulsar.reader.receiverQueueSize", "1000000")`. +### Pulsar specific configurations +Client/producer/reader configurations of Pulsar can be set via `DataStreamReader.option` +with `pulsar.client.`/`pulsar.producer.`/`pulsar.reader.` prefix, e.g, +`stream.option("pulsar.reader.receiverQueueSize", "1000000")`. Since the connector needs to access the Pulsar Admin interface as well, separate configuration of the admin client can be set via the same method with the -`pulsar.admin` prefix. - -For example: `stream.option("pulsar.admin.authParams","token:")`. - -This can be useful if a different authentication plugin or token need to be used. -If this is not given explicitly, the client parameters (with `pulsar.client` prefix) will be used for accessing the admin +`pulsar.admin` prefix. For example: `stream.option("pulsar.admin.authParams","token:")`. +This can be useful if a different authentication plugin or +token need to be used. If this is not given explicitly, the client +parameters (with `pulsar.client` prefix) will be used for accessing the admin interface as well. - -For possible Pulsar parameters, check docs at [Pulsar client libraries](https://pulsar.apache.org/docs/en/client-libraries/). +For possible Pulsar parameters, check docs at +[Pulsar client libraries](https://pulsar.apache.org/docs/en/client-libraries/). ## Build Spark Pulsar Connector - If you want to build a Spark-Pulsar connector reading data from Pulsar and writing results to Pulsar, follow the steps below. 1. Checkout the source code. - ```bash - $ git clone https://github.com/streamnative/pulsar-spark.git - $ cd pulsar-spark - ``` -2. Install Docker. Pulsar-spark connector is using [Testcontainers](https://www.testcontainers.org/) for - integration tests. In order to run the integration tests, make sure you - have installed [Docker](https://docs.docker.com/docker-for-mac/install/). +```bash +$ git clone https://github.com/streamnative/pulsar-spark.git +$ cd pulsar-spark +``` + +2. Install Docker. -3. Set a Scala version. Change `scala.version` and `scala.binary.version` in `pom.xml`. - > **Note** - > - > Scala version should be consistent with the Scala version of Spark you use. +> Pulsar-spark connector is using [Testcontainers](https://www.testcontainers.org/) for +> integration tests. In order to run the integration tests, make sure you +> have installed [Docker](https://docs.docker.com/docker-for-mac/install/). + +3. Set a Scala version. +> Change `scala.version` and `scala.binary.version` in `pom.xml`. +> #### Note +> Scala version should be consistent with the Scala version of Spark you use. 4. Build the project. - ```bash - $ mvn clean install -DskipTests - ``` - If you get the following error during compilation, try running Maven with Java 8: - ``` - [ERROR] [Error] : Source option 6 is no longer supported. Use 7 or later. - [ERROR] [Error] : Target option 6 is no longer supported. Use 7 or later. - ``` + +```bash +$ mvn clean install -DskipTests +``` + +If you get the following error during compilation, try running Maven with Java 8: +``` +[ERROR] [Error] : Source option 6 is no longer supported. Use 7 or later. +[ERROR] [Error] : Target option 6 is no longer supported. Use 7 or later. +``` 5. Run the tests. - ```bash - $ mvn clean install - ``` - Note: by configuring `scalatest-maven-plugin` in the [usual ways](https://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin), individual tests can be executed, if that is needed: - ```bash - mvn -Dsuites=org.apache.spark.sql.pulsar.CachedPulsarClientSuite clean install - ``` - This might be handy if test execution is slower, or you get a `java.io.IOException: Too many open files` exception during full suite run. +```bash +$ mvn clean install +``` + +Note: by configuring `scalatest-maven-plugin` in the [usual ways](https://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin), individual tests can be executed, if that is needed: + +```bash +mvn -Dsuites=org.apache.spark.sql.pulsar.CachedPulsarClientSuite clean install +``` + +This might be handy if test execution is slower, or you get a `java.io.IOException: Too many open files` exception during full suite run. + +Once the installation is finished, there is a fat jar generated under both local maven repo and `target` directory. - Once the installation is finished, there is a fat jar generated under both local maven repo and `target` directory. ## License [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fstreamnative%2Fpulsar-spark.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fstreamnative%2Fpulsar-spark?ref=badge_large) diff --git a/pom.xml b/pom.xml index e428c077..5f99c348 100644 --- a/pom.xml +++ b/pom.xml @@ -67,10 +67,11 @@ 2.10.2 - 2.12.17 + 2.12.10 2.12 3.2.14 - 3.2.3 + 3.2.2 + 2.11.0 1.17.6 @@ -78,11 +79,11 @@ 1.1.1640084764.9f463a9 4.1 3.10.1 - 3.4.0 - 3.4.1 + 3.3.0 + 3.4.0 3.2.1 2.22.2 - 4.8.0 + 4.7.2 2.2.0 1.0.0 1.6.13 @@ -131,7 +132,13 @@ - + + commons-io + commons-io + ${commons-io.version} + + + org.apache.spark @@ -145,32 +152,6 @@ - - org.apache.spark - spark-core_${scala.binary.version} - ${spark.version} - tests - test - - - - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${spark.version} - provided - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${spark.version} - test-jar - test - - - - org.apache.spark spark-sql_${scala.binary.version} @@ -189,6 +170,15 @@ ${spark.version} provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + tests + test + + org.apache.spark spark-sql_${scala.binary.version} @@ -196,6 +186,7 @@ tests test + org.apache.spark spark-catalyst_${scala.binary.version} @@ -204,8 +195,6 @@ test - - org.scalatest scalatest_${scala.binary.version} @@ -226,6 +215,7 @@ -Werror -Xlint:deprecation -Xlint:unchecked + -Xpkginfo:always @@ -236,6 +226,7 @@ ${license-maven-plugin.version}
src/resources/license.template
+ LICENSE NOTICE @@ -350,8 +341,8 @@ - + org.apache.maven.plugins maven-shade-plugin ${maven-shade-plugin.version} @@ -365,6 +356,7 @@ true true false + com.github.luben:* diff --git a/set-pulsar-version.sh b/set-pulsar-version.sh new file mode 100755 index 00000000..d257bc12 --- /dev/null +++ b/set-pulsar-version.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version=${1#v} +if [[ "x$version" == "x" ]]; then + echo "You need to provide the version of the Pulsar" + exit 1 +fi + +mvn versions:set-property -Dproperty=pulsar.version -DnewVersion=${version} diff --git a/set-version.sh b/set-version.sh new file mode 100755 index 00000000..c5e8f070 --- /dev/null +++ b/set-version.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version=${1#v} +if [[ "x$version" == "x" ]]; then + echo "You need to provide a version number for building sn-pulsar-plugins" + exit 1 +fi + +mvn versions:set -DnewVersion=${version} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/CachedConsumer.scala b/src/main/scala/org/apache/spark/sql/pulsar/CachedConsumer.scala new file mode 100644 index 00000000..f43ddff3 --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/pulsar/CachedConsumer.scala @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.pulsar + +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Success, Try} + +import com.google.common.cache._ +import org.apache.pulsar.client.api.{Consumer, PulsarClient} +import org.apache.pulsar.client.api.schema.GenericRecord +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging + +private[pulsar] object CachedConsumer extends Logging { + + private var client: PulsarClient = null + + private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) + + private lazy val cacheExpireTimeout: Long = + Option(SparkEnv.get) + .map( + _.conf + .getTimeAsMs( + "spark.pulsar.client.cache.timeout", + s"${defaultCacheExpireTimeout}ms")) match { + case Some(timeout) => timeout + case None => defaultCacheExpireTimeout + } + + private val cacheLoader = new CacheLoader[(String, String), Consumer[GenericRecord]]() { + override def load(k: (String, String)): Consumer[GenericRecord] = { + val (topic, subscription) = k + Try( + client + .newConsumer(new AutoConsumeSchema()) + .topic(topic) + .subscriptionName(subscription) + .subscribe()) match { + case Success(consumer) => consumer + case Failure(exception) => + logError( + s"Failed to create consumer to topic ${topic} with subscription ${subscription}") + throw exception + } + } + } + + private val removalListener = new RemovalListener[(String, String), Consumer[GenericRecord]]() { + override def onRemoval( + notification: RemovalNotification[(String, String), Consumer[GenericRecord]]): Unit = { + Try(notification.getValue.close()) match { + case Success(_) => logInfo(s"Closed consumer for ${notification.getKey}") + case Failure(exception) => + logWarning(s"Failed to close consumer for ${notification.getKey}", exception) + } + } + } + + private lazy val guavaCache: LoadingCache[(String, String), Consumer[GenericRecord]] = + CacheBuilder + .newBuilder() + .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) + .removalListener(removalListener) + .build[(String, String), Consumer[GenericRecord]](cacheLoader) + + private[pulsar] def getOrCreate( + topic: String, + subscription: String, + client: PulsarClient): Consumer[GenericRecord] = { + this.client = client + Try(guavaCache.get((topic, subscription))) match { + case Success(consumer) => consumer + case Failure(exception) => + logError(s"Failed to create consumer to topic ${topic} with subscription ${subscription}") + throw exception + } + } + + private[pulsar] def close(topic: String, subscription: String): Unit = { + guavaCache.invalidate((topic, subscription)) + } + + private[pulsar] def clear(): Unit = { + logInfo("Cleaning up Consumer Cache.") + guavaCache.invalidateAll() + } + +} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala b/src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala index eb5c46fe..671eaad2 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala @@ -22,7 +22,7 @@ import scala.util.control.NonFatal import com.google.common.cache._ import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.apache.pulsar.client.api.PulsarClient - +import org.apache.pulsar.client.impl.PulsarClientImpl import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.sql.pulsar.PulsarOptions._ @@ -37,8 +37,8 @@ private[pulsar] object CachedPulsarClient extends Logging { .getTimeAsMs("spark.pulsar.client.cache.timeout", s"${defaultCacheExpireTimeout}ms")) .getOrElse(defaultCacheExpireTimeout) - private val cacheLoader = new CacheLoader[ju.Map[String, Object], PulsarClient]() { - override def load(config: ju.Map[String, Object]): PulsarClient = { + private val cacheLoader = new CacheLoader[ju.Map[String, Object], PulsarClientImpl]() { + override def load(config: ju.Map[String, Object]): PulsarClientImpl = { val pulsarServiceUrl = config.get(PulsarOptions.ServiceUrlOptionKey).toString val clientConf = PulsarConfigUpdater("pulsarClientCache", config.asScala.toMap, PulsarOptions.FilteredKeys) @@ -63,7 +63,7 @@ private[pulsar] object CachedPulsarClient extends Logging { s"Created a new instance of PulsarClient for serviceUrl = $pulsarServiceUrl," + s" clientConf = $clientConf.") - pulsarClient + pulsarClient.asInstanceOf[PulsarClientImpl] } catch { case e: Throwable => logError( @@ -75,9 +75,9 @@ private[pulsar] object CachedPulsarClient extends Logging { } } - private val removalListener = new RemovalListener[ju.Map[String, Object], PulsarClient]() { + private val removalListener = new RemovalListener[ju.Map[String, Object], PulsarClientImpl]() { override def onRemoval( - notification: RemovalNotification[ju.Map[String, Object], PulsarClient]): Unit = { + notification: RemovalNotification[ju.Map[String, Object], PulsarClientImpl]): Unit = { val params: ju.Map[String, Object] = notification.getKey val client: PulsarClient = notification.getValue logDebug( @@ -93,19 +93,19 @@ private[pulsar] object CachedPulsarClient extends Logging { } } - private lazy val guavaCache: LoadingCache[ju.Map[String, Object], PulsarClient] = + private lazy val guavaCache: LoadingCache[ju.Map[String, Object], PulsarClientImpl] = CacheBuilder .newBuilder() .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) .removalListener(removalListener) - .build[ju.Map[String, Object], PulsarClient](cacheLoader) + .build[ju.Map[String, Object], PulsarClientImpl](cacheLoader) /** - * Get a cached PulsarProducer for a given configuration. If matching PulsarProducer doesn't - * exist, a new PulsarProducer will be created. PulsarProducer is thread safe, it is best to - * keep one instance per specified pulsarParams. + * Get a cached PulsarClient for a given configuration. If matching PulsarClient doesn't exist, + * a new PulsarClient will be created. PulsarClient is thread safe, it is best to keep one + * instance per specified pulsarParams. */ - private[pulsar] def getOrCreate(params: ju.Map[String, Object]): PulsarClient = { + private[pulsar] def getOrCreate(params: ju.Map[String, Object]): PulsarClientImpl = { try { guavaCache.get(params) } catch { @@ -115,13 +115,13 @@ private[pulsar] object CachedPulsarClient extends Logging { } } - /** For explicitly closing pulsar producer */ + /** For explicitly closing pulsar client */ private[pulsar] def close(params: ju.Map[String, Object]): Unit = { guavaCache.invalidate(params) } private[pulsar] def clear(): Unit = { - logInfo("Cleaning up guava cache.") + logInfo("Cleaning up PulsarClient cache.") guavaCache.invalidateAll() } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala new file mode 100644 index 00000000..5cf3707d --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala @@ -0,0 +1,311 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.pulsar + +import java.{util => ju} +import java.io.{Externalizable, ObjectInput, ObjectOutput} + +import org.apache.pulsar.client.api.{Message, MessageId, Schema} +import org.apache.pulsar.client.impl.{BatchMessageIdImpl, MessageIdImpl} +import org.apache.pulsar.common.schema.SchemaInfo + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.JSONOptionsInRead +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.connector.read.streaming.{ + ContinuousPartitionReader, + ContinuousPartitionReaderFactory +} +import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset, PartitionOffset} +import org.apache.spark.sql.execution.streaming.{Offset => eOffset} +import org.apache.spark.sql.pulsar.PulsarSourceUtils.{messageIdRoughEquals, reportDataLossFunc} +import org.apache.spark.util.Utils + +/** + * A [[ContinuousReader]] for reading data from Pulsar. + * + * @param clientConf + * @param readerConf + * @param initialOffset + */ +class PulsarContinuousReader( + pulsarHelper: PulsarHelper, + clientConf: ju.Map[String, Object], + readerConf: ju.Map[String, Object], + initialOffset: PerTopicOffset, + pollTimeoutMs: Int, + failOnDataLoss: Boolean, + subscriptionNamePrefix: String, + jsonOptions: JSONOptionsInRead) + extends ContinuousStream + with Logging { + + // Initialized when creating reader factories. If this diverges from the partitions at the latest + // offsets, we need to reconfigure. + // Exposed outside this object only for unit tests. + @volatile private[sql] var knownTopics: Set[String] = _ + + lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema() + + private val reportDataLoss = reportDataLossFunc(failOnDataLoss) + + private var offset: Offset = _ + + private var stopped = false + + override def deserializeOffset(json: String): Offset = { + SpecificPulsarOffset(JsonUtils.topicOffsets(json)) + } + + override def needsReconfiguration(): Boolean = { + knownTopics != null && pulsarHelper.fetchLatestOffsets().topicOffsets.keySet != knownTopics + } + + override def toString: String = s"PulsarSource[$offset]" + + override def stop(): Unit = { + pulsarHelper.removeCursor() + pulsarHelper.close() + } + + override def planInputPartitions(start: Offset): Array[InputPartition] = { + val oldStartPartitionOffsets = + SpecificPulsarOffset.getTopicOffsets(offset.asInstanceOf[eOffset]) + val currentPartitionSet = pulsarHelper.fetchLatestOffsets().topicOffsets.keySet + val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) + val newPartitionOffsets = pulsarHelper.fetchEarliestOffsets(newPartitions.toSeq) + val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) + if (deletedPartitions.nonEmpty) { + reportDataLoss(s"Some topics were deleted: $deletedPartitions") + } + + val startOffsets = newPartitionOffsets ++ + oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_)) + knownTopics = startOffsets.keySet + + startOffsets.toSeq.map { case (topic, start) => + new PulsarContinuousTopic( + topic, + pulsarHelper.adminUrl, + new SchemaInfoSerializable(pulsarSchema), + start, + clientConf, + readerConf, + pollTimeoutMs, + failOnDataLoss, + subscriptionNamePrefix, + jsonOptions).asInstanceOf[InputPartition] + }.toArray + } + + override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = + new ContinuousPartitionReaderFactory { + override def createReader( + partition: InputPartition): ContinuousPartitionReader[InternalRow] = { + val pulsarOffset = offset.asInstanceOf[PulsarPartitionOffset] + val pulsarTopic = partition.asInstanceOf[PulsarContinuousTopic] + require( + pulsarOffset.topic == pulsarTopic.topic, + s"Expected topic: $pulsarTopic.topic, but got: ${pulsarOffset.topic}") + new PulsarContinuousTopicReader( + pulsarTopic.topic, + pulsarTopic.adminUrl, + pulsarTopic.schemaInfo, + pulsarOffset.messageId, + clientConf, + readerConf, + pollTimeoutMs, + failOnDataLoss, + subscriptionNamePrefix, + jsonOptions) + } + } + + override def mergeOffsets(partitionOffsets: Array[PartitionOffset]): Offset = { + val mergedMap = partitionOffsets + .map { case PulsarPartitionOffset(t, o) => + Map(t -> o) + } + .reduce(_ ++ _) + SpecificPulsarOffset(mergedMap) + } + + override def initialOffset(): Offset = offset + + override def commit(offset: Offset): Unit = { + val off = SpecificPulsarOffset.getTopicOffsets(offset.asInstanceOf[eOffset]) + pulsarHelper.commitCursorToOffset(off) + } +} + +private[pulsar] class PulsarContinuousTopic( + var topic: String, + var adminUrl: String, + var schemaInfo: SchemaInfoSerializable, + var startingOffsets: MessageId, + var clientConf: ju.Map[String, Object], + var readerConf: ju.Map[String, Object], + var pollTimeoutMs: Int, + var failOnDataLoss: Boolean, + var subscriptionNamePrefix: String, + var jsonOptions: JSONOptionsInRead) + extends InputPartition + with Externalizable { + + def this() = + this(null, null, null, null, null, null, 0, false, null, null) // For deserialization only + + override def writeExternal(out: ObjectOutput): Unit = { + out.writeUTF(topic) + out.writeUTF(adminUrl) + out.writeObject(schemaInfo) + out.writeObject(clientConf) + out.writeObject(readerConf) + out.writeInt(pollTimeoutMs) + out.writeBoolean(failOnDataLoss) + out.writeUTF(subscriptionNamePrefix) + + val bytes = startingOffsets.toByteArray + out.writeInt(bytes.length) + out.write(bytes) + if (startingOffsets.isInstanceOf[UserProvidedMessageId]) { + out.writeBoolean(true) + } else { + out.writeBoolean(false) + } + out.writeObject(jsonOptions) + } + + override def readExternal(in: ObjectInput): Unit = { + topic = in.readUTF() + adminUrl = in.readUTF() + schemaInfo = in.readObject().asInstanceOf[SchemaInfoSerializable] + clientConf = in.readObject().asInstanceOf[ju.Map[String, Object]] + readerConf = in.readObject().asInstanceOf[ju.Map[String, Object]] + failOnDataLoss = in.readBoolean() + pollTimeoutMs = in.readInt() + subscriptionNamePrefix = in.readUTF() + + val length = in.readInt() + val bytes = new Array[Byte](length) + in.readFully(bytes) + + val userProvided = in.readBoolean() + startingOffsets = if (userProvided) { + UserProvidedMessageId(MessageId.fromByteArray(bytes)) + } else { + MessageId.fromByteArray(bytes) + } + jsonOptions = in.readObject().asInstanceOf[JSONOptionsInRead] + } +} + +/** + * A per task data reader for continuous pulsar processing. + * + * @param topic + * @param startingOffsets + * @param clientConf + * @param readerConf + */ +class PulsarContinuousTopicReader( + topic: String, + adminUrl: String, + schemaInfo: SchemaInfoSerializable, + startingOffsets: MessageId, + clientConf: ju.Map[String, Object], + readerConf: ju.Map[String, Object], + pollTimeoutMs: Int, + failOnDataLoss: Boolean, + subscriptionNamePrefix: String, + jsonOptions: JSONOptionsInRead) + extends ContinuousPartitionReader[InternalRow] { + + val reportDataLoss = reportDataLossFunc(failOnDataLoss) + + private val deserializer = new PulsarDeserializer(schemaInfo.si, jsonOptions) + private val schema: Schema[_] = SchemaUtils.getPSchema(schemaInfo.si) + private val reader = CachedPulsarClient + .getOrCreate(clientConf) + .newReader(schema) + .subscriptionRolePrefix(subscriptionNamePrefix) + .topic(topic) + .startMessageId(startingOffsets) + .startMessageIdInclusive() + .loadConf(readerConf) + .create() + + var currentMessage: Message[_] = _ + var currentId: MessageId = _ + + if (!startingOffsets.isInstanceOf[UserProvidedMessageId] + && startingOffsets != MessageId.earliest) { + currentMessage = reader.readNext() + currentId = currentMessage.getMessageId + if (startingOffsets != MessageId.earliest && !messageIdRoughEquals( + currentId, + startingOffsets)) { + reportDataLoss( + s"Potential Data Loss: intended to start at $startingOffsets, " + + s"actually we get $currentId") + } + + (startingOffsets, currentId) match { + case (_: BatchMessageIdImpl, _: BatchMessageIdImpl) => + // we seek using a batch message id, we can read next directly in `getNext()` + case (_: MessageIdImpl, cbmid: BatchMessageIdImpl) => + // we seek using a message id, this is supposed to be read by previous task since it's + // inclusive for the last batch (start, end], so we skip this batch + val newStart = + new MessageIdImpl(cbmid.getLedgerId, cbmid.getEntryId + 1, cbmid.getPartitionIndex) + reader.seek(newStart) + case (smid: MessageIdImpl, cmid: MessageIdImpl) => + // current entry is a non-batch entry, we can read next directly in `getNext()` + } + } else if (startingOffsets == MessageId.earliest) { + currentId = MessageId.earliest + } else if (startingOffsets.isInstanceOf[UserProvidedMessageId]) { + val id = startingOffsets.asInstanceOf[UserProvidedMessageId].mid + if (id == MessageId.latest) { + Utils.tryWithResource(AdminUtils.buildAdmin(adminUrl, clientConf)) { admin => + currentId = PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(topic)) + } + } else { + currentId = id + } + } + + // use general `MessageIdImpl` while talking with Spark, + // and internally deal with batchMessageIdImpl and MessageIdImpl + override def getOffset: PartitionOffset = { + PulsarPartitionOffset(topic, PulsarSourceUtils.mid2Impl(currentId)) + } + + override def next(): Boolean = { + currentMessage = reader.readNext() + currentId = currentMessage.getMessageId + true + } + + override def get(): InternalRow = { + deserializer.deserialize(currentMessage) + } + + override def close(): Unit = { + if (reader != null) { + reader.close() + } + } +} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala index 1d28956f..116377cf 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala @@ -15,35 +15,32 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import java.io.Closeable -import java.util.Optional import java.util.concurrent.TimeUnit import java.util.regex.Pattern import scala.annotation.tailrec import scala.collection.mutable import scala.language.postfixOps +import scala.util.control.NonFatal -import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException} -import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient} +import org.apache.pulsar.client.api.{Message, MessageId} +import org.apache.pulsar.client.impl.PulsarClientImpl import org.apache.pulsar.client.impl.schema.BytesSchema +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace import org.apache.pulsar.common.naming.TopicName import org.apache.pulsar.common.schema.SchemaInfo import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles - import org.apache.spark.internal.Logging import org.apache.spark.sql.pulsar.PulsarOptions._ import org.apache.spark.sql.types.StructType /** - * A Helper class that is responsible for interacting with Pulsar to conduct - * subscription management, cursor management, schema and topic metadata lookup etc. - * + * A Helper class that is responsible for interacting with Pulsar to conduct subscription + * management, cursor management, schema and topic metadata lookup etc. */ private[pulsar] case class PulsarHelper( serviceUrl: String, - adminUrl: String, clientConf: ju.Map[String, Object], - adminClientConf: ju.Map[String, Object], driverGroupIdPrefix: String, caseInsensitiveParameters: Map[String, String], allowDifferentTopicSchemas: Boolean, @@ -53,14 +50,13 @@ private[pulsar] case class PulsarHelper( import scala.collection.JavaConverters._ - protected val admin: PulsarAdmin = AdminUtils.buildAdmin(adminUrl, adminClientConf) - protected var client: PulsarClient = CachedPulsarClient.getOrCreate(clientConf) + protected var client: PulsarClientImpl = CachedPulsarClient.getOrCreate(clientConf) private var topics: Seq[String] = _ private var topicPartitions: Seq[String] = _ override def close(): Unit = { - admin.close() + // do nothing } def setupCursor(startingPos: PerTopicOffset): Unit = { @@ -72,29 +68,19 @@ private[pulsar] case class PulsarHelper( } } - private def setupCursorByMid(offset: SpecificPulsarOffset, subscription: Option[String]): Unit = { + private def setupCursorByMid( + offset: SpecificPulsarOffset, + subscription: Option[String]): Unit = { offset.topicOffsets.foreach { case (tp, mid) => val umid = mid.asInstanceOf[UserProvidedMessageId] - val (subscriptionName, subscriptionPredefined) = extractSubscription(subscription, tp) + val (subscriptionName, _) = extractSubscription(subscription, tp) - // setup the subscription - if (!subscriptionPredefined) { - try { - admin.topics().createSubscription(tp, subscriptionName, umid.mid) - } catch { - case _: PulsarAdminException.ConflictException => - // if subscription already exists, log the info and continue to reset cursor - log.info("Subscription already exists...") - case e: Throwable => - throw new RuntimeException( - s"Failed to setup cursor for ${TopicName.get(tp).toString}", - e) - } - } + // establish connection and setup the subscription if needed + val consumer = CachedConsumer.getOrCreate(tp, subscriptionName, client) // reset cursor position log.info(s"Resetting cursor for $subscriptionName to given offset") - admin.topics().resetCursor(tp, subscriptionName, umid.mid) + consumer.seek(umid.mid) } } @@ -107,30 +93,18 @@ private[pulsar] case class PulsarHelper( case _ => throw new RuntimeException(s"Invalid starting time for $tp: $time") } - val (subscriptionNames, subscriptionPredefined) = extractSubscription(subscription, tp) + val (subscriptionNames, _) = extractSubscription(subscription, tp) - // setup the subscription - if (!subscriptionPredefined) { - try { - admin.topics().createSubscription(tp, s"$subscriptionNames", msgID) - } catch { - case _: PulsarAdminException.ConflictException => - // if subscription already exists, log the info and continue to reset cursor - log.info("subscription already exists...") - case e: Throwable => - throw new RuntimeException( - s"Failed to setup cursor for ${TopicName.get(tp).toString}", - e) - } - } + // establish connection and setup the subscription if needed + val consumer = CachedConsumer.getOrCreate(tp, subscriptionNames, client) // reset cursor position log.info(s"Resetting cursor for $subscriptionNames to given timestamp") time match { case PulsarProvider.EARLIEST_TIME | PulsarProvider.LATEST_TIME => - admin.topics().resetCursor(tp, s"$subscriptionNames", msgID) + consumer.seek(msgID) case _ => - admin.topics().resetCursor(tp, s"$subscriptionNames", time) + consumer.seek(time) } } } @@ -148,10 +122,8 @@ private[pulsar] case class PulsarHelper( offset.foreach { case (tp, mid) => try { val (subscription, _) = extractSubscription(predefinedSubscription, tp) - admin.topics().resetCursor(tp, s"$subscription", mid) + CachedConsumer.getOrCreate(tp, subscription, client).seek(mid) } catch { - case e: PulsarAdminException if e.getStatusCode == 404 || e.getStatusCode == 412 => - logInfo(s"Cannot commit cursor since the topic $tp has been deleted during execution.") case e: Throwable => throw new RuntimeException( s"Failed to commit cursor for ${TopicName.get(tp).toString}", @@ -169,11 +141,8 @@ private[pulsar] case class PulsarHelper( // Only delete a subscription if it's not predefined and created by us if (!subscriptionPredefined) { try { - admin.topics().deleteSubscription(tp, s"$subscriptionName") + CachedConsumer.getOrCreate(tp, subscriptionName, client).unsubscribe() } catch { - case e: PulsarAdminException if e.getStatusCode == 404 => - logInfo( - s"Cannot remove cursor since the topic $tp has been deleted during execution.") case e: Throwable => throw new RuntimeException( s"Failed to remove cursor for ${TopicName.get(tp).toString}", @@ -184,7 +153,7 @@ private[pulsar] case class PulsarHelper( } def getAndCheckCompatible(schema: Option[StructType]): StructType = { - val si = getPulsarSchema() + val si = getPulsarSchema val inferredSchema = SchemaUtils.pulsarSourceSchema(si) require( schema.isEmpty || inferredSchema == schema.get, @@ -194,37 +163,36 @@ private[pulsar] case class PulsarHelper( def getPulsarSchema(): SchemaInfo = { getTopics() - if (allowDifferentTopicSchemas) { - SchemaUtils.emptySchemaInfo() - } else { - if (topics.nonEmpty) { - val schemas = topics.map { tp => - getPulsarSchema(tp) - } - val sset = schemas.toSet - if (sset.size != 1) { - throw new IllegalArgumentException( - "Topics to read must share identical schema. Consider setting " + - s"'$AllowDifferentTopicSchemas' to 'false' to read topics with empty " + - s"schemas instead. We got ${sset.size} distinct " + - s"schemas:[${sset.mkString(", ")}]") + allowDifferentTopicSchemas match { + case false => + if (topics.size > 0) { + val schemas = topics.map { tp => + getPulsarSchema(tp) + } + val sset = schemas.toSet + if (sset.size != 1) { + throw new IllegalArgumentException( + "Topics to read must share identical schema. Consider setting " + + s"'$AllowDifferentTopicSchemas' to 'false' to read topics with empty " + + s"schemas instead. We got ${sset.size} distinct " + + s"schemas:[${sset.mkString(", ")}]") + } else { + sset.head + } } else { - sset.head + // if no topic exists, and we are getting schema, + // then auto created topic has schema of None + SchemaUtils.emptySchemaInfo() } - } else { - // if no topic exists, and we are getting schema, - // then auto created topic has schema of None - SchemaUtils.emptySchemaInfo() - } + case true => SchemaUtils.emptySchemaInfo() } } private def getPulsarSchema(topic: String): SchemaInfo = { try { - admin.schemas().getSchemaInfo(TopicName.get(topic).toString) + client.getSchema(topic).get().get() } catch { - case e: PulsarAdminException if e.getStatusCode == 404 => - return BytesSchema.of().getSchemaInfo + case e: NoSuchElementException => BytesSchema.of().getSchemaInfo case e: Throwable => throw new RuntimeException( s"Failed to get schema information for ${TopicName.get(topic).toString}", @@ -233,32 +201,17 @@ private[pulsar] case class PulsarHelper( } def fetchLatestOffsets(): SpecificPulsarOffset = { - getTopicPartitions() + getTopicPartitions SpecificPulsarOffset(topicPartitions.map { tp => - (tp -> { - val messageId = - try { - admin.topics().getLastMessageId(tp) - } catch { - case e: PulsarAdminException if e.getStatusCode == 404 => - MessageId.earliest - case e: Throwable => - throw new RuntimeException( - s"Failed to get last messageId for ${TopicName.get(tp).toString}", - e) - } - PulsarSourceUtils.seekableLatestMid(messageId) - }) + (tp -> fetchLatestOffsetForTopic(tp)) }.toMap) } def fetchLatestOffsetForTopic(topic: String): MessageId = { val messageId = try { - admin.topics().getLastMessageId(topic) + getLastMessageId(topic) } catch { - case e: PulsarAdminException if e.getStatusCode == 404 => - MessageId.earliest case e: Throwable => throw new RuntimeException( s"Failed to get last messageId for ${TopicName.get(topic).toString}", @@ -292,33 +245,33 @@ private[pulsar] case class PulsarHelper( waitForTopicIfNeeded() } - private def getTopicPartitions(): Seq[String] = { + private def getTopicPartitions: Seq[String] = { getTopics() topicPartitions = topics.flatMap { tp => - val partNum = admin.topics().getPartitionedTopicMetadata(tp).partitions - if (partNum == 0) { - tp :: Nil - } else { - (0 until partNum).map(tp + PulsarOptions.PartitionSuffix + _) - } + client.getPartitionsForTopic(tp).get().asScala.map(_.toString) } topicPartitions } private def getTopics(topicsPattern: String): Seq[String] = { val dest = TopicName.get(topicsPattern) - val allNonPartitionedTopics: ju.List[String] = - admin - .topics() - .getList(dest.getNamespace) - .asScala - .filter(t => !TopicName.get(t).isPartitioned) - .asJava + val allTopics: ju.List[String] = client.getLookup + .getTopicsUnderNamespace(dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL) + .get() + + val allNonPartitionedTopics: ju.List[String] = allTopics.asScala + .filter(t => !TopicName.get(t).isPartitioned) + .asJava val nonPartitionedMatch = topicsPatternFilter(allNonPartitionedTopics, dest.toString) - val allPartitionedTopics: ju.List[String] = - admin.topics().getPartitionedTopicList(dest.getNamespace) + val allPartitionedTopics: ju.List[String] = allTopics.asScala + .filter(t => TopicName.get(t).isPartitioned) + .map(TopicName.get(_).getPartitionedTopicName) // trim partition suffix + .toSet // deduplicate topics + .toSeq + .asJava val partitionedMatch = topicsPatternFilter(allPartitionedTopics, dest.toString) + nonPartitionedMatch ++ partitionedMatch } @@ -339,10 +292,10 @@ private[pulsar] case class PulsarHelper( while (waitList.nonEmpty) { val topic = waitList.head try { - admin.topics().getPartitionedTopicMetadata(topic) + client.getPartitionedTopicMetadata(topic).get() waitList -= topic } catch { - case _: PulsarAdminException => + case NonFatal(_) => logInfo(s"The desired $topic doesn't existed, wait for 5 seconds.") Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS) } @@ -354,7 +307,7 @@ private[pulsar] case class PulsarHelper( params: Map[String, String], defaultOffsets: PulsarOffset, optionKey: String): PerTopicOffset = { - getTopicPartitions() + getTopicPartitions val offset = PulsarProvider.getPulsarOffset(params, defaultOffsets, optionKey) offset match { @@ -409,7 +362,7 @@ private[pulsar] case class PulsarHelper( offsetOptionKey: String, defaultOffsets: PulsarOffset): SpecificPulsarOffset = { - getTopicPartitions() + getTopicPartitions val offset = PulsarProvider.getPulsarOffset(params, offsetOptionKey, defaultOffsets) offset match { case LatestOffset => @@ -457,8 +410,7 @@ private[pulsar] case class PulsarHelper( if (time == PulsarProvider.EARLIEST_TIME) { UserProvidedMessageId(MessageId.earliest) } else if (time == PulsarProvider.LATEST_TIME) { - UserProvidedMessageId( - PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(tp))) + UserProvidedMessageId(PulsarSourceUtils.seekableLatestMid(getLastMessageId(tp))) } else { assert(time > 0, s"time less than 0: $time") val reader = client @@ -508,11 +460,6 @@ private[pulsar] case class PulsarHelper( } } - def getMetrics(): PulsarMetrics = { - getTopics() - new PulsarMetrics(admin, topics, driverGroupIdPrefix) - } - @tailrec private def fetchOffsetForTopic( poolTimeoutMs: Int, @@ -529,8 +476,7 @@ private[pulsar] case class PulsarHelper( case MessageId.earliest => UserProvidedMessageId(off) case MessageId.latest => - UserProvidedMessageId( - PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(tp))) + UserProvidedMessageId(PulsarSourceUtils.seekableLatestMid(getLastMessageId(tp))) case _ => val reader = client .newReader() @@ -550,4 +496,9 @@ private[pulsar] case class PulsarHelper( } } } + + private def getLastMessageId(topic: String): MessageId = { + val (subscriptionName, _) = extractSubscription(predefinedSubscription, topic) + CachedConsumer.getOrCreate(topic, subscriptionName, client).getLastMessageId + } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetrics.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetrics.scala deleted file mode 100644 index 85ff2451..00000000 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetrics.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.pulsar - -import java.util - -import scala.collection.JavaConverters._ - -import com.codahale.metrics.{Gauge, Metric, MetricRegistry, MetricSet} -import com.google.common.collect.ImmutableMap -import org.apache.pulsar.client.admin.PulsarAdmin -import org.apache.pulsar.common.policies.data.SubscriptionStats - -import org.apache.spark.metrics.source.Source - -/** - * The metrics for exposing Pulsar's metrics to the Spark metric system. - */ -private class PulsarMetrics(admin: PulsarAdmin, topics: Seq[String], subscriptionPrefix: String) - extends Source { - - override val sourceName = "pulsar" - override val metricRegistry = new MetricRegistry - - topics.foreach(topic => registerSubscription(topic)) - - private def registerSubscription(topic: String): Unit = { - metricRegistry.registerAll( - subscriptionPrefix, - new MetricSet { - override def getMetrics: util.Map[String, Metric] = { - subscriptionStats(topic, subscriptionPrefix) match { - case None => new util.HashMap() - case Some(stats) => - ImmutableMap - .builder[String, Metric]() - .put( - "msgRateOut", - new Gauge[Double] { - override def getValue: Double = stats.getMsgRateOut - }) - .put( - "msgThroughputOut", - new Gauge[Double] { - override def getValue: Double = stats.getMsgThroughputOut - }) - .put( - "bytesOutCounter", - new Gauge[Long] { - override def getValue: Long = stats.getBytesOutCounter - }) - .put( - "msgOutCounter", - new Gauge[Long] { - override def getValue: Long = stats.getMsgOutCounter - }) - .put( - "msgRateRedeliver", - new Gauge[Double] { - override def getValue: Double = stats.getMsgRateRedeliver - }) - .put( - "messageAckRate", - new Gauge[Double] { - override def getValue: Double = stats.getMessageAckRate - }) - .put( - "chunkedMessageRate", - new Gauge[Int] { - override def getValue: Int = stats.getChunkedMessageRate - }) - .put( - "msgBacklog", - new Gauge[Long] { - override def getValue: Long = stats.getMsgBacklog - }) - .put( - "backlogSize", - new Gauge[Long] { - override def getValue: Long = stats.getBacklogSize - }) - .put( - "earliestMsgPublishTimeInBacklog", - new Gauge[Long] { - override def getValue: Long = stats.getEarliestMsgPublishTimeInBacklog - }) - .put( - "msgBacklogNoDelayed", - new Gauge[Long] { - override def getValue: Long = stats.getMsgBacklogNoDelayed - }) - .put( - "msgDelayed", - new Gauge[Long] { - override def getValue: Long = stats.getMsgDelayed - }) - .put( - "unackedMessages", - new Gauge[Long] { - override def getValue: Long = stats.getUnackedMessages - }) - .put( - "msgRateExpired", - new Gauge[Double] { - override def getValue: Double = stats.getMsgRateExpired - }) - .put( - "totalMsgExpired", - new Gauge[Long] { - override def getValue: Long = stats.getTotalMsgExpired - }) - .put( - "lastExpireTimestamp", - new Gauge[Long] { - override def getValue: Long = stats.getLastExpireTimestamp - }) - .put( - "lastConsumedFlowTimestamp", - new Gauge[Long] { - override def getValue: Long = stats.getLastConsumedFlowTimestamp - }) - .put( - "lastConsumedTimestamp", - new Gauge[Long] { - override def getValue: Long = stats.getLastConsumedTimestamp - }) - .put( - "lastAckedTimestamp", - new Gauge[Long] { - override def getValue: Long = stats.getLastAckedTimestamp - }) - .put( - "lastMarkDeleteAdvancedTimestamp", - new Gauge[Long] { - override def getValue: Long = stats.getLastMarkDeleteAdvancedTimestamp - }) - .build() - } - } - }) - } - - private def subscriptionStats( - topic: String, - subscriptionPrefix: String): Option[SubscriptionStats] = { - val subscriptions = admin.topics().getStats(topic).getSubscriptions.asScala - subscriptions.keys.find(sub => sub.startsWith(subscriptionPrefix)) match { - case Some(sub) => subscriptions.get(sub) - case _ => None - } - } -} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala new file mode 100644 index 00000000..725619c5 --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala @@ -0,0 +1,292 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.pulsar + +import java.{util => ju} +import java.util.concurrent.TimeUnit + +import org.apache.pulsar.client.api.{Message, MessageId, Schema} +import org.apache.pulsar.client.impl.{BatchMessageIdImpl, MessageIdImpl} +import org.apache.pulsar.common.schema.SchemaInfo + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.JSONOptionsInRead +import org.apache.spark.sql.connector.read.{ + InputPartition, + PartitionReader, + PartitionReaderFactory +} +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} +import org.apache.spark.sql.execution.streaming.{Offset => eOffset} + +private[pulsar] class PulsarMicroBatchReader( + pulsarHelper: PulsarHelper, + clientConf: ju.Map[String, Object], + readerConf: ju.Map[String, Object], + metadataPath: String, + startingOffsets: PerTopicOffset, + pollTimeoutMs: Int, + failOnDataLoss: Boolean, + subscriptionNamePrefix: String, + jsonOptions: JSONOptionsInRead) + extends MicroBatchStream + with Logging { + + import PulsarSourceUtils._ + + private var startTopicOffsets: Map[String, MessageId] = _ + private var endTopicOffsets: Map[String, MessageId] = _ + private var stopped = false + + val reportDataLoss = reportDataLossFunc(failOnDataLoss) + + private lazy val initialTopicOffsets: SpecificPulsarOffset = { + val metadataLog = + new PulsarSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) + metadataLog.getInitialOffset(pulsarHelper, startingOffsets, pollTimeoutMs, reportDataLoss) + } + + override def deserializeOffset(json: String): Offset = { + SpecificPulsarOffset(JsonUtils.topicOffsets(json)) + } + + lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema() + + override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = { + val newPartitions = endTopicOffsets.keySet.diff(startTopicOffsets.keySet) + val newPartitionInitialOffsets = pulsarHelper.fetchEarliestOffsets(newPartitions.toSeq) + logInfo(s"Topics added: $newPartitions") + + val deletedPartitions = startTopicOffsets.keySet.diff(endTopicOffsets.keySet) + if (deletedPartitions.nonEmpty) { + reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") + } + + val newStartsOffsets = startTopicOffsets ++ newPartitionInitialOffsets + + val offsetRanges = endTopicOffsets.keySet + .map { tp => + val fromOffset = newStartsOffsets.getOrElse( + tp, { + // this shouldn't happen + throw new IllegalStateException(s"$tp doesn't have a start offset") + }) + val untilOffset = endTopicOffsets(tp) + val sortedExecutors = getSortedExecutorList() + val numExecutors = sortedExecutors.length + val preferredLoc = if (numExecutors > 0) { + // This allows cached PulsarClient in the executors to be re-used to read the same + // partition in every batch. + Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors))) + } else None + PulsarOffsetRange(tp, fromOffset, untilOffset, preferredLoc) + } + .filter { range => + if (range.untilOffset.compareTo(range.fromOffset) < 0 && + range.fromOffset.asInstanceOf[MessageIdImpl] != MessageId.latest) { + reportDataLoss( + s"${range.topic}'s offset was changed " + + s"from ${range.fromOffset} to ${range.untilOffset}, " + + "some data might has been missed") + false + } else if (range.untilOffset.compareTo(range.fromOffset) < 0 && + range.fromOffset.asInstanceOf[MessageIdImpl] == MessageId.latest) { + false + } else { + true + } + } + .toSeq + + offsetRanges.map { range => + new PulsarMicroBatchInputPartition( + range, + new SchemaInfoSerializable(pulsarSchema), + clientConf, + readerConf, + pollTimeoutMs, + failOnDataLoss, + subscriptionNamePrefix, + jsonOptions).asInstanceOf[InputPartition] + }.toArray + } + + override def stop(): Unit = synchronized { + if (!stopped) { + pulsarHelper.removeCursor() + pulsarHelper.close() + stopped = true + } + } + + override def createReaderFactory(): PartitionReaderFactory = new PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val range = partition.asInstanceOf[PulsarMicroBatchInputPartition].range + val start = range.fromOffset + val end = range.untilOffset + + if (start == end || !messageExists(end)) { + return PulsarMicroBatchEmptyInputPartitionReader + } + new PulsarMicroBatchInputPartitionReader( + range, + new SchemaInfoSerializable(pulsarSchema), + clientConf, + readerConf, + pollTimeoutMs, + failOnDataLoss, + subscriptionNamePrefix, + jsonOptions) + } + } + + override def initialOffset(): Offset = SpecificPulsarOffset(startTopicOffsets) + + override def latestOffset(): Offset = SpecificPulsarOffset(endTopicOffsets) + + override def commit(end: Offset): Unit = { + val endTopicOffsets = SpecificPulsarOffset.getTopicOffsets(end.asInstanceOf[eOffset]) + pulsarHelper.commitCursorToOffset(endTopicOffsets) + } +} + +case class PulsarMicroBatchInputPartition( + range: PulsarOffsetRange, + pulsarSchema: SchemaInfoSerializable, + clientConf: ju.Map[String, Object], + readerConf: ju.Map[String, Object], + pollTimeoutMs: Int, + failOnDataLoss: Boolean, + subscriptionNamePrefix: String, + jsonOptions: JSONOptionsInRead) + extends InputPartition { + override def preferredLocations(): Array[String] = range.preferredLoc.toArray +} + +object PulsarMicroBatchEmptyInputPartitionReader + extends PartitionReader[InternalRow] + with Logging { + + override def next(): Boolean = false + override def get(): InternalRow = null + override def close(): Unit = {} +} + +case class PulsarMicroBatchInputPartitionReader( + range: PulsarOffsetRange, + pulsarSchema: SchemaInfoSerializable, + clientConf: ju.Map[String, Object], + readerConf: ju.Map[String, Object], + pollTimeoutMs: Int, + failOnDataLoss: Boolean, + subscriptionNamePrefix: String, + jsonOptions: JSONOptionsInRead) + extends PartitionReader[InternalRow] + with Logging { + + import PulsarSourceUtils._ + + val tp = range.topic + val start = range.fromOffset + val end = range.untilOffset + + val reportDataLoss = reportDataLossFunc(failOnDataLoss) + + private val deserializer = new PulsarDeserializer(pulsarSchema.si, jsonOptions) + private val schema: Schema[_] = SchemaUtils.getPSchema(pulsarSchema.si) + val reader = CachedPulsarClient + .getOrCreate(clientConf) + .newReader(schema) + .subscriptionRolePrefix(subscriptionNamePrefix) + .startMessageId(start) + .startMessageIdInclusive() + .topic(tp) + .loadConf(readerConf) + .create() + + private var inEnd: Boolean = false + private var isLast: Boolean = false + private val enterEndFunc: (MessageId => Boolean) = enteredEnd(end) + + private var nextRow: InternalRow = _ + private var nextMessage: Message[_] = _ + private var nextId: MessageId = _ + + if (!start.isInstanceOf[UserProvidedMessageId] && start != MessageId.earliest) { + nextMessage = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS) + if (nextMessage == null) { + isLast = true + reportDataLoss(s"Cannot read data at offset $start from topic: $tp") + } else { + nextId = nextMessage.getMessageId + if (start != MessageId.earliest && !messageIdRoughEquals(nextId, start)) { + reportDataLoss( + s"Potential Data Loss in reading $tp: intended to start at $start, " + + s"actually we get $nextId") + } + + (start, nextId) match { + case (_: BatchMessageIdImpl, _: BatchMessageIdImpl) => + // we seek using a batch message id, we can read next directly in `getNext()` + case (_: MessageIdImpl, cbmid: BatchMessageIdImpl) => + // we seek using a message id, this is supposed to be read by previous task since it's + // inclusive for the last batch (start, end], so we skip this batch + val newStart = + new MessageIdImpl(cbmid.getLedgerId, cbmid.getEntryId + 1, cbmid.getPartitionIndex) + reader.seek(newStart) + case (smid: MessageIdImpl, cmid: MessageIdImpl) => + // current entry is a non-batch entry, we can read next directly in `getNext()` + } + } + } else { + nextId = start + } + + override def next(): Boolean = { + if (isLast) { + return false + } + + nextMessage = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS) + + if (nextMessage == null) { + // Losing some data. Skip the rest offsets in this partition. + reportDataLoss( + s"we didn't get enough messages as promised from topic $tp, data loss occurs") + return false + } + + nextId = nextMessage.getMessageId + + nextRow = deserializer.deserialize(nextMessage) + + inEnd = enterEndFunc(nextId) + if (inEnd) { + isLast = isLastMessage(nextId) + } + + true + } + + override def get(): InternalRow = { + assert(nextRow != null) + nextRow + } + + override def close(): Unit = { + reader.close() + } +} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOffset.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOffset.scala index aa9534d7..0f664e6a 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOffset.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOffset.scala @@ -33,7 +33,7 @@ private[pulsar] case class SpecificPulsarOffset(topicOffsets: Map[String, Messag extends Offset with PerTopicOffset { - override def json(): String = JsonUtils.topicOffsets(topicOffsets) + override val json = JsonUtils.topicOffsets(topicOffsets) } private[pulsar] case class SpecificPulsarTime(topicTimes: Map[String, Long]) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala index 5f8d2229..3d89fb83 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala @@ -53,6 +53,9 @@ private[pulsar] object PulsarOptions { val AuthPluginClassName: String = "authPluginClassName" val AuthParams: String = "authParams" + val TlsTrustCertsFilePath: String = "tlsTrustCertsFilePath" + val TlsAllowInsecureConnection: String = "tlsAllowInsecureConnection" + val TlsHostnameVerificationEnable: String = "tlsHostnameVerificationEnable" val AllowDifferentTopicSchemas: String = "allowDifferentTopicSchemas".toLowerCase(Locale.ROOT) val WaitingForNonExistedTopic: String = "waitingForNonExistedTopic".toLowerCase(Locale.ROOT) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala index 5ef09b02..ca783d29 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio import org.apache.spark.sql.catalyst.json.JSONOptionsInRead import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.streaming.{Sink, Source} -import org.apache.spark.sql.pulsar.PulsarConfigurationUtils._ import org.apache.spark.sql.pulsar.PulsarSourceUtils.reportDataLossFunc import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode @@ -57,22 +56,22 @@ private[pulsar] class PulsarProvider parameters: Map[String, String]): (String, StructType) = { val caseInsensitiveParams = validateStreamOptions(parameters) - val (clientConfig, _, adminClientConfig, serviceUrlConfig, adminUrlConfig) = - prepareConfForReader(parameters) + val (clientConfig, _, serviceUrlConfig) = prepareConfForReader(parameters) val subscriptionNamePrefix = s"spark-pulsar-${UUID.randomUUID}" val inferredSchema = Utils.tryWithResource( PulsarHelper( serviceUrlConfig, - adminUrlConfig, clientConfig, - adminClientConfig, subscriptionNamePrefix, caseInsensitiveParams, getAllowDifferentTopicSchemas(parameters), getPredefinedSubscription(parameters))) { pulsarHelper => pulsarHelper.getAndCheckCompatible(schema) } + + logInfo(s"Schema of Pulsar source: $inferredSchema") + (shortName(), inferredSchema) } @@ -82,22 +81,24 @@ private[pulsar] class PulsarProvider schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { + logDebug(s"Creating Pulsar source: $parameters") + val caseInsensitiveParams = validateStreamOptions(parameters) - val (clientConfig, readerConfig, adminClientConfig, serviceUrl, adminUrl) = - prepareConfForReader(parameters) + val (clientConfig, readerConfig, serviceUrl) = prepareConfForReader(parameters) + logDebug( + s"Client config: $clientConfig; Reader config: $readerConfig; Service URL: $serviceUrl") val subscriptionNamePrefix = getSubscriptionPrefix(parameters) val pulsarHelper = PulsarHelper( serviceUrl, - adminUrl, clientConfig, - adminClientConfig, subscriptionNamePrefix, caseInsensitiveParams, getAllowDifferentTopicSchemas(parameters), getPredefinedSubscription(parameters)) - pulsarHelper.getAndCheckCompatible(schema) + val pSchema = pulsarHelper.getAndCheckCompatible(schema) + logDebug(s"Schema from Spark: $schema; Schema from Pulsar: ${pSchema}") // start from latest offset if not specified to be consistent with Pulsar source val offset = @@ -124,14 +125,11 @@ private[pulsar] class PulsarProvider val subscriptionNamePrefix = getSubscriptionPrefix(parameters, isBatch = true) - val (clientConfig, readerConfig, adminClientConfig, serviceUrl, adminUrl) = - prepareConfForReader(parameters) + val (clientConfig, readerConfig, serviceUrl) = prepareConfForReader(parameters) val (start, end, schema, pSchema) = Utils.tryWithResource( PulsarHelper( serviceUrl, - adminUrl, clientConfig, - adminClientConfig, subscriptionNamePrefix, caseInsensitiveParams, getAllowDifferentTopicSchemas(parameters), @@ -160,7 +158,6 @@ private[pulsar] class PulsarProvider sqlContext, schema, new SchemaInfoSerializable(pSchema), - adminUrl, clientConfig, readerConfig, start, @@ -204,14 +201,10 @@ private[pulsar] class PulsarProvider */ new BaseRelation { override def sqlContext: SQLContext = unsupportedException - // FIXME: integration with pulsar schema override def schema: StructType = unsupportedException - override def needConversion: Boolean = unsupportedException - override def sizeInBytes: Long = unsupportedException - override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException private def unsupportedException = @@ -236,7 +229,7 @@ private[pulsar] class PulsarProvider } private[pulsar] object PulsarProvider extends Logging { - + import PulsarConfigurationUtils._ import PulsarOptions._ val LATEST_TIME = -2L @@ -245,7 +238,9 @@ private[pulsar] object PulsarProvider extends Logging { private def getClientParams(parameters: Map[String, String]): Map[String, String] = { val lowercaseKeyMap = parameters.keySet .filter(_.startsWith(PulsarClientOptionKeyPrefix)) - .map { k => k.drop(PulsarClientOptionKeyPrefix.length) -> parameters(k) } + .map { k => + k.drop(PulsarClientOptionKeyPrefix.length).toString -> parameters(k) + } .toMap lowercaseKeyMap.map { case (k, v) => clientConfKeys.getOrElse( @@ -262,10 +257,6 @@ private[pulsar] object PulsarProvider extends Logging { getModuleParams(parameters, PulsarReaderOptionKeyPrefix, readerConfKeys) } - private def getAdminParams(parameters: Map[String, String]): Map[String, String] = { - getModuleParams(parameters, PulsarAdminOptionKeyPrefix, clientConfKeys) - } - private def getModuleParams( connectorConfiguration: Map[String, String], modulePrefix: String, @@ -283,10 +274,6 @@ private[pulsar] object PulsarProvider extends Logging { } } - private def hasAdminParams(parameters: Map[String, String]): Boolean = { - getAdminParams(parameters).nonEmpty - } - def getPulsarOffset( params: Map[String, String], defaultOffsets: PulsarOffset, @@ -404,10 +391,6 @@ private[pulsar] object PulsarProvider extends Logging { throw new IllegalArgumentException(s"$ServiceUrlOptionKey must be specified") } - if (!caseInsensitiveParams.contains(AdminUrlOptionKey)) { - throw new IllegalArgumentException(s"$AdminUrlOptionKey must be specified") - } - // validate topic options val topicOptions = caseInsensitiveParams.filter { case (k, _) => TopicOptionKeys.contains(k) @@ -511,29 +494,18 @@ private[pulsar] object PulsarProvider extends Logging { caseInsensitiveParams } - private def prepareConfForReader(parameters: Map[String, String]): ( - ju.Map[String, Object], - ju.Map[String, Object], - ju.Map[String, Object], - String, - String) = { + private def prepareConfForReader(parameters: Map[String, String]) + : (ju.Map[String, Object], ju.Map[String, Object], String) = { val serviceUrl = getServiceUrl(parameters) - val adminUrl = getAdminUrl(parameters) - var clientParams = getClientParams(parameters) clientParams += (ServiceUrlOptionKey -> serviceUrl) val readerParams = getReaderParams(parameters) - val adminParams = Option(getAdminParams(parameters)) - .filter(_.nonEmpty) - .getOrElse(clientParams) ( paramsToPulsarConf("pulsar.client", clientParams), paramsToPulsarConf("pulsar.reader", readerParams), - paramsToPulsarConf("pulsar.admin", adminParams), - serviceUrl, - adminUrl) + serviceUrl) } private def prepareConfForProducer(parameters: Map[String, String]) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarRelation.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarRelation.scala index 79593695..e5bffa77 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarRelation.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarRelation.scala @@ -29,7 +29,6 @@ private[pulsar] class PulsarRelation( override val sqlContext: SQLContext, override val schema: StructType, schemaInfo: SchemaInfoSerializable, - adminUrl: String, clientConf: ju.Map[String, Object], readerConf: ju.Map[String, Object], startingOffset: SpecificPulsarOffset, @@ -85,7 +84,6 @@ private[pulsar] class PulsarRelation( val rdd = new PulsarSourceRDD4Batch( sqlContext.sparkContext, schemaInfo, - adminUrl, clientConf, readerConf, offsetRanges, diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala index 936a0f6a..cbff38d1 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala @@ -16,10 +16,11 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import java.util.concurrent.TimeUnit -import org.apache.pulsar.client.api.{Producer, Schema} +import scala.util.control.NonFatal +import org.apache.pulsar.client.api.{Producer, PulsarClientException, Schema} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SparkSession} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.execution.QueryExecution @@ -161,15 +162,28 @@ private[pulsar] object PulsarSinks extends Logging { topic: String, schema: Schema[T]): Producer[T] = { - CachedPulsarClient - .getOrCreate(clientConf) - .newProducer(schema) - .topic(topic) - .loadConf(producerConf) - .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) - // maximizing the throughput - .batchingMaxMessages(5 * 1024 * 1024) - .create() + try { + CachedPulsarClient + .getOrCreate(clientConf) + .newProducer(schema) + .topic(topic) + .loadConf(producerConf) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + // maximizing the throughput + .batchingMaxMessages(5 * 1024 * 1024) + .create() + } catch { + case e: PulsarClientException.IncompatibleSchemaException => + throw new AnalysisException( + s"Cannot write incompatible data to topic $topic. " + + s"Details: ${e.getMessage}", + cause = Some(e)) + case NonFatal(e) => + throw new AnalysisException( + s"Cannot create pulsar producer for topic $topic. " + + s"Details: ${e.getMessage}", + cause = Some(e)) + } } def toStructType(attrs: Seq[Attribute]): StructType = { diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala index 8fa55a3b..851ddca7 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala @@ -19,7 +19,6 @@ import org.apache.pulsar.client.api.MessageId import org.apache.pulsar.client.impl.MessageIdImpl import org.apache.pulsar.common.schema.SchemaInfo -import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow @@ -28,16 +27,16 @@ import org.apache.spark.sql.execution.streaming.{Offset, Source} import org.apache.spark.sql.types.StructType private[pulsar] class PulsarSource( - sqlContext: SQLContext, - pulsarHelper: PulsarHelper, - clientConf: ju.Map[String, Object], - readerConf: ju.Map[String, Object], - metadataPath: String, - startingOffsets: PerTopicOffset, - pollTimeoutMs: Int, - failOnDataLoss: Boolean, - subscriptionNamePrefix: String, - jsonOptions: JSONOptionsInRead) + sqlContext: SQLContext, + pulsarHelper: PulsarHelper, + clientConf: ju.Map[String, Object], + readerConf: ju.Map[String, Object], + metadataPath: String, + startingOffsets: PerTopicOffset, + pollTimeoutMs: Int, + failOnDataLoss: Boolean, + subscriptionNamePrefix: String, + jsonOptions: JSONOptionsInRead) extends Source with Logging { @@ -45,7 +44,7 @@ private[pulsar] class PulsarSource( private val sc = sqlContext.sparkContext - private val reportDataLoss = reportDataLossFunc(failOnDataLoss) + val reportDataLoss = reportDataLossFunc(failOnDataLoss) private var stopped = false private lazy val initialTopicOffsets: SpecificPulsarOffset = { @@ -55,7 +54,7 @@ private[pulsar] class PulsarSource( private var currentTopicOffsets: Option[Map[String, MessageId]] = None - private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema() + private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema) @@ -82,7 +81,7 @@ private[pulsar] class PulsarSource( if (start.isDefined && start.get == end) { return sqlContext.internalCreateDataFrame( sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), - schema(), + schema, isStreaming = true) } @@ -153,13 +152,6 @@ private[pulsar] class PulsarSource( "GetBatch generating RDD of offset range: " + offsetRanges.sortBy(_.topic).mkString(", ")) - // Register the monitor metrics here. - val env = SparkEnv.get - if (env != null) { - val metrics = pulsarHelper.getMetrics() - env.metricsSystem.registerSource(metrics) - } - sqlContext.internalCreateDataFrame(rdd.setName("pulsar"), schema(), isStreaming = true) } @@ -174,5 +166,6 @@ private[pulsar] class PulsarSource( pulsarHelper.close() stopped = true } + } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala index d6704488..5f5f2b17 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala @@ -14,11 +14,9 @@ package org.apache.spark.sql.pulsar import java.{util => ju} -import java.util.UUID import java.util.concurrent.TimeUnit -import org.apache.pulsar.client.admin.PulsarAdmin -import org.apache.pulsar.client.api.{Message, MessageId, Schema, SubscriptionType} +import org.apache.pulsar.client.api.{Message, MessageId, PulsarClientException, Schema} import org.apache.pulsar.client.impl.{BatchMessageIdImpl, MessageIdImpl} import org.apache.spark.{Partition, SparkContext, TaskContext} @@ -79,59 +77,79 @@ private[pulsar] abstract class PulsarSourceRDDBase( var currentMessage: Message[_] = _ var currentId: MessageId = _ - if (!startOffset.isInstanceOf[UserProvidedMessageId] && startOffset != MessageId.earliest) { - currentMessage = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS) - if (currentMessage == null) { - isLast = true - reportDataLoss(s"cannot read data at $startOffset from topic $topic") - } else { - currentId = currentMessage.getMessageId - if (startOffset != MessageId.earliest && !messageIdRoughEquals( - currentId, - startOffset)) { - reportDataLoss( - s"Potential Data Loss: intended to start at $startOffset, " + - s"actually we get $currentId") - } - - (startOffset, currentId) match { - case (_: BatchMessageIdImpl, _: BatchMessageIdImpl) => - // we seek using a batch message id, we can read next directly in `getNext()` - case (_: MessageIdImpl, cbmid: BatchMessageIdImpl) => - // we seek using a message id, this is supposed to be read by previous task since it's - // inclusive for the last batch (start, end], so we skip this batch - val newStart = new MessageIdImpl( - cbmid.getLedgerId, - cbmid.getEntryId + 1, - cbmid.getPartitionIndex) - reader.seek(newStart) - case (smid: MessageIdImpl, cmid: MessageIdImpl) => - // current entry is a non-batch entry, we can read next directly in `getNext()` + try { + if (!startOffset + .isInstanceOf[UserProvidedMessageId] && startOffset != MessageId.earliest) { + reader.seek(startOffset) + currentMessage = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS) + if (currentMessage == null) { + isLast = true + reportDataLoss(s"cannot read data at $startOffset from topic $topic") + } else { + currentId = currentMessage.getMessageId + if (startOffset != MessageId.earliest && !messageIdRoughEquals( + currentId, + startOffset)) { + reportDataLoss( + s"Potential Data Loss: intended to start at $startOffset, " + + s"actually we get $currentId") + } + + (startOffset, currentId) match { + case (_: BatchMessageIdImpl, _: BatchMessageIdImpl) => + // we seek using a batch message id, we can read next directly in `getNext()` + case (_: MessageIdImpl, cbmid: BatchMessageIdImpl) => + // we seek using a message id, this is supposed to be read by previous task since it's + // inclusive for the last batch (start, end], so we skip this batch + val newStart = new MessageIdImpl( + cbmid.getLedgerId, + cbmid.getEntryId + 1, + cbmid.getPartitionIndex) + reader.seek(newStart) + case (smid: MessageIdImpl, cmid: MessageIdImpl) => + // current entry is a non-batch entry, we can read next directly in `getNext()` + } } } + } catch { + case e: PulsarClientException => + logError(s"PulsarClient failed to read message from topic $topic", e) + close() + throw e + case e: Throwable => + throw e } override protected def getNext(): InternalRow = { - if (isLast) { - finished = true - return null - } - currentMessage = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS) - if (currentMessage == null) { - reportDataLoss( - s"We didn't get enough message as promised from topic $topic, data loss occurs") - finished = true - return null - } + try { + if (isLast) { + finished = true + return null + } + currentMessage = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS) + if (currentMessage == null) { + reportDataLoss( + s"We didn't get enough message as promised from topic $topic, data loss occurs") + finished = true + return null + } - currentId = currentMessage.getMessageId + currentId = currentMessage.getMessageId - finished = false - inEnd = enterEndFunc(currentId) - if (inEnd) { - isLast = isLastMessage(currentId) + finished = false + inEnd = enterEndFunc(currentId) + if (inEnd) { + isLast = isLastMessage(currentId) + } + deserializer.deserialize(currentMessage) + } catch { + case e: PulsarClientException => + logError(s"PulsarClient failed to read message from topic $topic", e) + close() + throw e + case e: Throwable => + throw e } - deserializer.deserialize(currentMessage) } override protected def close(): Unit = { @@ -185,7 +203,6 @@ private[pulsar] class PulsarSourceRDD( private[pulsar] class PulsarSourceRDD4Batch( sc: SparkContext, schemaInfo: SchemaInfoSerializable, - adminUrl: String, clientConf: ju.Map[String, Object], readerConf: ju.Map[String, Object], offsetRanges: Seq[PulsarOffsetRange], @@ -209,13 +226,7 @@ private[pulsar] class PulsarSourceRDD4Batch( val part = split.asInstanceOf[PulsarSourceRDDPartition] val tp = part.offsetRange.topic val start = part.offsetRange.fromOffset - val end = part.offsetRange.untilOffset match { - case MessageId.latest => - Utils.tryWithResource(AdminUtils.buildAdmin(adminUrl, clientConf)) { admin => - PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(tp)) - } - case id => id - } + val end = part.offsetRange.untilOffset if (start == end || !messageExists(end)) { return Iterator.empty diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala index 4a1483dd..57f89f47 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala @@ -16,9 +16,9 @@ package org.apache.spark.sql.pulsar import java.io._ import java.nio.charset.StandardCharsets +import org.apache.commons.io.IOUtils import org.apache.pulsar.client.api.MessageId import org.apache.pulsar.client.impl.{BatchMessageIdImpl, MessageIdImpl, TopicMessageIdImpl} -import org.apache.pulsar.shade.org.apache.commons.io.IOUtils import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.internal.Logging diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarStreamWriter.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarStreamWriter.scala new file mode 100644 index 00000000..df2bf419 --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarStreamWriter.scala @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.pulsar + +import java.{util => ju} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.write.{ + DataWriter, + DataWriterFactory, + PhysicalWriteInfo, + WriterCommitMessage +} +import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} +import org.apache.spark.sql.types.StructType + +/** + * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but + * we don't need to really send one. + */ +case object PulsarWriterCommitMessage extends WriterCommitMessage + +/** + * A [[StreamWriter]] for Pulsar writing. Responsible for generating the writer factory. + * + * @param schema + * The schema of the input data. + * @param clientConf + * Parameters for Pulsar client in each task. + * @param producerConf + * Parameters for Pulsar producers in each task. + * @param topic + * The topic this writer is responsible for. + */ +class PulsarStreamWriter( + schema: StructType, + clientConf: ju.Map[String, Object], + producerConf: ju.Map[String, Object], + topic: Option[String], + adminUrl: String) + extends StreamingWrite { + + override def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory = + new PulsarStreamWriterFactory(schema, clientConf, producerConf, topic, adminUrl) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} +} + +/** + * A [[DataWriterFactory]] for Pulsar writing. Will be serialized and sent to executors to + * generate the per-task data writers. + * @param schema + * The schema of the input data. + * @param clientConf + * Parameters for Pulsar client. + * @param producerConf + * Parameters for Pulsar producers in each task. + * @param topic + * The topic that should be written to. + */ +class PulsarStreamWriterFactory( + schema: StructType, + clientConf: ju.Map[String, Object], + producerConf: ju.Map[String, Object], + topic: Option[String], + adminUrl: String) + extends StreamingDataWriterFactory { + + override def createWriter( + partitionId: Int, + taskId: Long, + epochId: Long): DataWriter[InternalRow] = { + new PulsarStreamDataWriter(schema.toAttributes, clientConf, producerConf, topic, adminUrl) + } +} + +/** + * A [[DataWriter]] for Pulsar writing. One data writer will be created in each partition to + * process incoming rows. + * + * @param topic + * The topic that this data writer is targeting. + * @param clientConf + * Parameters to use for the Pulsar client. + * @param producerConf + * Parameters to use for the Pulsar producer. + * @param inputSchema + * The attributes in the input data. + */ +class PulsarStreamDataWriter( + inputSchema: Seq[Attribute], + clientConf: ju.Map[String, Object], + producerConf: ju.Map[String, Object], + topic: Option[String], + adminUrl: String) + extends PulsarRowWriter(inputSchema, clientConf, producerConf, topic, adminUrl) + with DataWriter[InternalRow] { + + def write(row: InternalRow): Unit = { + checkForErrors() + sendRow(row) + } + + def commit(): WriterCommitMessage = { + // Send is asynchronous, but we can't commit until all rows are actually in Pulsar. + // This requires flushing and then checking that no callbacks produced errors. + // We also check for errors before to fail as soon as possible - the check is cheap. + checkForErrors() + producerFlush() + checkForErrors() + PulsarWriterCommitMessage + } + + def abort(): Unit = {} + + def close(): Unit = { + checkForErrors() + producerClose() + checkForErrors() + } +} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala index 2bf6e4d8..3a8396d4 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala @@ -18,9 +18,7 @@ import java.util.function.BiConsumer import scala.collection.mutable -import org.apache.pulsar.client.admin.PulsarAdmin import org.apache.pulsar.client.api.{MessageId, Producer} - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} import org.apache.spark.sql.types._ @@ -137,7 +135,6 @@ private[pulsar] abstract class PulsarRowWriter( // reuse producer through the executor protected lazy val singleProducer = if (topic.isDefined) { - SchemaUtils.uploadPulsarSchema(admin, topic.get, pulsarSchema.getSchemaInfo) PulsarSinks.createProducer(clientConf, producerConf, topic.get, pulsarSchema) } else null protected val topic2Producer: mutable.Map[String, Producer[_]] = mutable.Map.empty @@ -150,7 +147,6 @@ private[pulsar] abstract class PulsarRowWriter( if (topic2Producer.contains(tp)) { topic2Producer(tp).asInstanceOf[Producer[T]] } else { - SchemaUtils.uploadPulsarSchema(admin, tp, pulsarSchema.getSchemaInfo) val p = PulsarSinks.createProducer(clientConf, producerConf, tp, pulsarSchema) topic2Producer.put(tp, p) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/SchemaUtils.scala b/src/main/scala/org/apache/spark/sql/pulsar/SchemaUtils.scala index 9a88e13b..83c574fd 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/SchemaUtils.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/SchemaUtils.scala @@ -15,18 +15,14 @@ package org.apache.spark.sql.pulsar import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.nio.charset.StandardCharsets -import java.nio.charset.StandardCharsets.UTF_8 import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException} import org.apache.pulsar.client.api.{Schema => PSchema} import org.apache.pulsar.client.api.schema.{GenericRecord, GenericSchema} import org.apache.pulsar.client.impl.schema._ import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl -import org.apache.pulsar.common.naming.TopicName -import org.apache.pulsar.common.protocol.schema.PostSchemaPayload import org.apache.pulsar.common.schema.{SchemaInfo, SchemaType} import org.apache.pulsar.shade.org.apache.avro.{LogicalTypes, Schema => ASchema, SchemaBuilder} import org.apache.pulsar.shade.org.apache.avro.LogicalTypes.{ @@ -85,54 +81,6 @@ private[pulsar] object SchemaUtils { private lazy val nullSchema = ASchema.create(ASchema.Type.NULL) - def uploadPulsarSchema(admin: PulsarAdmin, topic: String, schemaInfo: SchemaInfo): Unit = { - assert(schemaInfo != null, "schemaInfo shouldn't be null") - - val existingSchema = - try { - admin.schemas().getSchemaInfo(TopicName.get(topic).toString) - } catch { - case e: PulsarAdminException if e.getStatusCode == 404 => - null - case e: Throwable => - throw new RuntimeException( - s"Failed to get schema information for ${TopicName.get(topic).toString}", - e) - } - - if (existingSchema == null) { - val pl = new PostSchemaPayload() - pl.setType(schemaInfo.getType.name()) - pl.setSchema(new String(schemaInfo.getSchema, UTF_8)) - pl.setProperties(schemaInfo.getProperties) - try { - admin.schemas().createSchema(TopicName.get(topic).toString, pl) - } catch { - case e: PulsarAdminException if e.getStatusCode == 404 => - throw new RuntimeException( - s"Create schema for ${TopicName.get(topic).toString} got 404") - case e: Throwable => - throw new RuntimeException( - s"Failed to create schema for ${TopicName.get(topic).toString}", - e) - } - } else if (existingSchema - .equals(schemaInfo) || compatibleSchema(existingSchema, schemaInfo)) { - // no need to upload again - } else { - throw new RuntimeException("Writing to a topic which have incompatible schema") - } - } - - def compatibleSchema(x: SchemaInfo, y: SchemaInfo): Boolean = { - (x.getType, y.getType) match { - // None and bytes are compatible - case (SchemaType.NONE, SchemaType.BYTES) => true - case (SchemaType.BYTES, SchemaType.NONE) => true - case _ => false - } - } - def emptySchemaInfo(): SchemaInfo = { SchemaInfo.builder().name("empty").`type`(SchemaType.NONE).schema(new Array[Byte](0)).build() }