樹莓派/PC實現(xiàn)實時攝像頭數(shù)據(jù)共享(Python—OpenCV)

使用Python和OpenCV實現(xiàn)樹莓派/PC實時攝像頭數(shù)據(jù)共享,主要分為服務器和客戶端兩部分。

服務器Demo如下:

#服務器端

import socket

import threading

import struct

import time

import cv2

import numpy

class Carame_Accept_Object:

? ? def __init__(self,S_addr_port=("",8880)):

? ? ? ? self.resolution=(640,480)? ? ? #分辨率

? ? ? ? self.img_fps=15? ? ? ? ? ? ? ? #每秒傳輸多少幀數(shù)

? ? ? ? self.addr_port=S_addr_port

? ? ? ? self.Set_Socket(self.addr_port)

? ? #設置套接字

? ? def Set_Socket(self,S_addr_port):

? ? ? ? self.server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

? ? ? ? self.server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #端口可復用

? ? ? ? self.server.bind(S_addr_port)

? ? ? ? self.server.listen(5)

? ? ? ? #print("the process work in the port:%d" % S_addr_port[1])

def check_option(object,client):

? ? #按格式解碼,確定幀數(shù)和分辨率

? ? info=struct.unpack('lhh',client.recv(8))

? ? if info[0]>888:

? ? ? ? object.img_fps=int(info[0])-888? ? ? ? ? #獲取幀數(shù)

? ? ? ? object.resolution=list(object.resolution)

? ? ? ? # 獲取分辨率

? ? ? ? object.resolution[0]=info[1]

? ? ? ? object.resolution[1]=info[2]

? ? ? ? object.resolution = tuple(object.resolution)

? ? ? ? return 1

? ? else:

? ? ? ? return 0

def RT_Image(object,client,D_addr):

? ? if(check_option(object,client)==0):

? ? ? ? return

? ? camera=cv2.VideoCapture(0)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? #從攝像頭中獲取視頻

? ? img_param=[int(cv2.IMWRITE_JPEG_QUALITY),object.img_fps]? #設置傳送圖像格式、幀數(shù)

? ? while(1):

? ? ? ? time.sleep(0.1)? ? ? ? ? ? #推遲線程運行0.1s

? ? ? ? _,object.img=camera.read()? #讀取視頻每一幀

? ? ? ? object.img=cv2.resize(object.img,object.resolution)? ? #按要求調整圖像大小(resolution必須為元組)

? ? ? ? _,img_encode=cv2.imencode('.jpg',object.img,img_param)? #按格式生成圖片

? ? ? ? img_code=numpy.array(img_encode)? ? ? ? ? ? ? ? ? ? ? ? #轉換成矩陣

? ? ? ? object.img_data=img_code.tostring()? ? ? ? ? ? ? ? ? ? #生成相應的字符串

? ? ? ? try:

? ? ? ? ? ? #按照相應的格式進行打包發(fā)送圖片

? ? ? ? ? ? client.send(struct.pack("lhh",len(object.img_data),object.resolution[0],object.resolution[1])+object.img_data)

? ? ? ? except:

? ? ? ? ? ? camera.release()? ? ? ? #釋放資源

? ? ? ? ? ? return

if __name__ == '__main__':

? ? camera=Carame_Accept_Object()

? ? while(1):

? ? ? ? client,D_addr=camera.server.accept()

? ? ? ? clientThread=threading.Thread(None,target=RT_Image,args=(camera,client,D_addr,))

? ? ? ? clientThread.start()

客戶端Demo如下:

#客戶端

import socket

import cv2

import threading

import struct

import numpy

class Camera_Connect_Object:

? ? def __init__(self,D_addr_port=["",8880]):

? ? ? ? self.resolution=[640,480]

? ? ? ? self.addr_port=D_addr_port

? ? ? ? self.src=888+15? ? ? ? ? ? ? ? #雙方確定傳輸幀數(shù),(888)為校驗值

? ? ? ? self.interval=0? ? ? ? ? ? ? ? #圖片播放時間間隔

? ? ? ? self.img_fps=100? ? ? ? ? ? ? #每秒傳輸多少幀數(shù)

? ? def Set_socket(self):

? ? ? ? self.client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

? ? ? ? self.client.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

? ? def Socket_Connect(self):

? ? ? ? self.Set_socket()

? ? ? ? self.client.connect(self.addr_port)

? ? ? ? print("IP is %s:%d" % (self.addr_port[0],self.addr_port[1]))

? ? def RT_Image(self):

? ? ? ? #按照格式打包發(fā)送幀數(shù)和分辨率

? ? ? ? self.name=self.addr_port[0]+" Camera"

? ? ? ? self.client.send(struct.pack("lhh", self.src, self.resolution[0], self.resolution[1]))

? ? ? ? while(1):

? ? ? ? ? ? info=struct.unpack("lhh",self.client.recv(8))

? ? ? ? ? ? buf_size=info[0]? ? ? ? ? ? ? ? ? ? #獲取讀的圖片總長度

? ? ? ? ? ? if buf_size:

? ? ? ? ? ? ? ? try:

? ? ? ? ? ? ? ? ? ? self.buf=b""? ? ? ? ? ? ? ? #代表bytes類型

? ? ? ? ? ? ? ? ? ? temp_buf=self.buf

? ? ? ? ? ? ? ? ? ? while(buf_size):? ? ? ? ? ? #讀取每一張圖片的長度

? ? ? ? ? ? ? ? ? ? ? ? temp_buf=self.client.recv(buf_size)

? ? ? ? ? ? ? ? ? ? ? ? buf_size-=len(temp_buf)

? ? ? ? ? ? ? ? ? ? ? ? self.buf+=temp_buf? ? ? #獲取圖片

? ? ? ? ? ? ? ? ? ? ? ? data = numpy.fromstring(self.buf, dtype='uint8')? ? #按uint8轉換為圖像矩陣

? ? ? ? ? ? ? ? ? ? ? ? self.image = cv2.imdecode(data, 1)? ? ? ? ? ? ? ? ? #圖像解碼

? ? ? ? ? ? ? ? ? ? ? ? gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)

? ? ? ? ? ? ? ? ? ? ? ? cv2.imshow(self.name, self.image)? ? ? ? ? ? ? ? ? #展示圖片

? ? ? ? ? ? ? ? except:

? ? ? ? ? ? ? ? ? ? pass;

? ? ? ? ? ? ? ? finally:

? ? ? ? ? ? ? ? ? ? if(cv2.waitKey(10)==27):? ? ? ? #每10ms刷新一次圖片,按‘ESC’(27)退出

? ? ? ? ? ? ? ? ? ? ? ? self.client.close()

? ? ? ? ? ? ? ? ? ? ? ? cv2.destroyAllWindows()

? ? ? ? ? ? ? ? ? ? ? ? break

? ? def Get_Data(self,interval):

? ? ? ? showThread=threading.Thread(target=self.RT_Image)

? ? ? ? showThread.start()

if __name__ == '__main__':

? ? camera=Camera_Connect_Object()

? ? camera.addr_port[0]=input("Please input IP:")

? ? camera.addr_port=tuple(camera.addr_port)

? ? camera.Socket_Connect()

? ? camera.Get_Data(camera.interval)

運行效果還是比較流暢的,因為傳輸網(wǎng)絡視頻流,所以畫質欠佳。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容