flask

# encoding:utf-8


# !/usr/bin/env python

import psutil

import time

from threadingimport Lock

from flaskimport Flask, render_template

from flask_socketioimport SocketIO

import pandasas pd

import MySQLdb

async_mode =None

app = Flask(__name__)

app.config['SECRET_KEY'] ='secret!'

socketio = SocketIO(app,async_mode=async_mode)

thread =None

thread_lock = Lock()


# 后臺線程 產(chǎn)生數(shù)據(jù),即刻推送至前端

def background_thread():

count =0

? ? lastvalue=0

? ? t=0

? ? cpu=0

? ? amount=0

? ? while True:

count +=1


? ? ? ? import numpyas np


? ? ? ? t=t +1

? ? ? ? n=np.random.randn()

cpu= n*10 if n>0 else 2

? ? ? ? amount=amount + np.random.randn()*10 if np.random.randn()>0 else amount+13


? ? ? ? socketio.emit('server_response',

{'data': [t,int(cpu),amount,lastvalue],'count': count},

namespace='/test')

socketio.emit('server_response2',

{'data': [0,int(cpu),amount],'count': count},

namespace='/test')

lastvalue = amount

socketio.sleep(2)

# 注意:這里不需要客戶端連接的上下文,默認(rèn) broadcast = True

@app.route('/')

def index():

return render_template('index.html',async_mode=socketio.async_mode)

@socketio.on('connect',namespace='/test')

def test_connect():

global thread

with thread_lock:

if threadis None:

thread = socketio.start_background_task(target=background_thread)

if __name__ =='__main__':

socketio.run(app,debug=True,port=5028,host='0.0.0.0')

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容