姓名:閆偉? 學(xué)號(hào):15020150038
轉(zhuǎn)載自:https://zhuanlan.zhihu.com/p/47341761
【嵌牛導(dǎo)讀】:OpenCV是Intel開(kāi)發(fā)并維護(hù)的一種常用的計(jì)算機(jī)視覺(jué)庫(kù),它在道路交通領(lǐng)域有著廣泛的應(yīng)用
【嵌牛鼻子】:Python OpenCV
【嵌牛提問(wèn)】:如何用基于計(jì)算機(jī)視覺(jué)技術(shù)的方法計(jì)算道路上的車流量
【嵌牛正文】:
在本教程,我們只用到 Python 和 OpenCV,以及在背景減除算法輔助下使用一點(diǎn)很簡(jiǎn)單的運(yùn)動(dòng)檢測(cè)原理。
項(xiàng)目所有代碼地址見(jiàn)文末。
以下是我們的整體計(jì)劃:
理解用于前景檢測(cè)的背景去除算法的主要理念
OpenCV圖像過(guò)濾器
為進(jìn)一步數(shù)據(jù)操作創(chuàng)建數(shù)據(jù)處理工作流
最終我們會(huì)得到如視頻所示的結(jié)果:
背景去除算法

有不少算法可以用于背景去除,但它們的主要理念都比較簡(jiǎn)單。
假設(shè)你有一段自己房間的監(jiān)控視頻,在其中一些視頻幀上沒(méi)有人也沒(méi)有寵物出現(xiàn),那么基本上畫面就是靜止的,我們稱之為 background_layer,即背景層。所以要想獲取視頻畫面上的移動(dòng)物體,我們只需:
foreground_objects = current_frame - background_layer
但在一些情況下,我們無(wú)法得到靜止幀,因?yàn)槌霈F(xiàn)光線發(fā)生改變,某些物體被人拿走,或者總是處于移動(dòng)狀態(tài)等等。在這些情況下,我們需要保存一定數(shù)量的幀,努力找出哪些像素大部分是一樣的,然后讓這些像素成為 background_layer 的一部分。整體上的區(qū)別是我們?cè)撊绾潍@取 background_layer 和用于實(shí)現(xiàn)更精準(zhǔn)檢測(cè)的額外過(guò)濾操作。
在本文,我們會(huì)用 MOG 算法執(zhí)行背景減除操作,經(jīng)過(guò)處理后的背景會(huì)如下所示:

圖:左為原始幀,右為MOG進(jìn)行背景減除后的幀
可以看到,前景蒙版中存在一些噪聲數(shù)據(jù),我們要使用一些過(guò)濾方法將它們?nèi)コ?/p>
此時(shí)我們的代碼如下所示:
import osimport loggingimport logging.handlersimport randomimport numpy as npimport skvideo.ioimport cv2import matplotlib.pyplot as pltimport utils# 沒(méi)有它會(huì)出現(xiàn)一些奇怪的錯(cuò)誤cv2.ocl.setUseOpenCL(False)random.seed(123)# ============================================================================IMAGE_DIR = "./out"VIDEO_SOURCE = "input.mp4"SHAPE = (720, 1280)? # HxW# ============================================================================def train_bg_subtractor(inst, cap, num=500):? ? '''? ? ? ? BG substractor need process some amount of frames to start giving result? ? '''? ? print ('Training BG Subtractor...')? ? i = 0? ? for frame in cap:? ? ? ? inst.apply(frame, None, 0.001)? ? ? ? i += 1? ? ? ? if i >= num:? ? ? ? ? ? return capdef main():? ? log = logging.getLogger("main")? ? # 用緩存中的500幀創(chuàng)建MOG背景減除器和陰影檢測(cè)? ? bg_subtractor = cv2.createBackgroundSubtractorMOG2(? ? ? ? history=500, detectShadows=True)? ? # 設(shè)置圖像源? ? # 你還可以用CV2? ? cap = skvideo.io.vreader(VIDEO_SOURCE)? ? # 跳過(guò)500幀,訓(xùn)練背景減除器? ? train_bg_subtractor(bg_subtractor, cap, num=500)? ? frame_number = -1? ? for frame in cap:? ? ? ? if not frame.any():? ? ? ? ? ? log.error("Frame capture failed, stopping...")? ? ? ? ? ? break? ? ? ? frame_number += 1? ? ? ? utils.save_frame(frame, "./out/frame_%04d.png" % frame_number)? ? ? ? fg_mask = bg_subtractor.apply(frame, None, 0.001)? ? ? ? utils.save_frame(frame, "./out/fg_mask_%04d.png" % frame_number)# ============================================================================if __name__ == "__main__":? ? log = utils.init_logging()? ? if not os.path.exists(IMAGE_DIR):? ? ? ? log.debug("Creating image directory `%s`...", IMAGE_DIR)? ? ? ? os.makedirs(IMAGE_DIR)? ? main()
過(guò)濾
在我們這個(gè)項(xiàng)目中,需要這些過(guò)濾器:Threshold(http://docs.opencv.org/3.1.0/d7/d4d/tutorial_py_thresholding.html),Erode(http://docs.opencv.org/3.1.0/d9/d61/tutorial_py_morphological_ops.html),Dilate(http://docs.opencv.org/3.1.0/d9/d61/tutorial_py_morphological_ops.html),Opening(http://docs.opencv.org/3.1.0/d9/d61/tutorial_py_morphological_ops.html),Closing(http://docs.opencv.org/3.1.0/d9/d61/tutorial_py_morphological_ops.html)。
可以點(diǎn)開(kāi)鏈接,詳細(xì)閱讀它們的工作原理。
現(xiàn)在我們就用這些過(guò)濾器來(lái)移除前景蒙版中的噪聲數(shù)據(jù)。
首先,我們用 Closing 移除各區(qū)域中的間隙,然后用 Opening 移除 1-2 個(gè)像素點(diǎn),之后用 Dilate 讓物體變得更粗一些。
def filter_mask(img):? ? kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))? ? # 填充所有細(xì)小的孔洞? ? closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)? ? # 移除噪聲? ? opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)? ? # 用Dilate融合相鄰斑點(diǎn)? ? dilation = cv2.dilate(opening, kernel, iterations=2)? ? # threshold? ? th = dilation[dilation < 240] = 0? ? return th
經(jīng)過(guò)以上處理后,我們的前景變成這樣:

借助輪廓來(lái)檢測(cè)物體
在物體檢測(cè)這部分,我們使用標(biāo)準(zhǔn)的帶有參數(shù)的 cv2.findContours 方法:
cv2.CV_RETR_EXTERNAL — get only outer contours.cv2.CV_CHAIN_APPROX_TC89_L1 - use Teh-Chin chain approximation algorithm (faster)def get_centroid(x, y, w, h):? ? x1 = int(w / 2)? ? y1 = int(h / 2)? ? cx = x + x1? ? cy = y + y1? ? return (cx, cy)def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35):? ? matches = []? ? # 尋找外部輪廓? ? im, contours, hierarchy = cv2.findContours(? ? ? ? fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)? ? # 按照寬度和高度過(guò)濾? ? for (i, contour) in enumerate(contours):? ? ? ? (x, y, w, h) = cv2.boundingRect(contour)? ? ? ? contour_valid = (w >= min_contour_width) and (? ? ? ? ? ? h >= min_contour_height)? ? ? ? if not contour_valid:? ? ? ? ? ? continue? ? ? ? # 獲取邊界框的中心點(diǎn)? ? ? ? centroid = get_centroid(x, y, w, h)? ? ? ? matches.append(((x, y, w, h), centroid))return matches
在畫面出口區(qū)域,我們?cè)黾恿艘恍└鶕?jù)高度、寬度的過(guò)濾,并添加了形心(centroid)。
創(chuàng)建處理工作流
你一定知道在機(jī)器學(xué)習(xí)和計(jì)算機(jī)視覺(jué)中并沒(méi)有一個(gè)很神奇的算法能解決所有事情,即便假想有這么一種算法存在,我們也不會(huì)用它,因?yàn)檫@樣在大規(guī)模應(yīng)用時(shí)就不會(huì)很有效。例如,幾年前 Netflix 懸賞 3 百萬(wàn)美元征集效果最好的電影推薦算法,其中有個(gè)團(tuán)隊(duì)就創(chuàng)建了這樣一種算法,卻無(wú)法大規(guī)模解決問(wèn)題,結(jié)果對(duì) Netflix 幾乎沒(méi)有用處。
所以現(xiàn)在我們創(chuàng)建一個(gè)簡(jiǎn)單的處理工作流,當(dāng)然不是出于大規(guī)模應(yīng)用考慮,而是為了更方便,但二者的理念是一樣的。
class PipelineRunner(object):? ? '''? ? ? ? Very simple pipline.? ? ? ? Just run passed processors in order with passing context from one to? ? ? ? another.? ? ? ? You can also set log level for processors.? ? '''? ? def __init__(self, pipeline=None, log_level=logging.DEBUG):? ? ? ? self.pipeline = pipeline or []? ? ? ? self.context = {}? ? ? ? self.log = logging.getLogger(self.__class__.__name__)? ? ? ? self.log.setLevel(log_level)? ? ? ? self.log_level = log_level? ? ? ? self.set_log_level()? ? def set_context(self, data):? ? ? ? self.context = data? ? def add(self, processor):? ? ? ? if not isinstance(processor, PipelineProcessor):? ? ? ? ? ? raise Exception(? ? ? ? ? ? ? ? 'Processor should be an isinstance of PipelineProcessor.')? ? ? ? processor.log.setLevel(self.log_level)? ? ? ? self.pipeline.append(processor)? ? def remove(self, name):? ? ? ? for i, p in enumerate(self.pipeline):? ? ? ? ? ? if p.__class__.__name__ == name:? ? ? ? ? ? ? ? del self.pipeline[i]? ? ? ? ? ? ? ? return True? ? ? ? return False? ? def set_log_level(self):? ? ? ? for p in self.pipeline:? ? ? ? ? ? p.log.setLevel(self.log_level)? ? def run(self):? ? ? ? for p in self.pipeline:? ? ? ? ? ? self.context = p(self.context)? ? ? ? self.log.debug("Frame #%d processed.", self.context['frame_number'])? ? ? ? return self.contextclass PipelineProcessor(object):? ? '''? ? ? ? Base class for processors.? ? '''? ? def __init__(self):? ? ? ? self.log = logging.getLogger(self.__class__.__name__)
輸入構(gòu)建器會(huì)取一列處理器,按順序運(yùn)行。每個(gè)處理器都是這項(xiàng)工作的一部分。我們來(lái)創(chuàng)建輪廓檢測(cè)處理器。
class ContourDetection(PipelineProcessor):? ? '''? ? ? ? Detecting moving objects.? ? ? ? Purpose of this processor is to subtrac background, get moving objects? ? ? ? and detect them with a cv2.findContours method, and then filter off-by? ? ? ? width and height.? ? ? ? bg_subtractor - background subtractor isinstance.? ? ? ? min_contour_width - min bounding rectangle width.? ? ? ? min_contour_height - min bounding rectangle height.? ? ? ? save_image - if True will save detected objects mask to file.? ? ? ? image_dir - where to save images(must exist).? ? ? ? ? ? '''? ? def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'):? ? ? ? super(ContourDetection, self).__init__()? ? ? ? self.bg_subtractor = bg_subtractor? ? ? ? self.min_contour_width = min_contour_width? ? ? ? self.min_contour_height = min_contour_height? ? ? ? self.save_image = save_image? ? ? ? self.image_dir = image_dir? ? def filter_mask(self, img, a=None):? ? ? ? '''? ? ? ? ? ? This filters are hand-picked just based on visual tests? ? ? ? '''? ? ? ? kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))? ? ? ? # 找到所有細(xì)小孔洞? ? ? ? closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)? ? ? ? # 移除噪聲數(shù)據(jù)? ? ? ? opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)? ? ? ? # 用Dilate融合鄰近斑點(diǎn)? ? ? ? dilation = cv2.dilate(opening, kernel, iterations=2)? ? ? ? return dilation? ? def detect_vehicles(self, fg_mask, context):? ? ? ? matches = []? ? ? ? # 找到外部輪廓? ? ? ? im2, contours, hierarchy = cv2.findContours(? ? ? ? ? ? fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)? ? ? ? for (i, contour) in enumerate(contours):? ? ? ? ? ? (x, y, w, h) = cv2.boundingRect(contour)? ? ? ? ? ? contour_valid = (w >= self.min_contour_width) and (? ? ? ? ? ? ? ? h >= self.min_contour_height)? ? ? ? ? ? if not contour_valid:? ? ? ? ? ? ? ? continue? ? ? ? ? ? centroid = utils.get_centroid(x, y, w, h)? ? ? ? ? ? matches.append(((x, y, w, h), centroid))? ? ? ? return matches? ? def __call__(self, context):? ? ? ? frame = context['frame'].copy()? ? ? ? frame_number = context['frame_number']? ? ? ? fg_mask = self.bg_subtractor.apply(frame, None, 0.001)? ? ? ? # just thresholding values? ? ? ? fg_mask[fg_mask < 240] = 0? ? ? ? fg_mask = self.filter_mask(fg_mask, frame_number)? ? ? ? if self.save_image:? ? ? ? ? ? utils.save_frame(fg_mask, self.image_dir +? ? ? ? ? ? ? ? ? ? ? ? ? ? "/mask_%04d.png" % frame_number, flip=False)? ? ? ? context['objects'] = self.detect_vehicles(fg_mask, context)? ? ? ? context['fg_mask'] = fg_mask? ? ? ? return contex
我們將背景減除、過(guò)濾和目標(biāo)檢測(cè)部分融合在一起。
現(xiàn)在,我們創(chuàng)建一個(gè)處理器,能把在不同視頻幀上檢測(cè)到的物體連接在一起,并創(chuàng)建路徑,也會(huì)計(jì)算進(jìn)入出口區(qū)域的車輛數(shù)量。
'''? ? ? ? Counting vehicles that entered in exit zone.? ? ? ? Purpose of this class based on detected object and local cache create? ? ? ? objects pathes and count that entered in exit zone defined by exit masks.? ? ? ? exit_masks - list of the exit masks.? ? ? ? path_size - max number of points in a path.? ? ? ? max_dst - max distance between two points.? ? '''? ? def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0):? ? ? ? super(VehicleCounter, self).__init__()? ? ? ? self.exit_masks = exit_masks? ? ? ? self.vehicle_count = 0? ? ? ? self.path_size = path_size? ? ? ? self.pathes = []? ? ? ? self.max_dst = max_dst? ? ? ? self.x_weight = x_weight? ? ? ? self.y_weight = y_weight? ? def check_exit(self, point):? ? ? ? for exit_mask in self.exit_masks:? ? ? ? ? ? try:? ? ? ? ? ? ? ? if exit_mask[point[1]][point[0]] == 255:? ? ? ? ? ? ? ? ? ? return True? ? ? ? ? ? except:? ? ? ? ? ? ? ? return True? ? ? ? return False? ? def __call__(self, context):? ? ? ? objects = context['objects']? ? ? ? context['exit_masks'] = self.exit_masks? ? ? ? context['pathes'] = self.pathes? ? ? ? context['vehicle_count'] = self.vehicle_count? ? ? ? if not objects:? ? ? ? ? ? return context? ? ? ? points = np.array(objects)[:, 0:2]? ? ? ? points = points.tolist()? ? ? ? # 若路徑為空,則添加新的點(diǎn)? ? ? ? if not self.pathes:? ? ? ? ? ? for match in points:? ? ? ? ? ? ? ? self.pathes.append([match])? ? ? ? else:? ? ? ? ? ? # 根據(jù)點(diǎn)之間的最小距離將新點(diǎn)和舊路徑連接在一起? ? ? ? ? ? new_pathes = []? ? ? ? ? ? for path in self.pathes:? ? ? ? ? ? ? ? _min = 999999? ? ? ? ? ? ? ? _match = None? ? ? ? ? ? ? ? for p in points:? ? ? ? ? ? ? ? ? ? if len(path) == 1:? ? ? ? ? ? ? ? ? ? ? ? # 最后一個(gè)點(diǎn)和當(dāng)前點(diǎn)之間的距離? ? ? ? ? ? ? ? ? ? ? ? d = utils.distance(p[0], path[-1][0])? ? ? ? ? ? ? ? ? ? else:? ? ? ? ? ? ? ? ? ? ? ? # 根據(jù)兩個(gè)前面的點(diǎn)預(yù)測(cè)新點(diǎn)? ? ? ? ? ? ? ? ? ? ? ? # 并計(jì)算預(yù)測(cè)的新點(diǎn)和當(dāng)前點(diǎn)的距離? ? ? ? ? ? ? ? ? ? ? ? xn = 2 * path[-1][0][0] - path[-2][0][0]? ? ? ? ? ? ? ? ? ? ? ? yn = 2 * path[-1][0][1] - path[-2][0][1]? ? ? ? ? ? ? ? ? ? ? ? d = utils.distance(? ? ? ? ? ? ? ? ? ? ? ? ? ? p[0], (xn, yn),? ? ? ? ? ? ? ? ? ? ? ? ? ? x_weight=self.x_weight,? ? ? ? ? ? ? ? ? ? ? ? ? ? y_weight=self.y_weight? ? ? ? ? ? ? ? ? ? ? ? )? ? ? ? ? ? ? ? ? ? if d < _min:? ? ? ? ? ? ? ? ? ? ? ? _min = d? ? ? ? ? ? ? ? ? ? ? ? _match = p? ? ? ? ? ? ? ? if _match and _min <= self.max_dst:? ? ? ? ? ? ? ? ? ? points.remove(_match)? ? ? ? ? ? ? ? ? ? path.append(_match)? ? ? ? ? ? ? ? ? ? new_pathes.append(path)? ? ? ? ? ? ? ? # 如果當(dāng)前幀沒(méi)有匹配,不要丟棄路徑? ? ? ? ? ? ? ? if _match is None:? ? ? ? ? ? ? ? ? ? new_pathes.append(path)? ? ? ? ? ? self.pathes = new_pathes? ? ? ? ? ? # 添加新路徑? ? ? ? ? ? if len(points):? ? ? ? ? ? ? ? for p in points:? ? ? ? ? ? ? ? ? ? # 不要添加應(yīng)當(dāng)已經(jīng)被計(jì)算的點(diǎn)? ? ? ? ? ? ? ? ? ? if self.check_exit(p[1]):? ? ? ? ? ? ? ? ? ? ? ? continue? ? ? ? ? ? ? ? ? ? self.pathes.append([p])? ? ? ? # 只保存路徑中最后N個(gè)點(diǎn)? ? ? ? for i, _ in enumerate(self.pathes):? ? ? ? ? ? self.pathes[i] = self.pathes[i][self.path_size * -1:]? ? ? ? # 計(jì)算車輛,并丟棄計(jì)算后的路徑:? ? ? ? new_pathes = []? ? ? ? for i, path in enumerate(self.pathes):? ? ? ? ? ? d = path[-2:]? ? ? ? ? ? if (? ? ? ? ? ? ? ? # 至少需要兩個(gè)點(diǎn)來(lái)計(jì)算? ? ? ? ? ? ? ? len(d) >= 2 and? ? ? ? ? ? ? ? # 前面的點(diǎn)不在出口區(qū)域? ? ? ? ? ? ? ? not self.check_exit(d[0][1]) and? ? ? ? ? ? ? ? # 當(dāng)前點(diǎn)在出口區(qū)域? ? ? ? ? ? ? ? self.check_exit(d[1][1]) and? ? ? ? ? ? ? ? # 路徑長(zhǎng)度大于最小值? ? ? ? ? ? ? ? self.path_size <= len(path)? ? ? ? ? ? ):? ? ? ? ? ? ? ? self.vehicle_count += 1? ? ? ? ? ? else:? ? ? ? ? ? ? ? # 防止和已在出口區(qū)域的路徑相連? ? ? ? ? ? ? ? add = True? ? ? ? ? ? ? ? for p in path:? ? ? ? ? ? ? ? ? ? if self.check_exit(p[1]):? ? ? ? ? ? ? ? ? ? ? ? add = False? ? ? ? ? ? ? ? ? ? ? ? break? ? ? ? ? ? ? ? if add:? ? ? ? ? ? ? ? ? ? new_pathes.append(path)? ? ? ? self.pathes = new_pathes? ? ? ? context['pathes'] = self.pathes? ? ? ? context['objects'] = objects? ? ? ? context['vehicle_count'] = self.vehicle_count? ? ? ? self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)? ? ? ? return context
這里稍微有些復(fù)雜,我們一步步說(shuō)。
圖像中的綠色蒙版表示出口區(qū)域,就是我們計(jì)算車輛數(shù)量的地方。例如,我們會(huì)只計(jì)算長(zhǎng)度超過(guò) 3 個(gè)點(diǎn)(以去除一些噪聲數(shù)據(jù))的路徑,以及綠色區(qū)域的第 4 個(gè)點(diǎn)。
我們使用蒙版,是因?yàn)橄啾仁褂孟蛄克惴?,它的操作更高效更?jiǎn)單。只需用“binary and”操作檢查區(qū)域內(nèi)的點(diǎn),就行了。我們的設(shè)置如下:
EXIT_PTS = np.array([? ? [[732, 720], [732, 590], [1280, 500], [1280, 720]],? ? [[0, 400], [645, 400], [645, 0], [0, 0]]])base = np.zeros(SHAPE + (3,), dtype='uint8')exit_mask = cv2.fillPoly(base, EXIT_PTS, (255, 255, 255))[:, :, 0]
現(xiàn)在我們將路徑上的點(diǎn)相連。
new_pathes = []for path in self.pathes:? ? _min = 999999? ? _match = None? ? for p in points:? ? ? ? if len(path) == 1:? ? ? ? ? ? # 最后一個(gè)點(diǎn)和當(dāng)前點(diǎn)的距離? ? ? ? ? ? d = utils.distance(p[0], path[-1][0])? ? ? ? else:? ? ? ? ? ? # 根據(jù)前面2個(gè)點(diǎn)預(yù)測(cè)接下來(lái)的點(diǎn)? ? ? ? ? ? # 并計(jì)算預(yù)測(cè)的新點(diǎn)和當(dāng)前點(diǎn)之間的距離? ? ? ? ? ? xn = 2 * path[-1][0][0] - path[-2][0][0]? ? ? ? ? ? yn = 2 * path[-1][0][1] - path[-2][0][1]? ? ? ? ? ? d = utils.distance(? ? ? ? ? ? ? ? p[0], (xn, yn),? ? ? ? ? ? ? ? x_weight=self.x_weight,? ? ? ? ? ? ? ? y_weight=self.y_weight? ? ? ? ? ? )? ? ? ? if d < _min:? ? ? ? ? ? _min = d? ? ? ? ? ? _match = p? ? if _match and _min <= self.max_dst:? ? ? ? points.remove(_match)? ? ? ? path.append(_match)? ? ? ? new_pathes.append(path)? ? # 若當(dāng)前幀未匹配,不要丟棄路徑? ? if _match is None:? ? ? ? new_pathes.append(path)self.pathes = new_pathes# 添加新路徑if len(points):? ? for p in points:? ? ? ? # do not add points that already should be counted? ? ? ? if self.check_exit(p[1]):? ? ? ? ? ? continue? ? ? ? self.pathes.append([p])# 只保存路徑中最后N個(gè)點(diǎn)for i, _ in enumerate(self.pathes):? ? self.pathes[i] = self.pathes[i][self.path_size * -1:]
在第一幀,我們只將所有點(diǎn)添加為新路徑。
接著如果長(zhǎng)度等于 1,對(duì)于緩存中的每個(gè)路徑我們會(huì)嘗試找到新檢測(cè)到的物體的點(diǎn)(形心),它和路徑的最后一個(gè)點(diǎn)的歐幾里得距離最小。
如果長(zhǎng)度大于 1,那么用路徑中的最后兩個(gè)點(diǎn)我們預(yù)測(cè)同一條線上的新的點(diǎn),并找到它和當(dāng)前點(diǎn)之間的最小距離。
將具有最小距離的點(diǎn)添加至當(dāng)前路徑末尾,并從列表移除。
如果經(jīng)過(guò)這部操作后還剩有一些點(diǎn),我們將其添加為新路徑。
另外我們還限制路徑中的點(diǎn)的數(shù)量。
# 計(jì)算車輛,并丟棄計(jì)算后的路徑:new_pathes = []for i, path in enumerate(self.pathes):? ? d = path[-2:]? ? if (? ? ? ? # 需要至少兩個(gè)點(diǎn)來(lái)計(jì)算? ? ? ? len(d) >= 2 and? ? ? ? # 前面的點(diǎn)不在出口區(qū)域中? ? ? ? not self.check_exit(d[0][1]) and? ? ? ? # 當(dāng)前點(diǎn)在出口區(qū)域中? ? ? ? self.check_exit(d[1][1]) and? ? ? ? # 路徑長(zhǎng)度大于最小值? ? ? ? self.path_size <= len(path)? ? ):? ? ? ? self.vehicle_count += 1? ? else:? ? ? ? # 防止和已在出口區(qū)域中的路徑相連? ? ? ? add = True? ? ? ? for p in path:? ? ? ? ? ? if self.check_exit(p[1]):? ? ? ? ? ? ? ? add = False? ? ? ? ? ? ? ? break? ? ? ? if add:? ? ? ? ? ? new_pathes.append(path)self.pathes = new_pathescontext['pathes'] = self.pathescontext['objects'] = objectscontext['vehicle_count'] = self.vehicle_countself.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)return context
現(xiàn)在我們嘗試計(jì)算進(jìn)入出口區(qū)域內(nèi)的車輛數(shù)量。完成這一步,我們只需取路徑中最后兩個(gè)點(diǎn),在出口區(qū)域檢查它們,還要確保長(zhǎng)度應(yīng)大于限制條件。
剩余部分就是防止發(fā)生回連,將新點(diǎn)連接至當(dāng)前區(qū)域中的點(diǎn)。
最后兩個(gè)處理器是 CSV 寫入器,用于創(chuàng)建 CSV 格式的報(bào)告文件,可視化調(diào)試和創(chuàng)建美觀的圖形。
class CsvWriter(PipelineProcessor):? ? def __init__(self, path, name, start_time=0, fps=15):? ? ? ? super(CsvWriter, self).__init__()? ? ? ? self.fp = open(os.path.join(path, name), 'w')? ? ? ? self.writer = csv.DictWriter(self.fp, fieldnames=['time', 'vehicles'])? ? ? ? self.writer.writeheader()? ? ? ? self.start_time = start_time? ? ? ? self.fps = fps? ? ? ? self.path = path? ? ? ? self.name = name? ? ? ? self.prev = None? ? def __call__(self, context):? ? ? ? frame_number = context['frame_number']? ? ? ? count = _count = context['vehicle_count']? ? ? ? if self.prev:? ? ? ? ? ? _count = count - self.prev? ? ? ? time = ((self.start_time + int(frame_number / self.fps)) * 100? ? ? ? ? ? ? ? + int(100.0 / self.fps) * (frame_number % self.fps))? ? ? ? self.writer.writerow({'time': time, 'vehicles': _count})? ? ? ? self.prev = count? ? ? ? return contextclass Visualizer(PipelineProcessor):? ? def __init__(self, save_image=True, image_dir='images'):? ? ? ? super(Visualizer, self).__init__()? ? ? ? self.save_image = save_image? ? ? ? self.image_dir = image_dir? ? def check_exit(self, point, exit_masks=[]):? ? ? ? for exit_mask in exit_masks:? ? ? ? ? ? if exit_mask[point[1]][point[0]] == 255:? ? ? ? ? ? ? ? return True? ? ? ? return False? ? def draw_pathes(self, img, pathes):? ? ? ? if not img.any():? ? ? ? ? ? return? ? ? ? for i, path in enumerate(pathes):? ? ? ? ? ? path = np.array(path)[:, 1].tolist()? ? ? ? ? ? for point in path:? ? ? ? ? ? ? ? cv2.circle(img, point, 2, CAR_COLOURS[0], -1)? ? ? ? ? ? ? ? cv2.polylines(img, [np.int32(path)], False, CAR_COLOURS[0], 1)? ? ? ? return img? ? def draw_boxes(self, img, pathes, exit_masks=[]):? ? ? ? for (i, match) in enumerate(pathes):? ? ? ? ? ? contour, centroid = match[-1][:2]? ? ? ? ? ? if self.check_exit(centroid, exit_masks):? ? ? ? ? ? ? ? continue? ? ? ? ? ? x, y, w, h = contour? ? ? ? ? ? cv2.rectangle(img, (x, y), (x + w - 1, y + h - 1),? ? ? ? ? ? ? ? ? ? ? ? ? BOUNDING_BOX_COLOUR, 1)? ? ? ? ? ? cv2.circle(img, centroid, 2, CENTROID_COLOUR, -1)? ? ? ? return img? ? def draw_ui(self, img, vehicle_count, exit_masks=[]):? ? ? ? # 為圖像添加綠色蒙版? ? ? ? for exit_mask in exit_masks:? ? ? ? ? ? _img = np.zeros(img.shape, img.dtype)? ? ? ? ? ? _img[:, :] = EXIT_COLOR? ? ? ? ? ? mask = cv2.bitwise_and(_img, _img, mask=exit_mask)? ? ? ? ? ? cv2.addWeighted(mask, 1, img, 1, 0, img)? ? ? ? # 畫出頂部區(qū)域? ? ? ? cv2.rectangle(img, (0, 0), (img.shape[1], 50), (0, 0, 0), cv2.FILLED)? ? ? ? cv2.putText(img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30),? ? ? ? ? ? ? ? ? ? cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)? ? ? ? return img? ? def __call__(self, context):? ? ? ? frame = context['frame'].copy()? ? ? ? frame_number = context['frame_number']? ? ? ? pathes = context['pathes']? ? ? ? exit_masks = context['exit_masks']? ? ? ? vehicle_count = context['vehicle_count']? ? ? ? frame = self.draw_ui(frame, vehicle_count, exit_masks)? ? ? ? frame = self.draw_pathes(frame, pathes)? ? ? ? frame = self.draw_boxes(frame, pathes, exit_masks)? ? ? ? utils.save_frame(frame, self.image_dir +? ? ? ? ? ? ? ? ? ? ? ? "/processed_%04d.png" % frame_number)? ? ? ? return context
CSV 寫入器會(huì)按時(shí)間保存數(shù)據(jù),因?yàn)槲覀冃枰盟鲞M(jìn)一步分析。我用的是這種格式往 unixtimestamp 中添加額外的幀定時(shí):
開(kāi)始時(shí)間 =1 000 000 000,fps=10,我會(huì)得到如下結(jié)果
幀1=1 000 000 000 010
幀2=1 000 000 000 020
…
然后在你獲取完整的 CSV 報(bào)告后,可以按自己需要將數(shù)據(jù)合計(jì)在一起。
結(jié)語(yǔ)
可以看到,也不是很難。但如果你運(yùn)行程序的話會(huì)發(fā)現(xiàn)這項(xiàng)解決方案也算不上完美,前景會(huì)出現(xiàn)物體重疊的問(wèn)題,而且也無(wú)法按類型將車輛分類(在實(shí)際分析中肯定會(huì)需要這個(gè))。但如果有高質(zhì)量的攝像頭(設(shè)在馬路上方),程序還是有很高的準(zhǔn)確率。這也告訴我們,如果是正確使用,即便是很小很簡(jiǎn)單的算法也能得到好結(jié)果。