最近需要用到websocket類似的長連接
使用這個插件:https://github.com/miguelgrinberg/python-socketio
有類似的監(jiān)聽功能
客戶端安裝:
pip install "python-socketio[client]"
服務端安裝:
pip install python-socketio
客戶端代碼:
import socketio
def create_client():
sio = socketio.Client()
@sio.event
def connect():
print('connection established')
sio.emit('client', {'foo': 'bar'})
@sio.on('serve')
def on_message(data):
print('client received a message!',data)
# @sio.event
# def message(data):
# print('message received with ', data)
# sio.emit('client', {'response': 'my response'})
@sio.event
def connect_error():
print("The connection failed!")
sio.disconnect()
@sio.event
def disconnect():
print('disconnected from server')
sio.disconnect()
sio.connect('http://localhost:5000')
sio.wait()
create_client()
服務端代碼:
import eventlet
import socketio
def create_serve():
sio = socketio.Server()
app = socketio.WSGIApp(sio, static_files={
'/': {'content_type': 'text/html', 'filename': 'index.html'}
})
@sio.event
def connect(sid, environ):
print('connect ', sid)
sio.emit('serve', {'response': 'connert success'})
# @sio.on('client')
# def on_message(sid, data):
# print('serve received a message!111', data)
@sio.on('client')
def another_event(sid, data):
print('serve received a message!', data)
# @sio.event
# def my_event(sid, data):
# print('message ', data)
# sio.emit('serve', {'response': 'connert success'})
@sio.event
def disconnect(sid):
print('disconnect ', sid)
if __name__ == '__main__':
eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
create_serve()
詳情看官方文檔:
https://python-socketio.readthedocs.io/en/latest/
end
————————————————————————————————————————————
自用連接:
https://python-socketio.readthedocs.io/en/latest/client.html
https://python-socketio.readthedocs.io/en/latest/server.html