引言
目前車聯(lián)網(wǎng)和車路協(xié)同 (V2V, V2I, V2P... 統(tǒng)稱 V2X) 是自動駕駛和智能交通領域研究的核心問題之一。簡單來說,V2X 就是為車輛提供了上帝視角,可以獲取更大范圍、更準確的信息,從而優(yōu)化駕駛決策。
在各種智能交通場景中,platooning (車隊編排) 是一個很熱門的話題。借助 V2X, platooning 可以減小車輛之間的安全距離。較小的安全距離可以增加道路的吞吐量,對于在高速公路上行駛的卡車編隊,還可以減小風阻,降低能耗。
對于一般的研究機構,由于各方面的限制,很難在實車、實路上測試智能交通算法的性能。在這種情況下,仿真測試可以在一定程度上驗證算法的有效性。
V2X 仿真中涉及到兩方面:交通場景和網(wǎng)絡通訊。
SUMO 可以實現(xiàn)交通場景的仿真,網(wǎng)絡通訊方面的仿真軟件有OMNeT++, NS3 等.
Veins (vehicles in network simulation) = SUMO + OMNeT++
從名字就可以看出來, Veins 是針對車聯(lián)網(wǎng)的仿真軟件。
Plexe (platooning extension for veins) = Plexe-SUMO + Plexe-Veins
Plexe 是在 Veins 中加入了 platooning 的元素。
在做 V2X 仿真時,如果對網(wǎng)絡通訊方面的真實度要求不高,不需要模擬實際聯(lián)網(wǎng)中的延遲、丟包等情況,可以只用 Plexe-SUMO,這樣程序編寫要簡單很多。
Plexe-SUMO 是在 SUMO 基礎上加入了 platooning 相關的元素,更方便 platooning 場景的搭建。另外,Plexe-SUMO 提供了 Python API,可以在 python 中以 module 的方式調用,程序書寫更簡單。
本文首先介紹了 Plexe-SUMO python API 的安裝方法,然后分析 Plexe-SUMO 官方給的 demo 程序,并在此基礎上嘗試構造我們自己的仿真場景。官方 demo 包括
- brakedemo.py: platoon 協(xié)同剎車
- joindemo.py: 從中間加入 platoon
- enginedemo.py: 三車( alfa-147, audi-r8, bugatti-veyron ) 啟動性能對比
- platoon-lane-change-test.py: platoon 協(xié)同換道超車
- overtake-and-keep-right-test.py: 單輛車超車
本文重點分析 brakedemo.py 和 joindemo.py 兩個程序。
安裝 Plexe-SUMO python API
cd
git clone --recursive https://github.com/eclipse/sumo
sudo apt-get install cmake python libxerces-c-dev libfox-1.6-dev libgl1-mesa-dev libglu1-mesa-dev libgdal-dev libproj-dev
cd ~/sumo/build/cmake_modules
cmake ../..
make -j8
echo "export SUMO_HOME=~/sumo" >> ~/.bashrc
echo "export PATH=$SUMO_HOME/bin:$SUMO_HOME/tools:$PATH" >> ~/.bashrc
cd
git clone https://github.com/michele-segata/plexe-pyapi.git
cd plexe-pyapi
pip install --user .
brakedemo.py
在 platoon 場景中,車輛的速度控制一般采用 CACC (cooperative adaptive cruise control),這里的 cooperative 就意味著車輛之間可以協(xié)作,共享信息。在 brakedemo.py 程序中,platoon 實現(xiàn)協(xié)同剎車,即使跟車距離比較短,也不會發(fā)生碰撞。

程序及注釋如下:
import os
import sys
import random
?
# 調用 utils module,里面包含了 platooning 的中層實現(xiàn)函數(shù)
from utils import add_platooning_vehicle, start_sumo, running, communicate
?
# 確保路徑設置正確,python 能夠搜索到 traci module
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
import traci
?
# 調用 plexe module, 包含了 platooning 的底層實現(xiàn)函數(shù)
from plexe import Plexe, ACC, CACC, RPM, GEAR, RADAR_DISTANCE
?
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# cruising speed
# 120 km/h = 33.3 m/s
SPEED = 120/3.6
# distance between multiple platoons
# 不同 Platoon 之間的距離,在該 demo 中只有一個 platoon
PLATOON_DISTANCE = SPEED * 1.5 + 2
# vehicle who starts to brake
# 最初開始剎車的是頭車,編號為 "v.0.0",即 0 號 platoon 中的 0 號車輛
# 由于只有一個 platoon,所以所有車輛編號都是 "v.0.* "的形式
BRAKING_VEHICLE = "v.0.0"
?
# 構造 platoon,返回 platoon 中車輛之間的通信連接關系
def add_vehicles(plexe, n, n_platoons, real_engine=False):
"""
Adds a set of platoons of n vehicles each to the simulation
:param plexe: API instance
:param n: number of vehicles of the platoon
:param n_platoons: number of platoons
:param real_engine: set to true to use the realistic engine model,
false to use a first order lag model
:return: returns the topology of the platoon, i.e., a dictionary which
indicates, for each vehicle, who is its leader and who is its front
vehicle. The topology can the be used by the data exchange logic to
automatically fetch data from leading and front vehicle to feed the CACC
"""
# add a platoon of n vehicles
# topology 中以 dictionary 的形式存儲車輛之間的通信拓撲結構,包括每輛車的前車和頭車編號
topology = {}
p_length = n * LENGTH + (n - 1) * DISTANCE
for p in range(n_platoons):
for i in range(n):
vid = "v.%d.%d" % (p, i)
# 調用 utils module 中的函數(shù),將車輛編排成 platoon 的形式
add_platooning_vehicle(plexe, vid, (n-p+1) * (p_length + PLATOON_DISTANCE) + (n-i+1) *
(DISTANCE+LENGTH), 0, SPEED, DISTANCE, real_engine)
# 將車輛保持在 lane 0 ,并忽略安全距離限制
plexe.set_fixed_lane(vid, 0, False)
# 設置車輛速度模式。車輛的速度有 5 個影響因素
# https://sumo.dlr.de/wiki/TraCI/Change_Vehicle_State#speed_mode_.280xb3.29
# 1\. safe speed
# 2\. max accel
# 3\. max speed
# 4\. right of way at intersections
# 5\. brake hard to avoid passing red light
# 如果全都不考慮,則設置為 [0, 0, 0, 0, 0] = 0, 此時,車速完全由 traci 控制
# 如果全都考慮,則設置為 [1, 1, 1, 1, 1] = 31
# 如果只考慮 safe speed,則設置為 [0, 0, 0, 0, 1] = 1
traci.vehicle.setSpeedMode(vid, 0)
# 在 platoon 中頭車采用 adaptive cruise control 的控制方式
# 后邊的跟車采用 cooperative adative cruise control 的控制方式
plexe.use_controller_acceleration(vid, False)
if i == 0:
plexe.set_active_controller(vid, ACC)
else:
plexe.set_active_controller(vid, CACC)
if i > 0:
topology[vid] = {"front": "v.%d.%d" % (p, i - 1), "leader": "v.%d.0" % p}
else:
topology[vid] = {}
return topology
??
def main(demo_mode, real_engine, setter=None):
# used to randomly color the vehicles
# 具體著色是在 utils module 的 add_platooning_vehicle 函數(shù)中實現(xiàn)的
random.seed(1)
# 運行 SUMO 的配置文件,后邊的參數(shù) False / True 表示 SUMO server 是否已經在運行了。
# 若為 False,則打開 SUMO 并加載配置文件
# 若為 True,則重新加載配置文件
# freeway.sumo.cfg 中仿真步長為 0.01s
start_sumo("cfg/freeway.sumo.cfg", False)
# 以下設置可以使得每次 traci.simulationStep() 之后都調用一次 plexe
plexe = Plexe()
traci.addStepListener(plexe)
step = 0
topology = dict()
min_dist = 1e6
?
# 主循環(huán)
while running(demo_mode, step, 1500):
?
# when reaching 15 seconds, reset the simulation when in demo_mode
if demo_mode and step == 1500:
print("Min dist: %f" % min_dist)
start_sumo("cfg/freeway.sumo.cfg", True)
step = 0
random.seed(1)
?
traci.simulationStep()
?
# 仿真初始化階段,構造含有 8 輛車的 platoon
# 設置 GUI 中畫面在整個仿真過程中始終聚焦在 v.0.0, 即頭車
# 鏡頭縮放參數(shù) 20000, 這個可以根據(jù)具體場景設置,使得鏡頭既不會拉的太近,也不會拉的太遠。
if step == 0:
# create vehicles and track the braking vehicle
topology = add_vehicles(plexe, 8, 1, real_engine)
traci.gui.trackVehicle("View #0", BRAKING_VEHICLE)
traci.gui.setZoom("View #0", 20000)
# 每隔 10 步車輛之間通信一次,獲得其他車輛的位置、速度、加速度等信息
if step % 10 == 1:
# simulate vehicle communication every 100 ms
communicate(plexe, topology)
# 是否使用 plexe 中改進的更加逼真的引擎模型
if real_engine and setter is not None:
# if we are running with the dashboard, update its values
tracked_id = traci.gui.getTrackedVehicle("View #0")
if tracked_id != "":
ed = plexe.get_engine_data(tracked_id)
vd = plexe.get_vehicle_data(tracked_id)
setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
# 在第 500 步時頭車剎車,由于是 車聯(lián)網(wǎng) 狀態(tài),所以其他車輛也會及時得到頭車信息,幾乎同步剎車,即使跟車距離很小,也不會追尾。這就是 車輛網(wǎng) 的優(yōu)勢。
if step == 500:
plexe.set_fixed_acceleration(BRAKING_VEHICLE, True, -6)
# 記錄在整個仿真過程中車輛間隔的最小距離,有需要的話可以隨后進行分析
if step > 1:
radar = plexe.get_radar_data("v.0.1")
if radar[RADAR_DISTANCE] < min_dist:
min_dist = radar[RADAR_DISTANCE]
?
step += 1
?
traci.close()
?
?
if __name__ == "__main__":
main(True, True)
joindemo.py
該程序實現(xiàn)了單個車輛從中部加入 platoon 的過程

程序及注釋如下:
import os
import sys
import random
from utils import add_platooning_vehicle, communicate, get_distance, start_sumo, running
?
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
?
import traci
from plexe import Plexe, ACC, CACC, FAKED_CACC, RPM, GEAR, ACCELERATION, SPEED
?
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# inter-vehicle distance when leaving space for joining
JOIN_DISTANCE = DISTANCE * 2
# cruising speed
SPEED = 120 / 3.6
?
# maneuver states:
GOING_TO_POSITION = 0
OPENING_GAP = 1
COMPLETED = 2
?
# maneuver actors
# 這里只考慮 1 個 platoon,其中車輛編號依次為 v.0, v.1, v.2, ...
LEADER = "v.0"
JOIN_POSITION = 3
FRONT_JOIN = "v.%d" % (JOIN_POSITION - 1)
BEHIND_JOIN = "v.%d" % JOIN_POSITION
N_VEHICLES = 8
JOINER = "v.%d" % N_VEHICLES
?
?
# 該程序與 brakedemo.py 中類似,向場景中加入車輛,這里包含 1 個 platoon 和 1 個單獨的車輛
# 返回 platoon 的 topology,這個 topology 中不包含后加入的單個車輛
def add_vehicles(plexe, n, real_engine=False):
"""
Adds a platoon of n vehicles to the simulation, plus an additional one
farther away that wants to join the platoon
:param plexe: API instance
:param n: number of vehicles of the platoon
:param real_engine: set to true to use the realistic engine model,
false to use a first order lag model
:return: returns the topology of the platoon, i.e., a dictionary which
indicates, for each vehicle, who is its leader and who is its front
vehicle. The topology can the be used by the data exchange logic to
automatically fetch data from leading and front vehicle to feed the CACC
"""
# add a platoon of n vehicles
topology = {}
for i in range(n):
vid = "v.%d" % i
add_platooning_vehicle(plexe, vid, (n - i + 1) * (DISTANCE + LENGTH) +
50, 0, SPEED, DISTANCE, real_engine)
plexe.set_fixed_lane(vid, 0, safe=False)
traci.vehicle.setSpeedMode(vid, 0)
if i == 0:
plexe.set_active_controller(vid, ACC)
else:
plexe.set_active_controller(vid, CACC)
if i > 0:
topology[vid] = {"front": "v.%d" % (i - 1), "leader": LEADER}
# add a vehicle that wants to join the platoon
vid = "v.%d" % n
add_platooning_vehicle(plexe, vid, 10, 1, SPEED, DISTANCE, real_engine)
plexe.set_fixed_lane(vid, 1, safe=False)
traci.vehicle.setSpeedMode(vid, 0)
plexe.set_active_controller(vid, ACC)
plexe.set_path_cacc_parameters(vid, distance=JOIN_DISTANCE)
return topology
?
# 該函數(shù)的作用是通過改變待加入車輛的通訊拓撲,令其到達指定位置,準備變道加入 platoon
def get_in_position(plexe, jid, fid, topology):
"""
Makes the joining vehicle get close to the join position. This is done by
changing the topology and setting the leader and the front vehicle for
the joiner. In addition, we increase the cruising speed and we switch to
the "fake" CACC, which uses a given GPS distance instead of the radar
distance to compute the control action
:param plexe: API instance
:param jid: id of the joiner
:param fid: id of the vehicle that will become the predecessor of the joiner
:param topology: the current platoon topology
:return: the modified topology
"""
topology[jid] = {"leader": LEADER, "front": fid}
plexe.set_cc_desired_speed(jid, SPEED + 15)
plexe.set_active_controller(jid, FAKED_CACC)
return topology
?
# 該函數(shù)的作用是當待加入車輛到達指定位置時,后車為其留出足夠的距離
# 在此過程中,platoon 分裂,后車成為新的 leader,帶領它的 follower 減速,為待加入車輛留出足夠的空間
def open_gap(plexe, vid, jid, topology, n):
"""
Makes the vehicle that will be behind the joiner open a gap to let the
joiner in. This is done by creating a temporary platoon, i.e., setting
the leader of all vehicles behind to the one that opens the gap and then
setting the front vehicle of the latter to be the joiner. To properly
open the gap, the vehicle leaving space switches to the "fake" CACC,
to consider the GPS distance to the joiner
:param plexe: API instance
:param vid: vehicle that should open the gap
:param jid: id of the joiner
:param topology: the current platoon topology
:param n: total number of vehicles currently in the platoon
:return: the modified topology
"""
index = int(vid.split(".")[1])
for i in range(index + 1, n):
# temporarily change the leader
topology["v.%d" % i]["leader"] = vid
# the front vehicle if the vehicle opening the gap is the joiner
topology[vid]["front"] = jid
plexe.set_active_controller(vid, FAKED_CACC)
plexe.set_path_cacc_parameters(vid, distance=JOIN_DISTANCE)
return topology
?
# 該函數(shù)的作用是,完成 join 操作后,修改通訊網(wǎng)絡拓撲,回歸最初的 CACC 情形,都以頭車作為 leader
def reset_leader(vid, topology, n):
"""
After the maneuver is completed, the vehicles behind the one that opened
the gap, reset the leader to the initial one
:param vid: id of the vehicle that let the joiner in
:param topology: the current platoon topology
:param n: total number of vehicles in the platoon (before the joiner)
:return: the modified topology
"""
index = int(vid.split(".")[1])
for i in range(index + 1, n):
# restore the real leader
topology["v.%d" % i]["leader"] = LEADER
return topology
?
?
def main(demo_mode, real_engine, setter=None):
# used to randomly color the vehicles
random.seed(1)
start_sumo("cfg/freeway.sumo.cfg", False)
plexe = Plexe()
traci.addStepListener(plexe)
step = 0
state = GOING_TO_POSITION
while running(demo_mode, step, 6000):
? # when reaching 60 seconds, reset the simulation when in demo_mode
if demo_mode and step == 6000:
start_sumo("cfg/freeway.sumo.cfg", True)
step = 0
state = GOING_TO_POSITION
random.seed(1)
?
traci.simulationStep()
?
if step == 0:
# create vehicles and track the joiner
topology = add_vehicles(plexe, N_VEHICLES, real_engine)
traci.gui.trackVehicle("View #0", JOINER)
traci.gui.setZoom("View #0", 20000)
if step % 10 == 1:
# simulate vehicle communication every 100 ms
communicate(plexe, topology)
# 程序執(zhí)行 100 步 (0.01s * 100 = 1s) 后,令單個車輛駛近目標位置
if step == 100:
# at 1 second, let the joiner get closer to the platoon
topology = get_in_position(plexe, JOINER, FRONT_JOIN, topology)
# 當車輛達到指定位置,令后車留出足夠的空間,以便匯入。
if state == GOING_TO_POSITION and step > 0:
# when the distance of the joiner is small enough, let the others
# open a gap to let the joiner enter the platoon
if get_distance(plexe, JOINER, FRONT_JOIN) < JOIN_DISTANCE + 1:
state = OPENING_GAP
topology = open_gap(plexe, BEHIND_JOIN, JOINER, topology, N_VEHICLES)
# 當空間足夠大時,完成匯入
if state == OPENING_GAP:
# when the gap is large enough, complete the maneuver
if get_distance(plexe, BEHIND_JOIN, FRONT_JOIN) > 2 * JOIN_DISTANCE + 2:
state = COMPLETED
plexe.set_fixed_lane(JOINER, 0, safe=False)
plexe.set_active_controller(JOINER, CACC)
plexe.set_path_cacc_parameters(JOINER, distance=DISTANCE)
plexe.set_active_controller(BEHIND_JOIN, CACC)
plexe.set_path_cacc_parameters(BEHIND_JOIN, distance=DISTANCE)
topology = reset_leader(BEHIND_JOIN, topology, N_VEHICLES)
if real_engine and setter is not None:
# if we are running with the dashboard, update its values
tracked_id = traci.gui.getTrackedVehicle("View #0")
if tracked_id != "":
ed = plexe.get_engine_data(tracked_id)
vd = plexe.get_vehicle_data(tracked_id)
setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
?
step += 1
?
traci.close()
?
?
if __name__ == "__main__":
main(True, False)
構造自己的仿真場景
以上 demo 涵蓋了很多基本場景模塊,如構造編隊、隊列分割、添加新 leader、換道、剎車等?;谶@些模塊,我們可以構造不同的仿真場景,測試自己的車聯(lián)網(wǎng)和車路協(xié)同策略。
這里我們考慮一個 platoon 通過交通路口的場景。交通燈可以向一定范圍內的車輛發(fā)送信號,信號中包含當前交通燈狀態(tài)以及剩余時間等信息,platoon leader 基于這些信息作出決策,platoon 中哪些車輛可以通過,哪些車輛需要提前剎車避免闖紅燈。

為了實現(xiàn)上述 V2X 場景,可以在 brakedemo.py 基礎上做簡單修改。需要注意的是,這里的 sumo 網(wǎng)絡和配置文件不再是之前的 freeway,而是需要自己設計,加入交通燈等元素。
在車輛顏色方面,為了區(qū)分 leader 和 follower,我對 utils 中的車輛顏色做了一些修改,不再使用 random,而是將 leader 設定為紅色, follower 設定為黃色。
platoon leader 在進入交通燈 100m 范圍內就可以獲得交通燈信息,知道當前狀態(tài)和剩余時間,然后進行后續(xù)的 platoon split 操作,分裂成兩個 platoon。前部的 platoon 會正常行駛通過路口,后部的 platoon 在新的 leader 帶領下剎車,停在路口處,等待下個綠燈通行。
程序及注釋如下:
import os
import sys
from math import floor
?
import random
from utils import add_platooning_vehicle, start_sumo, running, communicate
?
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
import traci
from plexe import Plexe, ACC, CACC, RPM, GEAR, RADAR_DISTANCE
?
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# platoon size
PLATOON_SIZE = 8
# number of platoon
PLATOON_NUM = 1
# cruising speed
SPEED = 33 # m/s
# distance between multiple platoons
PLATOON_DISTANCE = SPEED * 1.5 + 2
# vehicle who starts to brake
BRAKING_VEHICLE = "v.0.0"
# the original leader ID
ORI_LEADER_ID ="v.0.0"
# traffic light ID
TL_ID = "tl_0"
# range of traffic light broadcast
RANGE = 100
# traffic light position
TL_POS = 558
?
?
?
def add_vehicles(plexe, n, n_platoons, real_engine=False):
"""
Adds a set of platoons of n vehicles each to the simulation
:param plexe: API instance
:param n: number of vehicles of the platoon
:param n_platoons: number of platoons
:param real_engine: set to true to use the realistic engine model,
false to use a first order lag model
:return: returns the topology of the platoon, i.e., a dictionary which
indicates, for each vehicle, who is its leader and who is its front
vehicle. The topology can the be used by the data exchange logic to
automatically fetch data from leading and front vehicle to feed the CACC
"""
# add a platoon of n vehicles
topology = {}
p_length = n * LENGTH + (n - 1) * DISTANCE
for p in range(n_platoons):
for i in range(n):
vid = "v.%d.%d" % (p, i)
add_platooning_vehicle(plexe, vid, 200 + (n-i+1) * (DISTANCE+LENGTH), 0, SPEED, DISTANCE, real_engine)
plexe.set_fixed_lane(vid, 0, False)
traci.vehicle.setSpeedMode(vid, 0)
plexe.use_controller_acceleration(vid, False)
if i == 0:
plexe.set_active_controller(vid, ACC)
else:
plexe.set_active_controller(vid, CACC)
if i > 0:
topology[vid] = {"front": "v.%d.%d" % (p, i - 1), "leader": "v.%d.0" % p}
else:
topology[vid] = {}
return topology
?
?
def main(demo_mode, real_engine, setter=None):
# used to randomly color the vehicles
random.seed(1)
start_sumo("cfg/intersection/intersection.sumo.cfg", False)
plexe = Plexe()
traci.addStepListener(plexe)
step = 0
topology = dict()
min_dist = 1e6
split = False
?
while running(demo_mode, step, 3000):
?
traci.simulationStep()
?
if step == 0:
# create vehicles and track the braking vehicle
topology = add_vehicles(plexe, PLATOON_SIZE, PLATOON_NUM, real_engine)
tracked_veh = "v.0.%d" %(PLATOON_SIZE-1)
traci.gui.trackVehicle("View #0", tracked_veh)
traci.gui.setZoom("View #0", 2000)
?
# when the leader is 100m away from the traffic light, it will receive the current phase of the traffic light
# Accordingly, it computes which vehicles could pass safely.
leader_data = plexe.get_vehicle_data(ORI_LEADER_ID)
# the structure of vehicle data is defined in vehicle_data.py file in plexe folder
# self.acceleration, self.speed, self.pos_x, self.pos_y
if leader_data.pos_x >= TL_POS - RANGE and not split:
current_phase = traci.trafficlight.getPhase(TL_ID)
if current_phase == 0:
absolute_time = traci.trafficlight.getNextSwitch(TL_ID)
time_left = absolute_time - traci.simulation.getTime()
new_leader = floor((leader_data.speed * time_left - RANGE)/(LENGTH + DISTANCE))
new_leader_id = "v.0.%d" % new_leader
# change topology: add new leader and decelerate.
for i in range(new_leader+1,PLATOON_SIZE):
topology["v.0.%d" %i]["leader"] = new_leader_id
topology[new_leader_id] = {}
new_leader_data = plexe.get_vehicle_data(new_leader_id)
decel = new_leader_data.speed**2 / (2* (RANGE + new_leader * (LENGTH + DISTANCE)))
plexe.set_fixed_acceleration(new_leader_id, True, -1 * decel)
split = True
?
# set color for leader
for i in range(PLATOON_SIZE):
vid = "v.0.%d" % i
if topology[vid] == {}:
traci.vehicle.setColor(vid, (250,0,0, 255))
?
if step % 10 == 1:
# simulate vehicle communication every 100 ms
communicate(plexe, topology)
if real_engine and setter is not None:
# if we are running with the dashboard, update its values
tracked_id = traci.gui.getTrackedVehicle("View #0")
if tracked_id != "":
ed = plexe.get_engine_data(tracked_id)
vd = plexe.get_vehicle_data(tracked_id)
setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
?
if split == True:
new_leader_data = plexe.get_vehicle_data(new_leader_id)
current_phase = traci.trafficlight.getPhase(TL_ID)
if TL_POS - new_leader_data.pos_x < 10 and current_phase == 0:
plexe.set_fixed_acceleration(new_leader_id, True, 3)
?
if step > 1:
radar = plexe.get_radar_data("v.0.1")
if radar[RADAR_DISTANCE] < min_dist:
min_dist = radar[RADAR_DISTANCE]
?
step += 1
if step > 3000:
break
?
traci.close()
?
?
if __name__ == "__main__":
main(True, True)