...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
from __future__ import (print_function) from time import sleep import os from universal_extension import UniversalExtension from universal_extension import ExtensionResult from universal_extension import event class Extension(UniversalExtension): """Required class that serves as the entry point for the extension """ def __init__(self): """Initializes an instance of the 'Extension' class """ # Call the base class initializer super(Extension, self).__init__() # Flag to control the event loop below self.run = True def extension_start(self, fields): """Required method that serves as the starting point for work performed for a task instance. Parameters ---------- fields : dict populated with field values from the associated task instance launched in the Controller Returns ------- ExtensionResult once the work is done, an instance of ExtensionResult must be returned. See the documentation for a full list of parameters that can be passed to the ExtensionResult class constructor """ # Get the value of the 'sleep_value' field sleep_value = fields.get('sleep_value', 3) # Get the value of the 'directory' field directory = fields.get('directory', '') # Verify the directory exists if not directory: return ExtensionResult( rc=-1, unv_output="specified directory is empty" ) elif not os.path.exists(directory): return ExtensionResult( rc=-1, unv_output="specified directory does not exist" ) prev_filelist = set() # loop that publishes events continuously as long as self.run is True while self.run: curr_filelist = set(os.listdir(directory)) # Subtracting 'prev_filelist' from 'curr_filelist' will ensure the # 'filelist' only contains the newly detected files. filelist = curr_filelist - prev_filelist filelist = ','.join(filelist) prev_filelist = curr_filelist.copy() if filelist: print(filelist) # Publish the event event.publish( 'publisher_event', {"filelist": filelist} ) # sleep before publishing next event sleep(sleep_value) # Return the result with a payload marking the end of extension_start() return ExtensionResult( unv_output='extension_start() finished' ) def extension_cancel(self): """Optional method that allows the Extension to do any cleanup work before finishing """ # Set self.run to False which will end the event loop above self.run = False |
Line 3 | Imports the os module used to list files in an directory. |
Line 44 | Extract the directory field from the task instance passed down by the Controller. |
Line 47-56 | Verify the directory exists |
Line 58-80 | Get the current directory listing and remove the files from the previous iteration, only leaving new files discovered in the current iteration. Then, publish the filtered file listing as a comma-separated string to the publisher_event event. |
Save the changes to extension.py
and execute the UIP: Push command as shown below:
...