One script Do It All

Thanks to python syntax, provided there is no need to use include to merge code, one single python script can be used on the command line, but also as a task wrapper with no change, and it may provide the ability to load the task into the expected server/suite for operation and test.

Sweeper is used in operation to start product generation earlier.

../../_images/image17.png
Listing 121 meter sweeper
#!/usr/bin/env python

"""
python docstring: demonstrate python cli + ecFlow task wrapper + task loader
 
$manual
DESCRIPTION:
 
sweeper:
 
find out min/max among modeleps_nemo tasks
 
OPERATORS: please, set complete if problematic
 
ANALYST: example as pure python task - job
$end
 
$comment
  comments can be added ...
$end
 
"""
import time
import os.path
import getopt
import sys

PATH = "/usr/local/apps/ecflow/4.0.9/lib/python2.7/site-packages/ecflow"
sys.path.insert(0, PATH)
import ecflow as ec

MICRO = "$$"  # double dollar to please ecFlow micro character balance
TARGET_TASK = "modeleps_nemo"  # use option -t to overwrite
TARGET_METER = "step"  # would this need an option???
MC_STEPS_LIST = (
    "/mc/main/12/legA/fc",  # use option -p for single path
    "/mc/main/12/legB/fc",
    "/mc/main/18bc/legA/fc",
    "/mc/main/00/legA/fc",
    "/mc/main/00/legB/fc",
    "/mc/main/06bc/legA/fc",
    "/mofc/thu/01/legC/fc",
    "/mofc/thu/hind/14/back",
    "/mofc/mon/01/legC/fc",
    "/mofc/mon/hind/14/back",
)


def focus_below(node):
    """continue down below nodes which name is expected ;
    please update for external use...
    """
    return (
        node.name() in ("00", "12", "18bc", "06bc", "legA", "legB", "pf", "cf", "fc")
        or "main" == node.name()
    )


def process(item, path=None, low=999, high=-1, task=None):
    """track nodes to follow and update min/max"""
    if type(item) == str:
        defs = ec.Defs(item)
    elif isinstance(item, ec.Client):
        item.sync_local()
        defs = item.get_defs()
    elif isinstance(item, ec.Defs):
        defs = item
    else:
        defs = None

    if defs:
        for node in defs.suites:
            if node.name() != str(path.split("/")[1]):
                continue
            status = "%s" % node.get_state()
            if node.is_suspended() or status == "unknown":
                continue
            for item in node.nodes:
                low, high = process(item, path, low, high, task)

    elif isinstance(item, ec.Family):
        for node in item.nodes:
            if focus_below(node):
                return process(node, path, low, high, task)
            low, high = process(node, path, low, high, task)

    elif isinstance(item, ec.Task):
        node = item
        path2node = item.get_abs_node_path()
        status = "%s" % node.get_state()

        # ignore cal-val # ecmwf specific
        if "/cv/" in path2node or not task in item.name():
            return low, high
        if "/cv/" in path2node and (high > 240 or low < -1):
            high = 240
            return low, high

        stamp = ""
        evt = ""
        meter = ""
        if node.is_suspended():
            status = "suspended"
        try:
            for att in node.meters:
                meter = "%s" % att.value()
                if att.name() != TARGET_METER:
                    continue
                if att.value() > high:
                    high = att.value()
                elif att.value() < low:
                    low = att.value()

        except Exception as excpt:
            print("#! problem with line:", excpt)
        # name = "%20s %s %5s %5s" % (path2node, stamp, evt, meter)

    else:
        print(type(item))

    return low, high


def create_task():
    """add ecFlow task into a family..."""
    sys.path.append("/home/ma/emos/o/def")
    import ecf

    steps = 360
    return ecf.Task("sweeper").add(
        ecf.Variables(
            ECF_FILES=os.getenv("ECF_FDI", None),
            ECF_EXTN=".py",
            ECF_JOB_CMD="ssh -x $WSHOST$ $ECF_JOB$ > $ECF_JOBOUT$ 2>&1 &",
            ECF_MICRO=MICRO[0],
        ),
        ecf.Label("info", ""),
        ecf.Defcomplete(),
        ecf.Meter("min", -1, steps, steps),
        ecf.Meter("max", -1, steps, steps),
    )


def loader(test=1, host=os.getenv("ECF_TEST_HOST", None)):
    """replace sweeper task in test/oper mode"""
    sys.path.append("/home/ma/emos/o/def")
    import ecf

    if test:
        env = {
            "host": os.getenv("ECF_TEST_HOST", None),
            "port": os.getenv("ECF_TEST_PORT", None),
            "path": "/admine/steps",
        }
    else:
        env = {
            "host": os.getenv("ECF_OPER_HOST", None),
            "port": os.getenv("ECF_OPER_PORT", None),
            "path": "/admin/steps",
        }
    defs = ecf.Defs()

    path = env["path"]
    sname = str(path.split("/")[1])
    fname = str(path.split("/")[2])

    suite = ecf.Suite(sname).add(
        ecf.Extern(MC_STEPS_LIST, defs),
        ecf.Family(fname).add(
            create_task().add(
                ecf.Cron("04:30 23:59 03:00"),
                ecf.Trigger("==active or ".join(MC_STEPS_LIST) + "==active"),
                ecf.Variables(WSHOST=host, QUEUE="test"),
            )
        ),
    )

    defs.add_suite(suite)

    print("#MSG: replacing", path, env["host"], env["port"])
    ecf.Client(env["host"], env["port"]).replace(path, defs, 1, 1)


class Sweep(object):
    """find min/max meter value below a node"""

    def __init__(self, host=None, port=None, path=None, task=None):
        import signal

        if host is None:
            host = os.getenv("ECF_NODE", "$ECF_NODE$")
        if port is None:
            port = int(os.getenv("ECF_PORT", "$ECF_PORT$"))
        self.clt = ec.Client(host, port)
        self.cl2 = None
        if path is None:
            self.path = MC_STEPS_LIST
        else:
            self.path = path
        if task is None:
            self.task = TARGET_TASK
        else:
            self.task = task

        # shall make sense when processed into job by ecflow
        # name remains when not processed...
        ecfv = {
            "ECF_NODE": "$ECF_NODE$",
            "ECF_PASS": "$ECF_PASS$",
            "ECF_NAME": "$ECF_NAME$",
            "ECF_PORT": "$ECF_PORT$",
            "ECF_TRYNO": "$ECF_TRYNO$",
        }

        for key, val in ecfv.items():
            if key in val:
                ecfv[key] = os.getenv(key, None)
        if ecfv["ECF_NODE"] and ecfv["ECF_NAME"]:
            print("#MSG will communicated with server...")
            print("#kill: ssh %s kill -15 %d" % (ecfv["ECF_NODE"], os.getpid()))
            self.cl2 = ec.Client()
            self.cl2.set_host_port(ecfv["ECF_NODE"], ecfv["ECF_PORT"])
            self.cl2.set_child_pid(os.getpid())
            self.cl2.set_child_path(ecfv["ECF_NAME"])
            self.cl2.set_child_password(ecfv["ECF_PASS"])
            self.cl2.set_child_try_no(int(ecfv["ECF_TRYNO"]))
            self.cl2.child_init()
            self.cl2.set_child_timeout(20)

            for sig in (
                signal.SIGINT,
                signal.SIGHUP,
                signal.SIGQUIT,
                signal.SIGILL,
                signal.SIGTRAP,
                signal.SIGIOT,
                signal.SIGBUS,
                signal.SIGFPE,
                signal.SIGUSR1,
                signal.SIGUSR2,
                signal.SIGPIPE,
                signal.SIGTERM,
                signal.SIGXCPU,
                signal.SIGPWR,
            ):
                signal.signal(sig, self.signal_handler)

    def signal_handler(self, signum, frame):
        """catch signal"""
        print("Aborting: Signal handler called with signal ", signum)
        self.cl2.child_abort("Signal handler called with signal " + str(signum))

    def report(self, msg, meter=None):
        """communicate with ecFlow server"""
        if not self.cl2:
            return
        if meter:
            self.cl2.child_meter(msg, meter)
        elif msg == "stop":
            self.cl2.child_complete()
            sys.exit(0)
        else:
            self.cl2.child_label("info", msg)

    def update(self):
        """refresh status tree asking ecFlow server"""
        num = 20
        leg = "legA"
        for path in self.path:
            first = 1
            low = 0
            high = 0
            self.clt.ch_register(False, [str(path.split("/")[1])])
            if "/back" in path:
                path += "/%02d/%s" % (num, leg)

            while 1:
                clt = self.clt
                status = ""
                try:
                    if clt.news_local() or first:  # has the server changed
                        clt.sync_local()
                        first = 0
                        defs = clt.get_defs()
                        node = defs.find_abs_node(path)
                        if node:
                            status = "%s" % node.get_state()
                        low, high = process(node, self.path, 999, -1, self.task)
                        sleep = 30
                    else:
                        sleep = 90
                    msg = "... %ds %s %s %d %d" % (sleep, path, status, low, high)
                    print(msg)
                    if type(low) != type(high):
                        print(type(low), type(high))

                    if status == "complete":
                        if leg == "legB":
                            num -= 1
                            leg = "legA"
                        else:
                            leg = "legB"
                        break

                    elif (
                        low == high
                        and status in ("queued", "aborted", "complete")
                        or node is None
                    ):
                        if node:
                            msg = "# %s %s" % (node.get_abs_node_path(), status)
                            print(msg)
                            self.report(msg)
                        break
                    else:
                        self.report(msg)
                        self.report("min", low)
                        self.report("max", high)
                    sys.stdout.flush()
                    time.sleep(sleep)
                except RuntimeError as exp:
                    print(str(exp))


def usage():
    """help"""
    print(
        """client
  -o: operational node is to be replaced, default is test node,
      provide this option BEFORE -r
  -r: replace task node
  -p: path to look below, by default internal list MC_STEPS_LIST will be used
  -h: this help
 
ECF_NODE=localhost ECF_PORT=31415 ./client.py --path /mc/main/18bc/legA/fc -e
 
"""
    )


if __name__ == "__main__":
    try:
        OPTS, ARGS = getopt.getopt(
            sys.argv[1:], "hep:rot:", ["help", "ens", "path", "replace", "oper", "task"]
        )
    except getopt.GetoptError as err:
        print("# what?", usage())
        sys.exit(2)

    TEST = 1
    PATH = None
    TASK = None
    for o, a in OPTS:
        if o in (
            "-e",
            "--ens",
        ):
            Sweep(path=PATH).update()
            sys.exit(0)
        elif o in (
            "-r",
            "--replace",
        ):
            loader(TEST)
            sys.exit(0)
        elif o in (
            "-p",
            "--path",
        ):
            PATH = a
        elif o in (
            "-o",
            "--oper",
        ):
            TEST = 0
        elif o in (
            "-t",
            "--task",
        ):
            TASK = a
    CLT = Sweep(path=PATH, task=TASK)
    CLT.update()
    CLT.report("stop")