py_trees源码分析(五)
@ 晚风 · Thursday, Apr 18, 2024 · 5 分钟阅读 · 更新于 4月 18, 2024

py_trees源码分析之黑板

版本信息

Python: 3.9

py_trees: 2.2.3

本文的代码与示例绝大部分来自github,只分析了部分重要的代码,包含基础行为模块、通用行为模块、通用模块、组合节点模块、装饰节点模块、黑板模块,且较简单的模块用表格汇总。

黑板(Blackboard)

黑板类

属性

  • storage:一个字典,用于存储键值对。
  • metadata:一个字典,用于存储键的元数据信息。
  • clients:一个字典,用于存储客户端的id-name对。
  • activity_stream:一个可选的ActivityStream对象,用于记录活动日志。
  • separator:一个字符串,用作命名空间分隔符。默认为/

方法

⚠️注意:下列的variable_name其实并非一定作为黑板的key,如果他是xx.xx.xx的形式,仅利用以.划分的第一个元素作为key,而剩下的字符串作为key_attributes。

方法 参数 作用 返回
keys 返回所有元数据metadata的key
get variable_name: 变量名。
可以是a.b 或者是a这种形式
按照.划分variable_name为两部分,第一部分为key, 第二部分为key_attributes。首先从storage中取出key对应的value,然后从value中获取key_attributes对应的值。 value
set variable_name: 变量名
value: 对应值
按照.划分variable_name为两部分,第一部分为key, 第二部分如果没有key_attributes则直接设置storage,如果有则为Blackboard.storage[key]设置key_attributes,最后在metadata中设置key
unset key:要删除的key 删除storage中对应的key bool
exists name:查询的key 调用get方法判断key是否存在 bool
keys_filtered_by_regex regex: 正则表达式 返回metadata中通过regex匹配到的所有key Set
keys_filtered_by_clients client_ids 查询metadata中与client_ids有交集的key Set
enable_activity_stream maximum_size 初始化activity_stream=ActivityStream(size)
disable_activity_stream 使activity_stream=None
clear 清空上述的类属性
absolute_name namespace
key
利用namespace把key打造成绝对路径
如果key以/开头直接返回,否则使用namespace+separator+key返回
str
relative_name namespace
key
利用namespace把key打造成相对路径
如果key不以/开头直接返回,如果key以namespace开头,直接去掉namespace+separator
str
key variable_name 获取变量的key,直接返回以.截取变量的第一个元素 str
key_with_attributes variable_name 获取变量的key和key_attributes tuple

客户端

键值存储的客户端,用于在行为之间共享数据,同时使用这些数据决定行为。他依赖于黑板类

属性

  • name: 客户端的标识符

  • namespace: 命名空间

  • read: 存储拥有读权限的客户端集合

  • write: 存储拥有写权限de客户端集合

  • exclusive: 存储对键有独占写入权的集合

  • remappings: 存储name的映射关系

重要方法解析

_setattr_

  1. 先利用namespace获得一个完整的name名称
  2. 如果此key没有写权限,则在Blackboard.activity_stream记录此事件,其中name为名称,ActivityType.ACCESS_DENIED意思是访问被拒绝,然后返回异常。
  3. 获取映射的name(如果在register_key注册时,并没有设置映射参数,则name和remapped_name其实是一样的)
  4. 仍然在Blackboard.activity_stream记录赋值事件!分别是写事件和初始化事件
  5. 直接存储于Blackboard.storage
name = Blackboard.absolute_name(super().__getattribute__("namespace"), name)
if (name not in super().__getattribute__("write")) and (
    name not in super().__getattribute__("exclusive")
):
    if Blackboard.activity_stream is not None:
        Blackboard.activity_stream.push(
            self._generate_activity_item(name, ActivityType.ACCESS_DENIED)
        )
    raise AttributeError(
        "client '{}' does not have write access to '{}'".format(self.name, name)
    )
remapped_name = super().__getattribute__("remappings")[name]
if Blackboard.activity_stream is not None:
    if remapped_name in Blackboard.storage.keys():
        Blackboard.activity_stream.push(
            self._generate_activity_item(
                key=remapped_name,
                activity_type=ActivityType.WRITE,
                previous_value=Blackboard.storage[remapped_name],
                current_value=value,
            )
        )
    else:
        Blackboard.activity_stream.push(
            self._generate_activity_item(
                key=remapped_name,
                activity_type=ActivityType.INITIALISED,
                current_value=value,
            )
        )
Blackboard.storage[remapped_name] = value

_getattr_

  1. 先利用namespace获得一个完整的name名称

  2. 获取权限,这里有一个注意的点,如果此name不属于任何一个权限,则进行进一步的判断,如果他在命名空间中,则返回IntermediateVariableFetcher对象,否则记录事件并抛出异常

  3. 获取映射的name

  4. 区分权限,如果name是读权限,则key原始类型->READ;复合类型->ACCESSED,请注意这里区分权限只是为了往activity_stream中推数据

  5. 返回Blackboard.storage[remapped_name]

name = Blackboard.absolute_name(super().__getattribute__("namespace"), name)
read_key = False
write_key = False
if name in super().__getattribute__("read"):
    read_key = True
elif name in super().__getattribute__("write"):
    write_key = True
elif name in super().__getattribute__("exclusive"):
    write_key = True
else:
    if name in super().__getattribute__("namespaces"):
        return IntermediateVariableFetcher(blackboard=self, namespace=name)
    if Blackboard.activity_stream is not None:
        Blackboard.activity_stream.push(
            self._generate_activity_item(name, ActivityType.ACCESS_DENIED)
        )
    raise AttributeError(
        "client '{}' does not have read/write access to '{}'".format(
            self.name, name
        )
    )
remapped_name = super().__getattribute__("remappings")[name]
try:
    if write_key:
        if Blackboard.activity_stream is not None:
            if utilities.is_primitive(Blackboard.storage[remapped_name]):
                activity_type = ActivityType.READ
            else:  # could be a nested class object being accessed to write an attribute
                activity_type = ActivityType.ACCESSED
            Blackboard.activity_stream.push(
                self._generate_activity_item(
                    key=remapped_name,
                    activity_type=activity_type,
                    current_value=Blackboard.storage[remapped_name],
                )
            )
        return Blackboard.storage[remapped_name]
    if read_key:
        if Blackboard.activity_stream is not None:
            Blackboard.activity_stream.push(
                self._generate_activity_item(
                    key=remapped_name,
                    activity_type=ActivityType.READ,
                    current_value=Blackboard.storage[remapped_name],
                )
            )
        return Blackboard.storage[remapped_name]
except KeyError as e:
    if Blackboard.activity_stream is not None:
        Blackboard.activity_stream.push(
            self._generate_activity_item(remapped_name, ActivityType.NO_KEY)
        )
    raise KeyError(
        f"client '{self.name}' tried to access '{remapped_name}' but it does not yet exist on the blackboard"
    ) from e

set

首先和__setattr__方法相比,set增加overwrite参数允许覆盖赋值,其次增加name解析为key和key_attr,然后再调用__setattr__方法。

  1. 解析name为key和key_attr
  2. 如果key不在write和exclusive中,记录事件并抛出异常
  3. 获取映射的name
  4. 如果不允许覆盖,则判断是否已经存在,如果存在则记录事件并返回False
  5. 如果key_attr为空,则直接赋值,否则递归赋值
name = Blackboard.absolute_name(super().__getattribute__("namespace"), name)
name_components = name.split(".")
key = name_components[0]
key_attributes = ".".join(name_components[1:])
if (key not in super().__getattribute__("write")) and (
    key not in super().__getattribute__("exclusive")
):
    if Blackboard.activity_stream is not None:
        Blackboard.activity_stream.push(
            self._generate_activity_item(key, ActivityType.ACCESS_DENIED)
        )
    raise AttributeError(
        "client '{}' does not have write access to '{}'".format(self.name, name)
    )
remapped_key = super().__getattribute__("remappings")[key]
if not overwrite:
    if remapped_key in Blackboard.storage:
        if Blackboard.activity_stream is not None:
            Blackboard.activity_stream.push(
                self._generate_activity_item(
                    key=remapped_key,
                    activity_type=ActivityType.NO_OVERWRITE,
                    current_value=Blackboard.storage[remapped_key],
                )
            )
        return False
if not key_attributes:
    setattr(self, key, value)
    return True
else:
    blackboard_object = getattr(self, key)
    try:
        setattr(blackboard_object, key_attributes, value)
        return True
    except AttributeError:  # when the object doesn't have the attributes
        return False

register_key

参数
  • key: 键
  • access: 权限
  • required: 是否为必要的key
  • remap_to: 映射到黑板的键
代码解读
# 1.解析并获取映射key
key = Blackboard.absolute_name(super().__getattribute__("namespace"), key)
super().__getattribute__("remappings")[key] = (
    key if remap_to is None else remap_to
)
remapped_key = super().__getattribute__("remappings")[key]
# 2.判断权限并把key加入权限集合
if access == common.Access.READ:
    super().__getattribute__("read").add(key)
    Blackboard.metadata.setdefault(remapped_key, KeyMetaData())
    Blackboard.metadata[remapped_key].read.add(
        super().__getattribute__("unique_identifier")
    )
# 如果是写权限,则判断key是否存在于`exclusive`即互斥写中,如果存在则抛异常
elif access == common.Access.WRITE:
    conflicts = set()
    try:
        for unique_identifier in Blackboard.metadata[remapped_key].exclusive:
            conflicts.add(Blackboard.clients[unique_identifier])
            if conflicts:
                raise AttributeError(
                    (
                        f"'{super().__getattribute__('name')}' requested write on key '{remapped_key}', "
                        f"but this key is already associated with an exclusive writer[{conflicts}]"
                    )
                )
    except KeyError:
        pass  # no readers or writers on the key yet
    # 更新写权限集合,更新metadata
    super().__getattribute__("write").add(key)
    Blackboard.metadata.setdefault(remapped_key, KeyMetaData())
    Blackboard.metadata[remapped_key].write.add(
        super().__getattribute__("unique_identifier")
    )
# 如果是互斥写权限,则判断key是否存在于写、互斥写中,如果存在抛异常
elif access == common.Access.EXCLUSIVE_WRITE:
    try:
        key_metadata = Blackboard.metadata[remapped_key]
        conflicts = set()
        for unique_identifier in key_metadata.write | key_metadata.exclusive:
            conflicts.add(Blackboard.clients[unique_identifier])
        if conflicts:
            raise AttributeError(
                "'{}' requested exclusive write on key '{}', but this key is already associated [{}]".format(
                    super().__getattribute__("name"), remapped_key, conflicts
                )
            )
    except KeyError:
        pass  # no readers or writers on the key yet
    # 更新互斥写权限集合,更新metadata
    super().__getattribute__("exclusive").add(key)
    Blackboard.metadata.setdefault(remapped_key, KeyMetaData())
    Blackboard.metadata[remapped_key].exclusive.add(
        super().__getattribute__("unique_identifier")
    )
else:
    raise TypeError(
        "access argument is of incorrect type [{}]".format(type(access))
    )
# 如果required为True,则为必要字段,存于required变量内
if required:
    super().__getattribute__("required").add(key)
# 更新命名空间
self._update_namespaces(added_key=key)

unregister_key

参数
  • key: 键
  • clear: 是否清除
  • update_namespace_cache: 是否更新命名空间
代码解读

这里我们结合register_key来看

  1. 先获取完整的key
  2. 删除客户端存储的read、write、exclusive权限集合中的key
  3. 删除黑板中存储的拥有read、write、exclusive权限集合中当前客户端的id(第二步删除的是客户端的key,而第三步删除的是拥有此key客户端)
  4. 如果已无任何客户端拥有此key的权限,则Blackboard.metadata直接删除key
  5. 如果clear为True,则删除Blackboard.storage中此key拥有的任何数据
  6. 更新命名空间
key = Blackboard.absolute_name(super().__getattribute__("namespace"), key)
remapped_key = super().__getattribute__("remappings")[key]
super().__getattribute__("read").discard(
    key
)  # doesn't throw exceptions if it not present
super().__getattribute__("write").discard(key)
super().__getattribute__("exclusive").discard(key)
Blackboard.metadata[remapped_key].read.discard(
    super().__getattribute__("unique_identifier")
)
Blackboard.metadata[remapped_key].write.discard(
    super().__getattribute__("unique_identifier")
)
Blackboard.metadata[remapped_key].exclusive.discard(
    super().__getattribute__("unique_identifier")
)
if (
    (not Blackboard.metadata[remapped_key].read)
    and (not Blackboard.metadata[remapped_key].write)
    and (not Blackboard.metadata[remapped_key].exclusive)
):
    del Blackboard.metadata[remapped_key]
    if clear:
        try:
            del Blackboard.storage[remapped_key]
        except KeyError:
            pass  # perfectly legitimate for a registered key to not exist on the blackboard
del super().__getattribute__("remappings")[key]
if update_namespace_cache:
    self._update_namespaces()

示例

import py_trees

a = py_trees.blackboard.Client(name="a", namespace="na")
b = py_trees.blackboard.Client(name="b", namespace="na")

a.register_key("x", access=py_trees.common.Access.WRITE)
b.register_key("x", access=py_trees.common.Access.WRITE)

a.x = 20

print(a.x)  # >> 20
print(b.x)  # >> 20 可以看出针对同样的命名空间和key,则设置的值是一样的

a.unregister_key("x", False) 
print(a.x) # >> KeyError 如果注销key,首先a客户端已无此属性
print(b.x) # >> 20 而b客户端仍有此属性 这说明了Blackboard.metadata[remapped_key].xx为集合的重要性

# 接下来我们看clear参数
a.unregister_key("x", True)  # 如果clear=True,注意此时客户端a和b都拥有x的读权限,那么这里即使设置clear为True,则仍未清理x因为Blackboard.metadata[remapped_key].write仍有b客户端id
# 如果此时重新注册,不用赋值,a.x仍然存在
a.register_key("x", access=py_trees.common.Access.WRITE)
print(a.x)  # >> 20

# 如果
a.register_key("y", access=py_trees.common.Access.WRITE)
a.y = 10
a.unregister_key("y", True)
a.register_key("y", access=py_trees.common.Access.WRITE)
print(a.y) # >> KeyError 已经清理y,没有重新赋值,则报错

方法汇总

方法 参数 作用 返回
exists name 判断某个key是否存在,调用get方法 bool
absolute_name key 先调用is_registered判断key是否注册,然后调用Blackboard.absolute_name获取key的绝对命名,即命名空间+key str
get name 如果没有key_attr,调用__getattr__方法获取value,否则嵌套获取value Any
unset key 删除Blackboard.storge的key(映射后即remapped_key) bool
_generate_activity_item key
activety_type
previous_value
current_value
记录事件,在多个方法中可见到 ActivityItem
_update_namespaces added_key 如果added_key不为空,则命名空间新增;否则重新构建命名空间。这里细节不再详细叙述。
unregister clear 调用unregister_all_keys,然后删除Blackboard.clientsid
unregister_all_keys clear 获取所有权限类型包含的值,然后调用unregister_key并更新命名空间
verify_required_keys_exist 检查client.required的值是否存在,这里的required是在register_key时的变量required控制,是否是必要的key
is_registered key
access
如果设置权限值,则在对应权限值的集合中寻找key,如果没有,则在所有权限集合中寻找key bool
unregister_key key
clear
update_namespace_cache
注销一个key,

依赖类汇总

参数 作用 返回
KeyMetaData 存储3个元数据<read、write、exclusive>
ActivityType(枚举)
ActivityItem key
client_name
client_id
activity_type
previous_value
current_value
存储黑板的活动数据
ActivityStream maximum_size(尺寸)
push(func):activity_item
存储记录黑板活动的事件流(列表模拟流)
IntermediateVariableFetcher blackboard
namespace
返回namespace对应的key操作实例,在__getattr__方法中可以看到,当预查的key是一个命名空间时,则返回嵌套可操作对象。

关于我

❤️

姓名: lwz


性别: 男


年龄: 29


星座: 摩羯座


职业: python工程师


爱好: 秋、ps5、运动


主要的技术栈是:

  • python
  • 自动驾驶仿真验证

学习网站: leetcode


公司: 国科础石


– 2025 年 2 月 25 日更新

我的一些开源项目

等等?项目呢?不会没有吧??

其他

如果你喜欢我的开源项目或者它们可以给你带来帮助,可以赏一杯咖啡 ☕ 给我。~

It is better to attach some information or leave a message so that I can record the donation 📝, thank you very much 🙏.

社交链接