Skip to content

Commit

Permalink
test case
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Aug 15, 2023
1 parent 9f6fe43 commit 25c39eb
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 9 deletions.
25 changes: 16 additions & 9 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
*/
package org.apache.spark.sql.pulsar

import org.apache.pulsar.client.admin.PulsarAdmin

import java.{util => ju}

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
import org.apache.pulsar.client.internal.DefaultImplementation
import org.apache.pulsar.common.schema.SchemaInfo

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -31,8 +36,6 @@ import org.apache.spark.sql.pulsar.PulsarOptions.ServiceUrlOptionKey
import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable

private[pulsar] class PulsarSource(
serviceUrl: String,
Expand Down Expand Up @@ -76,11 +79,11 @@ private[pulsar] class PulsarSource(
initialTopicOffsets
val latest = pulsarHelper.fetchLatestOffsets()
currentTopicOffsets = Some(latest.topicOffsets)
logDebug(s"GetOffset: ${latest.topicOffsets.toSeq.map(_.toString).sorted}")
Some(latest.asInstanceOf[Offset])
}

override def latestOffset(startingOffset: streaming.Offset, readLimit: ReadLimit): streaming.Offset = {
override def latestOffset(startingOffset: streaming.Offset,
readLimit: ReadLimit): streaming.Offset = {
initialTopicOffsets
// implement helper inside PulsarHelper in order to use getTopicPartitions
val latestOffsets = pulsarHelper.fetchLatestOffsets().topicOffsets
Expand All @@ -92,7 +95,8 @@ private[pulsar] class PulsarSource(
Map[String, MessageId]()
}
val newTopics = latestOffsets.keySet.diff(existingStartOffsets.keySet)
val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition => topicPartition -> pulsarHelper.fetchLatestOffsetForTopic(topicPartition))
val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition
=> topicPartition -> pulsarHelper.fetchLatestOffsetForTopic(topicPartition))
val totalReadLimit = AdmissionLimits(readLimit).get.bytesToTake
val offsets = mutable.Map[String, MessageId]()
offsets ++= startPartitionOffsets
Expand All @@ -105,7 +109,8 @@ private[pulsar] class PulsarSource(
val ledgerId = getLedgerId(messageId)
val entryId = getEntryId(messageId)
val stats = pulsarAdmin.topics.getInternalStats(topicPartition)
pulsarAdmin.topics.getInternalStats(topicPartition).ledgers.asScala.filter(_.ledgerId >= ledgerId).sortBy(_.ledgerId).foreach{ ledger =>
pulsarAdmin.topics.getInternalStats(topicPartition).ledgers.
asScala.filter(_.ledgerId >= ledgerId).sortBy(_.ledgerId).foreach{ ledger =>
ledger.entries = stats.currentLedgerEntries
val avgBytesPerEntries = stats.currentLedgerSize / stats.currentLedgerEntries
// approximation of bytes left in ledger to deal with case
Expand All @@ -123,9 +128,11 @@ private[pulsar] class PulsarSource(
.getDefaultImplementation
.newMessageId(ledger.ledgerId, ledger.entries - 1, -1))
} else {
val numEntriesToRead = Math.max(1, readLimit / avgBytesPerEntries)
val lastEntryRead = Math.min(ledger.entries - 1, entryId + numEntriesToRead)
offsets += (topicPartition -> DefaultImplementation
.getDefaultImplementation
.newMessageId(ledger.ledgerId, entryId + readLimit / avgBytesPerEntries, -1))
.newMessageId(ledger.ledgerId, lastEntryRead, -1))
readLimit = 0
}
}
Expand Down Expand Up @@ -249,4 +256,4 @@ private[pulsar] class PulsarSource(
}

/** A read limit that admits a soft-max of `maxBytes` per micro-batch. */
case class ReadMaxBytes(maxBytes: Long) extends ReadLimit
case class ReadMaxBytes(maxBytes: Long) extends ReadLimit
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.apache.spark.sql.pulsar

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.spark.sql.streaming.Trigger.{Once, ProcessingTime}
import org.apache.spark.util.Utils

class PulsarAdmissionControlSuite extends PulsarSourceTest {

import PulsarOptions._
import testImplicits._

override def beforeAll(): Unit = {
super.beforeAll()
}

/**
* Write unit test to create limits, can construct fake ledger statistics
* Can call latestOffset() directly from the unit test
*
* Just need to verify that each microbatch is <= maxBytesPerTrigger (within some threshold)
* Can send message of specific size in AddPulsarData here
*/

test("Admission Control") {
val topic = newTopic()
sendMessages(topic, Array("-1"))
require(getLatestOffsets(Set(topic)).size === 1)
sparkContext.setLogLevel("INFO")
val pulsar = spark.readStream
.format("pulsar")
.option(TopicSingle, topic)
.option(ServiceUrlOptionKey, serviceUrl)
.option(AdminUrlOptionKey, adminUrl)
.option(MaxBytesPerTrigger, 120)
.load()
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

val mapped = pulsar.map(kv => kv._2.toInt + 1)

// Each Int adds 38 bytes to message size, so we expect 3 Ints in each message
testStream(mapped)(
StartStream(trigger = ProcessingTime(100)),
makeSureGetOffsetCalled,
AddPulsarData(Set(topic), 1, 2, 3),
CheckLastBatch(2, 3, 4),
AddPulsarData(Set(topic), 4, 5, 6, 7, 8, 9),
CheckLastBatch(8, 9, 10),
AssertOnQuery { query =>
val recordsRead = query.recentProgress.map(_.numInputRows).sum
recordsRead == 9
}
)

}

}

0 comments on commit 25c39eb

Please sign in to comment.