Source code for armor_api.armor_utils_client

"""
System utility commands for Armor Python API --ArmorPy.
"""

import rospy
# from armor_api.exceptions import ArmorServiceCallError, ArmorServiceInternalError

__author__ = "Alessio Capitanelli"
__copyright__ = "Copyright 2016, ArmorPy"
__license__ = "GNU"
__version__ = "1.0.0"
__maintainer__ = "Alessio Capitanelli"
__email__ = "alessio.capitanelli@dibris.unige.it"
__status__ = "Development"

# =========================================     UTILITY     ========================================= #


[docs]class ArmorUtilsClient(object): _client = None class __TermColors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' def __init__(self, client): self._client = client
[docs] def apply_buffered_changes(self): """ Apply buffered manipulations to the ontology when its reference has been initialized in buffered manipulation mode Returns: bool: True if ontology is consistent, False otherwise Raises: armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error Note: It returns the boolean consistency state of the ontology. This value is not updated to the last operation if you are working in buffered reasoner or manipulation mode! """ try: res = self._client.call('APPLY', '', '', []) rospy.loginfo( "{0}Buffered manipulations to {1} applied.{2}" .format(self.__TermColors.OKGREEN, self._client.reference_name, self.__TermColors.ENDC)) except rospy.ServiceException, e: raise ArmorServiceCallError( "Armor service internal error. Buffered changes have not been applied. Exception: {0}".format(e)) except rospy.ROSException: raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.") if res.success: return res.is_consistent else: raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def sync_buffered_reasoner(self): """ Sync ontology reference reasoner when operating in buffered reasoner mode Returns: bool: True if ontology is consistent, False otherwise Raises: armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error """ try: res = self._client.call('REASON', '', '', []) rospy.loginfo( "{0}Reasoner synced on {1} applied.{2}".format( self.__TermColors.OKGREEN, self._client.reference_name, self.__TermColors.ENDC)) except rospy.ServiceException: raise ArmorServiceCallError("Armor service internal error. Buffered reasoner could not be synced.") except rospy.ROSException: raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.") if res.success: return res.is_consistent else: raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def save_ref_with_inferences(self, filepath): """ Save ontology with latest inferred axioms to specified filepath Args: filepath (str): Returns: None: Raises: armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error """ try: res = self._client.call('SAVE', 'INFERENCE', '', [filepath]) rospy.loginfo( "{0}Reference {1} saved to {2}.{3}" .format(self.__TermColors.OKGREEN, self._client.reference_name, filepath, self.__TermColors.ENDC)) except rospy.ServiceException: raise ArmorServiceCallError( "Armor service internal error. Cannot save reference {0} to {1}.".format( self._client.reference_name, filepath)) except rospy.ROSException: raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.") if not res.success: raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def load_ref_from_file(self, owl_file_path, iri, buffered_manipulation=False, reasoner="PELLET", buffered_reasoner=False, mounted=False): """ Loads an ontology into armor from an .owl file. Args: owl_file_path (str): iri (str): buffered_manipulation (bool): set buffered manipulations, default False reasoner (str): set which reasoner to use (PELLET, HERMIT, SNOROCKET, FACT), default PELLET buffered_reasoner (bool): set if reasoner should be buffered, default False mounted (bool): set if the client should be mounted on the reference, default False Raises: armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error """ try: if mounted: res = self._client.call('LOAD', 'FILE', 'MOUNTED', [owl_file_path, iri, str(buffered_manipulation), reasoner, str(buffered_reasoner)]) else: res = self._client.call('LOAD', 'FILE', '', [owl_file_path, iri, str(buffered_manipulation), reasoner, str(buffered_reasoner)]) except rospy.ServiceException: raise ArmorServiceCallError("Service call failed upon reference {0} from {1}" .format(self._client.reference_name, self._client.client_id)) except rospy.ROSException: raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.") if not res.success: raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def mount_on_ref(self): """ Mounts a client on an ontology reference. Raises: armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error """ try: res = self._client.call('MOUNT', '', '', []) rospy.loginfo( "{0}{1} mounted on {2}.{3}".format(self.__TermColors.WARNING, self._client.client_id, self._client.reference_name, self.__TermColors.ENDC)) except rospy.ServiceException: raise ArmorServiceCallError( "Service call failed upon mounting {0} on reference {1}.".format( self._client.client_id, self._client.reference_name)) except rospy.ROSException: raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.") if not res.success: raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def unmount_from_ref(self): """ Unmount a client from an ontology. Raises: armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error """ try: res = self._client.call('UNMOUNT', '', '', []) rospy.loginfo( "{0}{1} unmounted from {2}.{3}".format(self.__TermColors.WARNING, self._client.client_id, self._client.reference_name, self.__TermColors.ENDC)) except rospy.ServiceException: raise ArmorServiceCallError( "Service call failed while unmounting {0} from reference {1}.".format(self._client.client_id, self._client.reference_name)) except rospy.ROSException: raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.") if not res.success: raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def set_log_to_terminal(self, switch): """ Set ARMOR to log to terminal. Args: switch (bool): Raises: armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error """ try: if switch: res = self._client.call('LOG', 'SCREEN', 'ON', []) else: res = self._client.call('LOG', 'SCREEN', 'OFF', []) rospy.loginfo("{0}Toggled log to terminal.{1}".format(self.__TermColors.WARNING, self.__TermColors.ENDC)) except rospy.ServiceException: raise ArmorServiceCallError("Service call failed while toggling logger to terminal") except rospy.ROSException: raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.") if not res.success: raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def set_log_to_file(self, switch, filepath=''): """ Set ARMOR to log to file. Args: switch (bool): filepath (str): Raises: armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error """ try: if switch: if filepath != '': res = self._client.call('LOG', 'FILE', 'ON', [filepath]) else: raise ArmorServiceCallError( "ARMOR_PY ERROR: please select a filepath to log to when you toggle on ARMOR logging to file.") else: res = self._client.call('', '', 'LOG', 'FILE', 'OFF', []) rospy.loginfo("{0}Toggled log to file: {1}.{2}".format(self.__TermColors.WARNING, filepath, self.__TermColors.ENDC)) except rospy.ServiceException: raise ArmorServiceCallError("Service call failed while toggling log to file.") except rospy.ROSException: raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.") if not res.success: raise ArmorServiceInternalError(res.error_description, res.exit_code)