Skip to content

Commit

Permalink
significant parsing speed improvement (naming series columns at later…
Browse files Browse the repository at this point in the history
… stage)
  • Loading branch information
dleutnant committed Aug 21, 2017
1 parent f881d04 commit 8b0568d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Package: influxdbr
Type: Package
Title: R Interface to InfluxDB
Version: 0.12.0.9000
Date: 2017-08-20
Date: 2017-08-21
Authors@R: person("Dominik", "Leutnant", email = "[email protected]", role = c("aut", "cre"))
Description: An R interface to the InfluxDB time series database <https://www.influxdata.com>. This package allows you to fetch and write time series data from/to an InfluxDB server. Additionally, handy wrappers for the Influx Query Language (IQL) to manage and explore a remote database are provided.
License: GPL-3
Expand Down
16 changes: 12 additions & 4 deletions R/influxdb_json_parser.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ query_list_to_tibble <- function(x, timestamp_format) {
#stop()
#timestamp_format <- "n"

# development options
performance <- FALSE
timer <- function(x, txt) {message(paste(Sys.time(), txt));x}

# create divisor for different timestamp format
div <- switch(timestamp_format,
Expand Down Expand Up @@ -59,19 +62,23 @@ query_list_to_tibble <- function(x, timestamp_format) {

# extract values
series_values <- purrr::map(series_ele$series, "values") %>%
# set names
purrr::map2(., .y = series_columns, ~ purrr::map(.,
purrr::set_names,
nm = .y)) %>%
# transpose for faster data munging
`if`(performance, timer(., "transpose data"), .) %>%
purrr::map( ~ purrr::transpose(.)) %>%
# convert influxdb NULL to NA
`if`(performance, timer(., "convert influxdb NULL to NA"), .) %>%
purrr::map( ~ purrr::map(., ~ purrr::map(., ~ . %||% NA))) %>%
# unlist for faster data munging
`if`(performance, timer(., "unlist data"), .) %>%
purrr::map( ~ purrr::map(., base::unlist)) %>%
# convert int to dbl (required for unnesting)
`if`(performance, timer(., "unify numerics"), .) %>%
purrr::map( ~ purrr::map_if(., is.integer, as.double)) %>%
# set names
`if`(performance, timer(., "setting column names"), .) %>%
purrr::map2(., .y = series_columns, ~ purrr::set_names(., nm = .y)) %>%
# influxdb ALWAYS stores data in GMT!!
`if`(performance, timer(., "set POSIX-based time index"), .) %>%
purrr::map( ~ purrr::map_at(., .at = "time",
~ as.POSIXct(. / div,
origin = "1970-1-1",
Expand Down Expand Up @@ -119,6 +126,7 @@ query_list_to_tibble <- function(x, timestamp_format) {

### in case of CHUNKED responses, concatenate tables with same statement_id
list_of_result <- list_of_result %>% # take the list of results
`if`(performance, timer(., "concatenate tables with same statement_id"), .) %>%
purrr::map("statement_id") %>% # extract "statement_id" of each result
purrr::map_int(unique) %>% # create a vector with unique "statement_id"
rle %>% # perform run length encoding to get the length of each "statement_id"
Expand Down
20 changes: 19 additions & 1 deletion R/influxdb_main.R
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ influx_query <- function(con,
return_xts = TRUE,
chunked = FALSE,
simplifyList = FALSE) {

# development options
performance <- FALSE
timer <- function(x, txt) {message(paste(Sys.time(), txt));x}

# performance profiler
if (performance) {perf <- timer(NA, "start")}

if (is.null(con)) {
warning("Connection object is NULL.")
return(NULL)
Expand Down Expand Up @@ -259,6 +267,9 @@ influx_query <- function(con,
# add query
q <- c(q, q = query)

# performance profiler
if (performance) {perf <- timer(NA, "sending query")}

# submit query
response <- tryCatch(httr::GET(url = "",
scheme = con$scheme,
Expand All @@ -282,21 +293,28 @@ influx_query <- function(con,

# debug_data <<- rawToChar(response$content)

if (performance) {perf <- timer(NA, "converting raw to char")}

# initiate data conversion which result in a tibble with list-columns
list_of_result <-
rawToChar(response$content) %>% # convert to chars
`if`(performance, timer(., "converting json to list"), .) %>%
purrr::map(response_to_list) %>% # from json to list
`if`(performance, timer(., "converting list to tibbles"), .) %>%
purrr::map(query_list_to_tibble, # from list to tibble
timestamp_format = timestamp_format) %>%
purrr::flatten(.)

# xts object required?
if (return_xts)
list_of_result <- list_of_result %>%
`if`(performance, timer(., "converting tibbles to xts"), .) %>%
purrr::map(tibble_to_xts)

if (simplifyList && (length(list_of_result[[1]]) == 1))
list_of_result <- list_of_result[[1]][[1]]

if (performance) {perf <- timer(NA, "done.")}

# if not simplified, a list of results, either a list of tibbles or xts objects
# is ALWAYS returned! A wrapping function ALWAYS returns a tibble!
Expand Down

1 comment on commit 8b0568d

@ckatsulis
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice solution! I don't know purrr at all.

Please sign in to comment.