使用Python和OpenCV實現(xiàn)樹莓派/PC實時攝像頭數(shù)據(jù)共享,主要分為服務器和客戶端兩部分。
服務器Demo如下:
#服務器端
import socket
import threading
import struct
import time
import cv2
import numpy
class Carame_Accept_Object:
? ? def __init__(self,S_addr_port=("",8880)):
? ? ? ? self.resolution=(640,480)? ? ? #分辨率
? ? ? ? self.img_fps=15? ? ? ? ? ? ? ? #每秒傳輸多少幀數(shù)
? ? ? ? self.addr_port=S_addr_port
? ? ? ? self.Set_Socket(self.addr_port)
? ? #設置套接字
? ? def Set_Socket(self,S_addr_port):
? ? ? ? self.server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
? ? ? ? self.server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #端口可復用
? ? ? ? self.server.bind(S_addr_port)
? ? ? ? self.server.listen(5)
? ? ? ? #print("the process work in the port:%d" % S_addr_port[1])
def check_option(object,client):
? ? #按格式解碼,確定幀數(shù)和分辨率
? ? info=struct.unpack('lhh',client.recv(8))
? ? if info[0]>888:
? ? ? ? object.img_fps=int(info[0])-888? ? ? ? ? #獲取幀數(shù)
? ? ? ? object.resolution=list(object.resolution)
? ? ? ? # 獲取分辨率
? ? ? ? object.resolution[0]=info[1]
? ? ? ? object.resolution[1]=info[2]
? ? ? ? object.resolution = tuple(object.resolution)
? ? ? ? return 1
? ? else:
? ? ? ? return 0
def RT_Image(object,client,D_addr):
? ? if(check_option(object,client)==0):
? ? ? ? return
? ? camera=cv2.VideoCapture(0)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? #從攝像頭中獲取視頻
? ? img_param=[int(cv2.IMWRITE_JPEG_QUALITY),object.img_fps]? #設置傳送圖像格式、幀數(shù)
? ? while(1):
? ? ? ? time.sleep(0.1)? ? ? ? ? ? #推遲線程運行0.1s
? ? ? ? _,object.img=camera.read()? #讀取視頻每一幀
? ? ? ? object.img=cv2.resize(object.img,object.resolution)? ? #按要求調整圖像大小(resolution必須為元組)
? ? ? ? _,img_encode=cv2.imencode('.jpg',object.img,img_param)? #按格式生成圖片
? ? ? ? img_code=numpy.array(img_encode)? ? ? ? ? ? ? ? ? ? ? ? #轉換成矩陣
? ? ? ? object.img_data=img_code.tostring()? ? ? ? ? ? ? ? ? ? #生成相應的字符串
? ? ? ? try:
? ? ? ? ? ? #按照相應的格式進行打包發(fā)送圖片
? ? ? ? ? ? client.send(struct.pack("lhh",len(object.img_data),object.resolution[0],object.resolution[1])+object.img_data)
? ? ? ? except:
? ? ? ? ? ? camera.release()? ? ? ? #釋放資源
? ? ? ? ? ? return
if __name__ == '__main__':
? ? camera=Carame_Accept_Object()
? ? while(1):
? ? ? ? client,D_addr=camera.server.accept()
? ? ? ? clientThread=threading.Thread(None,target=RT_Image,args=(camera,client,D_addr,))
? ? ? ? clientThread.start()
客戶端Demo如下:
#客戶端
import socket
import cv2
import threading
import struct
import numpy
class Camera_Connect_Object:
? ? def __init__(self,D_addr_port=["",8880]):
? ? ? ? self.resolution=[640,480]
? ? ? ? self.addr_port=D_addr_port
? ? ? ? self.src=888+15? ? ? ? ? ? ? ? #雙方確定傳輸幀數(shù),(888)為校驗值
? ? ? ? self.interval=0? ? ? ? ? ? ? ? #圖片播放時間間隔
? ? ? ? self.img_fps=100? ? ? ? ? ? ? #每秒傳輸多少幀數(shù)
? ? def Set_socket(self):
? ? ? ? self.client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
? ? ? ? self.client.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
? ? def Socket_Connect(self):
? ? ? ? self.Set_socket()
? ? ? ? self.client.connect(self.addr_port)
? ? ? ? print("IP is %s:%d" % (self.addr_port[0],self.addr_port[1]))
? ? def RT_Image(self):
? ? ? ? #按照格式打包發(fā)送幀數(shù)和分辨率
? ? ? ? self.name=self.addr_port[0]+" Camera"
? ? ? ? self.client.send(struct.pack("lhh", self.src, self.resolution[0], self.resolution[1]))
? ? ? ? while(1):
? ? ? ? ? ? info=struct.unpack("lhh",self.client.recv(8))
? ? ? ? ? ? buf_size=info[0]? ? ? ? ? ? ? ? ? ? #獲取讀的圖片總長度
? ? ? ? ? ? if buf_size:
? ? ? ? ? ? ? ? try:
? ? ? ? ? ? ? ? ? ? self.buf=b""? ? ? ? ? ? ? ? #代表bytes類型
? ? ? ? ? ? ? ? ? ? temp_buf=self.buf
? ? ? ? ? ? ? ? ? ? while(buf_size):? ? ? ? ? ? #讀取每一張圖片的長度
? ? ? ? ? ? ? ? ? ? ? ? temp_buf=self.client.recv(buf_size)
? ? ? ? ? ? ? ? ? ? ? ? buf_size-=len(temp_buf)
? ? ? ? ? ? ? ? ? ? ? ? self.buf+=temp_buf? ? ? #獲取圖片
? ? ? ? ? ? ? ? ? ? ? ? data = numpy.fromstring(self.buf, dtype='uint8')? ? #按uint8轉換為圖像矩陣
? ? ? ? ? ? ? ? ? ? ? ? self.image = cv2.imdecode(data, 1)? ? ? ? ? ? ? ? ? #圖像解碼
? ? ? ? ? ? ? ? ? ? ? ? gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
? ? ? ? ? ? ? ? ? ? ? ? cv2.imshow(self.name, self.image)? ? ? ? ? ? ? ? ? #展示圖片
? ? ? ? ? ? ? ? except:
? ? ? ? ? ? ? ? ? ? pass;
? ? ? ? ? ? ? ? finally:
? ? ? ? ? ? ? ? ? ? if(cv2.waitKey(10)==27):? ? ? ? #每10ms刷新一次圖片,按‘ESC’(27)退出
? ? ? ? ? ? ? ? ? ? ? ? self.client.close()
? ? ? ? ? ? ? ? ? ? ? ? cv2.destroyAllWindows()
? ? ? ? ? ? ? ? ? ? ? ? break
? ? def Get_Data(self,interval):
? ? ? ? showThread=threading.Thread(target=self.RT_Image)
? ? ? ? showThread.start()
if __name__ == '__main__':
? ? camera=Camera_Connect_Object()
? ? camera.addr_port[0]=input("Please input IP:")
? ? camera.addr_port=tuple(camera.addr_port)
? ? camera.Socket_Connect()
? ? camera.Get_Data(camera.interval)
運行效果還是比較流暢的,因為傳輸網(wǎng)絡視頻流,所以畫質欠佳。