Skip to content

Commit

Permalink
adding pulsaroption
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Aug 15, 2023
1 parent 939259a commit 9f6fe43
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[pulsar] object PulsarOptions {
val SubscriptionPrefix: String = "subscriptionPrefix".toLowerCase(Locale.ROOT)
val PredefinedSubscription: String = "predefinedSubscription".toLowerCase(Locale.ROOT)

val MaxBytesPerTrigger: String = "maxBytesPerTrigger".toLowerCase(Locale.ROOT)
val PollTimeoutMS: String = "pollTimeoutMs".toLowerCase(Locale.ROOT)
val FailOnDataLossOptionKey: String = "failOnDataLoss".toLowerCase(Locale.ROOT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ private[pulsar] class PulsarProvider
metadataPath,
offset,
pollTimeoutMs(caseInsensitiveParams),
maxBytesPerTrigger(caseInsensitiveParams),
failOnDataLoss(caseInsensitiveParams),
subscriptionNamePrefix,
jsonOptions)
Expand Down Expand Up @@ -385,6 +386,13 @@ private[pulsar] object PulsarProvider extends Logging {
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
.toInt

private def maxBytesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams
.getOrElse(
PulsarOptions.MaxBytesPerTrigger,
Long.MaxValue.toString
).toLong

private def validateGeneralOptions(
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {
Expand Down
27 changes: 5 additions & 22 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ private[pulsar] class PulsarSource(
metadataPath: String,
startingOffsets: PerTopicOffset,
pollTimeoutMs: Int,
maxBytesPerTrigger: Long,
failOnDataLoss: Boolean,
subscriptionNamePrefix: String,
jsonOptions: JSONOptionsInRead)
extends Source
with Logging
with SupportsAdmissionControl
{
with SupportsAdmissionControl {

import PulsarSourceUtils._

Expand Down Expand Up @@ -97,25 +97,6 @@ private[pulsar] class PulsarSource(
val offsets = mutable.Map[String, MessageId]()
offsets ++= startPartitionOffsets
val numPartitions = startPartitionOffsets.size
val startingOffsetStr = startPartitionOffsets.map { case (k, v) =>
val ledgerId = getLedgerId(v) // Assuming getLedgerId is a method of the object
val entryId = getEntryId(v) // Assuming getEntryId is a method of the object
val stats = pulsarAdmin.topics().getInternalStats(k)
val numEntries = stats.numberOfEntries
val numEntriesPerLedger = pulsarAdmin.topics().getInternalStats(k).ledgers.asScala.map{ ledger =>
s"[LedgerID: ${ledger.ledgerId}, Size: ${ledger.size}, Entries: ${ledger.entries}]"
}.mkString(", ")

s"[$k, LedgerId: $ledgerId, " +
s"EntryId: $entryId, " +
s"Entries: ${numEntries}, " +
s"CurrentLedgerEntries: ${stats.currentLedgerEntries}, " +
s"CurrentLedgerSize: ${stats.currentLedgerSize}, " +
s"NumLedgers: ${stats.ledgers.size()}, " +
s"TotalSize: ${stats.totalSize}, " +
s"EntriesPerLedger: ${numEntriesPerLedger}]"
}.mkString(", ")
print(s"STARTOFFSETS: $startingOffsetStr\n")
startPartitionOffsets.keys.filter(topicPartition => {
pulsarAdmin.topics.getInternalStats(topicPartition).currentLedgerEntries > 0
}).foreach { topicPartition =>
Expand Down Expand Up @@ -151,7 +132,9 @@ private[pulsar] class PulsarSource(
}
SpecificPulsarOffset(offsets.toMap)
}

override def getDefaultReadLimit: ReadLimit = {
ReadMaxBytes.apply(maxBytesPerTrigger)
}
class AdmissionLimits(var bytesToTake: Long)

object AdmissionLimits {
Expand Down

0 comments on commit 9f6fe43

Please sign in to comment.