Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
titleextension.py
linenumberstrue
from __future__ import (print_function)
from time import sleep
import os
from universal_extension import UniversalExtension
from universal_extension import ExtensionResult
from universal_extension import event


class Extension(UniversalExtension):
    """Required class that serves as the entry point for the extension
    """

    def __init__(self):
        """Initializes an instance of the 'Extension' class
        """
        # Call the base class initializer
        super(Extension, self).__init__()

        # Flag to control the event loop below
        self.run = True

    def extension_start(self, fields):
        """Required method that serves as the starting point for work performed
        for a task instance.

        Parameters
        ----------
        fields : dict
            populated with field values from the associated task instance
            launched in the Controller

        Returns
        -------
        ExtensionResult
            once the work is done, an instance of ExtensionResult must be
            returned. See the documentation for a full list of parameters that
            can be passed to the ExtensionResult class constructor
        """

        # Get the value of the 'sleep_value' field
        sleep_value = fields.get('sleep_value', 3)

        # Get the value of the 'directory' field
        directory = fields.get('directory', '')

        # Verify the directory exists
        if not directory:
            return ExtensionResult(
                rc=-1,
                unv_output="specified directory is empty"
            )
        elif not os.path.exists(directory):
            return ExtensionResult(
                rc=-1,
                unv_output="specified directory does not exist"
            )

        prev_filelist = set()
        # loop that publishes events continuously as long as self.run is True
        while self.run:
            curr_filelist = set(os.listdir(directory))

            # Subtracting 'prev_filelist' from 'curr_filelist' will ensure the
            # 'filelist' only contains the newly detected files.
            filelist = curr_filelist - prev_filelist
            filelist = ','.join(filelist)

            prev_filelist = curr_filelist.copy()

            if filelist:
                print(filelist)

            # Publish the event
            event.publish(
                'publisher_event',
                {"filelist": filelist}
            )

            # sleep before publishing next event
            sleep(sleep_value)

        # Return the result with a payload marking the end of extension_start()
        return ExtensionResult(
            unv_output='extension_start() finished'
        )

    def extension_cancel(self):
        """Optional method that allows the Extension to do any cleanup work
        before finishing
        """
        # Set self.run to False which will end the event loop above
        self.run = False


Line 3Imports the os module used to list files in an directory.
Line 44Extract the directory field from the task instance passed down by the Controller.
Line 47-56Verify the directory exists
Line 58-80Get the current directory listing and remove the files from the previous iteration, only leaving new files discovered in the current iteration. Then, publish the filtered file listing as a comma-separated string to the publisher_event event.

Save the changes to extension.py and execute the UIP: Push command as shown below:

...