ecflow.Client
- class ecflow.Client
Bases:
instance
Class client provides an interface to communicate with the ecflow_server.:
Client(
string host, # The server name. Cannot be empty.
string port # The port on the server, must be unique to the server
)
Client(
string host, # The server name. Cannot be empty.
int port # The port on the server, must be unique to the server
)
Client(
string host_port, # Expects <host>:<port> || <host>@<port>
)
The client reads in the following environment variables. For child commands,(i.e these are commands called in the .ecf/jobs files), these variables are used. For the python interface these environment variable are not really applicable but documented for completeness:
ECF_NAME <string> : Full path name to the task
ECF_PASS <string> : The jobs password, allocated by server, then used by server to authenticate client request
ECF_TRYNO <int> : The number of times the job has run. Used in file name generation. Set to 1 by begin() and re-queue commands.
ECF_TIMEOUT <int> : Max time in seconds for client to deliver message to main server
ECF_HOSTFILE <string> : File that lists alternate hosts to try, if connection to main host fails
ECF_DENIED <any> : Provides a way for child to exit with an error, if server denies connection. Avoids 24hr wait. Note: when you have hundreds of tasks, using this approach requires a lot of manual intervention to determine job status
NO_ECF <any> : If set exit’s immediately with success. Used to test jobs without communicating with server
The following environment variables are used by the python interface and child commands
ECF_HOST <string> : The host name of the main server. defaults to ‘localhost’
ECF_PORT <int> : The TCP/IP port to call on the server. Must be unique to a server
The ECF_HOST and ECF_PORT can be overridden by using the Constructor or set_host_port() member function. For optimal usage it is best to reuse the same Client rather than recreating for each client server interaction By default the Client interface will throw exceptions for error’s.
Usage:
try:
ci = Client('localhost:3150') # for errors will throw RuntimeError
ci.terminate_server()
except RuntimeError, e:
print(str(e))
- Client.alter((Client)arg1, (list)paths, (str)alter_type, (str)attribute_type[, (str)name=''[, (str)value='']]) None :
- Alter command is used to change the attributes of a node
void alter( (list | string ) paths(s) : A single or list of paths. Path name to the node whose attributes are to be changed string alter_type : This must be one of [ 'add' | 'change' | 'delete' | 'set_flag' | 'clear_flag' ] string attr_type : This varies according to the 'alter_type'. valid strings are: add : [ variable,time,today,date,day,label,zombie,late] delete : [ variable,time,today,date,day,label,cron,event,meter,trigger,complete,repeat,limit,inlimit,limit_path,zombie,late] change : [ variable,clock-type,clock-gain,event,meter,label,trigger,complete,repeat,limit-max,limit-value,late,time,today] set_flag and clear_flag: [ force_aborted | user_edit | task_aborted | edit_failed | ecfcmd_failed | statuscmd_failed | killcmd_failed | no_script | killed | status | late | message | complete | queue_limit | task_waiting | locked | zombie ] string name : used to locate the attribute, when multiple attributes of the same type, optional for some.i.e. when changing, attributes like variable,meter,event,label,limits string value : Only used when 'changing' a attribute. provides a new value )Exceptions can be raised because:
absolute_node_path does not exist.
parsing fails
The following describes the parameters in more detail:
add variable variable_name variable_value add time format # when format is +hh:mm | hh:mm | hh:mm(start) hh:mm(finish) hh:mm(increment) add today format # when format is +hh:mm | hh:mm | hh:mm(start) hh:mm(finish) hh:mm(increment) add date format # when format dd.mm.yyyy, can use '*' to indicate any day,month, or year add day format # when format is one of [ sunday,monday,tuesday,wednesday,friday,saturday ] add zombie format # when format is one of <zombie-type>:<child>:<server-action>|<client-action>:<zombie-lifetime> # <zombie-type> := [ user | ecf | path ] # <child> := [ init, event, meter, label, wait, abort, complete ] # <server-action> := [ adopt | delete ] # <client-action> := [ fob | fail | block(default) ] # <zombie-lifetime>:= lifetime of zombie in the server # example # add zombie :label:fob:0 # fob all child label request, & remove zombie as soon as possible delete variable name # if name is empty will delete -all- variables on the node delete time name # To delete a specific time, enter the time in same format as show above, # or as specified in the defs file # an empty name will delete all time attributes on the node delete today name # To delete a specific today attribute, enter in same format as show above, # or as specified in the defs file. # an empty name will delete all today attributes on the node delete date name # To delete a specific date attribute, enter in same format as show above, # or as specified in the defs file # an empty name will delete all date attributes on the node delete day name # To delete a specific day attribute, enter in same format as show above, # or as specified in the defs file # an empty name will delete all day attributes on the node delete cron name # To delete a specific cron attribute, enter in same as specified in the defs file # an empty name will delete all cron attributes on the node delete event name # To delete a specific event, enter name or number # an empty name will delete all events on the node delete meter name # To delete a specific meter , enter the meter name # an empty name will delete all meter on the node delete label name # To delete a specific label , enter the label name # an empty name will delete all labels on the node delete limit name # To delete a specific limit , enter the limit name # an empty name will delete all limits on the node delete inlimit name # To delete a specific inlimit , enter the inlimit name # an empty name will delete all inlimits on the node delete limit_path limit_name limit_path # To delete a specific limit path delete trigger # A node can only have one trigger expression, hence the name is not required delete complete # A node can only have one complete expression, hence the name is not required delete repeat # A node can only have one repeat, hence the name is not required change variable name value # Find the specified variable, and set the new value. change clock_type name # The name must be one of 'hybrid' or 'real'. change clock_gain name # The gain must be convertible to an integer. change clock_sync name # Sync suite calendar with the computer. change event name(optional ) # if no name specified the event is set, otherwise name must be 'set' or 'clear' change meter name value # The meter value must be convertible to an integer, and between meter min-max range. change label name value # sets the label change trigger name # The name must be expression. returns an error if the expression does not parse change complete name # The name must be expression. returns an error if the expression does not parse change limit_max name value # Sets the max value of the limit. The value must be convertible to an integer change limit_value name value # Sets the consumed tokens to value.The value must be convertible to an integer change repeat value # If the repeat is a date, then the value must be a valid YMD ( ie. yyyymmdd) # and be convertible to an integer, additionally the value must be in range # of the repeat start and end dates. Like wise for repeat integer. For repeat # string and enum, the name must either be an integer, that is a valid index or # if it is a string, it must correspond to one of enum's or strings list
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.alter('/suite/task','change','trigger','b2 == complete') except RuntimeError, e: print(str(e))
alter( (Client)arg1, (str)abs_node_path, (str)alter_type, (str)attribute_type [, (str)name=’’ [, (str)value=’’]]) -> None
- Client.archive((Client)arg1, (str)arg2) None :
- Archives suite or family nodes. Saves the suite/family nodes to disk, and then removes then from the definition
This saves memory in the server, when dealing with huge definitions that are not needed. If the node is re-queued or begun, it is automatically restored Use –restore to reload the archived nodes manually The nodes are saved to ECF_HOME/ECF_NAME.check Usage:
string archive( list paths # List of paths. ) string archive( string absolute_node_path )
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) print ci.archive('/suite1') except RuntimeError, e: print str(e)
archive( (Client)arg1, (list)arg2) -> None
- Client.begin_all_suites((Client)arg1[, (bool)force=False]) int :
Begin playing all the suites in the ecflow_server
Note
using the force option may cause zombies if suite has running jobs
void begin_all_suites(
[(bool)force=False] : bypass the checks for submitted and active jobs
)
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.begin_all_suites() # begin playing all the suites
ci.begin_all_suites(True) # begin playing all the suites, by passing checks
except RuntimeError, e:
print(str(e))
- Client.begin_suite((Client)arg1, (str)suite_name[, (bool)force=False]) int :
Begin playing the chosen suites in the ecflow_server
Note
using the force option may cause zombies if suite has running jobs
void begin_suite
string suite_name : begin playing the given suite
[(bool)force=False] : bypass the checks for submitted and active jobs
)
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.begin_suite('/suite1') # begin playing suite '/suite1'
ci.begin_suite('/suite1',True) # begin playing suite '/suite1' bypass any checks except RuntimeError, e:
print(str(e))
- Client.ch_add((Client)arg1, (int)arg2, (list)arg3) None :
Add a set of suites, to an existing registered handle
When dealing with large definitions, where a user is only interested in a small subset of suites, registering them, improves download performance from the server. Registered suites have an associated handle.
integer ch_add( integer handle : the handle obtained after ch_register list suite_names : list of strings representing suite names ) integer ch_add( list suite_names : list of strings representing suite names )Usage:
try: with Client() as ci: # use default host(ECF_HOST) & port(ECF_PORT) ci.ch_register(True,[]) # register interest in any new suites ci.ch_add(['s1','s2']) # add suites s1,s2 to the last added handle except RuntimeError, e: print(str(e))
ch_add( (Client)arg1, (list)arg2) -> None
- Client.ch_auto_add((Client)arg1, (int)arg2, (bool)arg3) int :
Change an existing handle so that new suites can be added automatically
When dealing with large definitions, where a user is only interested in a small subset of suites, registering them, improves download performance from the server. Registered suites have an associated handle.
void ch_auto_add( integer handle, : the handle obtained after ch_register bool auto_add_new_suite : automatically add new suites, this handle when they are created ) void ch_auto_add( bool auto_add_new_suite : automatically add new suites using handle on the client )Usage:
try: with Client() as ci: # use default host(ECF_HOST) & port(ECF_PORT) ci.ch_register(True,['s1','s2','s3']) # register interest in suites s1,s2,s3 and any new suites ci.ch_auto_add( False ) # disable adding newly created suites to my handle except RuntimeError, e: print(str(e))
ch_auto_add( (Client)arg1, (bool)arg2) -> int
- Client.ch_drop((Client)arg1, (int)arg2) int :
Drop/de-register the client handle.
When dealing with large definitions, where a user is only interested in a small subset of suites, registering them, improves download performance from the server. Registered suites have an associated handle. Client must ensure un-used handle are dropped otherwise they will stay, in the ecflow_server
void ch_drop( int client_handle : The handle must be an integer that is > 0 ) void ch_drop() : Uses the local handle stored on the client, from last call to ch_register()Exception:
RunTimeError thrown if handle has not been previously registered
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.ch_register(False,['s1','s2']) while( 1 ): # get incremental changes to suites s1 & s2, uses data stored on ci/defs ci.sync_local() # will only retrieve data for suites s1 & s2 update(ci.get_defs()) finally: ci.ch_drop()To automatically drop the handle(Preferred) use with :
try: with Client() as ci: ci.ch_register(False,['s1','s2']) while( 1 ): # get incremental changes to suites s1 & s2, uses data stored on ci/defs ci.sync_local() # will only retrieve data for suites s1 & s2 update(ci.get_defs()) .... # will automatically drop last handle except RuntimeError, e: print(str(e))
ch_drop( (Client)arg1) -> int
- Client.ch_drop_user((Client)arg1, (str)arg2) int :
Drop/de-register all handles associated with user.
When dealing with large definitions, where a user is only interested in a small subset of suites, registering them, improves download performance from the server. Registered suites have an associated handle. Client must ensure un-used handle are dropped otherwise they will stay, in the ecflow_server
void ch_drop_user(
string user # If empty string will drop current user
)
Exception:
RunTimeError thrown if handle has not been previously registered
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.ch_register(False,['s1','s2'])
while( 1 ):
# get incremental changes to suites s1 & s2, uses data stored on ci/defs
update(ci.get_defs())
finally:
ci.ch_drop_user('') # drop all handles associated with current user
- Client.ch_handle((Client)arg1) int :
Register interest in a set of suites.
If a definition has lots of suites, but the client is only interested in a small subset.
Then using this command can reduce network bandwidth and synchronisation will be quicker.
This command will create a client handle. This handle is held locally on the ecflow.Client, and
can be used implicitly by ch_drop(),ch_add(),ch_remove() and ch_auto_add().
Registering a client handle affects the news() and sync() commands:
void ch_register(
bool auto_add_new_suites : true means add new suites to my list, when they are created
list suite_names : should be a list of suite names, names not in the definition are ignored
)
Usage:
try:
ci = Client()
suite_names = [ 's1', 's2', 's3' ]
ci.ch_register(True,suite_names) # register interest in suites s1,s2,s3 and any new suites
ci.ch_register(False,suite_names) # register interest in suites s1,s2,s3 only
except RuntimeError, e:
print(str(e))
The client ‘ci’ will hold locally the client handle. Since we have made multiple calls to register a handle, the variable ‘ci’ will hold the handle for the last call only. The handle associated with the suite can be manually retrieved:
try:
ci = Client()
ci.ch_register(True,['s1','s2','s3']) # register interest in suites s1,s2,s3 and any new suites
client_handle = ci.ch_handle() # get the handle associated with last call to ch_register
.... # after a period of time
except RuntimeError, e:
print(str(e))
finally:
ci.ch_drop( client_handle ) # de-register the handle
To automatically drop the handle(preferred) use with:
try:
with Client() as ci:
ci.ch_register(True,['s1','s2','s3']) # register interest in suites s1,s2,s3 and any new suites
client_handle = ci.ch_handle() # get the handle associated with last call to ch_register
.... # will automatically drop last handle
except RuntimeError, e:
print(str(e))
- Client.ch_register((Client)arg1, (bool)arg2, (list)arg3) None :
Register interest in a set of suites.
If a definition has lots of suites, but the client is only interested in a small subset.
Then using this command can reduce network bandwidth and synchronisation will be quicker.
This command will create a client handle. This handle is held locally on the ecflow.Client, and
can be used implicitly by ch_drop(),ch_add(),ch_remove() and ch_auto_add().
Registering a client handle affects the news() and sync() commands:
void ch_register(
bool auto_add_new_suites : true means add new suites to my list, when they are created
list suite_names : should be a list of suite names, names not in the definition are ignored
)
Usage:
try:
ci = Client()
suite_names = [ 's1', 's2', 's3' ]
ci.ch_register(True,suite_names) # register interest in suites s1,s2,s3 and any new suites
ci.ch_register(False,suite_names) # register interest in suites s1,s2,s3 only
except RuntimeError, e:
print(str(e))
The client ‘ci’ will hold locally the client handle. Since we have made multiple calls to register a handle, the variable ‘ci’ will hold the handle for the last call only. The handle associated with the suite can be manually retrieved:
try:
ci = Client()
ci.ch_register(True,['s1','s2','s3']) # register interest in suites s1,s2,s3 and any new suites
client_handle = ci.ch_handle() # get the handle associated with last call to ch_register
.... # after a period of time
except RuntimeError, e:
print(str(e))
finally:
ci.ch_drop( client_handle ) # de-register the handle
To automatically drop the handle(preferred) use with:
try:
with Client() as ci:
ci.ch_register(True,['s1','s2','s3']) # register interest in suites s1,s2,s3 and any new suites
client_handle = ci.ch_handle() # get the handle associated with last call to ch_register
.... # will automatically drop last handle
except RuntimeError, e:
print(str(e))
- Client.ch_remove((Client)arg1, (int)arg2, (list)arg3) None :
Remove a set of suites, from an existing handle
When dealing with large definitions, where a user is only interested in a small subset of suites, registering them, improves download performance from the server. Registered suites have an associated handle.
integer ch_remove( integer handle : the handle obtained after ch_register list suite_names : list of strings representing suite names ) integer ch_remove( list suite_names : list of strings representing suite names )Usage:
try: with Client() as ci: # use default host(ECF_HOST) & port(ECF_PORT) ci.ch_register(True,['s1','s2','s3']) # register interest in suites s1,s2,s3 and any new suites ci.ch_remove( ['s1'] ) # remove suites s1 from the last added handle except RuntimeError, e: print(str(e))
ch_remove( (Client)arg1, (list)arg2) -> None
- Client.ch_suites((Client)arg1) None :
Writes to standard out the list of registered handles and the suites they reference.
When dealing with large definitions, where a user is only interested in a small subset of suites, registering them, improves download performance from the server. Registered suites have an associated handle.
- property Client.changed_node_paths
After a call to sync_local() we can access the list of nodes that changed
The returned list consists of node paths. IF the list is empty assume that whole definition changed. This should be expected after the first call to sync_local() since that always retrieves the full definition from the server:
void changed_node_paths()
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
if ci.news_local(): # has the server changed
print('Server Changed') # server changed bring client in sync with server
ci.sync_local() # get the full definition from the server if first time
# otherwise apply incremental changes to Client definition,
# bringing it in sync with the server definition
defs = ci.get_defs() # get the updated/synchronised definition
for path in ci.changed_node_paths:
if path == '/': # path '/' represent change to server node/defs
print('defs changed') # defs state change or user variables changed
else:
node = defs.find_abs_node(path)
# if changed_node_paths is empty, then assume entire definition changed
print(defs) # print the synchronised definition. Should be same as server
except RuntimeError, e:
print(str(e))
- Client.check((Client)arg1, (str)arg2) str :
Check trigger and complete expressions and limits
The ecflow_server does not store externs. Hence all unresolved references are reported as errors. Returns a non empty string for any errors or warning
string check( list paths # List of paths. ) string check( string absolute_node_path )Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) print(ci.check('/suite1')) except RuntimeError, e: print(str(e))
check( (Client)arg1, (list)arg2) -> str
- Client.checkpt((Client)arg1[, (CheckPt)mode=ecflow.CheckPt.UNDEFINED[, (int)check_pt_interval=0[, (int)check_pt_save_alarm_time=0]]]) int :
Request the ecflow_server check points the definition held in the server immediately
This effectively saves the definition held in the server to disk, in a platform independent manner. This is the default when no arguments are specified. The saved file will include node state, passwords, etc. The default file name is <host>.<port>.ecf.check and is saved in ECF_HOME directory. The check point file name can be overridden via ECF_CHECK server environment variable. The back up check point file name can be overridden via ECF_CHECKOLD server environment variable:
void checkpt(
[(CheckPt::Mode)mode=CheckPt.UNDEFINED]
: Must be one of [ NEVER, ON_TIME, ALWAYS, UNDEFINED ]
NEVER : Never check point the definition in the server
ON_TIME: Turn on automatic check pointing at interval stored on server
or with interval specified as the second argument
ALWAYS: Check point at any change in node tree, *NOT* recommended for large definitions
UNDEFINED:The default, which allows for immediate check pointing, or alarm setting
[(int)interval=120] : This specifies the interval in seconds when server should automatically check pt.
This will only take effect if mode is on_time/CHECK_ON_TIME
Should ideally be a value greater than 60 seconds, default is 120 seconds
[(int)alarm=30] : Specifies check pt save alarm time. If saving the check pt takes longer than
the alarm time, then the late flag is set on the server.
This flag will need to be cleared manually.
)
Note
When the time taken to save the check pt is excessive, it can interfere with job scheduling. It may be an indication of the following:
slow disk
file system full
The definition is very large and needs to split
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.checkpt() # Save the definition held in the server to disk
ci.checkpt(CheckPt.NEVER) # Switch off check pointing
ci.checkpt(CheckPt.ON_TIME) # Start automatic check pointing at the interval stored in the server
ci.checkpt(CheckPt.ON_TIME,180) # Start automatic check pointing every 180 seconds
ci.checkpt(CheckPt.ALWAYS) # Check point at any state change in node tree. *not* recommended for large defs
ci.checkpt(CheckPt.UNDEFINED,0,35) # Change check point save time alarm to 35 seconds
# With these arguments mode and interval remain unchanged
except RuntimeError, e:
print(str(e))
- Client.child_abort((Client)arg1[, (str)reason='']) None :
Child command,notify server job has aborted, can provide an optional reason
- Client.child_complete((Client)arg1) None :
Child command,notify server job has complete
- Client.child_event((Client)arg1, (str)event_name[, (bool)value=True]) None :
Child command,notify server event occurred, requires the event name
- Client.child_init((Client)arg1) None :
Child command,notify server job has started
- Client.child_label((Client)arg1, (str)arg2, (str)arg3) None :
Child command,notify server label changed, requires label name, and new value
- Client.child_meter((Client)arg1, (str)arg2, (int)arg3) None :
Child command,notify server meter changed, requires meter name and value
- Client.child_queue((Client)arg1, (str)queue_name, (str)action[, (str)step=''[, (str)path_to_node_with_queue='']]) str :
Child command,active:return current step as string, then increment index, requires queue name, and optionally path to node with the queue
- Client.child_wait((Client)arg1, (str)arg2) None :
Child command,wait for expression to come true
- Client.clear_log((Client)arg1) int :
Request the ecflow_server to clear log file.
Log file will be empty after this call.
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.clear_log() # log file is now empty
except RuntimeError, e:
print(str(e))
- Client.debug((Client)arg1, (bool)arg2) None :
enable/disable client api debug
- Client.debug_server_off((Client)arg1) int :
Disable server debug
- Client.debug_server_on((Client)arg1) int :
Enable server debug, Will dump to standard out on server host.
- Client.delete((Client)arg1, (str)abs_node_path[, (bool)force=False]) int :
Delete the node (s) specified.
If a node is submitted or active, then a Exception will be raised. To force the deletion at the expense of zombie creation, then set the force parameter to true
void delete( list paths : List of paths. [(bool)force=False] : If true delete even if in 'active' or 'submitted' states Which risks creating zombies. ) void delete( string absolute_node_path: Path name of node to delete. [(bool)force=False] : If true delete even if in 'active' or 'submitted' states )Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.delete('/s1/f1/task1') paths = ['/s1/f1/t1','/s2/f1/t2'] ci.delete(paths) # delete all tasks specified in the paths except RuntimeError, e: print(str(e))
delete( (Client)arg1, (list)paths [, (bool)force=False]) -> None
- Client.delete_all((Client)arg1[, (bool)force=False]) int :
Delete all the nodes held in the ecflow_server.
The suite definition in the server will be empty, after this call. Use with care If a node is submitted or active, then a Exception will be raised. To force the deletion at the expense of zombie creation, then set the force parameter to true
void delete_all(
[(bool)force=False] : If true delete even if in 'active' or 'submitted' states
Which risks creating zombies.
)
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.delete_all()
ci.get_server_defs()
except RuntimeError, e:
print(str(e)); # expect failure since all nodes deleted
- Client.disable_ssl((Client)arg1) None :
- ecFlow client and server are SSL enabled. To use SSL choose between:
export ECF_SSL=1 # search for server.crt otherwise <host>.<port>.crt
export ECF_SSL=<host>.<port> # Use server specific certificates <host>.<port>.***
- use –ssl # argument on ecflow_client/ecflow_server, same as option 1.
# Typically ssl server can be started with ecflow_start.sh -s
Client.enable_ssl() # for python client
ecFlow expects the certificates to be in directory $HOME/.ecflowrc/ssl The certificates can be shared if you have multiple servers running on the same machine. In this case use ECF_SSL=1, then ecflow_server expects the following files in $HOME/.ecflowrc/ssl
dh2048.pem
server.crt
server.key
server.passwd (optional) if this exists it must contain the pass phrase used to create server.key
ecflow_client expects the following files in : $HOME/.ecflowrc/ssl
server.crt (this must be the same as server)
Alternatively you can have different setting for each server ECF_SSL=<host>.<port> Then server expect files of the type:
<host>.<port>.pem
<host>.<port>.crt
<host>.<port>.key
<host>.<port>.passwd (optional)
and client expect files of the type:
<host>.<port>.crt # as before this must be same as the server
The server/client will automatically check existence of both variants, but will give preference to NON <host>.<port>.*** variants first, when ECF_SSL=1 The following steps, show you how to create the certificate files. This may need to be adapted if you want to use <host>.<port>.***
Generate a password protected private key. This will request a pass phrase. This key is a 1024 bit RSA key which is encrypted using Triple-DES and stored in a PEM format so that it is readable as ASCII text
> openssl genrsa -des3 -out server.key 1024 # Password protected private key
Additional security. If you want additional security, create a file called ‘server.passwd’ and add the pass phrase to the file. Then set the file permission so that file is only readable by the server process. Or you can choose to remove password requirement. In that case we don’t need server.passwd file.
> cp server.key server.key.secure > openssl rsa -in server.key.secure -out server.key # remove password requirement
Sign certificate with private key (self signed certificate).Generate Certificate Signing Request(CSR). This will prompt with a number of questions. However please ensure ‘common name’ matches the host where your server is going to run.
> openssl req -new -key server.key -out server.csr # Generate Certificate Signing Request(CSR)
Generate a self signed certificate CRT, by using the CSR and private key.
> openssl x509 -req -days 3650 -in server.csr -signkey server.key -out server.crt
- Generate dhparam file. ecFlow expects 2048 key.
> openssl dhparam -out dh2048.pem 2048
- Client.edit_script_edit((Client)arg1, (str)arg2) str :
get script for Edit
- Client.edit_script_preprocess((Client)arg1, (str)arg2) str :
get script for Edit Preprocess
- Client.edit_script_submit((Client)arg1, (str)arg2, (list)arg3, (list)arg4, (bool)arg5, (bool)arg6) int :
submit script from Edit/Preprocess to run as alias or not:
ci = Client()
ci.edit_script_submit(path_to_task,
used_variables, # array name=value
file_contents, # strings array
alias, # bool False,
run # bool true
)
- Client.enable_ssl((Client)arg1) None :
- ecFlow client and server are SSL enabled. To use SSL choose between:
export ECF_SSL=1 # search for server.crt otherwise <host>.<port>.crt
export ECF_SSL=<host>.<port> # Use server specific certificates <host>.<port>.***
- use –ssl # argument on ecflow_client/ecflow_server, same as option 1.
# Typically ssl server can be started with ecflow_start.sh -s
Client.enable_ssl() # for python client
ecFlow expects the certificates to be in directory $HOME/.ecflowrc/ssl The certificates can be shared if you have multiple servers running on the same machine. In this case use ECF_SSL=1, then ecflow_server expects the following files in $HOME/.ecflowrc/ssl
dh2048.pem
server.crt
server.key
server.passwd (optional) if this exists it must contain the pass phrase used to create server.key
ecflow_client expects the following files in : $HOME/.ecflowrc/ssl
server.crt (this must be the same as server)
Alternatively you can have different setting for each server ECF_SSL=<host>.<port> Then server expect files of the type:
<host>.<port>.pem
<host>.<port>.crt
<host>.<port>.key
<host>.<port>.passwd (optional)
and client expect files of the type:
<host>.<port>.crt # as before this must be same as the server
The server/client will automatically check existence of both variants, but will give preference to NON <host>.<port>.*** variants first, when ECF_SSL=1 The following steps, show you how to create the certificate files. This may need to be adapted if you want to use <host>.<port>.***
Generate a password protected private key. This will request a pass phrase. This key is a 1024 bit RSA key which is encrypted using Triple-DES and stored in a PEM format so that it is readable as ASCII text
> openssl genrsa -des3 -out server.key 1024 # Password protected private key
Additional security. If you want additional security, create a file called ‘server.passwd’ and add the pass phrase to the file. Then set the file permission so that file is only readable by the server process. Or you can choose to remove password requirement. In that case we don’t need server.passwd file.
> cp server.key server.key.secure > openssl rsa -in server.key.secure -out server.key # remove password requirement
Sign certificate with private key (self signed certificate).Generate Certificate Signing Request(CSR). This will prompt with a number of questions. However please ensure ‘common name’ matches the host where your server is going to run.
> openssl req -new -key server.key -out server.csr # Generate Certificate Signing Request(CSR)
Generate a self signed certificate CRT, by using the CSR and private key.
> openssl x509 -req -days 3650 -in server.csr -signkey server.key -out server.crt
- Generate dhparam file. ecFlow expects 2048 key.
> openssl dhparam -out dh2048.pem 2048
- Client.flush_log((Client)arg1) int :
Request the ecflow_server to flush and then close log file
It is best that the server is shutdown first, as log file will be reopened whenever a command wishes to log any changes.
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.flush_log() # Log can now opened by external program
except RuntimeError, e:
print(str(e))
- Client.force_event((Client)arg1, (str)arg2, (str)arg3) None :
- Set or clear a event
void force_event( string absolute_node_path:event: Path name to node: < event name | number> The paths must begin with a leading '/' string signal : [ set | clear ] ) void force_event( list paths : A list of absolute node paths. Each path must include a event name The paths must begin with a leading '/' string signal : [ set | clear ] )Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.force_event('/s1/f1:event_name','set') # Set or clear a event for a list of events paths = [ '/s1/t1:ev1', '/s2/t2:ev2' ] ci.force_event(paths,'clear') except RuntimeError, e: print(str(e))
force_event( (Client)arg1, (list)arg2, (str)arg3) -> None
- Client.force_state((Client)arg1, (str)arg2, (State)arg3) None :
Force a node(s) to a given state
When a task is set to complete, it may be automatically re-queued if it has multiple time dependencies. In the specific case where a task has a single time dependency and we want to interactively set it to complete a flag is set so that it is not automatically re-queued when set to complete. The flag is applied up the node hierarchy until reach a node with a repeat or cron attribute. This behaviour allow repeat values to be incremented interactively. A repeat attribute is incremented when all the child nodes are complete in this case the child nodes are automatically re-queued
void force_state( string absolute_node_path: Path name to node. The path must begin with a leading '/' State::State state : [ unknown | complete | queued | submitted | active | aborted ] ) void force_state( list paths : A list of absolute node paths. The paths must begin with a leading '/' State::State state : [ unknown | complete | queued | submitted | active | aborted ] )Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) # force a single node to complete ci.force_state('/s1/f1',State.complete) # force a list of nodes to complete paths = [ '/s1/t1', '/s1/t2', '/s1/f1/t1' ] ci.force_state(paths,State.complete) except RuntimeError, e: print(str(e))Effect:
Lets see the effect of forcing complete on the following defs
suite s1 task t1; time 10:00 # will complete straight away task t2; time 10:00 13:00 01:00 # will re-queue 3 times and complete on fourthIn the last case (task t2) after each force complete, the next time slot is incremented. This can be seen by calling the Why command.
force_state( (Client)arg1, (list)arg2, (State)arg3) -> None
- Client.force_state_recursive((Client)arg1, (str)arg2, (State)arg3) None :
- Force node(s) to a given state recursively
void force_state_recursive( string absolute_node_path: Path name to node.The paths must begin with a leading '/' State::State state : [ unknown | complete | queued | submitted | active | aborted ] ) void force_state_recursive( list paths : A list of absolute node paths.The paths must begin with a leading '/' State::State state : [ unknown | complete | queued | submitted | active | aborted ] )
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.force_state_recursive('/s1/f1',State.complete) # recursively force a list of nodes to complete paths = [ '/s1', '/s2', '/s1/f1/t1' ] ci.force_state_recursive(paths,State.complete) except RuntimeError, e: print(str(e))
force_state_recursive( (Client)arg1, (list)arg2, (State)arg3) -> None
- Client.free_all_dep((Client)arg1, (str)arg2) None :
- Free all trigger, date and all time(day, today, cron,etc) dependencies
void free_all_dep( string absolute_node_path : Path name to node )
After freeing the time related dependencies (i.e time,today,cron) the next time slot will be missed.
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.free_all_dep('/s1/task') except RuntimeError, e: print(str(e))
free_all_dep( (Client)arg1, (list)arg2) -> None
- Client.free_date_dep((Client)arg1, (str)arg2) None :
- Free date dependencies for a node
void free_date_dep( string absolute_node_path : Path name to node )
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.free_date_dep('/s1/task') except RuntimeError, e: print(str(e))
free_date_dep( (Client)arg1, (list)arg2) -> None
- Client.free_time_dep((Client)arg1, (str)arg2) None :
- Free all time dependencies. i.e time, day, today, cron
void free_time_dep( string absolute_node_path : Path name to node )
After freeing the time related dependencies (i.e time,today,cron) the next time slot will be missed.
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.free_time_dep('/s1/task') except RuntimeError, e: print(str(e))
free_time_dep( (Client)arg1, (list)arg2) -> None
- Client.free_trigger_dep((Client)arg1, (str)arg2) None :
- Free trigger dependencies for a node
void free_trigger_dep( string absolute_node_path : Path name to node )
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.free_trigger_dep('/s1/f1/task') except RuntimeError, e: print(str(e))
free_trigger_dep( (Client)arg1, (list)arg2) -> None
- Client.get_defs((Client)arg1) Defs :
Returns the suite definition stored on the Client.
Use ecflow.Client.sync_local() to retrieve the definition from the server first.
The definition is retained in memory until the next call to sync_local().
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.sync_local() # get the definition from the server and store on 'ci'
print(ci.get_defs()) # print out definition stored in the client
print(ci.get_defs()) # print again, this shows that defs is retained on ci
except RuntimeError, e:
print(str(e))
- Client.get_file((Client)arg1, (str)task[, (str)type='script'[, (str)max_lines='10000'[, (bool)as_bytes=False]]]) object :
The File command is used to request the various file types associated with a node.
By default, the output is composed of the last 10000 lines of the file. The number of lines can be customised via the max_lines parameter.
The content can be retrieved as a sequence of ‘bytes’. This allows to download a file that contains invalid Unicode sequence, without causing an UnicodeDecodeError to be raised.
string get_file(
string absolute_node_path : Path name to node
[(string)file_type='script'] : file_type = [ script<default> | job | jobout | manual | kill | stat ]
[(string)max_lines='10000'] : The number of lines in the file to return
[(bool)as_bytes=False] : A flag indicating if the output should be 'bytes'; by default the output is of type 'str'
)
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
for file in [ 'script', 'job', 'jobout', 'manual', 'kill', 'stat' ]:
print(ci.get_file('/suite/f1/t1',file)) # print the contents of the file
except RuntimeError, e:
print(str(e))
- Client.get_host((Client)arg1) str :
Return the host, assume set_host_port() has been set, otherwise return localhost
- Client.get_log((Client)arg1, (int)arg2) str :
Request the ecflow_server to return the log file contents as a string
Use with caution as the returned string could be several megabytes. Only enabled in the debug build of ECF.
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
print(ci.get_log(100) # get the 100 last lines from server log file
except RuntimeError, e:
print(str(e))
- Client.get_port((Client)arg1) str :
Return the port, assume set_host_port() has been set. otherwise returns 3141
- Client.get_server_defs((Client)arg1) int :
Get all suite Node tree’s from the ecflow_server.
The definition is retained in memory until the next call to get_server_defs().
This is important since get_server_defs() could return several megabytes of data.
Hence we only want to call it once, and then access it locally with get_defs().
If you need to access the server definition in a loop use ecflow.Client.sync_local instead
since this is capable of returning incremental changes, and thus considerably
reducing the network load.
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.get_server_defs() # get the definition from the server and store on 'ci'
print(ci.get_defs()) # print out definition stored in the client
print(ci.get_defs()) # print again, this shows that defs is retained on ci
except RuntimeError, e:
print(str(e))
- Client.group((Client)arg1, (str)arg2) int :
Allows a series of commands to be executed in the ecflow_server
void group(
string cmds : a list of ';' separated commands
)
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.group('get; show')
ci.group('get; show state') # show node states and trigger abstract syntax trees
except RuntimeError, e:
print(str(e))
- Client.halt_server((Client)arg1) int :
Halt the ecflow_server
Stop server communication with jobs, and new job scheduling, and stops check pointing. See server states
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.halt_server()
except RuntimeError, e:
print(str(e))
- Client.in_sync((Client)arg1) bool :
Returns true if the definition on the client is in sync with the ecflow_server
Warning
Calling in_sync() is only valid after a call to sync_local().
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.sync_local() # very first call gets the full Defs
client_defs = ci.get_defs() # End user access to the returned Defs
... after a period of time
ci.sync_local() # Subsequent calls to sync_local() users the local Defs to sync incrementally
if ci.in_sync(): # returns true changed and changes applied to client
print('Client is now in sync with server')
client_defs = ci.get_defs() # End user access to the returned Defs
except RuntimeError, e:
print(str(e))
- Client.is_auto_sync_enabled((Client)arg1) bool :
Returns true if automatic syncing enabled
- Client.job_generation((Client)arg1, (str)arg2) int :
Job submission for chosen Node based on dependencies
The ecflow_server traverses the node tree every 60 seconds, and if the dependencies are free does job creation and submission. Sometimes the user may free time/date dependencies to avoid waiting for the server poll, this commands allows early job generation
void job_generation(
string absolute_node_path: Path name for job generation to start from
)
If empty string specified generates for full definition.
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.job_generation('/s1') # generate jobs for suite '/s1
except RuntimeError, e:
print(str(e))
- Client.kill((Client)arg1, (str)arg2) None :
- Kills the job associated with the node
void kill( list paths: List of paths. Paths must begin with a leading '/' character ) void kill( string absolute_node_path: Path name to node to kill. )
If a family or suite is selected, will kill hierarchically. Kill uses the ECF_KILL_CMD variable. After variable substitution it is invoked as a command. The ECF_KILL_CMD variable should be written in such a way that the output is written to %ECF_JOB%.kill, i.e:
kill -15 %ECF_RID% > %ECF_JOB%.kill 2>&1 /home/ma/emos/bin/ecfkill %USER% %HOST% %ECF_RID% %ECF_JOB% > %ECF_JOB%.kill 2>&1
Exceptions can be raised because:
The absolute_node_path does not exist in the server
ECF_KILL_CMD variable is not defined
variable substitution fails
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.kill('/s1/f1') time.sleep(2) print(ci.file('/s1/t1','kill')) # request kill output except RuntimeError, e: print(str(e))
kill( (Client)arg1, (list)arg2) -> None
- Client.load((Client)arg1, (str)path_to_defs[, (bool)force=False[, (bool)check_only=False[, (bool)print=False[, (bool)stats=False]]]]) int :
- Load a suite definition or checkpoint file given by the file_path argument into the ecflow_server
void load( string file_path : path name to the definition file [(bool)force=False] : If true overwrite suite of same name [(bool)print=False] : print parsed defs to standard out )
By default throws a RuntimeError exception for errors. If force is not used and suite of the same name already exists in the server, then a error is thrown
Usage:
defs_file = 'Hello.def' defs = Defs() suite = def.add_suite('s1') family = suite.add_family('f1') for i in [ '_1', '_2', '_3' ]: family.add_task( 't' + i ) defs.save_as_defs(defs_file) # write out in memory defs into the file 'Hello.def' ... try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.load(defs_file) # open and parse defs or checkpoint file, and load into server. except RuntimeError, e: print(str(e))
- load( (Client)arg1, (Defs)defs [, (bool)force=False]) -> int :
Load a in memory suite definition into the ecflow_server
void load( Defs defs : A in memory definition [(bool)force=False] : for true overwrite suite of same name )
If force is not used and suite already exists in the server, then a error is thrown.
Usage:
defs = Defs() suite = defs.add_suite('s1') family = suite.add_family('f1') for i in [ '_1', '_2', '_3' ]: family.add_task( Task( 't' + i) ) ... try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.load(defs) # Load in memory defs, into the server except RuntimeError, e: print(str(e))
- Client.log_msg((Client)arg1, (str)arg2) int
- Client.new_log((Client)arg1[, (str)path='']) int :
Request the ecflow_server to use the path provided, as the new log file
The old log file is released.
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.new_log('/path/log.log') # use '/path/log,log' as the new log file
# To keep track of log file Can change ECF_LOG
ci.alter('','change','variable','ECF_LOG','/new/path.log')
ci.new_log()
except RuntimeError, e:
print(str(e))
- Client.news_local((Client)arg1) bool :
Query the ecflow_server to detect any changes.
This returns a simple bool, if there has been changes, the user should call ecflow.Client.sync_local.
This will bring the client in sync with changes in the server. If sync_local() is not called
then calling news_local() will always return true.
news_local() uses the definition stored on the client:
bool news_local()
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
if ci.news_local(): # has the server changed
print('Server Changed') # server changed bring client in sync with server
ci.sync_local() # get the full definition from the server if first time
# otherwise apply incremental changes to Client definition,
# bringing it in sync with the server definition
print(ci.get_defs()) # print the synchronised definition. Should be same as server
except RuntimeError, e:
print(str(e))
- Client.order((Client)arg1, (str)arg2, (str)arg3) None :
Re-orders the nodes in the suite definition held by the ecflow_server
It should be noted that in the absence of dependencies, the order in which tasks are submitted, depends on the order in the definition. This changes the order and hence affects the submission order
void order(
string absolute_node_path: Path name to node.
string order_type : Must be one of [ top | bottom | alpha | order | up | down ]
)
o top raises the node within its parent, so that it is first
o bottom lowers the node within its parent, so that it is last
o alpha Arranges for all the peers of selected note to be sorted alphabetically
o order Arranges for all the peers of selected note to be sorted in reverse alphabet
o up Moves the selected node up one place amongst its peers
o down Moves the selected node down one place amongst its peers
Exceptions can be raised because:
The absolute_node_path does not exist in the server
The order_type is not the right type
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.order('/s1/f1','top')
except RuntimeError, e:
print(str(e))
- Client.ping((Client)arg1) int :
Checks if the ecflow_server is running
void ping()
The default behaviour is to check on host ‘localhost’ and port 3141 It should be noted that any Client function will fail if the server is is not running. Hence ping() is not strictly required. However its main distinction from other Client function is that it is quite fast.
Usage:
try:
ci = Client('localhost','3150')
ci.ping()
print('------- Server already running------')
do_something_with_server(ci)
except RuntimeError, e:
print('------- Server *NOT* running------' + str(e))
- Client.plug((Client)arg1, (str)arg2, (str)arg3) int :
Plug command is used to move nodes
The destination node can be on another ecflow_server. In which case the destination path should be of the form ‘//<host>:<port>/suite/family/task
void plug(
string source_absolute_node_path : Path name to source node
string destination_absolute_node_path : Path name to destination node. Note if only
'//host:port' is specified the whole suite can be moved
)
By default throws a RuntimeError exception for errors.
Exceptions can be raised because:
Another user already has an lock.
source/destination paths do not exist on the corresponding servers
If the destination node path is empty, i.e. only host:port is specified, then the source node must correspond to a suite.
If the source node is added as a child, then its name must be unique
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.plug('/suite','host3:3141')
except RuntimeError, e:
print(str(e))
- Client.query((Client)arg1, (str)arg2, (str)arg3[, (str)arg4]) str :
Query the status of event, meter, state, variable, limit, limit_max or trigger expression without blocking
state return [unknown | complete | queued | aborted | submitted | active] to standard out
dstate return [unknown | complete | queued | suspended | aborted | submitted | active] to standard out
event return ‘set’ | ‘clear’ to standard out
meter return value of the meter to standard out
limit return value of the limit to standard out
limit_max return max value of the limit to standard out
variable return value to standard out
trigger returns ‘true’ if the expression is true, otherwise ‘false’
:
string query(
string query_type # [ event | meter | variable | trigger | limit | limit_max ]
string path_to_attribute # path to the attribute
string attribute # name of the attribute or trigger expression
)
By default throws a exception for errors.
Exceptions can be raised if the path to the attribute does not exist and because:
No event of the given name exists on the specified node
No meter of the given name exists on the specified node
No limit of the given name exists on the specified node
No variable of the given name (repeat or generated variable) exists on the specified node or any of its parent
trigger expression does not parse, or if references to node/attributes are not defined
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
res = ci.query('event','/path/to/node','event_name') # returns 'SET' | 'CLEAR'
res = ci.query('meter','/path/to/node','meter_name') # returns meter value as a string
res = ci.query('limit','/path/to/node','limit_name') # returns limit value as a string
res = ci.query('limit_max','/path/to/node','limit_name') # returns max limit value as a string
res = ci.query('variable','/path/to/node,'var') # returns variable value as a string
res = ci.query('trigger','/path/to/node','/joe90 == complete') # return 'true' | 'false' as a string
res = ci.query('state','/path/to/node') # return node state as a string
res = ci.query('dstate','/path/to/node') # return node state as a string,can include suspended
except RuntimeError, e:
print str(e)
- Client.reload_custom_passwd_file((Client)arg1) int :
reload the custom passwd file. <host>.<port>.ecf.custom_passwd. For users using ECF_USER or –user or set_user_name()
- Client.reload_passwd_file((Client)arg1) int :
reload the passwd file. <host>.<port>.ecf.passwd
- Client.reload_wl_file((Client)arg1) int :
Request that the ecflow_server reload the white list file.
The white list file if present, can be used to control who has read/write access to the ecflow_server:
void reload_wl_file()
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.reload_wl_file()
except RuntimeError, e:
print(str(e))
- Client.replace((Client)arg1, (str)arg2, (str)arg3, (bool)arg4, (bool)arg5) int :
- Replaces a node in a suite definition with the given path. The definition is in the ecflow_server
void replace( string absolute_node_path: Path name to node in the client defs. This is also the node we want to replace in the server. string client_defs_file : File path to defs files, that provides the definition of the new node [(bool)parent=False] : create parent families or suite as needed, when absolute_node_path does not exist in the server [(bool)force=False] : check for zombies, if force = true, bypass checks ) void replace( string absolute_node_path: Path name to node in the client defs. This is also the node we want to replace in the server. Defs client_defs : In memory client definition that provides the definition of the new node [(bool)parent=False] : create parent families or suite as needed, when absolute_node_path does not exist in the server [(bool)force=False] : check for zombies, force = true, bypass checks )Exceptions can be raised because:
The absolute_node_path does not exist in the provided definition
The provided client definition must be free of errors
If the third argument is not provided, then the absolute_node_path must exist in the server defs
replace will fail, if child task nodes are in active / submitted state
After replace is done, we check trigger expressions. These are reported to standard output. It is up to the user to correct invalid trigger expressions, otherwise the tasks will not run. Please note, you can use check() to check trigger expression and limits in the server.
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.replace('/s1/f1','/tmp/defs.def') except RuntimeError, e: print(str(e)) try: ci.replace('/s1',client_defs) # replace suite 's1' in the server, with 's1' in the client_defs except RuntimeError, e: print(str(e))
replace( (Client)arg1, (str)arg2, (Defs)arg3, (bool)arg4, (bool)arg5) -> int
replace( (Client)arg1, (str)arg2, (Defs)arg3) -> None
replace( (Client)arg1, (str)arg2, (str)arg3) -> None
- Client.requeue((Client)arg1, (str)abs_node_path[, (str)option='']) None :
- Re queues the specified node (s)
void requeue( list paths : A list of paths. Node paths must begin with a leading '/' character [(str)option=''] : option = ('' | 'abort' | 'force') '' : empty string, the default, re-queue the node abort: means re-queue only aborted tasks below node force: means re-queueing even if there are nodes that are active or submitted ) void requeue( string absolute_node_path : Path name to node [(string)option=''] : option = ('' | 'abort' | 'force') )Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.requeue('/s1','abort') # re-queue aborted tasks below suite /s1 path_list = ['/s1/f1/t1','/s2/f1/t2'] ci.requeue(path_list) except RuntimeError, e: print(str(e))
requeue( (Client)arg1, (list)paths [, (str)option=’’]) -> None
- Client.reset((Client)arg1) None :
reset client definition, and handle number
- Client.restart_server((Client)arg1) int :
Restart the ecflow_server
Start job scheduling, communication with jobs, and respond to all requests. See server states
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.restart_server()
except RuntimeError, e:
print(str(e))
- Client.restore((Client)arg1, (str)arg2) None :
- Restore archived nodes.
Usage:
string restore( list paths # List of paths. ) string restore( string absolute_node_path )
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) print ci.restore('/suite1') except RuntimeError, e: print str(e)
restore( (Client)arg1, (list)arg2) -> None
- Client.restore_from_checkpt((Client)arg1) int :
Request the ecflow_server loads the check point file from disk
The server will first try to open file at ECF_HOME/ECF_CHECK if that fails it will then try path ECF_HOME/ECF_CHECKOLD. An error is returned if the server has not been halted or contains a suite definition
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.halt_server() # server must be halted, otherwise restore_from_checkpt will throw
ci.restore_from_checkpt() # restore the definition from the check point file
except RuntimeError, e:
print(str(e))
- Client.resume((Client)arg1, (str)arg2) None :
- Resume job creation / generation for the given node
void resume( list paths: List of paths. Paths must begin with a leading '/' character ) void resume( string absolute_node_path: Path name to node to resume. )
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.resume('/s1/f1/task1') paths = ['/s1/f1/t1','/s2/f1/t2'] ci.resume(paths) except RuntimeError, e: print(str(e))
resume( (Client)arg1, (list)arg2) -> None
- Client.run((Client)arg1, (str)arg2, (bool)arg3) None :
Immediately run the jobs associated with the input node.
Ignore triggers, limits, suspended, time or date dependencies, just run the task. When a job completes, it may be automatically re-queued if it has multiple time dependencies. In the specific case where a task has a SINGLE time dependency and we want to avoid re running the task then a flag is set so that it is not automatically re-queued when set to complete. The flag is applied up the node hierarchy until we reach a node with a repeat or cron attribute. This behaviour allow repeat values to be incremented interactively. A repeat attribute is incremented when all the child nodes are complete in this case the child nodes are automatically re-queued
void run( string absolute_node_path : Path name to node. If the path is suite/family will recursively run all child tasks [(bool)force=False] : If true, run even if there are nodes that are active or submitted. ) void run( list paths : List of paths. If the path is suite/family will recursively run all child tasks [(bool)force=False] : If true, run even if there are nodes that are active or submitted. )Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.run('/s1') # run all tasks under suite /s1 ci.run(['/s1/f1/t1','/s2/f1/t2']) # run all tasks specified except RuntimeError, e: print(str(e))Effect:
Lets see the effect of run command on the following defs:
suite s1 task t1; time 10:00 # will complete straight away task t2; time 10:00 13:00 01:00 # will re-queue 3 times and complete on fourth runIn the last case (task t2) after each run the next time slot is incremented. This can be seen by calling the Why command.
run( (Client)arg1, (list)arg2, (bool)arg3) -> None
- Client.server_version((Client)arg1) str :
Returns the server version, can throw for old servers, that did not implement this request.
- Client.set_auto_sync((Client)arg1, (bool)arg2) None :
If true automatically sync with local definition after each call.
- Client.set_child_complete_del_vars((Client)arg1, (list)arg2) None :
Set the list of variables to be deleted when a task becomes complete Needs a list of strings, representing the variable names.
- Client.set_child_init_add_vars((Client)arg1, (dict)arg2) None :
- Set the list of variables to be added when a task becomes active
Needs a dictionary of name/value pairs, or a list of ecflow Variables
- set_child_init_add_vars( (Client)arg1, (list)arg2) -> None :
Set the list of variables to be added when a task becomes active Needs a dictionary of name/value pairs, or a list of ecflow Variables
- Client.set_child_password((Client)arg1, (str)arg2) None :
Set the password, needed for authentication, provided by the server using %ECF_PASS%
By default the environment variable ECF_PASS is read for the jobs password This can be overridden for the python child api
- Client.set_child_path((Client)arg1, (str)arg2) None :
Set the path to the task, obtained from server using %ECF_NAME%
By default the environment variable ECF_NAME is read for the task path This can be overridden for the python child api
- Client.set_child_pid((Client)arg1, (str)arg2) None :
Set the process id of this job
By default the environment variable ECF_RID is read for the jobs process or remote id This can be overridden for the python child api
- set_child_pid( (Client)arg1, (int)arg2) -> None :
Set the process id of this job
By default the environment variable ECF_RID is read for the jobs process or remote id This can be overridden for the python child api
- Client.set_child_timeout((Client)arg1, (int)arg2) None :
Set timeout if child cannot connect to server, default is 24 hours. The input is required to be in seconds
By default the environment variable ECF_TIMEOUT is read to control how long child command should attempt to connect to the server This can be overridden for the python child api
- Client.set_child_try_no((Client)arg1, (int)arg2) None :
Set the try no, i.e the number of times this job has run, obtained from the server, using %ECF_TRYNO%
By default the environment variable ECF_TRYNO is read to record number of times job has been run This can be overridden for the python child api
- Client.set_connection_attempts((Client)arg1, (int)arg2) None :
Set the number of times to connect to ecflow_server, in case of connection failures
The period between connection attempts is handled by Client.set_retry_connection_period(). If the network is unreliable the connection attempts can be be increased, likewise when the network is stable this number could be reduced to one. This can increase responsiveness and reduce latency. Default value is set as 2. Setting a value less than one is ignored, will default to 1 in this case:
set_connection_attempts(
int attempts # must be an integer >= 1
)
Exceptions:
None
Usage:
ci = Client()
ci.set_connection_attempts(3) # make 3 attempts for server connection
ci.set_retry_connection_period(1) # wait 1 second between each attempt
- Client.set_host_port((Client)arg1, (str)arg2, (str)arg3) None :
Explicitly set the host and port to be used by the client, overriding the default host name (localhost) and port (3141) and the environment variables: ECF_HOST and ECF_PORT.
set_host_port( string host, # The server name. Cannot be empty. string port # The port on the server, must be unique to the server ) set_host_port( string host, # The server name. Cannot be empty. int port # The port on the server, must be unique to the server ) set_host_port( string host_port, # Expects <host>:<port> or <host>@<port> )Exceptions:
Raise a RuntimeError if the host or port is empty
Usage:
try: ci = Client() ci.set_host_port('localhost','3150') ci.set_host_port('avi',3150) ci.set_host_port('avi:3150') except RuntimeError, e: print(str(e))
set_host_port( (Client)arg1, (str)arg2) -> None
set_host_port( (Client)arg1, (str)arg2, (int)arg3) -> None
- Client.set_retry_connection_period((Client)arg1, (int)arg2) None :
Set the sleep period between connection attempts
Whenever there is a connection failure we wait a number of seconds before trying again. i.e. to get round glitches in the network. For the ping command this is hard wired as 1 second. This wait between connection attempts can be configured here. i.e This could be reduced to increase responsiveness. Default: In debug this period is 1 second and in release mode 10 seconds.
set_retry_connection_period(
int period # must be an integer >= 0
)
Exceptions:
None
Usage:
ci = Client()
ci.set_connection_attempts(3) # make 3 attempts for server connection
ci.set_retry_connection_period(1) # wait 1 second between each attempt
- Client.set_user_name((Client)arg1, (str)arg2) None :
set user name. A password must be provided in the file <host>.<port>.ecf.custom_passwd
- Client.set_zombie_child_timeout((Client)arg1, (int)arg2) None :
Set timeout for zombie child commands,that cannot connect to server, default is 24 hours. The input is required to be in seconds
- Client.shutdown_server((Client)arg1) int :
Shut down the ecflow_server
Stop server from scheduling new jobs. See server states
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.shutdown_server()
except RuntimeError, e:
print(str(e))
- Client.sort_attributes((Client)arg1, (str)abs_node_path, (str)attribute_name[, (bool)recursive=True]) None
sort_attributes( (Client)arg1, (list)paths, (str)attribute_name [, (bool)recursive=True]) -> None
- Client.stats((Client)arg1[, (bool)to_stdout]) str :
Returns the ecflow_server statistics as a string
Warning
When called without arguments, this function will print the statistics to stdout, before returning the information as a string.
To avoid printing the output, set the boolean flag to_stdout to False.
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
stats = ci.stats() # prints to stdout
stats = ci.stats(True) # prints to stdout
stats = ci.stats(False) # does not print to stdout
print(stats)
except RuntimeError, e:
print(str(e))
- Client.stats_reset((Client)arg1) None :
Resets the statistical data in the server
void stats_reset()
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.stats_reset()
except RuntimeError, e:
print(str(e))
- Client.status((Client)arg1, (str)arg2) None :
- Shows the status of a job associated with a task
void status( list paths: List of paths. Paths must begin with a leading '/' character ) void status( string absolute_node_path )
If a family or suite is selected, will invoke status command hierarchically. Status uses the ECF_STATUS_CMD variable. After variable substitution it is invoked as a command. The command should be written in such a way that the output is written to %ECF_JOB%.stat, i.e:
/home/ma/emos/bin/ecfstatus %USER% %HOST% %ECF_RID% %ECF_JOB% > %ECF_JOB%.stat 2>&1
Exceptions can be raised because:
The absolute_node_path does not exist in the server
ECF_STATUS_CMD variable is not defined
variable substitution fails
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.status('/s1/t1') time.sleep(2) print(ci.file('/s1/t1','stats')) # request status output except RuntimeError, e: print(str(e))
status( (Client)arg1, (list)arg2) -> None
- Client.suites((Client)arg1) list :
Returns a list strings representing the suite names
list(string) suites()
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
suites = ci.suites()
print(suites)
except RuntimeError, e:
print(str(e))
- Client.suspend((Client)arg1, (str)arg2) None :
- Suspend job creation / generation for the given node
void suspend( list paths: List of paths. Paths must begin with a leading '/' character ) void suspend( string absolute_node_path: Path name to node to suspend. )
Usage:
try: ci = Client() # use default host(ECF_HOST) & port(ECF_PORT) ci.suspend('/s1/f1/task1') paths = ['/s1/f1/t1','/s2/f1/t2'] ci.suspend(paths) except RuntimeError, e: print(str(e))
suspend( (Client)arg1, (list)arg2) -> None
- Client.sync_local((Client)arg1[, (bool)sync_suite_clock=False]) int :
Requests that ecflow_server returns the full definition or incremental change made and applies them to the client Defs
When there is a very large definition, calling ecflow.Client.get_server_defs each time can be very expensive
both in terms of memory, speed, and network bandwidth. The alternative is to call
this function, which will get the incremental changes, and apply them local client suite definition
effectively synchronising the client and server Defs.
If the period of time between two sync() calls is too long, then the full server definition
is returned and assigned to the client Defs.
We can determine if the changes were applied by calling in_sync() after the call to sync_local():
void sync_local(); # The very first call, will get the full Defs.
Exceptions:
raise a RuntimeError if the delta change cannot be applied.
this could happen if the client Defs bears no resemblance to server Defs
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.sync_local() # Very first call gets the full Defs
client_defs = ci.get_defs() # End user access to the returned Defs
... after a period of time
ci.sync_local() # Subsequent calls to sync_local() users the local Defs to sync incrementally
if ci.in_sync(): # returns true server changed and changes applied to client
print('Client is now in sync with server')
client_defs = ci.get_defs() # End user access to the returned Defs
except RuntimeError, e:
print(str(e))
Calling sync_local() is considerably faster than calling get_server_defs() for large Definitions
- Client.terminate_server((Client)arg1) int :
Terminate the ecflow_server
Usage:
try:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
ci.terminate_server()
except RuntimeError, e:
print(str(e))
- Client.version((Client)arg1) str :
Returns the current client version
- Client.wait_for_server_reply((Client)arg1[, (int)time_out=60]) bool :
Wait for a response from the ecflow_server:
void wait_for_server_reply(
int time_out : (default = 60)
)
This is used to check if server has started. Typically for tests. Returns true if server(ping) replies before time out, otherwise false
Usage:
ci = Client() # use default host(ECF_HOST) & port(ECF_PORT)
if ci.wait_for_server_reply(30):
print('Server is alive')
else:
print('Timed out after 30 second wait for server response.?')
- Client.zombie_adopt((Client)arg1, (str)arg2) int
zombie_adopt( (Client)arg1, (list)arg2) -> None
- Client.zombie_block((Client)arg1, (str)arg2) int
zombie_block( (Client)arg1, (list)arg2) -> None
- Client.zombie_fail((Client)arg1, (str)arg2) int
zombie_fail( (Client)arg1, (list)arg2) -> None
- Client.zombie_fob((Client)arg1, (str)arg2) int
zombie_fob( (Client)arg1, (list)arg2) -> None
- Client.zombie_kill((Client)arg1, (str)arg2) int
zombie_kill( (Client)arg1, (list)arg2) -> None
- Client.zombie_remove((Client)arg1, (str)arg2) int
zombie_remove( (Client)arg1, (list)arg2) -> None