# encoding:utf-8
# !/usr/bin/env python
import psutil
import time
from threadingimport Lock
from flaskimport Flask, render_template
from flask_socketioimport SocketIO
import pandasas pd
import MySQLdb
async_mode =None
app = Flask(__name__)
app.config['SECRET_KEY'] ='secret!'
socketio = SocketIO(app,async_mode=async_mode)
thread =None
thread_lock = Lock()
# 后臺線程 產(chǎn)生數(shù)據(jù),即刻推送至前端
def background_thread():
count =0
? ? lastvalue=0
? ? t=0
? ? cpu=0
? ? amount=0
? ? while True:
count +=1
? ? ? ? import numpyas np
? ? ? ? t=t +1
? ? ? ? n=np.random.randn()
cpu= n*10 if n>0 else 2
? ? ? ? amount=amount + np.random.randn()*10 if np.random.randn()>0 else amount+13
? ? ? ? socketio.emit('server_response',
{'data': [t,int(cpu),amount,lastvalue],'count': count},
namespace='/test')
socketio.emit('server_response2',
{'data': [0,int(cpu),amount],'count': count},
namespace='/test')
lastvalue = amount
socketio.sleep(2)
# 注意:這里不需要客戶端連接的上下文,默認(rèn) broadcast = True
@app.route('/')
def index():
return render_template('index.html',async_mode=socketio.async_mode)
@socketio.on('connect',namespace='/test')
def test_connect():
global thread
with thread_lock:
if threadis None:
thread = socketio.start_background_task(target=background_thread)
if __name__ =='__main__':
socketio.run(app,debug=True,port=5028,host='0.0.0.0')