Skip to content

Commit

Permalink
Panda nulls (#137)
Browse files Browse the repository at this point in the history
* Handle pandas timezones

* use_na_values checkpoint

* use_na_values checkpoint

* use_na_values checkpoint

* use_na_values checkpoint

* Pandas nulls checkpoint

* Pandas nulls checkpoint

* Minor cleanup
  • Loading branch information
genzgd authored Feb 27, 2023
1 parent a961b33 commit 7202dc5
Show file tree
Hide file tree
Showing 30 changed files with 448 additions and 221 deletions.
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,29 @@ The secondary effect of the `send_progress` argument -- to set `wait_end_of_quer
on whether the query is streaming or not.


## 0.5.13, 2023-02-27

### Improvements
- By default, reading Pandas Dataframes with query_df and query_df_stream now sets a new QueryContext property
of `use_pandas_na` to `True`. When `use_pandas_na` is True, clickhouse_connect will attempt to use Pandas "missing"
values, such as pandas.NaT and pandas.NA, for ClickHouse NULLs (in Nullable columns only), and use the associated
extended Pandas dtype. Closes https://github.com/ClickHouse/clickhouse-connect/issues/132
- There are new low level optimizations for reading some Nullable columns, and writing Pandas dataframes

### Bug Fixes
- Timezone information from ClickHouse DateTime columns with a timezone was lost. There was a workaround implemented
for this issue in v0.5.8 that allowed assigned timezones to the query or columns on the client side. ClickHouse now
support sending this timezone data with the column, but only in server versions 23.2 and later. If such a version is
detected, clickhouse-connect will return timezone aware DateTime values without a workaround. Fixes
https://github.com/ClickHouse/clickhouse-connect/issues/120
- For certain queries, an incorrect, non-zero "zero value" would be returned for queries where `use_none` was set
to `False`. All NULL values are now properly converted.
- Timezone data was lost when a DateTime64 column with a timezone was converted to a Pandas DataFrame. This has been
fixed. https://github.com/ClickHouse/clickhouse-connect/issues/136
- send_progress headers were not being correctly requested, which could result in unexpected timeouts for long-running
queries. This has been fixed.


## 0.5.12, 2023-02-16
### Improvement
- A new keyword parameter `server_host_name` is now recognized by the `clickhouse_connect.get_client` method. This identifies
Expand Down
2 changes: 1 addition & 1 deletion clickhouse_connect/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.12
0.5.13
2 changes: 0 additions & 2 deletions clickhouse_connect/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from clickhouse_connect.driver.exceptions import ProgrammingError


def version():
try:
return pkg_resources.get_distribution('clickhouse-connect').version
Expand Down Expand Up @@ -60,4 +59,3 @@ def _init_common(name: str, options: Sequence[Any], default: Any):
_init_common('dict_parameter_format', ('json', 'map'), 'json')
_init_common('invalid_setting_action', ('send', 'drop', 'error'), 'error')
_init_common('product_name', (), '')
_init_common('native_protocol_version', (0, 54337), 0)
77 changes: 43 additions & 34 deletions clickhouse_connect/datatypes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.query import QueryContext
from clickhouse_connect.driver.types import ByteSource
from clickhouse_connect.driver.options import np
from clickhouse_connect.driver.options import np, pd

logger = logging.getLogger(__name__)
ch_read_formats = {}
Expand Down Expand Up @@ -125,8 +125,7 @@ def read_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> S
:return: The decoded column data as a sequence and the updated location pointer
"""
self.read_column_prefix(source)
column = self.read_column_data(source, num_rows, ctx)
return column
return self.read_column_data(source, num_rows, ctx)

def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence:
"""
Expand All @@ -137,22 +136,18 @@ def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext)
:return: The decoded column plus the updated location pointer
"""
if self.low_card:
return self._read_low_card_column(source, num_rows, ctx)
if self.nullable:
return self._read_nullable_column(source, num_rows, ctx)
return self._read_column_binary(source, num_rows, ctx)
column = self._read_low_card_column(source, num_rows, ctx)
elif self.nullable:
column = self._read_nullable_column(source, num_rows, ctx)
else:
column = self._read_column_binary(source, num_rows, ctx)
return self._finalize_column(column, ctx)

def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence:
null_map = source.read_bytes(num_rows)
column = self._read_column_binary(source, num_rows, ctx)
if ctx.use_none:
if ctx.use_numpy or isinstance(column, (tuple, array.array)):
column = [None if null_map[ix] else column[ix] for ix in range(num_rows)]
else:
for ix in range(num_rows):
if null_map[ix]:
column[ix] = None
return column
null_obj = self._active_null(ctx)
return data_conv.build_nullable_column(column, null_map, null_obj)

# The binary methods are really abstract, but they aren't implemented for container classes which
# delegate binary operations to their elements
Expand All @@ -169,6 +164,9 @@ def _read_column_binary(self,
"""
return [], 0

def _finalize_column(self, column: Sequence, _ctx: QueryContext) -> Sequence:
return column

def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: MutableSequence, ctx: InsertContext):
"""
Lowest level write method for ClickHouseType data columns
Expand Down Expand Up @@ -211,19 +209,18 @@ def _read_low_card_column(self, source: ByteSource, num_rows: int, ctx: QueryCon
index_sz = 2 ** (key_data & 0xff)
key_cnt = source.read_uint64()
keys = self._read_column_binary(source, key_cnt, ctx)
use_none = ctx.use_none
if self.nullable:
try:
keys[0] = None if use_none else self._python_null(ctx)
except TypeError:
keys = (None if use_none else self._python_null(ctx),) + tuple(keys[1:])
index_cnt = source.read_uint64()
assert index_cnt == num_rows
index = source.read_array(array_type(index_sz, False), num_rows)
if ctx.use_numpy and hasattr(keys, 'dtype') and keys.dtype != np.object_:
return np.fromiter((keys[ix] for ix in index), dtype=keys.dtype, count=num_rows)
index = source.read_array(array_type(index_sz, False), index_cnt)
if self.nullable:
return self._build_lc_nullable_column(keys, index, ctx)
return self._build_lc_column(keys, index, ctx)

def _build_lc_column(self, keys: Sequence, index: array.array, _ctx: QueryContext):
return [keys[ix] for ix in index]

def _build_lc_nullable_column(self, keys: Sequence, index: array.array, ctx: QueryContext):
return data_conv.build_lc_nullable_column(keys, index, self._active_null(ctx))

def _write_column_low_card(self, column: Iterable, dest: MutableSequence, ctx: InsertContext):
if not column:
return
Expand Down Expand Up @@ -264,7 +261,7 @@ def _write_column_low_card(self, column: Iterable, dest: MutableSequence, ctx: I
write_uint64(len(index), dest)
write_array(array_type(1 << ix_type, False), index, dest)

def _python_null(self, _ctx: QueryContext):
def _active_null(self, _ctx: QueryContext) -> Any:
return None

def _first_value(self, column: Sequence) -> Optional[Any]:
Expand Down Expand Up @@ -302,18 +299,26 @@ def __init_subclass__(cls, registered: bool = True):
cls.byte_size = array.array(cls._array_type).itemsize

def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext):
if self.read_format(ctx) == 'string':
column = source.read_array(self._array_type, num_rows)
return [str(x) for x in column]
if ctx.use_numpy:
return numpy_conv.read_numpy_array(source, self.np_type, num_rows)
return source.read_array(self._array_type, num_rows)

def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence:
return data_conv.read_nullable_array(source,
self._array_type,
num_rows,
use_none=ctx.use_none or self.python_type == float)
return data_conv.read_nullable_array(source, self._array_type, num_rows, self._active_null(ctx))

def _build_lc_column(self, keys: Sequence, index: array.array, ctx: QueryContext):
if ctx.use_numpy:
return np.fromiter((keys[ix] for ix in index), dtype=keys.dtype, count=len(index))
return super()._build_lc_column(keys, index, ctx)

def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence:
if self.read_format(ctx) == 'string':
return [str(x) for x in column]
if ctx.use_pandas_na and self.nullable:
return pd.array(column, dtype=self.base_type)
if ctx.use_numpy and self.nullable and (not ctx.use_none):
return np.array(column, dtype=self.np_type)
return column

def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: MutableSequence, ctx: InsertContext):
if len(column) and self.nullable:
Expand All @@ -328,7 +333,11 @@ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: M
column = [0 if x is None else x for x in column]
write_array(self._array_type, column, dest)

def _python_null(self, _ctx):
def _active_null(self, ctx: QueryContext):
if ctx.as_pandas and ctx.use_na_values:
return pd.NA
if ctx.use_none:
return None
return 0


Expand Down
10 changes: 8 additions & 2 deletions clickhouse_connect/datatypes/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class IPv4(ClickHouseType):
python_type = IPv4Address

def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext):
if self.read_format(ctx) == 'int':
return source.read_array(self._array_type, num_rows)
if self.read_format(ctx) == 'string':
column = source.read_array(self._array_type, num_rows)
return [socket.inet_ntoa(x.to_bytes(4, 'big')) for x in column]
Expand All @@ -38,8 +40,10 @@ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: M
column = [x._ip for x in column]
write_array(self._array_type, column, dest)

def _python_null(self, ctx: QueryContext):
def _active_null(self, ctx: QueryContext):
fmt = self.read_format(ctx)
if ctx.use_none:
return None
if fmt == 'string':
return '0.0.0.0'
if fmt == 'int':
Expand Down Expand Up @@ -117,5 +121,7 @@ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: M
b = x.packed
dest += b if len(b) == 16 else (v4mask + b)

def _python_null(self, ctx):
def _active_null(self, ctx):
if ctx.use_none:
return None
return '::' if self.read_format(ctx) == 'string' else V6_NULL
59 changes: 50 additions & 9 deletions clickhouse_connect/datatypes/numeric.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import decimal
from typing import Union, Type, Sequence, MutableSequence

from math import nan

from clickhouse_connect.datatypes.base import TypeDef, ArrayType, ClickHouseType
from clickhouse_connect.driver.common import array_type, write_array, decimal_size, decimal_prec
from clickhouse_connect.driver.ctypes import numpy_conv, data_conv
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.options import pd, np
from clickhouse_connect.driver.query import QueryContext
from clickhouse_connect.driver.types import ByteSource

Expand Down Expand Up @@ -52,17 +55,25 @@ class UInt64(ArrayType):

def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext):
fmt = self.read_format(ctx)
if fmt == 'string':
column = source.read_array(self._array_type, num_rows)
return [str(x) for x in column]
if ctx.use_numpy:
np_type = '<q' if fmt == 'signed' else '<u8'
return numpy_conv.read_numpy_array(source, np_type, num_rows)
arr_type = 'q' if fmt == 'signed' else 'Q'
return source.read_array(arr_type, num_rows)

def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence:
return data_conv.read_nullable_array(source, self._array_type, num_rows, use_none=ctx.use_none)
return data_conv.read_nullable_array(source, 'q' if self.read_format(ctx) == 'signed' else 'Q',
num_rows, self._active_null(ctx))

def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence:
fmt = self.read_format(ctx)
if fmt == 'string':
return [str(x) for x in column]
if ctx.use_pandas_na and self.nullable:
return pd.array(column, dtype='Int64' if fmt == 'signed' else 'UInt64')
if ctx.use_numpy and self.nullable and (not ctx.use_none):
return np.array(column, dtype='<q' if fmt == 'signed' else '<u8')
return column


class BigInt(ClickHouseType, registered=False):
Expand Down Expand Up @@ -134,16 +145,34 @@ class UInt256(BigInt):
_signed = False


class Float32(ArrayType):
class Float(ArrayType, registered=False):
_array_type = 'f'
np_type = '<f4'
python_type = float

def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence:
if self.read_format(ctx) == 'string':
return [str(x) for x in column]
if ctx.use_numpy and self.nullable and (not ctx.use_none):
return np.array(column, dtype=self.np_type)
return column

def _active_null(self, ctx: QueryContext):
if ctx.use_pandas_na:
return nan
if ctx.use_none:
return None
if ctx.use_numpy:
return nan
return 0.0


class Float32(Float):
np_type = '<f4'


class Float64(ArrayType):
class Float64(Float):
_array_type = 'd'
np_type = '<f8'
python_type = float


class Bool(ClickHouseType):
Expand All @@ -154,6 +183,11 @@ def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx: QueryCont
column = source.read_bytes(num_rows)
return [b != 0 for b in column]

def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence:
if ctx.use_numpy:
return np.array(column)
return column

def _write_column_binary(self, column, dest, _ctx):
write_array('B', [1 if x else 0 for x in column], dest)

Expand All @@ -165,7 +199,7 @@ class Boolean(Bool):
class Enum(ClickHouseType):
__slots__ = '_name_map', '_int_map'
_array_type = 'b'
valid_formats = 'string', 'int'
valid_formats = 'native', 'int'
python_type = str

def __init__(self, type_def: TypeDef):
Expand Down Expand Up @@ -255,6 +289,13 @@ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: M
else:
write_array(self._array_type, [int(x * mult) for x in column], dest)

def _active_null(self, ctx: QueryContext):
if ctx.use_none:
return None
digits = str('0').rjust(self.prec, '0')
scale = self.scale
return decimal.Decimal(f'{digits[:-scale]}.{digits[-scale:]}')


class BigDecimal(Decimal, registered=False):
def _read_column_binary(self, source: ByteSource, num_rows: int, _ctx):
Expand Down
23 changes: 15 additions & 8 deletions clickhouse_connect/datatypes/string.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Sequence, MutableSequence, Union

import pandas as pd

from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.query import QueryContext
Expand All @@ -10,14 +12,17 @@
class String(ClickHouseType):

def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext):
if ctx.use_numpy and ctx.max_str_len:
return np.array(source.read_str_col(num_rows, ctx.encoding or self.encoding), dtype=f'<U{ctx.max_str_len}')
return source.read_str_col(num_rows, ctx.encoding or self.encoding)

def _read_nullable_column(self, source: ByteSource, num_rows: int, ctx: QueryContext) -> Sequence:
if ctx.use_numpy:
return super()._read_nullable_column(source, num_rows, ctx)
return source.read_str_col(num_rows, ctx.encoding or self.encoding, nullable=True, use_none=ctx.use_none)
return source.read_str_col(num_rows, ctx.encoding or self.encoding, True, self._active_null(ctx))

def _finalize_column(self, column: Sequence, ctx: QueryContext) -> Sequence:
if ctx.use_na_values:
return pd.array(column, dtype=pd.StringDtype())
if ctx.use_numpy and ctx.max_str_len:
return np.array(column, dtype=f'<U{ctx.max_str_len}')
return column

# pylint: disable=duplicate-code
def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: MutableSequence, ctx: InsertContext):
Expand Down Expand Up @@ -51,8 +56,8 @@ def _write_column_binary(self, column: Union[Sequence, MutableSequence], dest: M
app(0x80 | b)
dest += y

def _python_null(self, _ctx):
return ''
def _active_null(self, ctx):
return None if ctx.use_none else ''


class FixedString(ClickHouseType):
Expand All @@ -64,7 +69,9 @@ def __init__(self, type_def: TypeDef):
self._name_suffix = type_def.arg_str
self._empty_bytes = bytes(b'\x00' * self.byte_size)

def python_null(self, ctx: QueryContext):
def _active_null(self, ctx: QueryContext):
if ctx.use_none:
return None
return self._empty_bytes if self.read_format(ctx) == 'native' else ''

@property
Expand Down
Loading

0 comments on commit 7202dc5

Please sign in to comment.