Skip to content

Commit

Permalink
Improved Tiingo and introduced sendAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed Jan 17, 2024
1 parent 9d3c21f commit 811d777
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,4 @@ internal class TaSamples {

}


Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit
/**
* Tiingo historic feed.
*
* This feed uses CSV format for faster processing and less bandwidth usage.
* This feed requests CSV format from Tiingo for faster processing and less bandwidth usage.
*/
class TiingoHistoricFeed(
configure: TiingoConfig.() -> Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,8 @@ import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import org.roboquant.common.Asset
import org.roboquant.common.AssetType
import org.roboquant.common.Logging
import org.roboquant.feeds.Event
import org.roboquant.feeds.LiveFeed
import org.roboquant.feeds.PriceAction
import org.roboquant.feeds.PriceQuote
import org.roboquant.common.*
import org.roboquant.feeds.*
import java.time.Instant
import java.util.concurrent.TimeUnit
import kotlin.collections.set
Expand All @@ -39,24 +34,26 @@ private val logger = Logging.getLogger(TiingoLiveFeed::class)

private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() {

val assets = mutableMapOf<String, Asset>()

private fun handleIEX(data: JsonArray) {
val type = data[0].asString
val symbol = data[3].asString.uppercase()
val asset = Config.getOrPutAsset(symbol) { Asset.forexPair(symbol) }
val time = Instant.ofEpochMilli(0).plusNanos(data[2].asLong)

if (type == "Q") {
val symbol = data[3].asString.uppercase()
val asset = assets.getOrPut(symbol) { Asset(symbol) }
val quote = PriceQuote(asset, data[7].asDouble, data[8].asDouble, data[5].asDouble, data[4].asDouble)
val time = Instant.ofEpochMilli(0).plusNanos(data[2].asLong)
feed.deliver(quote, time)
} else if (type == "T") {
val trade = TradePrice(asset, data[9].asDouble, data[10].asDouble)
feed.deliver(trade, time)
}
}

private fun handleFX(data: JsonArray) {
val type = data[0].asString
if (type == "Q") {
val symbol = data[1].asString.uppercase()
val asset = assets.getOrPut(symbol) { Asset.forexPair(symbol) }
val asset = Config.getOrPutAsset(symbol) { Asset.forexPair(symbol) }
val quote = PriceQuote(asset, data[7].asDouble, data[6].asDouble, data[4].asDouble, data[3].asDouble)
val time = Instant.parse(data[2].asString)
feed.deliver(quote, time)
Expand All @@ -65,12 +62,15 @@ private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() {

private fun handleCrypto(data: JsonArray) {
val type = data[0].asString
val symbol = data[1].asString.uppercase()
val asset = Config.getOrPutAsset(symbol) { Asset(symbol, AssetType.CRYPTO, exchange = Exchange.CRYPTO) }
val time = Instant.parse(data[2].asString)
if (type == "Q") {
val symbol = data[1].asString.uppercase()
val asset = assets.getOrPut(symbol) { Asset(symbol, AssetType.CRYPTO) }
val quote = PriceQuote(asset, data[8].asDouble, data[7].asDouble, data[5].asDouble, data[4].asDouble)
val time = Instant.parse(data[2].asString)
feed.deliver(quote, time)
} else if (type == "T") {
val trade = TradePrice(asset, data[5].asDouble, data[4].asDouble)
feed.deliver(trade, time)
}
}

Expand Down Expand Up @@ -99,19 +99,21 @@ private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() {
}

/**
* Tiingo live feed
* Retrieve real-time data from Tiingo. It has support for US stocks, Forex and Crypto.
*
* This feed uses web-sockets for low letency and has nanosecond resolution
* This feed uses the Tiingo websocket API for low letency and has nanosecond time resolution
*/
class TiingoLiveFeed private constructor(
private val type: String,
private val thresholdLevel: Int,
private val thresholdLevel: Int = 5,
configure: TiingoConfig.() -> Unit = {}
) : LiveFeed() {

private val config = TiingoConfig()

init {
val types = setOf("iex", "crypto", "fx")
require(type in types) { "invalid type $type, allowed types are $types"}
config.configure()
require(config.key.isNotBlank()) { "no valid key found"}
}
Expand Down Expand Up @@ -178,13 +180,12 @@ class TiingoLiveFeed private constructor(
}

/**
* Subscribe to quotes for provided [symbols].
* Subscribe to real-time trades/quotes for provided [symbols].
*
* If no symbols are provided, all available data is subscribed to. That is a lot of data and can impact
* your monthly quato quickly.
* If no symbols are provided, all available market data is subscribed to. Be aware that this a lot of data.
*
* For crypto and some forex it is challenging to derive the underlying currency from just the symbol name,
* so it is better to use [subscribeAssets] instead.
* For crypto and some forex it is challenging to derive the underlying [Asset] from just its symbol name.
* You can use [Config.registerAsset] to register assets upfront
*/
fun subscribe(vararg symbols: String) {

Expand All @@ -206,22 +207,5 @@ class TiingoLiveFeed private constructor(

}

/**
* Subscribe to quotes for the provided [assets].
*
* If no symbols are provided, all available data is subscribed to. That is a lot of data and can impact
* your monthly quato quickly.
*/
fun subscribeAssets(vararg assets: Asset) {
val tickers = mutableListOf<String>()
for (asset in assets) {
val tiingoTicker = asset.symbol.replace("[^a-zA-Z]+".toRegex(), "")
this.handler.assets[tiingoTicker] = asset
tickers.add(tiingoTicker)
}

@Suppress("SpreadOperator")
subscribe(*tickers.toTypedArray())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.roboquant.samples
import org.roboquant.Roboquant
import org.roboquant.common.*
import org.roboquant.feeds.AggregatorLiveFeed
import org.roboquant.feeds.PriceAction
import org.roboquant.feeds.filter
import org.roboquant.loggers.ConsoleLogger
import org.roboquant.loggers.MemoryLogger
import org.roboquant.metrics.ProgressMetric
import org.roboquant.strategies.EMAStrategy
import org.roboquant.tiingo.TiingoHistoricFeed
Expand All @@ -33,6 +34,7 @@ internal class TiingoSamples {
@Test
@Ignore
internal fun testLiveFeed() {
System.setProperty(org.slf4j.simple.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "TRACE")
val feed = TiingoLiveFeed.iex()
feed.subscribe("AAPL", "TSLA")
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger())
Expand All @@ -46,8 +48,8 @@ internal class TiingoSamples {
val iex = TiingoLiveFeed.iex()
iex.subscribe()
val feed = AggregatorLiveFeed(iex, 5.seconds)
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = MemoryLogger())
rq.run(feed, Timeframe.next(3.minutes))
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger())
rq.run(feed, Timeframe.next(5.minutes))
println(rq.broker.account.fullSummary())
}

Expand All @@ -65,13 +67,28 @@ internal class TiingoSamples {
@Ignore
internal fun testLiveFeedCrypto() {
val feed = TiingoLiveFeed.crypto()
val asset = Asset("BNBFDUSD", AssetType.CRYPTO, "FDUSD")
feed.subscribeAssets(asset)
val asset = Asset("BNB/FDUSD", AssetType.CRYPTO, "FDUSD")
Config.registerAsset("BNBFDUSD", asset)

feed.subscribe("BNBFDUSD")
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger())
rq.run(feed, Timeframe.next(1.minutes))
println(rq.broker.account.fullSummary())
}


@Test
@Ignore
internal fun testLiveFeedCryptoAll() {
val feed = TiingoLiveFeed.crypto()
feed.subscribe()
feed.filter<PriceAction>(Timeframe.next(1.minutes)) {
println(it)
false
}
feed.close()
}

@Test
@Ignore
internal fun historic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ internal class TiingoTestIT {
Config.getProperty("TEST_TIINGO") ?: return
val feed = TiingoLiveFeed.crypto()
val asset = Asset("BNBFDUSD", AssetType.CRYPTO, "FDUSD")
feed.subscribeAssets(asset)
Config.registerAsset("BNBFDUSD", asset)
feed.subscribe("BNBFDUSD")
val rq = Roboquant(EMAStrategy(), AccountMetric(), logger = LastEntryLogger())
rq.run(feed, Timeframe.next(1.minutes))
val actions = rq.logger.getMetric("progress.actions").latestRun()
Expand Down
11 changes: 11 additions & 0 deletions roboquant/src/main/kotlin/org/roboquant/common/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ object Config {
private const val ONE_MB = 1024 * 1024
private const val DEFAULT_SEED = 42L

// Maps a symbol to an asset
private val symbolMap = mutableMapOf<String, Asset>()

// Used to handle Double imprecision
internal const val EPS = 1e-10

Expand All @@ -62,6 +65,14 @@ object Config {
val build: String
)

fun registerAsset(symbol: String, asset: Asset) {
symbolMap[symbol] = asset
}

fun getOrPutAsset(symbol: String, defaultValue: () -> Asset) : Asset {
return symbolMap.getOrPut(symbol, defaultValue)
}

/**
* MetadataProvider about the build en environment
*/
Expand Down
12 changes: 9 additions & 3 deletions roboquant/src/main/kotlin/org/roboquant/feeds/LiveFeed.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ abstract class LiveFeed(var heartbeatInterval: Long = 10_000) : Feed {
get() = channels.isNotEmpty()

/**
* Subclasses should use this method to send an event. If no channel is active, any event sent will be dropped.
* This is a blocking call to ensure that events are send to multiple channels are in the order they
* where delivered.
* Subclasses should use this method or `sendAsync` to send an event. If no channel is active, any event
* sent will be dropped.
*/
protected fun send(event: Event) = runBlocking {
sendAsync(event)
}

/**
* Subclasses should use this method to send an event. If no channel is active, any event sent will be dropped.
*/
protected suspend fun sendAsync(event: Event) {
for (channel in channels) {
try {
channel.send(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal class LiveFeedTest {

while (true) {
try {
send(event = Event(actions, Instant.now()))
sendAsync(event = Event(actions, Instant.now()))
delay(delayInMillis)
if (stop) break
} catch (e: Exception) {
Expand Down

0 comments on commit 811d777

Please sign in to comment.