Skip to content

Commit

Permalink
Merge pull request #8 from adambinch/sentor_devel
Browse files Browse the repository at this point in the history
New top-level arg `lambdas_when_published` that ensures that lambda e…
  • Loading branch information
marc-hanheide authored Feb 22, 2020
2 parents 0cc0d93 + c80edcb commit 1fea4f5
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 56 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The config file contains the list of topics to be monitored and the definition,
- "cowsay"
- "moo"
timeout: 0.0
lambdas_when_published: False
lock_exec: False
repeat_exec: False
default_notifications: True
Expand Down Expand Up @@ -82,6 +83,7 @@ The config file contains the list of topics to be monitored and the definition,
- "msg.header.frame_id = '/map'"
- "msg.pose.pose.orientation.w = 1.0"
timeout: 0.0
lambdas_when_published: False
lock_exec: False
repeat_exec: False
default_notifications: True
Expand All @@ -100,6 +102,7 @@ Top-level arguments:
- `signal_lambdas`: optional, it's a list of (pythonic) lambda expressions such that when they are satisfied a warning is sent. You can use the python package `math` in your lambda expressions. See 'Child arguments of `signal_lambdas`' below.
- `execute`: optional, a list of processes to execute if `signal_when` is satisfied, or if all lambda expressions are satisfied. They will be executed in sequence. See 'Child arguments of `execute`' below.
- `timeout`: optional (default=0.1), amount of time (in seconds) for which the signal has to be satisfied before sending the warning/executing processes.
- `lambdas_when_published`: optional (default=False), setting this to 'True' will ensure that lambda expressions can be satisfied (for `timeout` seconds) only if the topic is currently being published.
- `lock_exec`: optional (default=False), lock out other threads while this one is executing its sequence of processes.
- `repeat_exec`: optional (default=False), default behaviour is to execute processes once after conditions (`signal_when` or lambdas) have been satisfied for `timeout` seconds. They will not execute again until a change occurs (i.e. conditions become unsatisfied, then satisfied again). If `repeat_exec` is set to 'True' then processes will be executed every `timeout` seconds whilst the conditions are satisfied.
- `default_notifications`: optional (default=True), setting this to 'False' will turn off the default warnings given when conditions (`signal_when` or lambdas) are satisfied.
Expand Down Expand Up @@ -149,9 +152,9 @@ Child arguments of `log`:
- `msg_args`: optional, you can supply message data from the topic you are monitoring in your logs. See the first monitor in the example config.

## Safety critical conditions
A topic monitor's signal when condition, and each of its lambda expressions, can be tagged as *safety critical*. If any safety critical condition in any topic monitor is satisfied then the boolean message from the topic `safe operation` will be set to False.
A topic monitor's `signal_when` condition, and each of its lambda expressions, can be tagged as *safety critical*. If any safety critical condition in any topic monitor is satisfied then the boolean message from the topic `safe operation` will be set to 'False'.

By setting the arg `auto_safety_tagging` (see `sentor.launch`) to True sentor will automatically set `safe operation` to True when all safety critical condition across all monitors are unsatisfied. If `auto_safety_tagging` is set to `False` then the service `/sentor/set_safety_tag` must be called.
By setting the arg `auto_safety_tagging` (see `sentor.launch`) to 'True' sentor will automatically set `safe operation` to True when all safety critical condition across all monitors are unsatisfied. If `auto_safety_tagging` is set to 'False' then the service `/sentor/set_safety_tag` must be called.

## Using sentor with this example config
You will need the RASberry repo (<a href="https://github.com/LCAS/RASberry">get it here</a>) and all its dependencies. Also install `cowsay`. Create a file `.rasberryrc` in your home directory and put the following inside it:
Expand Down
2 changes: 2 additions & 0 deletions config/execute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- "cowsay"
- "moo"
timeout: 0.0
lambdas_when_published: False
lock_exec: False
repeat_exec: False
default_notifications: True
Expand Down Expand Up @@ -67,6 +68,7 @@
- "msg.header.frame_id = '/map'"
- "msg.pose.pose.orientation.w = 1.0"
timeout: 0.0
lambdas_when_published: False
lock_exec: False
repeat_exec: False
default_notifications: True
Expand Down
5 changes: 4 additions & 1 deletion scripts/sentor_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def event_callback(string, type, msg=""):
lock_exec = False
repeat_exec = False
timeout = 0
lambdas_when_published = False
default_notifications = True
include = True
if 'signal_when' in topic.keys():
Expand All @@ -114,6 +115,8 @@ def event_callback(string, type, msg=""):
repeat_exec = topic['repeat_exec']
if 'timeout' in topic.keys():
timeout = topic['timeout']
if 'lambdas_when_published' in topic.keys():
lambdas_when_published = topic['lambdas_when_published']
if 'default_notifications' in topic.keys():
default_notifications = topic['default_notifications']
if 'include' in topic.keys():
Expand All @@ -122,7 +125,7 @@ def event_callback(string, type, msg=""):
if include:
topic_monitor = TopicMonitor(topic_name, signal_when, safety_critical,
signal_lambdas, processes, lock_exec, repeat_exec,
timeout, default_notifications, event_callback)
timeout, lambdas_when_published, default_notifications, event_callback)
topic_monitors.append(topic_monitor)
safety_monitor.register_monitors(topic_monitor)

Expand Down
119 changes: 66 additions & 53 deletions src/sentor/TopicMonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ class bcolors:
class TopicMonitor(Thread):


def __init__(self, topic_name, signal_when, safety_critical, signal_lambdas,
processes, lock_exec, repeat_exec, timeout, default_notifications, event_callback):
def __init__(self, topic_name, signal_when, safety_critical, signal_lambdas, processes,
lock_exec, repeat_exec, timeout, lambdas_when_published, default_notifications,
event_callback):
Thread.__init__(self)

self.topic_name = topic_name
Expand All @@ -35,6 +36,7 @@ def __init__(self, topic_name, signal_when, safety_critical, signal_lambdas,
self.timeout = timeout
else:
self.timeout = 0.1
self.lambdas_when_published = lambdas_when_published
self.default_notifications = default_notifications
self.event_callback = event_callback
self.satisfied_expressions = []
Expand Down Expand Up @@ -78,6 +80,8 @@ def _instantiate_monitors(self):
if self.signal_when.lower() == 'not published' and self.safety_critical:
self.signal_when_is_safe = False
return False

self.hz_monitor = self._instantiate_hz_monitor(real_topic, self.topic_name, msg_class)

if self.signal_when.lower() == 'published':
print "Signaling 'published' for "+ bcolors.OKBLUE + self.topic_name + bcolors.ENDC +" initialized"
Expand All @@ -92,7 +96,6 @@ def _instantiate_monitors(self):
elif self.signal_when.lower() == 'not published':
print "Signaling 'not published' for "+ bcolors.BOLD + str(self.timeout) + " seconds" + bcolors.ENDC +" for " + bcolors.OKBLUE + self.topic_name + bcolors.ENDC +" initialized"
# signal when it is not published
self.hz_monitor = self._instantiate_hz_monitor(real_topic, self.topic_name, msg_class)

# if there is something else then we have a filter on the message
if len(self.signal_lambdas):
Expand Down Expand Up @@ -154,46 +157,48 @@ def run(self):
self.is_instantiated = True

def cb(_):
if self.safety_critical:
self.signal_when_is_safe = False
if self.default_notifications and self.safety_critical:
self.event_callback("SAFETY CRITICAL: Topic %s is not published anymore" % self.topic_name, "warn")
elif self.default_notifications:
self.event_callback("Topic %s is not published anymore" % self.topic_name, "warn")
if not self.repeat_exec:
self.execute()
if self.signal_when.lower() == 'not published':
if self.safety_critical:
self.signal_when_is_safe = False
if self.default_notifications and self.safety_critical:
self.event_callback("SAFETY CRITICAL: Topic %s is not published anymore" % self.topic_name, "warn")
elif self.default_notifications:
self.event_callback("Topic %s is not published anymore" % self.topic_name, "warn")
if not self.repeat_exec:
self.execute()

def repeat_cb(_):
self.execute()
if self.signal_when.lower() == 'not published':
self.execute()

timer = None
timer_repeat = None
while not self._killed_event.isSet():
while not self._stop_event.isSet():
# check it is still published (None if not)
if self.hz_monitor is not None:
# check it is still published (None if not)
rate = self.hz_monitor.get_hz()

# # if the publishing rate is less than 1Hz we assume it's a latch message
# if rate is not None:
# self.is_latch = (rate < 0.5)


if rate is None and self.is_topic_published: #and not self.is_latch:
self.is_topic_published = False

timer = rospy.Timer(rospy.Duration.from_sec(self.timeout), cb, oneshot=True)

if self.repeat_exec:
timer_repeat = rospy.Timer(rospy.Duration.from_sec(self.timeout), repeat_cb, oneshot=False)
# self.event_callback("Topic %s is not published anymore" % self.topic_name, "warn")

if rate is not None:# and not self.is_topic_published:# and not self.is_latch:
self.is_topic_published = True

if self.safety_critical:
self.signal_when_is_safe = True

if timer is not None:
timer.shutdown()
timer = None
Expand All @@ -202,7 +207,7 @@ def repeat_cb(_):
if timer_repeat is not None:
timer_repeat.shutdown()
timer_repeat = None

# self._lock.acquire()
# while len(self.satisfied_expressions) > 0:
# expr = self.satisfied_expressions.pop()
Expand All @@ -219,31 +224,33 @@ def repeat_cb(_):
# self.published_filters_list.remove(expr)
# #print "-", expr
# self._lock.release()

time.sleep(0.3)
time.sleep(1)

def lambda_satisfied_cb(self, expr, msg, safety_critical_lambda):
if not self._stop_event.isSet():
if safety_critical_lambda:
if not expr in self.sat_expr_crit_timer.keys():
# self.satisfied_expressions.append(expr)
def crit_cb(_):
self.lambdas_are_safe = False
if self.default_notifications:
self.event_callback("SAFETY CRITICAL: Expression '%s' for %s seconds on topic %s satisfied" % (expr, self.timeout, self.topic_name), "warn", msg)

process_lambda, self.sat_expr_crit_timer = ProcessLambda(self.sat_expr_crit_timer)
if process_lambda:
self.lambdas_are_safe = False
if self.default_notifications:
self.event_callback("SAFETY CRITICAL: Expression '%s' for %s seconds on topic %s satisfied" % (expr, self.timeout, self.topic_name), "warn", msg)

self._lock.acquire()
self.sat_expr_crit_timer.update({expr: rospy.Timer(rospy.Duration.from_sec(self.timeout), crit_cb, oneshot=True)})
self._lock.release()

if not expr in self.sat_expressions_timer.keys():
# self.satisfied_expressions.append(expr)
def cb(_):
if self.default_notifications and not safety_critical_lambda:
self.event_callback("Expression '%s' for %s seconds on topic %s satisfied" % (expr, self.timeout, self.topic_name), "warn", msg)
if not self.repeat_exec and len(self.sat_expressions_timer.keys()) == len(self.signal_lambdas):
self.execute(msg)
process_lambda, self.sat_expressions_timer = ProcessLambda(self.sat_expressions_timer)
if process_lambda:
if self.default_notifications and not safety_critical_lambda:
self.event_callback("Expression '%s' for %s seconds on topic %s satisfied" % (expr, self.timeout, self.topic_name), "warn", msg)
if not self.repeat_exec and len(self.sat_expressions_timer.keys()) == len(self.signal_lambdas):
self.execute(msg)

self._lock.acquire()
self.sat_expressions_timer.update({expr: rospy.Timer(rospy.Duration.from_sec(self.timeout), cb, oneshot=True)})
Expand All @@ -252,42 +259,49 @@ def cb(_):
if self.repeat_exec:
if not expr in self.sat_expr_repeat_timer.keys():
def repeat_cb(_):
if len(self.sat_expr_repeat_timer.keys()) == len(self.signal_lambdas):
self.execute(msg)
for expr in self.sat_expr_repeat_timer.keys():
self.sat_expr_repeat_timer[expr].shutdown()
self.sat_expr_repeat_timer.pop(expr)
process_lambda, self.sat_expr_repeat_timer = ProcessLambda(self.sat_expr_repeat_timer)
if process_lambda:
if len(self.sat_expr_repeat_timer.keys()) == len(self.signal_lambdas):
self.execute(msg)
for expr in self.sat_expr_repeat_timer.keys():
self.sat_expr_repeat_timer = self.kill_timer(self.sat_expr_repeat_timer, expr)

self._lock.acquire()
self.sat_expr_repeat_timer.update({expr: rospy.Timer(rospy.Duration.from_sec(self.timeout), repeat_cb, oneshot=True)})
self._lock.release()
#print "sat", msg

def ProcessLambda(timer_dict):
if self.lambdas_when_published and not self.is_topic_published:
process_lambda = False
for expr in timer_dict.keys():
timer_dict = self.kill_timer(timer_dict, expr)
else:
process_lambda = True
return process_lambda, timer_dict

def lambda_unsatisfied_cb(self, expr):
if not self._stop_event.isSet():
if expr in self.sat_expr_crit_timer.keys():
self._lock.acquire()
self.sat_expr_crit_timer[expr].shutdown()
self.sat_expr_crit_timer.pop(expr)
self._lock.release()

if not self.sat_expr_crit_timer:
self.lambdas_are_safe = True
self.sat_expr_crit_timer = self.kill_timer(self.sat_expr_crit_timer, expr)

if expr in self.sat_expressions_timer.keys():
self._lock.acquire()
self.sat_expressions_timer[expr].shutdown()
self.sat_expressions_timer.pop(expr)
self._lock.release()
self.sat_expressions_timer = self.kill_timer(self.sat_expressions_timer, expr)

if expr in self.sat_expr_repeat_timer.keys():
self._lock.acquire()
self.sat_expr_repeat_timer[expr].shutdown()
self.sat_expr_repeat_timer.pop(expr)
self._lock.release()
self.sat_expr_repeat_timer = self.kill_timer(self.sat_expr_repeat_timer, expr)

if not self.sat_expr_crit_timer:
self.lambdas_are_safe = True
# if not msg in self.unsatisfied_expressions and msg in self.satisfied_expressions:
# self.unsatisfied_expressions.append(msg)
#print "unsat", msg

def kill_timer(self, timer_dict, expr):
self._lock.acquire()
timer_dict[expr].shutdown()
timer_dict.pop(expr)
self._lock.release()
return timer_dict

def published_cb(self, msg):
if not self._stop_event.isSet():
Expand All @@ -312,7 +326,6 @@ def execute(self, msg=None):
self.executor.execute(msg)

def safety_cb(self, event=None):

if self.signal_when_is_safe and self.lambdas_are_safe:
self.thread_is_safe = True
else:
Expand Down

0 comments on commit 1fea4f5

Please sign in to comment.