基于 Plexe-SUMO 的 V2X 仿真

引言

目前車聯(lián)網(wǎng)和車路協(xié)同 (V2V, V2I, V2P... 統(tǒng)稱 V2X) 是自動駕駛和智能交通領域研究的核心問題之一。簡單來說,V2X 就是為車輛提供了上帝視角,可以獲取更大范圍、更準確的信息,從而優(yōu)化駕駛決策。

在各種智能交通場景中,platooning (車隊編排) 是一個很熱門的話題。借助 V2X, platooning 可以減小車輛之間的安全距離。較小的安全距離可以增加道路的吞吐量,對于在高速公路上行駛的卡車編隊,還可以減小風阻,降低能耗。

對于一般的研究機構,由于各方面的限制,很難在實車、實路上測試智能交通算法的性能。在這種情況下,仿真測試可以在一定程度上驗證算法的有效性。

V2X 仿真中涉及到兩方面:交通場景和網(wǎng)絡通訊。

SUMO 可以實現(xiàn)交通場景的仿真,網(wǎng)絡通訊方面的仿真軟件有OMNeT++, NS3 等.

Veins (vehicles in network simulation) = SUMO + OMNeT++
從名字就可以看出來, Veins 是針對車聯(lián)網(wǎng)的仿真軟件。

Plexe (platooning extension for veins) = Plexe-SUMO + Plexe-Veins
Plexe 是在 Veins 中加入了 platooning 的元素。

在做 V2X 仿真時,如果對網(wǎng)絡通訊方面的真實度要求不高,不需要模擬實際聯(lián)網(wǎng)中的延遲、丟包等情況,可以只用 Plexe-SUMO,這樣程序編寫要簡單很多。

Plexe-SUMO 是在 SUMO 基礎上加入了 platooning 相關的元素,更方便 platooning 場景的搭建。另外,Plexe-SUMO 提供了 Python API,可以在 python 中以 module 的方式調用,程序書寫更簡單。

本文首先介紹了 Plexe-SUMO python API 的安裝方法,然后分析 Plexe-SUMO 官方給的 demo 程序,并在此基礎上嘗試構造我們自己的仿真場景。官方 demo 包括

  • brakedemo.py: platoon 協(xié)同剎車
  • joindemo.py: 從中間加入 platoon
  • enginedemo.py: 三車( alfa-147, audi-r8, bugatti-veyron ) 啟動性能對比
  • platoon-lane-change-test.py: platoon 協(xié)同換道超車
  • overtake-and-keep-right-test.py: 單輛車超車

本文重點分析 brakedemo.py 和 joindemo.py 兩個程序。

安裝 Plexe-SUMO python API

cd

git clone --recursive https://github.com/eclipse/sumo

sudo apt-get install cmake python libxerces-c-dev libfox-1.6-dev libgl1-mesa-dev libglu1-mesa-dev libgdal-dev libproj-dev

cd ~/sumo/build/cmake_modules

cmake ../..

make -j8

echo "export SUMO_HOME=~/sumo" >> ~/.bashrc 

echo "export PATH=$SUMO_HOME/bin:$SUMO_HOME/tools:$PATH" >> ~/.bashrc 

cd 

git clone https://github.com/michele-segata/plexe-pyapi.git

cd plexe-pyapi

pip install --user .

brakedemo.py

在 platoon 場景中,車輛的速度控制一般采用 CACC (cooperative adaptive cruise control),這里的 cooperative 就意味著車輛之間可以協(xié)作,共享信息。在 brakedemo.py 程序中,platoon 實現(xiàn)協(xié)同剎車,即使跟車距離比較短,也不會發(fā)生碰撞。

brakedemo.gif

程序及注釋如下:

import os
import sys
import random
?
# 調用 utils module,里面包含了 platooning 的中層實現(xiàn)函數(shù)
from utils import add_platooning_vehicle, start_sumo, running, communicate
?
# 確保路徑設置正確,python 能夠搜索到 traci module
if 'SUMO_HOME' in os.environ:
    tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
    sys.path.append(tools)
else:
    sys.exit("please declare environment variable 'SUMO_HOME'")
import traci
?
# 調用 plexe module, 包含了 platooning 的底層實現(xiàn)函數(shù)
from plexe import Plexe, ACC, CACC, RPM, GEAR, RADAR_DISTANCE
?
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# cruising speed
# 120 km/h = 33.3 m/s
SPEED = 120/3.6  
# distance between multiple platoons
# 不同 Platoon 之間的距離,在該 demo 中只有一個 platoon
PLATOON_DISTANCE = SPEED * 1.5 + 2
# vehicle who starts to brake
# 最初開始剎車的是頭車,編號為 "v.0.0",即 0 號 platoon 中的 0 號車輛
# 由于只有一個 platoon,所以所有車輛編號都是 "v.0.* "的形式
BRAKING_VEHICLE = "v.0.0"
?
# 構造 platoon,返回 platoon 中車輛之間的通信連接關系
def add_vehicles(plexe, n, n_platoons, real_engine=False):
 """
 Adds a set of platoons of n vehicles each to the simulation
 :param plexe: API instance
 :param n: number of vehicles of the platoon
 :param n_platoons: number of platoons
 :param real_engine: set to true to use the realistic engine model,
 false to use a first order lag model
 :return: returns the topology of the platoon, i.e., a dictionary which
 indicates, for each vehicle, who is its leader and who is its front
 vehicle. The topology can the be used by the data exchange logic to
 automatically fetch data from leading and front vehicle to feed the CACC
 """
 # add a platoon of n vehicles
 # topology 中以 dictionary 的形式存儲車輛之間的通信拓撲結構,包括每輛車的前車和頭車編號
    topology = {}
    p_length = n * LENGTH + (n - 1) * DISTANCE
    for p in range(n_platoons):
        for i in range(n):
           vid = "v.%d.%d" % (p, i)
           # 調用 utils module 中的函數(shù),將車輛編排成 platoon 的形式
           add_platooning_vehicle(plexe, vid, (n-p+1) * (p_length + PLATOON_DISTANCE) + (n-i+1) *       
                                           (DISTANCE+LENGTH), 0, SPEED, DISTANCE,  real_engine)

           # 將車輛保持在 lane 0 ,并忽略安全距離限制
           plexe.set_fixed_lane(vid, 0, False)

           # 設置車輛速度模式。車輛的速度有 5 個影響因素
           # https://sumo.dlr.de/wiki/TraCI/Change_Vehicle_State#speed_mode_.280xb3.29
           # 1\. safe speed  
           # 2\. max accel
           # 3\. max speed
           # 4\. right of way at intersections
           # 5\. brake hard to avoid passing red light
           # 如果全都不考慮,則設置為 [0, 0, 0, 0, 0] = 0, 此時,車速完全由 traci 控制
           # 如果全都考慮,則設置為 [1, 1, 1, 1, 1] = 31
           # 如果只考慮 safe speed,則設置為 [0, 0, 0, 0, 1] = 1
           traci.vehicle.setSpeedMode(vid, 0)

           # 在 platoon 中頭車采用 adaptive cruise control 的控制方式
           # 后邊的跟車采用 cooperative adative cruise control 的控制方式
           plexe.use_controller_acceleration(vid, False)
           if i == 0:
               plexe.set_active_controller(vid, ACC)
           else:
               plexe.set_active_controller(vid, CACC)

           if i > 0:
               topology[vid] = {"front": "v.%d.%d" % (p, i - 1), "leader": "v.%d.0" % p}
           else:
               topology[vid] = {}
    return topology
??
def main(demo_mode, real_engine, setter=None):    
     # used to randomly color the vehicles
     # 具體著色是在 utils module 的 add_platooning_vehicle 函數(shù)中實現(xiàn)的
     random.seed(1)

     # 運行 SUMO 的配置文件,后邊的參數(shù) False / True 表示 SUMO server 是否已經在運行了。
     # 若為 False,則打開 SUMO 并加載配置文件
     # 若為 True,則重新加載配置文件
     # freeway.sumo.cfg 中仿真步長為 0.01s
     start_sumo("cfg/freeway.sumo.cfg", False)

     # 以下設置可以使得每次 traci.simulationStep() 之后都調用一次 plexe 
     plexe = Plexe()
     traci.addStepListener(plexe)

     step = 0
     topology = dict()
     min_dist = 1e6
?
     # 主循環(huán)
     while running(demo_mode, step, 1500):
?
         # when reaching 15 seconds, reset the simulation when in demo_mode
         if demo_mode and step == 1500:
             print("Min dist: %f" % min_dist)
             start_sumo("cfg/freeway.sumo.cfg", True)
             step = 0
             random.seed(1)
?
         traci.simulationStep()
?
         # 仿真初始化階段,構造含有 8 輛車的 platoon
         # 設置 GUI 中畫面在整個仿真過程中始終聚焦在 v.0.0, 即頭車
         # 鏡頭縮放參數(shù) 20000, 這個可以根據(jù)具體場景設置,使得鏡頭既不會拉的太近,也不會拉的太遠。
         if step == 0:
             # create vehicles and track the braking vehicle
             topology = add_vehicles(plexe, 8, 1, real_engine)
             traci.gui.trackVehicle("View #0", BRAKING_VEHICLE)
             traci.gui.setZoom("View #0", 20000)

         # 每隔 10 步車輛之間通信一次,獲得其他車輛的位置、速度、加速度等信息
         if step % 10 == 1:
             # simulate vehicle communication every 100 ms
             communicate(plexe, topology)

         # 是否使用 plexe 中改進的更加逼真的引擎模型
         if real_engine and setter is not None:
             # if we are running with the dashboard, update its values
             tracked_id = traci.gui.getTrackedVehicle("View #0")
             if tracked_id != "":
                 ed = plexe.get_engine_data(tracked_id)
                 vd = plexe.get_vehicle_data(tracked_id)
                 setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
         # 在第 500 步時頭車剎車,由于是 車聯(lián)網(wǎng) 狀態(tài),所以其他車輛也會及時得到頭車信息,幾乎同步剎車,即使跟車距離很小,也不會追尾。這就是 車輛網(wǎng) 的優(yōu)勢。
         if step == 500:
             plexe.set_fixed_acceleration(BRAKING_VEHICLE, True, -6)

         # 記錄在整個仿真過程中車輛間隔的最小距離,有需要的話可以隨后進行分析
         if step > 1:
             radar = plexe.get_radar_data("v.0.1")
             if radar[RADAR_DISTANCE] < min_dist:
                 min_dist = radar[RADAR_DISTANCE]
?
         step += 1
?
     traci.close()
?
?
if __name__ == "__main__":
 main(True, True)

joindemo.py

該程序實現(xiàn)了單個車輛從中部加入 platoon 的過程

joindemo.gif

程序及注釋如下:

import os
import sys
import random
from utils import add_platooning_vehicle, communicate, get_distance, start_sumo, running
?
if 'SUMO_HOME' in os.environ:
   tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
   sys.path.append(tools)
else:
   sys.exit("please declare environment variable 'SUMO_HOME'")
?
import traci
from plexe import Plexe, ACC, CACC, FAKED_CACC, RPM, GEAR, ACCELERATION, SPEED
?
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# inter-vehicle distance when leaving space for joining
JOIN_DISTANCE = DISTANCE * 2
# cruising speed
SPEED = 120 / 3.6
?
# maneuver states:
GOING_TO_POSITION = 0
OPENING_GAP = 1
COMPLETED = 2
?
# maneuver actors
# 這里只考慮 1 個 platoon,其中車輛編號依次為 v.0, v.1, v.2, ...
LEADER = "v.0"
JOIN_POSITION = 3
FRONT_JOIN = "v.%d" % (JOIN_POSITION - 1)
BEHIND_JOIN = "v.%d" % JOIN_POSITION
N_VEHICLES = 8
JOINER = "v.%d" % N_VEHICLES
?
?
# 該程序與 brakedemo.py 中類似,向場景中加入車輛,這里包含 1 個 platoon 和 1 個單獨的車輛
# 返回 platoon 的 topology,這個 topology 中不包含后加入的單個車輛
def add_vehicles(plexe, n, real_engine=False):
 """
 Adds a platoon of n vehicles to the simulation, plus an additional one
 farther away that wants to join the platoon
 :param plexe: API instance
 :param n: number of vehicles of the platoon
 :param real_engine: set to true to use the realistic engine model,
 false to use a first order lag model
 :return: returns the topology of the platoon, i.e., a dictionary which
 indicates, for each vehicle, who is its leader and who is its front
 vehicle. The topology can the be used by the data exchange logic to
 automatically fetch data from leading and front vehicle to feed the CACC
 """
   # add a platoon of n vehicles
   topology = {}
   for i in range(n):
       vid = "v.%d" % i
       add_platooning_vehicle(plexe, vid, (n - i + 1) * (DISTANCE + LENGTH) +
                                             50, 0, SPEED, DISTANCE, real_engine)
       plexe.set_fixed_lane(vid, 0, safe=False)
       traci.vehicle.setSpeedMode(vid, 0)
       if i == 0:
           plexe.set_active_controller(vid, ACC)
       else:
           plexe.set_active_controller(vid, CACC)
       if i > 0:
           topology[vid] = {"front": "v.%d" % (i - 1), "leader": LEADER}
    # add a vehicle that wants to join the platoon
   vid = "v.%d" % n
   add_platooning_vehicle(plexe, vid, 10, 1, SPEED, DISTANCE, real_engine)
   plexe.set_fixed_lane(vid, 1, safe=False)
   traci.vehicle.setSpeedMode(vid, 0)
   plexe.set_active_controller(vid, ACC)
   plexe.set_path_cacc_parameters(vid, distance=JOIN_DISTANCE)
   return topology
?
# 該函數(shù)的作用是通過改變待加入車輛的通訊拓撲,令其到達指定位置,準備變道加入 platoon
def get_in_position(plexe, jid, fid, topology):
 """
 Makes the joining vehicle get close to the join position. This is done by
 changing the topology and setting the leader and the front vehicle for
 the joiner. In addition, we increase the cruising speed and we switch to
 the "fake" CACC, which uses a given GPS distance instead of the radar
 distance to compute the control action
 :param plexe: API instance
 :param jid: id of the joiner
 :param fid: id of the vehicle that will become the predecessor of the joiner
 :param topology: the current platoon topology
 :return: the modified topology
 """
   topology[jid] = {"leader": LEADER, "front": fid}
   plexe.set_cc_desired_speed(jid, SPEED + 15)
   plexe.set_active_controller(jid, FAKED_CACC)
   return topology
?
# 該函數(shù)的作用是當待加入車輛到達指定位置時,后車為其留出足夠的距離
# 在此過程中,platoon 分裂,后車成為新的 leader,帶領它的 follower 減速,為待加入車輛留出足夠的空間
def open_gap(plexe, vid, jid, topology, n):
 """
 Makes the vehicle that will be behind the joiner open a gap to let the
 joiner in. This is done by creating a temporary platoon, i.e., setting
 the leader of all vehicles behind to the one that opens the gap and then
 setting the front vehicle of the latter to be the joiner. To properly
 open the gap, the vehicle leaving space switches to the "fake" CACC,
 to consider the GPS distance to the joiner
 :param plexe: API instance
 :param vid: vehicle that should open the gap
 :param jid: id of the joiner
 :param topology: the current platoon topology
 :param n: total number of vehicles currently in the platoon
 :return: the modified topology
 """
   index = int(vid.split(".")[1])
   for i in range(index + 1, n):
       # temporarily change the leader
       topology["v.%d" % i]["leader"] = vid
    # the front vehicle if the vehicle opening the gap is the joiner
   topology[vid]["front"] = jid
   plexe.set_active_controller(vid, FAKED_CACC)
   plexe.set_path_cacc_parameters(vid, distance=JOIN_DISTANCE)
   return topology
?
# 該函數(shù)的作用是,完成 join 操作后,修改通訊網(wǎng)絡拓撲,回歸最初的 CACC 情形,都以頭車作為 leader
def reset_leader(vid, topology, n):
 """
 After the maneuver is completed, the vehicles behind the one that opened
 the gap, reset the leader to the initial one
 :param vid: id of the vehicle that let the joiner in
 :param topology: the current platoon topology
 :param n: total number of vehicles in the platoon (before the joiner)
 :return: the modified topology
 """
   index = int(vid.split(".")[1])
   for i in range(index + 1, n):
       # restore the real leader
       topology["v.%d" % i]["leader"] = LEADER
   return topology
?
?
def main(demo_mode, real_engine, setter=None):
   # used to randomly color the vehicles
   random.seed(1)
   start_sumo("cfg/freeway.sumo.cfg", False)
   plexe = Plexe()
   traci.addStepListener(plexe)
   step = 0
   state = GOING_TO_POSITION
   while running(demo_mode, step, 6000):
?       # when reaching 60 seconds, reset the simulation when in demo_mode
       if demo_mode and step == 6000:
           start_sumo("cfg/freeway.sumo.cfg", True)
           step = 0
           state = GOING_TO_POSITION
           random.seed(1)
?
       traci.simulationStep()
?
       if step == 0:
           # create vehicles and track the joiner
           topology = add_vehicles(plexe, N_VEHICLES, real_engine)
           traci.gui.trackVehicle("View #0", JOINER)
           traci.gui.setZoom("View #0", 20000)
       if step % 10 == 1:
           # simulate vehicle communication every 100 ms
           communicate(plexe, topology)

       # 程序執(zhí)行 100 步 (0.01s * 100 = 1s) 后,令單個車輛駛近目標位置
       if step == 100:
           # at 1 second, let the joiner get closer to the platoon
           topology = get_in_position(plexe, JOINER, FRONT_JOIN, topology)

        # 當車輛達到指定位置,令后車留出足夠的空間,以便匯入。
       if state == GOING_TO_POSITION and step > 0:
             # when the distance of the joiner is small enough, let the others
             # open a gap to let the joiner enter the platoon
           if get_distance(plexe, JOINER, FRONT_JOIN) < JOIN_DISTANCE + 1:
               state = OPENING_GAP
               topology = open_gap(plexe, BEHIND_JOIN, JOINER, topology, N_VEHICLES)

       # 當空間足夠大時,完成匯入
       if state == OPENING_GAP:
           # when the gap is large enough, complete the maneuver
           if get_distance(plexe, BEHIND_JOIN, FRONT_JOIN) >  2 * JOIN_DISTANCE + 2:
               state = COMPLETED
               plexe.set_fixed_lane(JOINER, 0, safe=False)
               plexe.set_active_controller(JOINER, CACC)
               plexe.set_path_cacc_parameters(JOINER, distance=DISTANCE)
               plexe.set_active_controller(BEHIND_JOIN, CACC)
               plexe.set_path_cacc_parameters(BEHIND_JOIN, distance=DISTANCE)
               topology = reset_leader(BEHIND_JOIN, topology, N_VEHICLES)
       if real_engine and setter is not None:
           # if we are running with the dashboard, update its values
           tracked_id = traci.gui.getTrackedVehicle("View #0")
           if tracked_id != "":
               ed = plexe.get_engine_data(tracked_id)
               vd = plexe.get_vehicle_data(tracked_id)
               setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
?
       step += 1
?
   traci.close()
?
?
if __name__ == "__main__":
 main(True, False)

構造自己的仿真場景

以上 demo 涵蓋了很多基本場景模塊,如構造編隊、隊列分割、添加新 leader、換道、剎車等?;谶@些模塊,我們可以構造不同的仿真場景,測試自己的車聯(lián)網(wǎng)和車路協(xié)同策略。

這里我們考慮一個 platoon 通過交通路口的場景。交通燈可以向一定范圍內的車輛發(fā)送信號,信號中包含當前交通燈狀態(tài)以及剩余時間等信息,platoon leader 基于這些信息作出決策,platoon 中哪些車輛可以通過,哪些車輛需要提前剎車避免闖紅燈。

intersection.gif

為了實現(xiàn)上述 V2X 場景,可以在 brakedemo.py 基礎上做簡單修改。需要注意的是,這里的 sumo 網(wǎng)絡和配置文件不再是之前的 freeway,而是需要自己設計,加入交通燈等元素。

在車輛顏色方面,為了區(qū)分 leader 和 follower,我對 utils 中的車輛顏色做了一些修改,不再使用 random,而是將 leader 設定為紅色, follower 設定為黃色。

platoon leader 在進入交通燈 100m 范圍內就可以獲得交通燈信息,知道當前狀態(tài)和剩余時間,然后進行后續(xù)的 platoon split 操作,分裂成兩個 platoon。前部的 platoon 會正常行駛通過路口,后部的 platoon 在新的 leader 帶領下剎車,停在路口處,等待下個綠燈通行。

程序及注釋如下:

import os
import sys
from math import floor
?
import random
from utils import add_platooning_vehicle, start_sumo, running, communicate
?
if 'SUMO_HOME' in os.environ:
   tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
   sys.path.append(tools)
else:
   sys.exit("please declare environment variable 'SUMO_HOME'")
import traci
from plexe import Plexe, ACC, CACC, RPM, GEAR, RADAR_DISTANCE
?
# vehicle length
LENGTH = 4
# inter-vehicle distance
DISTANCE = 5
# platoon size
PLATOON_SIZE = 8
# number of platoon
PLATOON_NUM = 1
# cruising speed
SPEED = 33 # m/s
# distance between multiple platoons
PLATOON_DISTANCE = SPEED * 1.5 + 2
# vehicle who starts to brake
BRAKING_VEHICLE = "v.0.0"
# the original leader ID
ORI_LEADER_ID ="v.0.0"
# traffic light ID
TL_ID = "tl_0"
# range of traffic light broadcast
RANGE = 100 
# traffic light position
TL_POS = 558
?
?
?
def add_vehicles(plexe, n, n_platoons, real_engine=False):
 """
 Adds a set of platoons of n vehicles each to the simulation
 :param plexe: API instance
 :param n: number of vehicles of the platoon
 :param n_platoons: number of platoons
 :param real_engine: set to true to use the realistic engine model,
 false to use a first order lag model
 :return: returns the topology of the platoon, i.e., a dictionary which
 indicates, for each vehicle, who is its leader and who is its front
 vehicle. The topology can the be used by the data exchange logic to
 automatically fetch data from leading and front vehicle to feed the CACC
 """
 # add a platoon of n vehicles
     topology = {}
     p_length = n * LENGTH + (n - 1) * DISTANCE
     for p in range(n_platoons):
        for i in range(n):
           vid = "v.%d.%d" % (p, i)
           add_platooning_vehicle(plexe, vid, 200 + (n-i+1) * (DISTANCE+LENGTH), 0, SPEED, DISTANCE, real_engine)
           plexe.set_fixed_lane(vid, 0, False)
           traci.vehicle.setSpeedMode(vid, 0)
           plexe.use_controller_acceleration(vid, False)
           if i == 0:
               plexe.set_active_controller(vid, ACC)
           else:
               plexe.set_active_controller(vid, CACC)
           if i > 0:
               topology[vid] = {"front": "v.%d.%d" % (p, i - 1),  "leader": "v.%d.0" % p}
           else:
           topology[vid] = {}
     return topology
?
?
def main(demo_mode, real_engine, setter=None):
     # used to randomly color the vehicles
     random.seed(1)
     start_sumo("cfg/intersection/intersection.sumo.cfg", False)
     plexe = Plexe()
     traci.addStepListener(plexe)
     step = 0
     topology = dict()
     min_dist = 1e6
     split = False
?
     while running(demo_mode, step, 3000):
?
         traci.simulationStep()
?
         if step == 0:
             # create vehicles and track the braking vehicle
             topology = add_vehicles(plexe, PLATOON_SIZE, PLATOON_NUM, real_engine)
             tracked_veh = "v.0.%d" %(PLATOON_SIZE-1)
             traci.gui.trackVehicle("View #0", tracked_veh)
             traci.gui.setZoom("View #0", 2000)
?
         # when the leader is 100m away from the traffic light, it will receive the current phase of the traffic light
         # Accordingly, it computes which vehicles could pass safely.
         leader_data = plexe.get_vehicle_data(ORI_LEADER_ID)
         # the structure of vehicle data is defined in vehicle_data.py file in plexe folder
         # self.acceleration,  self.speed, self.pos_x, self.pos_y 
         if leader_data.pos_x >= TL_POS - RANGE and not split:
             current_phase = traci.trafficlight.getPhase(TL_ID)
             if current_phase == 0:
                 absolute_time = traci.trafficlight.getNextSwitch(TL_ID)
                 time_left = absolute_time - traci.simulation.getTime()
                 new_leader = floor((leader_data.speed * time_left - RANGE)/(LENGTH + DISTANCE))
                 new_leader_id = "v.0.%d" % new_leader
                 # change topology: add new leader and decelerate.
                 for i in range(new_leader+1,PLATOON_SIZE):
                 topology["v.0.%d" %i]["leader"] = new_leader_id
                 topology[new_leader_id] = {}
                 new_leader_data = plexe.get_vehicle_data(new_leader_id)
                 decel = new_leader_data.speed**2 / (2* (RANGE + new_leader * (LENGTH + DISTANCE)))
                 plexe.set_fixed_acceleration(new_leader_id, True, -1 * decel)
              split = True
?
         # set color for leader
         for i in range(PLATOON_SIZE):
             vid = "v.0.%d" % i
             if topology[vid] == {}:
                 traci.vehicle.setColor(vid, (250,0,0, 255))
?
         if step % 10 == 1:
             # simulate vehicle communication every 100 ms
             communicate(plexe, topology)
         if real_engine and setter is not None:
             # if we are running with the dashboard, update its values
             tracked_id = traci.gui.getTrackedVehicle("View #0")
             if tracked_id != "":
                 ed = plexe.get_engine_data(tracked_id)
                 vd = plexe.get_vehicle_data(tracked_id)
                 setter(ed[RPM], ed[GEAR], vd.speed, vd.acceleration)
?
         if split == True:
               new_leader_data = plexe.get_vehicle_data(new_leader_id)
               current_phase = traci.trafficlight.getPhase(TL_ID)
               if TL_POS - new_leader_data.pos_x < 10 and current_phase == 0: 
                   plexe.set_fixed_acceleration(new_leader_id, True, 3)
?
         if step > 1:
               radar = plexe.get_radar_data("v.0.1")
               if radar[RADAR_DISTANCE] < min_dist:
                     min_dist = radar[RADAR_DISTANCE]
?
         step += 1
         if step > 3000:
             break
?
     traci.close()
?
?
if __name__ == "__main__":
 main(True, True)
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容