OSVE Callback Datapacks ======================= The CALLBACK datapacks are intended to notify and provide to the OSVE caller program the steps, the data and events risen during the simulation. Note that in the example, the "filePath" properties were set, but the logic says that this filePath will not be required because no any file is going to be written. But OSVE will use this "filePath" string as an identifier of the datapack, to be able to notify to the OsveSubscriber linked to it with the specified data in the datapack definition. An OsveSubscriber is a Python object with some callback methods invoked from the OSVE execute/simulation process. The available callback methods to register to are ``onSimulationStart()``, ``onSimulationTimeStep()``, ``onSimulationEnd()``, ``onEventStateChaged()``. For fully detailed comments about the ``OsveSubscribers``, read ``osve/osve_subscriber.py`` documentation. In the example above, the CALLBACK datapacks have the first column as the time in UTC format. The callbacksDatapack1 will then will provide the EPS_SA_POWER (Available solar arrays power) column and the callbacksDatapack2 will provide the SC quaternions. So for each CALLBACK dataPack, the user shall indicate at least the filePath containing a string identifier, the '"type": "CALLBACK"' and at least one datapack field, usually the 'time' because is computationally straight forward. How to implement an OsveSubscriber from Python scripting -------------------------------------------------------- To properly understand how these OSVE callbacks needs to be implemented, first we are going to explain the most general and flexible way to handle them, that is implementing a sub class of the OsveSubscriberAbstract class. Inheriting the OsveSubscriberAbstract class +++++++++++++++++++++++++++++++++++++++++++ In the example below we are going to show how to register a general OSVE Subscriber in order to validate during the simulation the Spacecraft total body rates. In case that during the simulation the total body rate exceeds 0.02 degrees/sec2 the simulation shall be aborted. For doing this only overriding the function onSimulationTimeStep() would be necessary, but we implemented all of them just to show how they shall be implemented:: from osve import osve from osve_subscriber import OsveSubscriberAbstract class OsveSubscriber(OsveSubscriberAbstract): def __init__(self): super().__init__("dataPack1.csv") def onSimulationStart(self, data) -> int: # Notifies that simulation started, data contains the selected overlay name's and unit's, # if returned int is smaller than 0 then simulation is aborted. print ("onSimulationStart: " + str(data)) return 0 def onSimulationTimeStep(self, data) -> int: # Notifies that a time step has been simulated, data contains the selected overlay name's and values, # if returned int is smaller than 0 then simulation is aborted. print ("onSimulationTimeStep: " + str(data)) if data["TOTAL_BODY_RATE"] > 0.02: print ("TOTAL_BODY_RATE CONSTRAINT BREAK!! TOTAL_BODY_RATE: " + str(data["TOTAL_BODY_RATE"])) return -1 return 0 def onSimulationEnd(self, data) -> int: # Notifies that simulation finished, data contains the simulation end time, # if returned int is smaller than 0 then simulation is aborted. print ("onSimulationEnd: " + str(data)) return 0 def onEventStateChanged(self, data) -> int: # Notifies that an event has changed its state, data contains the event properties, # if returned int is smaller than 0 then simulation is aborted. print ("onEventStateChanged: " + str(data)) return 0 test_input_path = "SOME_PATH_POINTING_TO_SCENARIO" test_input_config_path = "SOME_PATH_POINTING_TO_SESSION_FILE" # First, instantiate the OSVE Simulator sim = osve.osve() # Second, register the defined OsveSubscriber sim.register_subscriber(OsveSubscriber()) # Finally, run simulation sim.execute(test_input_path, test_input_config_path) And this is how this dataPack shall be defined in the session file:: ... "outputFiles": { ... "dataPacks": [ { "filePath": "callbacksDatapack1", "type": "CALLBACK", "fields": [ { "type": "time", "format": "utc" }, { "type": "MAPPS", "overlayId": "TOTAL_BODY_RATE" } ] } ] }, ... Inheriting the OsveEventSubscriber class ++++++++++++++++++++++++++++++++++++++++ In case that you only need to register to the risen OSVE Events, we suggest you to take advantage of this helper class intended to minimise the development effort:: from osve import osve from osve_subscriber import OsveEventSubscriber def onEventStateChanged(data) -> int: # Notifies that an event has changed its state # if returned int is smaller than 0 then simulation is aborted. print ("onEventStateChanged: " + str(data)) return 0 test_input_path = "SOME_PATH_POINTING_TO_SCENARIO" test_input_config_path = "SOME_PATH_POINTING_TO_SESSION_FILE" # First, register required OsveSubscribers theOsveSubscriber = OsveEventSubscriber("callbacksDatapack1", onEventStateChanged) # Second, instantiate the OSVE Simulator sim = osve.osve() # Finally, run simulation the_osve.execute(test_input_path, test_input_config_path) And this is how this dataPack shall be defined in the session file:: ... "outputFiles": { ... "dataPacks": [ { "filePath": "callbacksDatapack1", "type": "CALLBACK", "fields": [ { "type": "time", "format": "utc" } ] } ] }, ... Inheriting the OsvePtrAbstract class ++++++++++++++++++++++++++++++++++++ The OsvePtrAbstract is a helper OSVE Subscriber class that allows to process the generated data per each executed PTR block. So classes inheriting it, shall implement a method "onPtrBlockEnd()" that will receive as an argument a dictionary with all the block details and generated data during the simulation of the block time period. In this example we are just going to show the PTR Block details and the number of datapack steps data, the number of events risen and the generated logs since the PTR block start until its end:: from osve import osve from osve_subscriber import OsvePtrAbstract class OsvePtrSubscriber(OsvePtrAbstract): def __init__(self): super().__init__("theOsvePtrSubscriber") def onPtrBlockEnd(self, blockData) -> int: print("onPtrBlockEnd -> " + str(blockData["block_start"]) + " - " + str(blockData["block_end"]) + " T: " + str(blockData["block_type"]) + " M: " + str(blockData["block_mode"]) + " S-> " + str(len(blockData["block_steps"])) + " E-> " + str(len(blockData["block_events"])) + " L-> " + str(len(blockData["block_logs"]))) return 0 test_input_path = "SOME_PATH_POINTING_TO_SCENARIO" test_input_config_path = "SOME_PATH_POINTING_TO_SESSION_FILE" # First, instantiate the OSVE Simulator sim = osve.osve() # Second, register the OsvePtrSubscriber theOsvePtrSubscriber = OsvePtrSubscriber() sim.register_subscriber(theOsvePtrSubscriber) sim.register_logger(theOsvePtrSubscriber) # Finally, run simulation sim.execute(test_input_path, test_input_config_path) And this is how this dataPack shall be defined in the session file:: ... "outputFiles": { ... "dataPacks": [ { "filePath": "theOsvePtrSubscriber", "type": "CALLBACK", "fields": [ { "type": "time", "format": "utc" }, { "type": "MAPPS", "overlayId": "TOTAL_BODY_RATE" } ] } ] }, ... Grouping logs per PTR Block using the OsvePtrAbstract class +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Here is a basic example of how to make a report per PTR Block of the log messages generated per PTR Block time span:: from osve import osve from osve_subscriber import OsvePtrAbstract class OsvePtrLogger(osve_subscriber.OsvePtrAbstract): blocks_data = [] def __init__(self): super().__init__("theOsvePtrLogger") def onPtrBlockEnd(self, blockData) -> int: self.blocks_data.append(blockData) return 0 def report(self): print ("") print ("----------------------------------------------") print (" theOsvePtrLogger:") print ("----------------------------------------------") print ("") idx = 0 for blockData in self.blocks_data: print("BLOCK " + str(idx) + ", From: " + str(blockData["block_start"]) + " , To: " + str(blockData["block_end"])) for log_data in blockData["block_logs"]: print(" -> " + str(log_data["severity"]) + " , " + str(log_data["module"]) + " , " + str(log_data["time"]) + " , " + str(log_data["text"])) idx += 1 test_input_path = "SOME_PATH_POINTING_TO_SCENARIO" test_input_config_path = "SOME_PATH_POINTING_TO_SESSION_FILE" # First, instantiate the OSVE Simulator sim = osve.osve() # Second, register the OsvePtrLogger theOsvePtrLogger = OsvePtrLogger() sim.register_subscriber(theOsvePtrLogger) sim.register_logger(theOsvePtrLogger) # Finally, run simulation and generate the report sim.execute(test_input_path, test_input_config_path) theOsvePtrLogger.report() And this is how this dataPack shall be defined in the session file:: ... "outputFiles": { ... "dataPacks": [ { "filePath": "theOsvePtrSubscriber", "type": "CALLBACK", "fields": [ { "type": "time", "format": "utc" } ] } ] }, ... Note that even we are not using this datapack "time" field data in our code, this field is required to allow OSVE to invoke "onPtrBlockEnd()" callback function, nevertheless, other callback functions needs to be invoked internally during the simulation. More helper classes would be implemented in the future, such as a class to perform averages or the total sum of a series or profile. But all these will be intended only to minimise the development effort at Python side, because all these features could be implemented already using a subclass implementation of the OsveSubscriberAbstract class.