From 811d77757939698755681a46f2c9b16aba336da2 Mon Sep 17 00:00:00 2001 From: Peter Dekkers Date: Wed, 17 Jan 2024 20:01:06 +0100 Subject: [PATCH] Improved Tiingo and introduced sendAsync --- .../kotlin/org/roboquant/samples/TaSamples.kt | 1 + .../roboquant/tiingo/TiingoHistoricFeed.kt | 2 +- .../org/roboquant/tiingo/TiingoLiveFeed.kt | 66 +++++++------------ .../org/roboquant/samples/TiingoSamples.kt | 27 ++++++-- .../org/roboquant/tiingo/TiingoTestIT.kt | 3 +- .../kotlin/org/roboquant/common/Config.kt | 11 ++++ .../kotlin/org/roboquant/feeds/LiveFeed.kt | 12 +++- .../org/roboquant/feeds/LiveFeedTest.kt | 2 +- 8 files changed, 72 insertions(+), 52 deletions(-) diff --git a/roboquant-ta/src/test/kotlin/org/roboquant/samples/TaSamples.kt b/roboquant-ta/src/test/kotlin/org/roboquant/samples/TaSamples.kt index 52901c242..9c6c8535d 100644 --- a/roboquant-ta/src/test/kotlin/org/roboquant/samples/TaSamples.kt +++ b/roboquant-ta/src/test/kotlin/org/roboquant/samples/TaSamples.kt @@ -202,3 +202,4 @@ internal class TaSamples { } + diff --git a/roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoHistoricFeed.kt b/roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoHistoricFeed.kt index 6a2d967d4..546790567 100644 --- a/roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoHistoricFeed.kt +++ b/roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoHistoricFeed.kt @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit /** * Tiingo historic feed. * - * This feed uses CSV format for faster processing and less bandwidth usage. + * This feed requests CSV format from Tiingo for faster processing and less bandwidth usage. */ class TiingoHistoricFeed( configure: TiingoConfig.() -> Unit = {} diff --git a/roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoLiveFeed.kt b/roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoLiveFeed.kt index f887cccec..08db2fd2d 100644 --- a/roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoLiveFeed.kt +++ b/roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoLiveFeed.kt @@ -23,13 +23,8 @@ import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.WebSocket import okhttp3.WebSocketListener -import org.roboquant.common.Asset -import org.roboquant.common.AssetType -import org.roboquant.common.Logging -import org.roboquant.feeds.Event -import org.roboquant.feeds.LiveFeed -import org.roboquant.feeds.PriceAction -import org.roboquant.feeds.PriceQuote +import org.roboquant.common.* +import org.roboquant.feeds.* import java.time.Instant import java.util.concurrent.TimeUnit import kotlin.collections.set @@ -39,16 +34,18 @@ private val logger = Logging.getLogger(TiingoLiveFeed::class) private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() { - val assets = mutableMapOf() - private fun handleIEX(data: JsonArray) { val type = data[0].asString + val symbol = data[3].asString.uppercase() + val asset = Config.getOrPutAsset(symbol) { Asset.forexPair(symbol) } + val time = Instant.ofEpochMilli(0).plusNanos(data[2].asLong) + if (type == "Q") { - val symbol = data[3].asString.uppercase() - val asset = assets.getOrPut(symbol) { Asset(symbol) } val quote = PriceQuote(asset, data[7].asDouble, data[8].asDouble, data[5].asDouble, data[4].asDouble) - val time = Instant.ofEpochMilli(0).plusNanos(data[2].asLong) feed.deliver(quote, time) + } else if (type == "T") { + val trade = TradePrice(asset, data[9].asDouble, data[10].asDouble) + feed.deliver(trade, time) } } @@ -56,7 +53,7 @@ private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() { val type = data[0].asString if (type == "Q") { val symbol = data[1].asString.uppercase() - val asset = assets.getOrPut(symbol) { Asset.forexPair(symbol) } + val asset = Config.getOrPutAsset(symbol) { Asset.forexPair(symbol) } val quote = PriceQuote(asset, data[7].asDouble, data[6].asDouble, data[4].asDouble, data[3].asDouble) val time = Instant.parse(data[2].asString) feed.deliver(quote, time) @@ -65,12 +62,15 @@ private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() { private fun handleCrypto(data: JsonArray) { val type = data[0].asString + val symbol = data[1].asString.uppercase() + val asset = Config.getOrPutAsset(symbol) { Asset(symbol, AssetType.CRYPTO, exchange = Exchange.CRYPTO) } + val time = Instant.parse(data[2].asString) if (type == "Q") { - val symbol = data[1].asString.uppercase() - val asset = assets.getOrPut(symbol) { Asset(symbol, AssetType.CRYPTO) } val quote = PriceQuote(asset, data[8].asDouble, data[7].asDouble, data[5].asDouble, data[4].asDouble) - val time = Instant.parse(data[2].asString) feed.deliver(quote, time) + } else if (type == "T") { + val trade = TradePrice(asset, data[5].asDouble, data[4].asDouble) + feed.deliver(trade, time) } } @@ -99,19 +99,21 @@ private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() { } /** - * Tiingo live feed + * Retrieve real-time data from Tiingo. It has support for US stocks, Forex and Crypto. * - * This feed uses web-sockets for low letency and has nanosecond resolution + * This feed uses the Tiingo websocket API for low letency and has nanosecond time resolution */ class TiingoLiveFeed private constructor( private val type: String, - private val thresholdLevel: Int, + private val thresholdLevel: Int = 5, configure: TiingoConfig.() -> Unit = {} ) : LiveFeed() { private val config = TiingoConfig() init { + val types = setOf("iex", "crypto", "fx") + require(type in types) { "invalid type $type, allowed types are $types"} config.configure() require(config.key.isNotBlank()) { "no valid key found"} } @@ -178,13 +180,12 @@ class TiingoLiveFeed private constructor( } /** - * Subscribe to quotes for provided [symbols]. + * Subscribe to real-time trades/quotes for provided [symbols]. * - * If no symbols are provided, all available data is subscribed to. That is a lot of data and can impact - * your monthly quato quickly. + * If no symbols are provided, all available market data is subscribed to. Be aware that this a lot of data. * - * For crypto and some forex it is challenging to derive the underlying currency from just the symbol name, - * so it is better to use [subscribeAssets] instead. + * For crypto and some forex it is challenging to derive the underlying [Asset] from just its symbol name. + * You can use [Config.registerAsset] to register assets upfront */ fun subscribe(vararg symbols: String) { @@ -206,22 +207,5 @@ class TiingoLiveFeed private constructor( } - /** - * Subscribe to quotes for the provided [assets]. - * - * If no symbols are provided, all available data is subscribed to. That is a lot of data and can impact - * your monthly quato quickly. - */ - fun subscribeAssets(vararg assets: Asset) { - val tickers = mutableListOf() - for (asset in assets) { - val tiingoTicker = asset.symbol.replace("[^a-zA-Z]+".toRegex(), "") - this.handler.assets[tiingoTicker] = asset - tickers.add(tiingoTicker) - } - - @Suppress("SpreadOperator") - subscribe(*tickers.toTypedArray()) - } } diff --git a/roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt b/roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt index dba7fe7fa..ca9fee47f 100644 --- a/roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt +++ b/roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt @@ -19,8 +19,9 @@ package org.roboquant.samples import org.roboquant.Roboquant import org.roboquant.common.* import org.roboquant.feeds.AggregatorLiveFeed +import org.roboquant.feeds.PriceAction +import org.roboquant.feeds.filter import org.roboquant.loggers.ConsoleLogger -import org.roboquant.loggers.MemoryLogger import org.roboquant.metrics.ProgressMetric import org.roboquant.strategies.EMAStrategy import org.roboquant.tiingo.TiingoHistoricFeed @@ -33,6 +34,7 @@ internal class TiingoSamples { @Test @Ignore internal fun testLiveFeed() { + System.setProperty(org.slf4j.simple.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "TRACE") val feed = TiingoLiveFeed.iex() feed.subscribe("AAPL", "TSLA") val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger()) @@ -46,8 +48,8 @@ internal class TiingoSamples { val iex = TiingoLiveFeed.iex() iex.subscribe() val feed = AggregatorLiveFeed(iex, 5.seconds) - val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = MemoryLogger()) - rq.run(feed, Timeframe.next(3.minutes)) + val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger()) + rq.run(feed, Timeframe.next(5.minutes)) println(rq.broker.account.fullSummary()) } @@ -65,13 +67,28 @@ internal class TiingoSamples { @Ignore internal fun testLiveFeedCrypto() { val feed = TiingoLiveFeed.crypto() - val asset = Asset("BNBFDUSD", AssetType.CRYPTO, "FDUSD") - feed.subscribeAssets(asset) + val asset = Asset("BNB/FDUSD", AssetType.CRYPTO, "FDUSD") + Config.registerAsset("BNBFDUSD", asset) + + feed.subscribe("BNBFDUSD") val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger()) rq.run(feed, Timeframe.next(1.minutes)) println(rq.broker.account.fullSummary()) } + + @Test + @Ignore + internal fun testLiveFeedCryptoAll() { + val feed = TiingoLiveFeed.crypto() + feed.subscribe() + feed.filter(Timeframe.next(1.minutes)) { + println(it) + false + } + feed.close() + } + @Test @Ignore internal fun historic() { diff --git a/roboquant-tiingo/src/test/kotlin/org/roboquant/tiingo/TiingoTestIT.kt b/roboquant-tiingo/src/test/kotlin/org/roboquant/tiingo/TiingoTestIT.kt index 5bcbf8d38..41e085bfc 100644 --- a/roboquant-tiingo/src/test/kotlin/org/roboquant/tiingo/TiingoTestIT.kt +++ b/roboquant-tiingo/src/test/kotlin/org/roboquant/tiingo/TiingoTestIT.kt @@ -54,7 +54,8 @@ internal class TiingoTestIT { Config.getProperty("TEST_TIINGO") ?: return val feed = TiingoLiveFeed.crypto() val asset = Asset("BNBFDUSD", AssetType.CRYPTO, "FDUSD") - feed.subscribeAssets(asset) + Config.registerAsset("BNBFDUSD", asset) + feed.subscribe("BNBFDUSD") val rq = Roboquant(EMAStrategy(), AccountMetric(), logger = LastEntryLogger()) rq.run(feed, Timeframe.next(1.minutes)) val actions = rq.logger.getMetric("progress.actions").latestRun() diff --git a/roboquant/src/main/kotlin/org/roboquant/common/Config.kt b/roboquant/src/main/kotlin/org/roboquant/common/Config.kt index 9a93f0c0d..5927f0459 100644 --- a/roboquant/src/main/kotlin/org/roboquant/common/Config.kt +++ b/roboquant/src/main/kotlin/org/roboquant/common/Config.kt @@ -42,6 +42,9 @@ object Config { private const val ONE_MB = 1024 * 1024 private const val DEFAULT_SEED = 42L + // Maps a symbol to an asset + private val symbolMap = mutableMapOf() + // Used to handle Double imprecision internal const val EPS = 1e-10 @@ -62,6 +65,14 @@ object Config { val build: String ) + fun registerAsset(symbol: String, asset: Asset) { + symbolMap[symbol] = asset + } + + fun getOrPutAsset(symbol: String, defaultValue: () -> Asset) : Asset { + return symbolMap.getOrPut(symbol, defaultValue) + } + /** * MetadataProvider about the build en environment */ diff --git a/roboquant/src/main/kotlin/org/roboquant/feeds/LiveFeed.kt b/roboquant/src/main/kotlin/org/roboquant/feeds/LiveFeed.kt index 1d8869bab..c817b8257 100644 --- a/roboquant/src/main/kotlin/org/roboquant/feeds/LiveFeed.kt +++ b/roboquant/src/main/kotlin/org/roboquant/feeds/LiveFeed.kt @@ -48,11 +48,17 @@ abstract class LiveFeed(var heartbeatInterval: Long = 10_000) : Feed { get() = channels.isNotEmpty() /** - * Subclasses should use this method to send an event. If no channel is active, any event sent will be dropped. - * This is a blocking call to ensure that events are send to multiple channels are in the order they - * where delivered. + * Subclasses should use this method or `sendAsync` to send an event. If no channel is active, any event + * sent will be dropped. */ protected fun send(event: Event) = runBlocking { + sendAsync(event) + } + + /** + * Subclasses should use this method to send an event. If no channel is active, any event sent will be dropped. + */ + protected suspend fun sendAsync(event: Event) { for (channel in channels) { try { channel.send(event) diff --git a/roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt b/roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt index 34488d999..d3a72fabb 100644 --- a/roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt +++ b/roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt @@ -68,7 +68,7 @@ internal class LiveFeedTest { while (true) { try { - send(event = Event(actions, Instant.now())) + sendAsync(event = Event(actions, Instant.now())) delay(delayInMillis) if (stop) break } catch (e: Exception) {