# {py:mod}`ros_sugar.robot.bus` ```{py:module} ros_sugar.robot.bus ``` ```{autodoc2-docstring} ros_sugar.robot.bus :allowtitles: ``` ## Module Contents ### Classes ````{list-table} :class: autosummary longtable :align: left * - {py:obj}`BusHandle ` - ```{autodoc2-docstring} ros_sugar.robot.bus.BusHandle :summary: ``` * - {py:obj}`FeedbackBus ` - ```{autodoc2-docstring} ros_sugar.robot.bus.FeedbackBus :summary: ``` * - {py:obj}`InProcessFeedbackBus ` - ```{autodoc2-docstring} ros_sugar.robot.bus.InProcessFeedbackBus :summary: ``` * - {py:obj}`SocketFeedbackBus ` - ```{autodoc2-docstring} ros_sugar.robot.bus.SocketFeedbackBus :summary: ``` ```` ### API ````{py:class} BusHandle(unsubscribe: typing.Callable[[], None]) :canonical: ros_sugar.robot.bus.BusHandle ```{autodoc2-docstring} ros_sugar.robot.bus.BusHandle ``` ```` `````{py:class} FeedbackBus :canonical: ros_sugar.robot.bus.FeedbackBus Bases: {py:obj}`abc.ABC` ```{autodoc2-docstring} ros_sugar.robot.bus.FeedbackBus ``` ````{py:method} start() -> None :canonical: ros_sugar.robot.bus.FeedbackBus.start :abstractmethod: ```{autodoc2-docstring} ros_sugar.robot.bus.FeedbackBus.start ``` ```` ````{py:method} connect() -> None :canonical: ros_sugar.robot.bus.FeedbackBus.connect :abstractmethod: ```{autodoc2-docstring} ros_sugar.robot.bus.FeedbackBus.connect ``` ```` ````{py:method} publish(channel: str, data: bytes) -> None :canonical: ros_sugar.robot.bus.FeedbackBus.publish :abstractmethod: ```{autodoc2-docstring} ros_sugar.robot.bus.FeedbackBus.publish ``` ```` ````{py:method} subscribe(channel: str, on_data: typing.Callable[[bytes], None]) -> ros_sugar.robot.bus.BusHandle :canonical: ros_sugar.robot.bus.FeedbackBus.subscribe :abstractmethod: ```{autodoc2-docstring} ros_sugar.robot.bus.FeedbackBus.subscribe ``` ```` ````{py:method} close() -> None :canonical: ros_sugar.robot.bus.FeedbackBus.close :abstractmethod: ```{autodoc2-docstring} ros_sugar.robot.bus.FeedbackBus.close ``` ```` ````{py:property} endpoint :canonical: ros_sugar.robot.bus.FeedbackBus.endpoint :type: typing.Optional[str] ```{autodoc2-docstring} ros_sugar.robot.bus.FeedbackBus.endpoint ``` ```` ````` `````{py:class} InProcessFeedbackBus() :canonical: ros_sugar.robot.bus.InProcessFeedbackBus Bases: {py:obj}`ros_sugar.robot.bus.FeedbackBus` ```{autodoc2-docstring} ros_sugar.robot.bus.InProcessFeedbackBus ``` ````{py:method} start() -> None :canonical: ros_sugar.robot.bus.InProcessFeedbackBus.start ```{autodoc2-docstring} ros_sugar.robot.bus.InProcessFeedbackBus.start ``` ```` ````{py:method} connect() -> None :canonical: ros_sugar.robot.bus.InProcessFeedbackBus.connect ```{autodoc2-docstring} ros_sugar.robot.bus.InProcessFeedbackBus.connect ``` ```` ````{py:method} publish(channel: str, data: bytes) -> None :canonical: ros_sugar.robot.bus.InProcessFeedbackBus.publish ```{autodoc2-docstring} ros_sugar.robot.bus.InProcessFeedbackBus.publish ``` ```` ````{py:method} subscribe(channel: str, on_data: typing.Callable[[bytes], None]) -> ros_sugar.robot.bus.BusHandle :canonical: ros_sugar.robot.bus.InProcessFeedbackBus.subscribe ```{autodoc2-docstring} ros_sugar.robot.bus.InProcessFeedbackBus.subscribe ``` ```` ````{py:method} close() -> None :canonical: ros_sugar.robot.bus.InProcessFeedbackBus.close ```{autodoc2-docstring} ros_sugar.robot.bus.InProcessFeedbackBus.close ``` ```` ````{py:property} endpoint :canonical: ros_sugar.robot.bus.InProcessFeedbackBus.endpoint :type: typing.Optional[str] ```{autodoc2-docstring} ros_sugar.robot.bus.InProcessFeedbackBus.endpoint ``` ```` ````` `````{py:class} SocketFeedbackBus(endpoint: typing.Optional[str] = None) :canonical: ros_sugar.robot.bus.SocketFeedbackBus Bases: {py:obj}`ros_sugar.robot.bus.FeedbackBus` ```{autodoc2-docstring} ros_sugar.robot.bus.SocketFeedbackBus ``` ````{py:property} endpoint :canonical: ros_sugar.robot.bus.SocketFeedbackBus.endpoint :type: typing.Optional[str] ```{autodoc2-docstring} ros_sugar.robot.bus.SocketFeedbackBus.endpoint ``` ```` ````{py:method} start() -> None :canonical: ros_sugar.robot.bus.SocketFeedbackBus.start ```{autodoc2-docstring} ros_sugar.robot.bus.SocketFeedbackBus.start ``` ```` ````{py:method} connect() -> None :canonical: ros_sugar.robot.bus.SocketFeedbackBus.connect ```{autodoc2-docstring} ros_sugar.robot.bus.SocketFeedbackBus.connect ``` ```` ````{py:method} publish(channel: str, data: bytes) -> None :canonical: ros_sugar.robot.bus.SocketFeedbackBus.publish ```{autodoc2-docstring} ros_sugar.robot.bus.SocketFeedbackBus.publish ``` ```` ````{py:method} subscribe(channel: str, on_data: typing.Callable[[bytes], None]) -> ros_sugar.robot.bus.BusHandle :canonical: ros_sugar.robot.bus.SocketFeedbackBus.subscribe ```{autodoc2-docstring} ros_sugar.robot.bus.SocketFeedbackBus.subscribe ``` ```` ````{py:method} close() -> None :canonical: ros_sugar.robot.bus.SocketFeedbackBus.close ```{autodoc2-docstring} ros_sugar.robot.bus.SocketFeedbackBus.close ``` ```` `````