Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka message queue backend is not working with gevent socket code. #2325

Open
neerajdlh74 opened this issue Sep 8, 2022 · 1 comment
Open

Comments

@neerajdlh74
Copy link

neerajdlh74 commented Sep 8, 2022

I needed to use kafka as backend message queue with the gevent web-socket, but it is not working, and giving error, i checked that kafka topics and brokers are working properly, but don't know where the problem could be. please someone help me. i used python-socketio framework for web-socket.

error i am getting on running server1.py is:

Traceback (most recent call last):
  File "server1.py", line 16, in <module>
    mgr = socketio.KafkaManager(url=url, channel='bingo.sockets')
  File "/usr/local/lib/python3.6/site-packages/socketio/kafka_manager.py", line 52, in __init__
    self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_urls)
  File "/usr/local/lib/python3.6/site-packages/kafka/producer/kafka.py", line 383, in __init__
    **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 208, in __init__
    self._selector = self.config['selector']()
  File "/usr/lib64/python3.6/selectors.py", line 399, in __init__
    self._epoll = select.epoll()
AttributeError: module 'select' has no attribute 'epoll'
Exception ignored in: <bound method KafkaClient.__del__ of <kafka.client_async.KafkaClient object at 0x7fed7ed59b00>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 443, in __del__
    self._close()
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 421, in _close
    self._selector.close()
AttributeError: 'KafkaClient' object has no attribute '_selector'

here i am pasting server1.py code:

import socketio
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey
monkey.patch_all()


url=['kafka://127.0.0.1:9092', 'kafka://127.0.0.2:9092', 'kafka://127.0.0.3:9092']
mgr = socketio.KafkaManager(url=url, channel='sockets')
sio = socketio.Server(async_mode='gevent', client_manager=mgr, logger=True, engineio_logger=True)
app = socketio.WSGIApp(sio)

@sio.event
def connect(sid, environ):
      print('connected')
      sio.emit('message', {'foo':'bar'})

@sio.event
def message(sid, data):
      print('msg', data)

@sio.event
def disconnect(sid):
      print('disconnect')

if __name__=='__main__':
    pywsgi.WSGIServer(('0.0.0.0', 8000), app, handler_class=WebSocketHandler).serve_forever()

Here is the version details of the libraries currently I am using right now:

Python 3.6.8

gevent==1.2.2
gevent-websocket==0.10.1
eventlet==0.33.1
kafka-python==2.0.2
python-engineio==4.3.2
python-socketio==5.6.0
websocket-client==1.3.1
websockets==9.1
greenlet==1.1.2
asyncio==3.4.3
@dabbler0606
Copy link

You must put gevent.monkey.patch_all() on top

Please try

import gevent.monkey
gevent.monkey.patch_all()

import socketio
...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants