"""
Querying commands for Armor Python API --ArmorPy.
"""
import rospy
# from armor_api.exceptions import ArmorServiceInternalError, ArmorServiceCallError
__author__ = "Alessio Capitanelli"
__copyright__ = "Copyright 2016, ArmorPy"
__license__ = "GNU"
__version__ = "1.0.0"
__maintainer__ = "Alessio Capitanelli"
__email__ = "alessio.capitanelli@dibris.unige.it"
__status__ = "Development"
# ========================================= QUERY ========================================= #
[docs]class ArmorQueryClient(object):
_client = None
def __init__(self, client):
self._client = client
[docs] def ind_b2_class(self, class_name):
"""
Query the list of all individuals belonging to a class.
Args:
class_name (str): a class in the ontology
Returns:
list(str): the list of individual belonging to the class
Raises:
armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails.
armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error.
"""
try:
res = self._client.call('QUERY', 'IND', 'CLASS', [class_name])
except rospy.ServiceException:
raise ArmorServiceCallError(
"Service call failed upon querying individuals belonging to class {0}".format(class_name))
except rospy.ROSException:
raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.")
if res.success:
return res.queried_objects
else:
raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def dataprop_b2_ind(self, dataprop_name, ind_name):
"""
Query all values of a data property associated with an individual.
Args:
dataprop_name (str): data property whose value you want to query.
ind_name (str): individual whose value you want to query.
Returns:
list(str): list of queried values as strings.
"""
try:
res = self._client.call('QUERY', 'DATAPROP', 'IND', [dataprop_name, ind_name])
except rospy.ServiceException:
raise ArmorServiceCallError(
"Service call failed upon querying property {0} to individual {1}.".format(dataprop_name, ind_name))
except rospy.ROSException:
raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.")
if res.success:
return res.queried_objects
else:
raise ArmorServiceInternalError(res.error_description, res.exit_code)
[docs] def check_ind_exists(self, ind_name):
"""
Utility function to check if an arbitrary named individual exists.
Args:
ind_name (str): the individual to be checked
Returns:
bool: True if individual exists, else False
Raises:
armor_api.exceptions.ArmorServiceCallError: if call to ARMOR fails.
armor_api.exceptions.ArmorServiceInternalError: if ARMOR reports an internal error.
"""
try:
query = self.ind_b2_class('Thing')
except rospy.ServiceException:
raise ArmorServiceCallError("Service call failed upon querying {0} from {1}".format(
self._client.reference_name, self._client.client_id))
except rospy.ROSException:
raise ArmorServiceCallError("Cannot reach ARMOR client: Timeout Expired. Check if ARMOR is running.")
if ind_name in query:
return True
else:
return False