ROS SMACH 基礎(chǔ)教程

1.SMACH介紹

當(dāng)機(jī)器人在執(zhí)行一些復(fù)雜的計劃時,SMACH將變得很有用,它可以將可能的狀態(tài)和狀態(tài)狀態(tài)轉(zhuǎn)移過程簡化,將不同的任務(wù)模塊整合到一起,讓機(jī)器人實(shí)現(xiàn)復(fù)雜的任務(wù)調(diào)度。

  • 適合用SMACH的情況
    • 快速原型化:基于Python的簡單SMACH語法使快速原型化狀態(tài)機(jī)并開始運(yùn)行狀態(tài)機(jī)變得容易。
    • 復(fù)雜狀態(tài)機(jī):SMACH允許您設(shè)計、維護(hù)和調(diào)試大型復(fù)雜的分層狀態(tài)機(jī)。你可以在這里找到一個復(fù)雜的分層狀態(tài)機(jī)的例子。
    • 內(nèi)省: SMACH讓你充分內(nèi)省狀態(tài)機(jī)、狀態(tài)轉(zhuǎn)換和數(shù)據(jù)流等。查看smach_viewer獲得更多細(xì)節(jié)。
  • 不適合使用狀態(tài)機(jī)
    • 非結(jié)構(gòu)化的任務(wù): 對于你的非結(jié)構(gòu)化任務(wù),SMACH將調(diào)度不足。
    • 低級系統(tǒng): 對于要求高效率的低級系統(tǒng),SMACH并不意味著可以作為一個狀態(tài)機(jī)使用,SMACH是一個任務(wù)級體系。
    • Smash: 當(dāng)你想要使用smash的時候,不要使用SMACH,這時候使用smash.

可以使用SMACH構(gòu)建有限狀態(tài)機(jī),但SMACH可以做更多的事情。SMACH是一個用于任務(wù)級執(zhí)行和協(xié)調(diào)的庫,并提供了幾種類型的“狀態(tài)容器”。其中一個容器是有限狀態(tài)機(jī),但這個容器也可以是另一個容器中的狀態(tài)。有關(guān)SMACH中內(nèi)置的容器和狀態(tài)的列表,請參見教程頁面。

2.SMACH文檔

2.1 概念

2.1.1 結(jié)果語義

對于所有SMACH容器,所包含狀態(tài)的接口是通過狀態(tài)結(jié)果定義的。狀態(tài)的潛在結(jié)果是狀態(tài)實(shí)例的屬性,必須在執(zhí)行之前聲明。如果手工編寫SMACH計劃,則在構(gòu)建時聲明所有可能的結(jié)果,并且可以輕松地檢查狀態(tài)轉(zhuǎn)換的一致性,而無需執(zhí)行它。

狀態(tài)結(jié)果可能會導(dǎo)致不同類型的容器發(fā)生不同的事情,但從狀態(tài)的角度來看,結(jié)果發(fā)出后發(fā)生的事情是無關(guān)緊要的。從這個意義上說,結(jié)果可以被認(rèn)為是特定狀態(tài)的“局部”。

例如,狀態(tài)可能提供“成功”、“中止”或“搶占”等結(jié)果,就狀態(tài)而言,這是它與任務(wù)級狀態(tài)流交互的方式。在狀態(tài)機(jī)的上下文中,這些結(jié)果將與其他狀態(tài)相關(guān)聯(lián),從而形成轉(zhuǎn)換。但是,在另一個容器的上下文中,它們可能被區(qū)別對待。結(jié)果只是作為狀態(tài)和容器之間的公共接口。

2.1.2 用戶數(shù)據(jù)

每個 SMACH 容器都有一個平面數(shù)據(jù)庫,用于在不同狀態(tài)之間協(xié)調(diào)和傳遞數(shù)據(jù)。當(dāng)狀態(tài)計算一些結(jié)果或返回一些傳感器信息時,這變得很有用。這允許將此類數(shù)據(jù)保存在執(zhí)行級別,并可供其他任務(wù)或過程使用。

與結(jié)果類似,在每個狀態(tài)中設(shè)置和檢索的用戶數(shù)據(jù)鍵由狀態(tài)的輸入鍵輸出鍵描述。這些也是狀態(tài)實(shí)例的屬性,必須在執(zhí)行之前聲明。當(dāng) SMACH 計劃變得復(fù)雜時,所有這些交互的聲明可以防止錯誤并有助于調(diào)試。

有關(guān)用戶數(shù)據(jù) API 的信息,請參閱在 SMACH 中操作用戶數(shù)據(jù)。

2.1.3 搶占

搶占傳播內(nèi)置于 SMACH 中。State基類包括一個用于協(xié)調(diào)容器和包含狀態(tài)之間的搶占請求的接口。每種容器類型都有自己明確定義的行為來響應(yīng)搶占請求。這使得系統(tǒng)既可以干凈地響應(yīng)來自最終用戶的終止信號,又可以由更高級別的執(zhí)行人員以編程方式取消。

2.1.4 內(nèi)省

SMACH 容器可以提供調(diào)試接口,允許開發(fā)人員(通過 ROS)設(shè)置 SMACH 樹的完整初始狀態(tài)。這可以通過SMACH Viewer進(jìn)行可視化。這包括每個容器的初始狀態(tài)標(biāo)簽以及每個級別的用戶數(shù)據(jù)結(jié)構(gòu)的內(nèi)容。

2.2 狀態(tài)

“狀態(tài)”在不同的上下文中可能意味著不同的事物。在 SMACH 中,“狀態(tài)”是執(zhí)行的本地狀態(tài),或者等效地,“狀態(tài)”對應(yīng)于執(zhí)行某些任務(wù)的系統(tǒng)。這與正式的狀態(tài)機(jī)不同,其中每個狀態(tài)描述的不是系統(tǒng)正在做什么,而是描述系統(tǒng)的給定配置。這允許用戶專注于系統(tǒng)正在執(zhí)行的內(nèi)容以及所述執(zhí)行的結(jié)果,而不是命名執(zhí)行之間的點(diǎn)。SMACH 中的狀態(tài)更多地對應(yīng)于結(jié)構(gòu)化編程中的狀態(tài)。有關(guān)此區(qū)別的更多信息,以及如何在 SMACH 中可視化狀態(tài),請參閱狀態(tài)圖 VS 流程圖。

2.2.1 提供的狀態(tài)類

描述 結(jié)果
狀態(tài) 狀態(tài)基接口 沒有
SPA狀態(tài) 具有三個常用預(yù)定義結(jié)果的狀態(tài) 成功,搶占,中止
監(jiān)控狀態(tài) 訂閱 ROS 主題并在條件成立時阻塞的狀態(tài)。 有效的,無效的,被搶占的
條件狀態(tài) 執(zhí)行條件回調(diào)的狀態(tài) 真假_ _
簡單動作狀態(tài) 充當(dāng)簡單actionlib操作的代理的狀態(tài) 成功,搶占,中止

2.3.容器

SMACH 庫提供了幾種容器類型。不同的容器提供不同的執(zhí)行語義,但它們都可以被視為其他容器中的狀態(tài)。容器可能有自己的方式來指定包含狀態(tài)的轉(zhuǎn)換,因?yàn)椤稗D(zhuǎn)換”在不同的上下文中意味著不同的東西。

所有 SMACH 容器都具有以下屬性:

  • 它們包含一個狀態(tài)字典,其中實(shí)現(xiàn) smach.State 接口的對象由字符串標(biāo)簽作為鍵。
  • 它們包含一個所有孩子都可以訪問的用戶數(shù)據(jù)結(jié)構(gòu)。
  • 為了修改容器的結(jié)構(gòu),必須打開它(見下文)。

2.3.1 打開容器進(jìn)行施工

這可以通過使用 Python 的with關(guān)鍵字或通過在容器對象上調(diào)用open()close()來完成。with關(guān)鍵字可以與容器對象本身一起使用,也可以在容器的打開方法上使用。

2.3.2 向容器添加狀態(tài)

SMACH 容器提供添加子項(xiàng)的靜態(tài)方法。這些方法訪問當(dāng)前打開的容器并相應(yīng)地添加狀態(tài)。

2.3.3提供的容器類

3. 在各個狀態(tài)傳遞數(shù)據(jù)

說明: 本教程教您如何將數(shù)據(jù)從一個狀態(tài)傳遞到下一個狀態(tài)
教程級別:初學(xué)者

3.1.指定用戶數(shù)據(jù)

一個狀態(tài)可能需要一些輸入數(shù)據(jù)來完成它的工作,并且/或者它可能有一些輸出數(shù)據(jù)想要提供給其他狀態(tài)。狀態(tài)的輸入和輸出數(shù)據(jù)稱為該狀態(tài)的用戶數(shù)據(jù)。在構(gòu)造狀態(tài)時,可以指定它需要/提供的用戶數(shù)據(jù)字段的名稱。

class Foo(smach.State):
    def __init__(self, outcomes=['outcome1', 'outcome2'],
                         input_keys=['foo_input'],
                         output_keys=['foo_output'])
    def execute(self, userdata):
        if userdata.foo_inout == 1:
            return 'outcom1'
        else :
            userdata.foo_output = 3
            return 'outcom2'                 
  • input_keys列表枚舉一個狀態(tài)需要運(yùn)行的所有輸入。狀態(tài)聲明它希望這些字段存在于用戶數(shù)據(jù)中。執(zhí)行方法提供了用戶數(shù)據(jù)結(jié)構(gòu)的副本。狀態(tài)可以讀取它在input_keys列表中枚舉的所有userdata字段,但它不能寫入這些字段中的任何一個。
  • output_keys列表枚舉一個狀態(tài)提供的所有輸出。狀態(tài)可以寫入output_keys列表中枚舉的userdata結(jié)構(gòu)中的所有字段。

注意:通過input_keys從userdata獲取的對象被封裝為不可變性,因此狀態(tài)不能調(diào)用這些對象上的方法。如果您需要一個可變的輸入對象,您必須在input_keys和output_keys中指定相同的鍵。如果不傳遞對象,或者不需要調(diào)用或修改它們的方法,則應(yīng)該在input_keys和output_keys中使用唯一的名稱,以避免混淆和潛在的bug。

狀態(tài)的接口由其結(jié)果、輸入鍵和輸出鍵定義。

3.2.連接用戶數(shù)據(jù)

在向狀態(tài)機(jī)添加狀態(tài)時,還需要連接用戶數(shù)據(jù)字段,以允許狀態(tài)之間傳遞數(shù)據(jù)。例如,如果狀態(tài)FOO產(chǎn)生'foo_output',而狀態(tài)BAR需要'bar_input',那么你可以使用名稱重映射將這兩個用戶數(shù)據(jù)端口連接在一起:

sm_top = smach.StateMachine(outcomes=['outcome4','outcome5'],
                                input_keys=['sm_input'],
                                output_keys=['sm_output'])
with sm_top:
    smach.StateMachine.add('FOO',Foo(),
                                transitions={'outcome1':'BAR',
                                              'outcome2':'outcome4'},
                                remapping={'foo_input':'sm_data',
                                            'foo_output':'sm_data'}
    )
    smach.StateMachine.add('BAR',Bar(),
                                transitions={'outcome2',:'FOO'},
                                remapping={'bar_input':'sm_data',
                                            'bar_output1':'sm_output'})

重映射字段將狀態(tài)的in/output_key映射到狀態(tài)機(jī)的userdata字段。所以當(dāng)你重新映射'x':'y'時:

  • X需要是狀態(tài)的input_key或output_key,并且

  • Y將自動成為狀態(tài)機(jī)的用戶數(shù)據(jù)的一部分。

注意:當(dāng)你的狀態(tài)中使用的用戶數(shù)據(jù)名與狀態(tài)機(jī)中使用的用戶數(shù)據(jù)名相同時,不需要重新映射。然而,重映射使得連接非常顯式,所以建議總是指定重映射,甚至像“重映射={'a':'a'}”這樣的東西。

3.2.1在狀態(tài)之間傳遞數(shù)據(jù)

我們可以使用重映射機(jī)制將數(shù)據(jù)從狀態(tài)FOO傳遞到狀態(tài)BAR。為了實(shí)現(xiàn)這一點(diǎn),我們在添加FOO時需要一個重映射,在添加BAR時需要一個重映射:

  • FOO:remapping={'foo_output':'sm_user_data'}

  • BAR:remapping={'bar_input':'sm_user_data'}

3.2.2 在狀態(tài)機(jī)和狀態(tài)之間傳遞數(shù)據(jù)

我們還可以使用重映射機(jī)制將數(shù)據(jù)從狀態(tài)BAR傳遞到包含BAR的狀態(tài)機(jī)。如果'sm_output'是狀態(tài)機(jī)的輸出鍵:

  • BAR:remapping={'bar_input':'sm_output'}

或者,相反,我們可以將數(shù)據(jù)從狀態(tài)機(jī)傳遞到狀態(tài)FOO。如果'sm_input'是狀態(tài)機(jī)的輸入鍵:

  • FOO:remapping={'foo_output':'sm_input'}


3.3舉例

#!/usr/bin/env python

import roslib; roslib.load_manifest('smach_tutorials')
import rospy
import smach
import smach_ros

# define state Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, 
                             outcomes=['outcome1','outcome2'],
                             input_keys=['foo_counter_in'],
                             output_keys=['foo_counter_out'])

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if userdata.foo_counter_in < 3:
            userdata.foo_counter_out = userdata.foo_counter_in + 1
            return 'outcome1'
        else:
            return 'outcome2'


# define state Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, 
                             outcomes=['outcome1'],
                             input_keys=['bar_counter_in'])
        
    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        rospy.loginfo('Counter = %f'%userdata.bar_counter_in)        
        return 'outcome1'
        

def main():
    rospy.init_node('smach_example_state_machine')

    # Create a SMACH state machine
    sm = smach.StateMachine(outcomes=['outcome4'])
    sm.userdata.sm_counter = 0

    # Open the container
    with sm:
        # Add states to the container
        smach.StateMachine.add('FOO', Foo(), 
                               transitions={'outcome1':'BAR', 
                                            'outcome2':'outcome4'},
                               remapping={'foo_counter_in':'sm_counter', 
                                          'foo_counter_out':'sm_counter'})
        smach.StateMachine.add('BAR', Bar(), 
                               transitions={'outcome1':'FOO'},
                               remapping={'bar_counter_in':'sm_counter'})

    # Execute SMACH plan
    outcome = sm.execute()

if __name__ == '__main__':
    main()

程序輸出結(jié)果:

[INFO] [1647916956.000118]: State machine starting in initial state 'FOO' with userdata: 
    ['sm_counter']
[INFO] [1647916956.000779]: Executing state FOO
[INFO] [1647916956.001367]: State machine transitioning 'FOO':'outcome1'-->'BAR'
[INFO] [1647916956.001626]: Executing state BAR
[INFO] [1647916956.001883]: Counter = 1.000000
[INFO] [1647916956.002094]: State machine transitioning 'BAR':'outcome1'-->'FOO'
[INFO] [1647916956.002342]: Executing state FOO
[INFO] [1647916956.002581]: State machine transitioning 'FOO':'outcome1'-->'BAR'
[INFO] [1647916956.002834]: Executing state BAR
[INFO] [1647916956.003061]: Counter = 2.000000
[INFO] [1647916956.003278]: State machine transitioning 'BAR':'outcome1'-->'FOO'
[INFO] [1647916956.003500]: Executing state FOO
[INFO] [1647916956.003763]: State machine transitioning 'FOO':'outcome1'-->'BAR'
[INFO] [1647916956.003992]: Executing state BAR
[INFO] [1647916956.004216]: Counter = 3.000000
[INFO] [1647916956.004484]: State machine transitioning 'BAR':'outcome1'-->'FOO'
[INFO] [1647916956.004727]: Executing state FOO
[INFO] [1647916956.004958]: State machine terminating 'FOO':'outcome2':'outcome4'

4.創(chuàng)建分級狀態(tài)機(jī)

4.1. 創(chuàng)建一些狀態(tài)

對于本例,我們創(chuàng)建了許多狀態(tài),每個狀態(tài)都指定了許多結(jié)果、輸入鍵和輸出鍵。

  # State Foo
  class Foo(smach.State):
     def __init__(self, outcomes=['outcome1', 'outcome2'])
     
     def execute(self, userdata):
        return 'outcome1'


  # State Bar
  class Bar(smach.State):
     def __init__(self, outcomes=['outcome1'])
     
     def execute(self, userdata):
        return 'outcome4'


  # State Bas
  class Bas(smach.State):
     def __init__(self, outcomes=['outcome3'])
     
     def execute(self, userdata):
        return 'outcome3'

4.2.創(chuàng)建分層狀態(tài)機(jī)

我們創(chuàng)建一個頂級狀態(tài)機(jī),并開始向它添加狀態(tài)。我們添加的一個狀態(tài)是另一個狀態(tài)機(jī):

   # Create the top level SMACH state machine
    sm_top = smach.StateMachine(outcomes=['outcome5'])

    # Open the container
    with sm_top:

        smach.StateMachine.add('BAS', Bas(),
                               transitions={'outcome3':'SUB'})

        # Create the sub SMACH state machine 
        sm_sub = smach.StateMachine(outcomes=['outcome4'])

        # Open the container 
        with sm_sub:

            # Add states to the container 
            smach.StateMachine.add('FOO', Foo(),
                                   transitions={'outcome1':'BAR', 
                                                'outcome2':'outcome4'})
            smach.StateMachine.add('BAR', Bar(),
                                   transitions={'outcome1':'FOO'})

        smach.StateMachine.add('SUB', sm_sub,
                               transitions={'outcome4':'outcome5'})

結(jié)果是這樣的。唯一需要注意的一點(diǎn)是,每個狀態(tài)機(jī)也是一個正常狀態(tài)。因此,可以將狀態(tài)機(jī)添加到另一個狀態(tài)機(jī),方法與將狀態(tài)添加到狀態(tài)機(jī)相同。因此,處理userdata與處理分層狀態(tài)機(jī)沒有什么不同:子狀態(tài)機(jī)指定輸入和輸出鍵,當(dāng)將子狀態(tài)機(jī)添加到頂級狀態(tài)機(jī)時,它們會被映射。


4.3.舉例

#!/usr/bin/env python

import roslib; roslib.load_manifest('smach_tutorials')
import rospy
import smach
import smach_ros

# define state Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'


# define state Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'
        


# define state Bas
class Bas(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome3'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAS')
        return 'outcome3'




def main():
    rospy.init_node('smach_example_state_machine')

    # Create the top level SMACH state machine
    sm_top = smach.StateMachine(outcomes=['outcome5'])
    
    # Open the container
    with sm_top:

        smach.StateMachine.add('BAS', Bas(),
                               transitions={'outcome3':'SUB'})

        # Create the sub SMACH state machine
        sm_sub = smach.StateMachine(outcomes=['outcome4'])

        # Open the container
        with sm_sub:

            # Add states to the container
            smach.StateMachine.add('FOO', Foo(), 
                                   transitions={'outcome1':'BAR', 
                                                'outcome2':'outcome4'})
            smach.StateMachine.add('BAR', Bar(), 
                                   transitions={'outcome1':'FOO'})

        smach.StateMachine.add('SUB', sm_sub,
                               transitions={'outcome4':'outcome5'})

    # Execute SMACH plan
    outcome = sm_top.execute()


if __name__ == '__main__':
    main()

程序輸出結(jié)果:

[INFO] [1647918547.471406]: State machine starting in initial state 'BAS' with userdata: 
    []
[INFO] [1647918547.471781]: Executing state BAS
[INFO] [1647918547.472021]: State machine transitioning 'BAS':'outcome3'-->'SUB'
[INFO] [1647918547.472292]: State machine starting in initial state 'FOO' with userdata: 
    []
[INFO] [1647918547.472532]: Executing state FOO
[INFO] [1647918547.472776]: State machine transitioning 'FOO':'outcome1'-->'BAR'
[INFO] [1647918547.473022]: Executing state BAR
[INFO] [1647918547.473253]: State machine transitioning 'BAR':'outcome1'-->'FOO'
[INFO] [1647918547.473495]: Executing state FOO
[INFO] [1647918547.473755]: State machine transitioning 'FOO':'outcome1'-->'BAR'
[INFO] [1647918547.473993]: Executing state BAR
[INFO] [1647918547.474448]: State machine transitioning 'BAR':'outcome1'-->'FOO'
[INFO] [1647918547.474715]: Executing state FOO
[INFO] [1647918547.474946]: State machine transitioning 'FOO':'outcome1'-->'BAR'
[INFO] [1647918547.475181]: Executing state BAR
[INFO] [1647918547.475405]: State machine transitioning 'BAR':'outcome1'-->'FOO'
[INFO] [1647918547.475659]: Executing state FOO
[INFO] [1647918547.475886]: State machine terminating 'FOO':'outcome2':'outcome4'
[INFO] [1647918547.476117]: State machine terminating 'SUB':'outcome4':'outcome5'

5.從狀態(tài)機(jī)(ROS)調(diào)用操作

from smach_ros import SimpleActionState

您可以簡單地從通用狀態(tài)調(diào)用任何操作,但是SMACH對調(diào)用操作有特定的支持,從而節(jié)省了大量代碼!SMACH提供了一個狀態(tài)類,它充當(dāng)actionlib操作的代理。狀態(tài)的實(shí)例化接受一個主題名稱、動作類型和一些用于生成目標(biāo)的策略。簡單動作狀態(tài)的可能結(jié)果是“成功”、“被搶占”和“中止”。

根據(jù)你如何實(shí)現(xiàn)你的目標(biāo),有簡單的和更復(fù)雜的方法來使用簡單的動作狀態(tài)。

5.1. 目標(biāo)消息

5.1.1 空目標(biāo)消息

這是一個簡單的例子,它將調(diào)用一個動作服務(wù)器,而不需要在目標(biāo)消息中填寫任何內(nèi)容。

sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
    smach.StateMachine.add('TRIGGER_GRIPPER',
                           SimpleActionState('action_server_namespace',
                                             GripperAction),
                           transitions={'succeeded':'APPROACH_PLUG'})

5.1.2 固定目標(biāo)消息

一個稍微高級一點(diǎn)的用法是,讓你指定一個硬編碼的固定目標(biāo),它將被傳遞給動作服務(wù)器:

sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
    gripper_goal = Pr2GripperCommandGoal()
    gripper_goal.command.position = 0.07
    gripper_goal.command.max_effort = 99999
    StateMachine.add('TRIGGER_GRIPPER',
                      SimpleActionState('action_server_namespace',
                                        GripperAction,
                                        goal=gripper_goal),
                      transitions={'succeeded':'APPROACH_PLUG'})

5.1.3 來自用戶的目標(biāo)

假設(shè)用戶數(shù)據(jù)中有許多字段已經(jīng)包含目標(biāo)消息所需的所有結(jié)構(gòu)。然后,您可以簡單地將userdata直接連接到目標(biāo)消息中的字段。因此,從上面的例子中我們知道,抓手動作的目標(biāo)中有兩個字段:max_effort和position。假設(shè)我們的userdata包含相應(yīng)的字段user_data_max和user_data_position。下面的代碼連接了相應(yīng)的字段。

sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
    StateMachine.add('TRIGGER_GRIPPER',
                      SimpleActionState('action_server_namespace',
                                        GripperAction,
                                        goal_slots=['max_effort', 
                                                    'position']),
                      transitions={'succeeded':'APPROACH_PLUG'},
                      remapping={'max_effort':'user_data_max',
                                 'position':'user_data_position'})

同樣的工作對于'result_slots':操作的結(jié)果字段可以自動寫入您的用戶數(shù)據(jù)。還要注意的是,你在'goal_slots'和'result_slots'中指定的所有字段都會自動放到'input_keys'和'output_keys'中。

5.1.4 目標(biāo)回調(diào)

這是終極的力量版本:當(dāng)動作需要目標(biāo)時,你可以得到一個回調(diào),你也可以根據(jù)需要創(chuàng)建自己的目標(biāo)消息。

sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
    def gripper_goal_cb(userdata, goal):
       gripper_goal = GripperGoal()
       gripper_goal.position.x = 2.0
       gripper_goal.max_effort = userdata.gripper_input
       return gripper_goal

    StateMachine.add('TRIGGER_GRIPPER',
                      SimpleActionState('action_server_namespace',
                                        GripperAction,
                                        goal_cb=gripper_goal_cb,
                                        input_keys=['gripper_input'])
                      transitions={'succeeded':'APPROACH_PLUG'},
                      remapping={'gripper_input':'userdata_input'})

在你的目標(biāo)回調(diào)中,你可以使用userdata,只要你在構(gòu)造函數(shù)中列出了input_keys?;卣{(diào)函數(shù)的一個參數(shù)是默認(rèn)目標(biāo)。如果你指定了'goal=…',該對象將被傳遞到回調(diào)函數(shù)中。

要了解更高級的回調(diào)用法,請參見cb_interface(來自ROS Jade的API)裝飾器。

5.2.結(jié)果消息

5.2.1 用戶結(jié)果消息

您可以將操作的結(jié)果直接寫入您的狀態(tài)的用戶數(shù)據(jù)。

sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
    StateMachine.add('TRIGGER_GRIPPER',
                      SimpleActionState('action_server_namespace',
                                        GripperAction,
                                        result_slots=['max_effort', 
                                                      'position']),
                      transitions={'succeeded':'APPROACH_PLUG'},
                      remapping={'max_effort':'user_data_max',
                                 'position':'user_data_position'})

5.2.2 用戶結(jié)果回調(diào)

結(jié)果回調(diào)與目標(biāo)回調(diào)非常相似。它允許您從操作結(jié)果字段讀取任何數(shù)據(jù),甚至返回一個不同于默認(rèn)的'succeeded', 'preempted', 'aborted'的結(jié)果。

sm = StateMachine(['succeeded','aborted','preempted'])
with sm:
    def gripper_result_cb(userdata, status, result):
       if status == GoalStatus.SUCCEEDED:
          userdata.gripper_output = result.num_iterations
          return 'my_outcome'

    StateMachine.add('TRIGGER_GRIPPER',
                      SimpleActionState('action_server_namespace',
                                        GripperAction,
                                        result_cb=gripper_result_cb,
                                        output_keys=['gripper_output'])
                      transitions={'succeeded':'APPROACH_PLUG'},
                      remapping={'gripper_output':'userdata_output'})

在結(jié)果回調(diào)中,您將獲得操作的狀態(tài),它告訴您操作是成功、中止還是被搶占。此外,您還可以訪問用戶數(shù)據(jù)以及操作的結(jié)果。

你可以選擇從結(jié)果回調(diào)返回一個不同的結(jié)果。如果你不返回任何東西,狀態(tài)將返回相應(yīng)的動作結(jié)果。

6.查看狀態(tài)機(jī)(ROS)

這是你想調(diào)試一個正在運(yùn)行的SMACH狀態(tài)機(jī):


SMACH查看器以圖形形式顯示了您的(子)狀態(tài)機(jī)中的所有狀態(tài)、狀態(tài)之間可能的轉(zhuǎn)換、當(dāng)前活動狀態(tài)和userdata的當(dāng)前值。SMACH查看器甚至允許設(shè)置狀態(tài)機(jī)的初始狀態(tài)。本教程將介紹如何開始使用SMACH查看器。

6.1.創(chuàng)建內(nèi)省服務(wù)

SMACH容器可以提供一個調(diào)試接口(通過ROS),允許開發(fā)人員對狀態(tài)機(jī)進(jìn)行全面的內(nèi)省。SMACH查看器可以使用這個調(diào)試接口來可視化狀態(tài)機(jī)并與狀態(tài)機(jī)交互。要將這個調(diào)試接口添加到狀態(tài)機(jī)中,請在代碼中添加以下代碼行:

# First you create a state machine sm
# .....
# Creating of state machine sm finished

# Create and start the introspection server
sis = smach_ros.IntrospectionServer('server_name', sm, '/SM_ROOT')
sis.start()

# Execute the state machine
outcome = sm.execute()

# Wait for ctrl-c to stop the application
rospy.spin()
sis.stop()
  • server_name:為ROS自省主題創(chuàng)建名稱空間。您可以隨意命名這個名稱,只要這個名稱在您的系統(tǒng)中是唯一的。這個名稱沒有顯示在smach查看器中。

  • SM_ROOT:在smach查看器中,狀態(tài)機(jī)將顯示在這個名稱下。所以你可以選擇任何你喜歡的名字。如果你的子狀態(tài)機(jī)在不同的可執(zhí)行文件中,你可以通過一個聰明的方式選擇這個名稱來讓它們顯示為分層狀態(tài)機(jī):如果頂級狀態(tài)機(jī)被稱為“SM_TOP”,您可以調(diào)用子狀態(tài)機(jī)“SM_TOP/SM_SUB”,查看器將識別子狀態(tài)機(jī)作為頂級狀態(tài)機(jī)的一部分。

有關(guān)內(nèi)省服務(wù)器的更多詳細(xì)信息,請參閱API文檔。

smach查看器將自動遍歷sm的子容器(如果存在的話),并為每個子容器添加ros鉤子。因此,您只需要將一個內(nèi)省服務(wù)器連接到頂級狀態(tài)機(jī),而不是連接到子狀態(tài)機(jī)。一旦內(nèi)省服務(wù)器被實(shí)例化,它將發(fā)布一組帶有名稱的主題,這些名稱通過附加到在構(gòu)造時給它的服務(wù)器名稱來構(gòu)造。在這種情況下,廣告會宣傳三個主題:

  • / server_name / smach / container_structure

  • / server_name / smach / container_status

  • / server_name / smach / container_init

前兩個是關(guān)于“server_name”提供的SMACH容器的結(jié)構(gòu)和狀態(tài)的心跳和事件信息。第三個是關(guān)于在ROS上設(shè)置SMACH樹的配置的主題。

“SM_ROOT”參數(shù)只是用于可視化,并強(qiáng)制不同服務(wù)器嵌套。

6.2.運(yùn)行SMACH查看器

一旦你在ROS系統(tǒng)中運(yùn)行了一個或多個內(nèi)省服務(wù)器,你可以使用以下命令啟動smach查看器:

 rosrun smach_viewer smach_viewer.py

查看器將自動連接到所有正在運(yùn)行的內(nèi)省服務(wù)器。
運(yùn)行結(jié)果。


7.并發(fā)狀態(tài)機(jī)

以下所有示例無需修改即可運(yùn)行。可以在示例目錄中的smach_教程包中找到它們。每個文件開頭的注釋大致顯示了運(yùn)行腳本的輸出應(yīng)該是什么樣子。


#!/usr/bin/env python

import roslib; roslib.load_manifest('smach_tutorials')
import rospy
import smach
import smach_ros

# define state Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'


# define state Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'
        


# define state Bas
class Bas(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome3'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAS')
        return 'outcome3'




def main():
    rospy.init_node('smach_example_state_machine')

    # Create the top level SMACH state machine
    sm_top = smach.StateMachine(outcomes=['outcome6'])
    
    # Open the container
    with sm_top:

        smach.StateMachine.add('BAS', Bas(),
                               transitions={'outcome3':'CON'})

        # Create the sub SMACH state machine
        sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
                                   default_outcome='outcome4',
                                   outcome_map={'outcome5':
                                       { 'FOO':'outcome2',
                                         'BAR':'outcome1'}})

        # Open the container
        with sm_con:
            # Add states to the container
            smach.Concurrence.add('FOO', Foo())
            smach.Concurrence.add('BAR', Bar())

        smach.StateMachine.add('CON', sm_con,
                               transitions={'outcome4':'CON',
                                            'outcome5':'outcome6'})

    # Execute SMACH plan
    outcome = sm_top.execute()


if __name__ == '__main__':
    main()

8.簡單動作狀態(tài)(ROS)

以下所有示例無需修改即可運(yùn)行。可以在示例目錄中的smach_教程包中找到它們。每個文件開頭的注釋大致顯示了運(yùn)行腳本的輸出應(yīng)該是什么樣子。


#!/usr/bin/env python

import roslib; roslib.load_manifest('smach_tutorials')
import rospy
import smach
import smach_ros

from smach_tutorials.msg import TestAction, TestGoal
from actionlib import *
from actionlib_msgs.msg import *


# Create a trivial action server
class TestServer:
    def __init__(self,name):
        self._sas = SimpleActionServer(name,
                TestAction,
                execute_cb=self.execute_cb)

    def execute_cb(self, msg):
        if msg.goal == 0:
            self._sas.set_succeeded()
        elif msg.goal == 1:
            self._sas.set_aborted()
        elif msg.goal == 2:
            self._sas.set_preempted()

def main():
    rospy.init_node('smach_example_actionlib')

    # Start an action server
    server = TestServer('test_action')

    # Create a SMACH state machine
    sm0 = smach.StateMachine(outcomes=['succeeded','aborted','preempted'])

    # Open the container
    with sm0:
        # Add states to the container

        # Add a simple action state. This will use an empty, default goal
        # As seen in TestServer above, an empty goal will always return with
        # GoalStatus.SUCCEEDED, causing this simple action state to return
        # the outcome 'succeeded'
        smach.StateMachine.add('GOAL_DEFAULT',
                               smach_ros.SimpleActionState('test_action', TestAction),
                               {'succeeded':'GOAL_STATIC'})

        # Add another simple action state. This will give a goal
        # that should abort the action state when it is received, so we
        # map 'aborted' for this state onto 'succeeded' for the state machine.
        smach.StateMachine.add('GOAL_STATIC',
                               smach_ros.SimpleActionState('test_action', TestAction,
                                                       goal = TestGoal(goal=1)),
                               {'aborted':'GOAL_CB'})

        
        # Add another simple action state. This will give a goal
        # that should abort the action state when it is received, so we
        # map 'aborted' for this state onto 'succeeded' for the state machine.
        def goal_callback(userdata, default_goal):
            goal = TestGoal()
            goal.goal = 2
            return goal

        smach.StateMachine.add('GOAL_CB',
                               smach_ros.SimpleActionState('test_action', TestAction,
                                                       goal_cb = goal_callback),
                               {'aborted':'succeeded'})

        # For more examples on how to set goals and process results, see 
        # executive_smach/smach_ros/tests/smach_actionlib.py

    # Execute SMACH plan
    outcome = sm0.execute()

    rospy.signal_shutdown('All done.')


if __name__ == '__main__':
    main()

參考文獻(xiàn):roswiki

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 簡介 laravel 使實(shí)施認(rèn)證的變得非常簡單,事實(shí)上,它提供了非常全面的配置項(xiàng)以適應(yīng)應(yīng)用的業(yè)務(wù)。認(rèn)證的配置文件存...
    Dearmadman閱讀 6,335評論 2 13
  • Linux基礎(chǔ)教程 一、常用命令使用 1.1 常用命令使用 1.1.1 登錄和退出Linux系統(tǒng) 1. 啟動和登陸...
    Garfield貓閱讀 873評論 0 3
  • Core ML 將機(jī)器學(xué)習(xí)模型集成到您的應(yīng)用程序中。 使用教程 使用Core ML將機(jī)器學(xué)習(xí)模型集成到您的應(yīng)用程序...
    iCloudEnd閱讀 554評論 0 1
  • uptime ? ? uptime命令功能比較簡單,主要功能如下所示: 查看服務(wù)器的開機(jī)時長 查看CPU負(fù)載 基本...
    Surpassme閱讀 486評論 0 1
  • 第 1 章為何選擇 Flink 許多情況下,人們希望用低延遲或者實(shí)時的流處理來獲得數(shù)據(jù)的高時效性,前提是流處理本身...
    yeedom閱讀 7,187評論 0 1

友情鏈接更多精彩內(nèi)容