diff --git a/README.md b/README.md index 4f936d0..b5de30a 100644 --- a/README.md +++ b/README.md @@ -288,7 +288,7 @@ A possible solution to remove duplicates when reading the written data could be - `poolTimeoutMs` + `pollTimeoutMs` A number string in unit of milliseconds No "120000" diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala index 990578d..20d2122 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala @@ -190,12 +190,12 @@ class PulsarSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: def getInitialOffset( pulsarHelper: PulsarHelper, startingOffsets: PerTopicOffset, - poolTimeoutMs: Int, + pollTimeoutMs: Int, reportDataLoss: String => Unit): SpecificPulsarOffset = { val deserializedOffset = get(0).map(markOffsetUserProvided(_)) deserializedOffset.getOrElse { val actualOffsets = SpecificPulsarOffset( - pulsarHelper.actualOffsets(startingOffsets, poolTimeoutMs, reportDataLoss)) + pulsarHelper.actualOffsets(startingOffsets, pollTimeoutMs, reportDataLoss)) add(0, actualOffsets) logInfo(s"Initial Offsets: $actualOffsets") actualOffsets