diff --git a/.jenkins/build.sh b/.jenkins/build.sh new file mode 100755 index 00000000..d7045bdd --- /dev/null +++ b/.jenkins/build.sh @@ -0,0 +1,25 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e + +JENKINS_DIR=`dirname "$0"` +PRJ_HOME=`cd ${JENKINS_DIR}/..;pwd` + +cd ${PRJ_HOME} + +mvn clean license:check scalastyle:check install +retcode=$? +cat target/surefire-reports/ConnectorTestSuite.txt +exit $retcode diff --git a/README.md b/README.md index 1db83b29..c915f3d0 100644 --- a/README.md +++ b/README.md @@ -1,53 +1,52 @@ # pulsar-spark [](https://github.com/streamnative/pulsar-spark/releases) + [](https://www.apache.org/licenses/LICENSE-2.0) [](https://app.fossa.io/projects/git%2Bgithub.com%2Fstreamnative%2Fpulsar-spark?ref=badge_shield) - Unified data processing with [Apache Pulsar](https://pulsar.apache.org) and [Apache Spark](https://spark.apache.org). ## Prerequisites - Java 8 or later -- Spark 3.2.0 or later -- Pulsar 2.10.0 or later +- Spark 3.2.2 or later +- Pulsar 2.10.2 or later ## Preparations ### Link +#### Client library For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact: ``` -groupId = io.streamnative.connectors -artifactId = pulsar-spark-connector_{{SCALA_BINARY_VERSION}} -version = {{PULSAR_SPARK_VERSION}} + groupId = io.streamnative.connectors + artifactId = pulsar-spark-connector_{{SCALA_BINARY_VERSION}} + version = {{PULSAR_SPARK_VERSION}} ``` ### Deploy -#### Client library - +#### Client library As with any Spark applications, `spark-submit` is used to launch your application. `pulsar-spark-connector_{{SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages`. Example -```shell +``` $ ./bin/spark-submit --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}} ... ``` -#### CLI - +#### CLI For experimenting on `spark-shell` (or `pyspark` for Python), you can also use `--packages` to add `pulsar-spark-connector_{{SCALA_BINARY_VERSION}}` and its dependencies directly. Example -```shell +``` $ ./bin/spark-shell --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}} ... @@ -56,7 +55,9 @@ $ ./bin/spark-shell When locating an artifact or library, `--packages` option checks the following repositories in order: 1. Local maven repository + 2. Maven central repository + 3. Other repositories specified by `--repositories` The format for the coordinates should be `groupId:artifactId:version`. @@ -68,9 +69,7 @@ For more information about **submitting applications with external dependencies* ### Read data from Pulsar #### Create a Pulsar source for streaming queries - The following examples are in Scala. - ```scala // Subscribe to 1 topic val df = spark @@ -106,19 +105,17 @@ df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] ``` -> **Note** -> +> #### Tip > For more information on how to use other language bindings for Spark Structured Streaming, > see [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). #### Create a Pulsar source for batch queries - If you have a use case that is better suited to batch processing, you can create a Dataset/DataFrame for a defined range of offsets. The following examples are in Scala. - ```scala + // Subscribe to 1 topic defaults to the earliest and latest offsets val df = spark .read @@ -133,7 +130,7 @@ df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") // Subscribe to multiple topics, specifying explicit Pulsar offsets import org.apache.spark.sql.pulsar.JsonUtils._ val startingOffsets = topicOffsets(Map("topic1" -> messageId1, "topic2" -> messageId2)) -val endingOffsets = topicOffsets(Map("topic1" -> MessageId.latest, "topic2" -> MessageId.latest)) +val endingOffsets = topicOffsets(...) val df = spark .read .format("pulsar") @@ -163,114 +160,185 @@ df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") The following options must be set for the Pulsar source for both batch and streaming queries. -| Option | Value | Description | -|-----------------|-------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------| -| `topic` | A topic name string | The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for the Pulsar source. | -| `topics` | A comma-separated list of topics | The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for the Pulsar source. | -| `topicsPattern` | A Java regex string | The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for the Pulsar source. | -| `service.url` | A service URL of your Pulsar cluster | The Pulsar `serviceUrl` configuration for creating the `PulsarClient` instance. | -| `admin.url` | A service HTTP URL of your Pulsar cluster | The Pulsar `serviceHttpUrl` configuration for creating the `PulsarAdmin` instance. | +
Option | Value | Description |
---|---|---|
`topic` | +A topic name string | +The topic to be consumed. + Only one of `topic`, `topics` or `topicsPattern` + options can be specified for Pulsar source. | +
`topics` | +A comma-separated list of topics | +The topic list to be consumed. + Only one of `topic`, `topics` or `topicsPattern` + options can be specified for Pulsar source. | +
`topicsPattern` | +A Java regex string | +The pattern used to subscribe to topic(s). + Only one of `topic`, `topics` or `topicsPattern` + options can be specified for Pulsar source. | +
`service.url` | +A service URL of your Pulsar cluster | +The Pulsar `serviceUrl` configuration. | +
`admin.url` | +A service HTTP URL of your Pulsar cluster | +The Pulsar `serviceHttpUrl` configuration. | +
Option | Value | Default | Applied Query Type | Description | |||||
---|---|---|---|---|---|---|---|---|---|
Option | Value | Default | Query Type | Description | |||||
startingOffsets |
-
- The following are valid values: -
| `startingOffsets` | + +The following are valid values: + + * "earliest"(streaming and batch queries) + + * "latest" (streaming query) + + * A JSON string + + **Example** + + """ {"topic-1":[8,11,16,101,24,1,32,1],"topic-5":[8,15,16,105,24,5,32,5]} """ |
+
-
|
+
+ * "earliest"(batch query)Streaming and batch queries | +
-
You can use Note -
|
+
+ `startingOffsets` option controls where a reader reads data from.
+
+ * "earliest": lacks a valid offset, the reader reads all the data in the partition, starting from the very beginning.|||
endingOffsets |
-
- The following are valid values: -
| `endingOffsets` | + +The following are valid values: + + * "latest" (batch query) + + * A JSON string + + **Example** + + {"topic-1":[8,12,16,102,24,2,32,2],"topic-5":[8,16,16,106,24,6,32,6]} + |
- latest |
+
+ "latest" | +Batch query | +
-
Note -
|
+
+ `endingOffsets` option controls where a reader stops reading data.
+
+ * "latest": the reader stops reading data at the latest record.
+
+ * A JSON string: specifies an ending offset for each topic.||
failOnDataLoss |
-
- The following are valid values: -
| `failOnDataLoss` | + +The following are valid values: + + * true + + * false + |
- true |
+
+ true | +Streaming query | +
-
This may cause a false alarm. You can set it to A batch query always fails if it fails to read any data from the provided offsets due to data loss. - |
+
+ `failOnDataLoss` option controls whether to fail a query when data is lost (for example, topics are deleted, or
+ messages are deleted because of retention policy).||
allowDifferentTopicSchemas |
- Boolean value | -false |
- Streaming query | -
- If multiple topics with different schemas are read, using this parameter automatic schema-based topic value deserialization can be turned off. In that way, topics with different schemas can be read in the same pipeline - which is then responsible for deserializing the raw values based on some schema. Since only the raw values are returned when this is |
++`allowDifferentTopicSchemas` + | ++Boolean value + | ++`false` + | ++Streaming query + | ++If multiple topics with different schemas are read, +using this parameter automatic schema-based topic +value deserialization can be turned off. +In that way, topics with different schemas can +be read in the same pipeline - which is then responsible +for deserializing the raw values based on some +schema. Since only the raw values are returned when +this is `true`, Pulsar topic schema(s) are not +taken into account during operation. + |
Column | Type |
---|---|
`__key` | +Binary | +
`__topic` | +String | +
`__messageId` | +Binary | +
`__publishTime` | +Timestamp | +
`__eventTime` | +Timestamp | +
`__messageProperties` | +Map < String, String > | +