obr.signac_wrapper.operations#

Module Contents#

Classes#

OpenFOAMProject

A signac project class specialized for workflow management.

Functions#

is_case(→ bool)

is_job(→ bool)

checks OBR_JOB is set to the current job.id used to prevent multiple

operation_complete(→ bool)

An operation is considered to be complete if an entry in the job document with same arguments exists and state is success

basic_eligible(→ bool)

Dispatches to standard checks if operations are eligible for given job

parent_job_is_ready(→ str)

Checks whether the parent of the given job is ready

_link_path(base, dst, parent_id, copy_instead_link)

creates file tree under dst with same folder structure as base but all

needs_initialization(→ bool)

Check if this job has been initialized already, without performing the initialization

initialize_if_required(→ bool)

check if this job has been already linked to

get_args(→ Union[dict, str])

operation can get args either via function call or it statepoint

execute_operation(→ Literal[True])

check whether an operation is requested

execute_post_build(operation_name, job)

check whether an operation is requested

execute_pre_build(operation_name, job)

check whether an operation is requested

start_job_state(→ None)

end_job_state(→ Literal[True])

dispatch_pre_hooks(operation_name, job)

just forwards to start_job_state and execute_pre_build

dispatch_post_hooks(operation_name, job)

Forwards to execute_post_build, performs md5sum calculation of case files and finishes with end_job_state

set_failure(operation_name, error, job)

just forwards to start_job_state and execute_pre_build

copy_on_uses(args, job, path, target)

copies the file specified in args['uses'] to path/target

controlDict(job[, args])

sets up the controlDict

MultiCase(job[, args])

Dummy operation to generate multiple cases

blockMesh(job[, args])

shell(job[, args])

initialConditions(job[, args])

A special operation to allow copying from 0.orig folders

fvSolution(job[, args])

fvSchemes(job[, args])

transportProperties(job[, args])

turbulenceProperties(job[, args])

setKeyValuePair(job[, args])

has_mesh(→ bool)

Check whether all mesh files are files (owning) or symlinks (non-owning)

decomposePar(job[, args])

fetchCase(job[, args])

is_locked(→ bool)

Cases that are already started are set to tmp_lock

refineMesh(job[, args])

allClean(job[, args])

checkMesh(job[, args])

get_number_of_procs(→ int)

Deduces the number of processors

get_values(→ set)

find all different statepoint values

run_cmd_builder(→ str)

Builds the cli command to run a OpenFOAM application

validate_state_impl(→ None)

Perform a detailed update of the job state

resetCase(→ None)

Dummy operation that calls resetCase

validateState(→ None)

Dummy operation which forwards to validate_state_impl. The reason for keeping this function

runParallelSolver(→ str)

runSerialSolver(job[, args])

archive(→ Literal[True])

apply(*jobs[, args])

Attributes#

obr.signac_wrapper.operations.logger#
class obr.signac_wrapper.operations.OpenFOAMProject(*args, **kwargs)#

Bases: flow.FlowProject

A signac project class specialized for workflow management.

This class is used to define, execute, and submit workflows based on operations and conditions.

Users typically interact with this class through its command line interface.

This is a typical example of how to use this class:

@FlowProject.operation
def hello(job):
    print('hello', job)

FlowProject().main()

Parameters#

pathstr, optional

The project directory. By default, the current working directory (Default value = None).

environmentflow.environment.ComputeEnvironment

An environment to use for scheduler submission. If None, the environment is automatically identified. The default is None.

entrypointdict

A dictionary with two possible keys: 'executable' and 'path'. The path represents the location of the script file (the script file must call FlowProject.main()). The executable represents the location of the Python interpreter used for the execution of BaseFlowOperation that are Python functions.

filtered_jobs: list[signac.job.Job] = []#
print_operations()#
filter_jobs(filters: list[str]) list[signac.job.Job]#

filter_jobs accepts a list of filters.

The filters will be applied to all jobs inside the OpenFOAMProject instance and the filtered jobs will be returned as a list.

query(jobs: list[signac.job.Job], query: list[obr.core.queries.Query]) list[dict]#

return list of job ids as result of Query.

set_entrypoint(entrypoint: dict)#

Sets the entrypoint for a project, this is useful for submit so that submit writes scripts that call obr run -o <args> instead of the default signac run -o <args> call

obr.signac_wrapper.operations.generate#
obr.signac_wrapper.operations.simulate#
obr.signac_wrapper.operations.is_case(job: signac.job.Job) bool#
obr.signac_wrapper.operations.is_job(job: signac.job.Job) bool#

checks OBR_JOB is set to the current job.id used to prevent multiple execution of jobs if –job=id is set

obr.signac_wrapper.operations.operation_complete(job: signac.job.Job, operation: str) bool#

An operation is considered to be complete if an entry in the job document with same arguments exists and state is success

obr.signac_wrapper.operations.basic_eligible(job: signac.job.Job, operation: str) bool#

Dispatches to standard checks if operations are eligible for given job

this includes:
  • check for lock, to avoid running operations when calling ‘obr run’ before operation is finished

  • check if parent case is ready

  • operation has been requested for job

  • copy and link files and folder

obr.signac_wrapper.operations.parent_job_is_ready(job: signac.job.Job) str#

Checks whether the parent of the given job is ready

creates file tree under dst with same folder structure as base but all files are relative symlinks

obr.signac_wrapper.operations.needs_initialization(job: signac.job.Job) bool#

Check if this job has been initialized already, without performing the initialization

obr.signac_wrapper.operations.initialize_if_required(job: signac.job.Job) bool#

check if this job has been already linked to

The default strategy is to link all files. If a file is modified the modifying operations are responsible for unlinking and copying

obr.signac_wrapper.operations.get_args(job: signac.job.Job, args: dict | str) dict | str#

operation can get args either via function call or it statepoint if no args are passed via function the args from the statepoint are taken

also args can be just a str in case of shell scripts

obr.signac_wrapper.operations.execute_operation(job: signac.job.Job, operation_name: str, operations) Literal[True]#

check whether an operation is requested

operation can be simple operations defined by a keyword like blockMesh or operations with parameters defined by a dictionary

obr.signac_wrapper.operations.execute_post_build(operation_name: str, job: signac.job.Job)#

check whether an operation is requested

operation can be simple operations defined by a keyword like blockMesh or operations with parameters defined by a dictionary

obr.signac_wrapper.operations.execute_pre_build(operation_name: str, job: signac.job.Job)#

check whether an operation is requested

operation can be simple operations defined by a keyword like blockMesh or operations with parameters defined by a dictionary

obr.signac_wrapper.operations.start_job_state(_, job: signac.job.Job) None#
obr.signac_wrapper.operations.end_job_state(_, job: signac.job.Job) Literal[True]#
obr.signac_wrapper.operations.dispatch_pre_hooks(operation_name: str, job: signac.job.Job)#

just forwards to start_job_state and execute_pre_build

obr.signac_wrapper.operations.dispatch_post_hooks(operation_name: str, job: signac.job.Job)#

Forwards to execute_post_build, performs md5sum calculation of case files and finishes with end_job_state

obr.signac_wrapper.operations.set_failure(operation_name: str, error, job: signac.job.Job)#

just forwards to start_job_state and execute_pre_build

obr.signac_wrapper.operations.copy_on_uses(args: dict, job: signac.job.Job, path: str, target: str)#

copies the file specified in args[‘uses’] to path/target

obr.signac_wrapper.operations.controlDict(job: signac.job.Job, args={})#

sets up the controlDict

obr.signac_wrapper.operations.MultiCase(job: signac.job.Job, args={})#

Dummy operation to generate multiple cases

obr.signac_wrapper.operations.blockMesh(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.shell(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.initialConditions(job: signac.job.Job, args={})#

A special operation to allow copying from 0.orig folders

obr.signac_wrapper.operations.fvSolution(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.fvSchemes(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.transportProperties(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.turbulenceProperties(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.setKeyValuePair(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.has_mesh(job: signac.job.Job) bool#

Check whether all mesh files are files (owning) or symlinks (non-owning)

TODO check also for .obr files for state of operation

obr.signac_wrapper.operations.decomposePar(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.fetchCase(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.is_locked(job: signac.job.Job) bool#

Cases that are already started are set to tmp_lock dont try to execute them

obr.signac_wrapper.operations.refineMesh(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.allClean(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.checkMesh(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.get_number_of_procs(job: signac.job.Job) int#

Deduces the number of processors For performance reasons the cache is used to store the number of subdomains

obr.signac_wrapper.operations.get_values(jobs: list, key: str) set#

find all different statepoint values

obr.signac_wrapper.operations.run_cmd_builder(job: signac.job.Job, cmd_format: str, args: dict) str#

Builds the cli command to run a OpenFOAM application

obr.signac_wrapper.operations.validate_state_impl(_: str, job: signac.job.Job) None#

Perform a detailed update of the job state

obr.signac_wrapper.operations.resetCase(job: signac.job.Job, args={}) None#

Dummy operation that calls resetCase

obr.signac_wrapper.operations.validateState(job: signac.job.Job, args={}) None#

Dummy operation which forwards to validate_state_impl. The reason for keeping this function is that it can be called from the cli to force a detailed update

obr.signac_wrapper.operations.runParallelSolver(job: signac.job.Job, args={}) str#
obr.signac_wrapper.operations.runSerialSolver(job: signac.job.Job, args={})#
obr.signac_wrapper.operations.archive(job: signac.job.Job, args={}) Literal[True]#
obr.signac_wrapper.operations.apply(*jobs, args={})#