Skip to content

Commit

Permalink
fix aio-libs#590: kafka-python 2.0 support
Browse files Browse the repository at this point in the history
  • Loading branch information
yumendy committed Mar 7, 2020
1 parent dddc19c commit 0160995
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 28 deletions.
12 changes: 6 additions & 6 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.errors import (
KafkaError,
ConnectionError,
KafkaConnectionError,
NodeNotReadyError,
RequestTimedOutError,
UnknownTopicOrPartitionError,
Expand Down Expand Up @@ -222,7 +222,7 @@ async def bootstrap(self):
log.debug('Received cluster metadata: %s', self.cluster)
break
else:
raise ConnectionError(
raise KafkaConnectionError(
'Unable to bootstrap from {}'.format(self.hosts))

# detect api version if need
Expand Down Expand Up @@ -457,10 +457,10 @@ async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
request (Struct): request object (not-encoded)
Raises:
kafka.common.RequestTimedOutError
kafka.common.NodeNotReadyError
kafka.common.ConnectionError
kafka.common.CorrelationIdError
kafka.errors.RequestTimedOutError
kafka.errors.NodeNotReadyError
kafka.errors.ConnectionError
kafka.errors.CorrelationIdError
Returns:
Future: resolves to Response struct
Expand Down
10 changes: 5 additions & 5 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def port(self):

def send(self, request, expect_response=True):
if self._writer is None:
raise Errors.ConnectionError(
raise Errors.KafkaConnectionError(
"No connection to broker at {0}:{1}"
.format(self._host, self._port))

Expand All @@ -387,7 +387,7 @@ def send(self, request, expect_response=True):
self._writer.write(size + message)
except OSError as err:
self.close(reason=CloseReason.CONNECTION_BROKEN)
raise Errors.ConnectionError(
raise Errors.KafkaConnectionError(
"Connection at {0}:{1} broken: {2}".format(
self._host, self._port, err))

Expand All @@ -402,7 +402,7 @@ def send(self, request, expect_response=True):

def _send_sasl_token(self, payload, expect_response=True):
if self._writer is None:
raise Errors.ConnectionError(
raise Errors.KafkaConnectionError(
"No connection to broker at {0}:{1}"
.format(self._host, self._port))

Expand All @@ -411,7 +411,7 @@ def _send_sasl_token(self, payload, expect_response=True):
self._writer.write(size + payload)
except OSError as err:
self.close(reason=CloseReason.CONNECTION_BROKEN)
raise Errors.ConnectionError(
raise Errors.KafkaConnectionError(
"Connection at {0}:{1} broken: {2}".format(
self._host, self._port, err))

Expand All @@ -435,7 +435,7 @@ def close(self, reason=None, exc=None):
self._read_task = None
for _, _, fut in self._requests:
if not fut.done():
error = Errors.ConnectionError(
error = Errors.KafkaConnectionError(
"Connection at {0}:{1} closed".format(
self._host, self._port))
if exc is not None:
Expand Down
4 changes: 2 additions & 2 deletions aiokafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@

KafkaUnavailableError,
KafkaTimeoutError,
ConnectionError,
KafkaConnectionError,
)

__all__ = [
Expand Down Expand Up @@ -143,7 +143,7 @@

"KafkaUnavailableError",
"KafkaTimeoutError",
"ConnectionError",
"KafkaConnectionError",
]


Expand Down
2 changes: 1 addition & 1 deletion aiokafka/structs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import collections
from kafka.common import (
from kafka.structs import (
OffsetAndMetadata, TopicPartition, BrokerMetadata, PartitionMetadata
)

Expand Down
2 changes: 1 addition & 1 deletion examples/ssl_consume_produce.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from kafka.common import TopicPartition
from kafka.structs import TopicPartition

context = create_ssl_context(
cafile="./ca-cert", # CA used to sign certificate.
Expand Down
4 changes: 2 additions & 2 deletions tests/_testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from aiokafka import ConsumerRebalanceListener
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import ConnectionError
from aiokafka.errors import KafkaConnectionError
from aiokafka.producer import AIOKafkaProducer
from aiokafka.helpers import create_ssl_context

Expand Down Expand Up @@ -273,7 +273,7 @@ def wait_kafka(cls):
# brokers. That counts as still not available
if client.cluster.brokers():
return
except ConnectionError:
except KafkaConnectionError:
pass
finally:
cls.loop.run_until_complete(client.close())
Expand Down
10 changes: 5 additions & 5 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import types
from unittest import mock

from kafka.common import (KafkaError, ConnectionError, RequestTimedOutError,
from kafka.errors import (KafkaError, KafkaConnectionError, RequestTimedOutError,
NodeNotReadyError, UnrecognizedBrokerVersion)
from kafka.protocol.metadata import (
MetadataRequest_v0 as MetadataRequest,
Expand Down Expand Up @@ -245,7 +245,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
async def test_bootstrap(self):
client = AIOKafkaClient(loop=self.loop,
bootstrap_servers='0.42.42.42:444')
with self.assertRaises(ConnectionError):
with self.assertRaises(KafkaConnectionError):
await client.bootstrap()

client = AIOKafkaClient(loop=self.loop, bootstrap_servers=self.hosts)
Expand All @@ -267,15 +267,15 @@ async def test_failed_bootstrap(self):
client = AIOKafkaClient(loop=self.loop, bootstrap_servers=self.hosts)
with mock.patch.object(AIOKafkaConnection, 'send') as mock_send:
mock_send.side_effect = KafkaError('some kafka error')
with self.assertRaises(ConnectionError):
with self.assertRaises(KafkaConnectionError):
await client.bootstrap()

@run_until_complete
async def test_failed_bootstrap_timeout(self):
client = AIOKafkaClient(loop=self.loop, bootstrap_servers=self.hosts)
with mock.patch.object(AIOKafkaConnection, 'send') as mock_send:
mock_send.side_effect = asyncio.TimeoutError('Timeout error')
with self.assertRaises(ConnectionError):
with self.assertRaises(KafkaConnectionError):
await client.bootstrap()

@run_until_complete
Expand Down Expand Up @@ -308,7 +308,7 @@ async def test_check_version(self):
await client.check_version(client.get_random_node())

client._get_conn = asyncio.coroutine(lambda _, **kw: None)
with self.assertRaises(ConnectionError):
with self.assertRaises(KafkaConnectionError):
await client.check_version()
await client.close()

Expand Down
8 changes: 4 additions & 4 deletions tests/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from aiokafka.conn import AIOKafkaConnection, create_conn, VersionInfo
from aiokafka.errors import (
ConnectionError, CorrelationIdError, KafkaError, NoError, UnknownError,
KafkaConnectionError, CorrelationIdError, KafkaError, NoError, UnknownError,
UnsupportedSaslMechanismError, IllegalSaslStateError
)
from aiokafka.record.legacy_records import LegacyRecordBatchBuilder
Expand Down Expand Up @@ -136,13 +136,13 @@ async def test_send_to_closed(self):
host, port = self.kafka_host, self.kafka_port
conn = AIOKafkaConnection(host=host, port=port, loop=self.loop)
request = MetadataRequest([])
with self.assertRaises(ConnectionError):
with self.assertRaises(KafkaConnectionError):
await conn.send(request)

conn._writer = mock.MagicMock()
conn._writer.write.side_effect = OSError('mocked writer is closed')

with self.assertRaises(ConnectionError):
with self.assertRaises(KafkaConnectionError):
await conn.send(request)

@run_until_complete
Expand Down Expand Up @@ -229,7 +229,7 @@ async def invoke_osserror(*a, **kw):
# invoke reader task
conn._read_task = conn._create_reader_task()

with self.assertRaises(ConnectionError):
with self.assertRaises(KafkaConnectionError):
await conn.send(request)
self.assertEqual(conn.connected(), False)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
OffsetCommitRequest, OffsetCommitResponse_v2,
OffsetFetchRequest_v1 as OffsetFetchRequest
)
import kafka.common as Errors
import kafka.errors as Errors

from ._testutil import KafkaIntegrationTestCase, run_until_complete

Expand Down
3 changes: 2 additions & 1 deletion tests/test_message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from unittest import mock

from kafka.cluster import ClusterMetadata
from kafka.common import (TopicPartition, KafkaTimeoutError,
from kafka.errors import (KafkaTimeoutError,
NotLeaderForPartitionError,
LeaderNotAvailableError)
from kafka.structs import TopicPartition
from ._testutil import run_until_complete
from aiokafka.util import ensure_future
from aiokafka.producer.message_accumulator import (
Expand Down

0 comments on commit 0160995

Please sign in to comment.