obr.signac_wrapper.operations

Module Contents

Classes

OpenFOAMProject

A signac project class specialized for workflow management.

Functions

is_case(→ bool)

is_job(→ bool)

checks OBR_JOB is set to the current job.id used to prevent multiple

operation_complete(→ bool)

An operation is considered to be complete if an entry in the job document with same arguments exists and state is success

basic_eligible(→ bool)

Dispatches to standard checks if operations are eligible for given job

parent_job_is_ready(→ str)

Checks whether the parent of the given job is ready

_link_path(base, dst, parent_id, copy_instead_link)

creates file tree under dst with same folder structure as base but all

needs_initialization(→ bool)

Check if this job has been initialized already, without performing the initialization

initialize_if_required(→ bool)

check if this job has been already linked to

get_args(→ Union[dict, str])

operation can get args either via function call or it statepoint

execute_operation(→ Literal[True])

check whether an operation is requested

execute_post_build(operation_name, job)

check whether an operation is requested

execute_pre_build(operation_name, job)

check whether an operation is requested

start_job_state(→ None)

end_job_state(→ Literal[True])

dispatch_pre_hooks(operation_name, job)

just forwards to start_job_state and execute_pre_build

dispatch_post_hooks(operation_name, job)

Forwards to execute_post_build, performs md5sum calculation of case files and finishes with end_job_state

set_failure(operation_name, error, job)

just forwards to start_job_state and execute_pre_build

copy_on_uses(args, job, path, target)

copies the file specified in args['uses'] to path/target

controlDict(job[, args])

sets up the controlDict

MultiCase(job[, args])

Dummy operation to generate multiple cases

blockMesh(job[, args])

replaceMesh(job[, args])

shell(job[, args])

initialConditions(job[, args])

A special operation to allow copying from 0.orig folders

fvSolution(job[, args])

fvSchemes(job[, args])

transportProperties(job[, args])

turbulenceProperties(job[, args])

setKeyValuePair(job[, args])

has_mesh(→ bool)

Check whether all mesh files are files (owning) or symlinks (non-owning)

decomposePar(job[, args])

fetchCase(job[, args])

is_locked(→ bool)

Cases that are already started are set to tmp_lock

refineMesh(job[, args])

allClean(job[, args])

checkMesh(job[, args])

get_number_of_procs(→ int)

Deduces the number of processors

get_values(→ set)

find all different statepoint values

run_cmd_builder(→ str)

Builds the cli command to run a OpenFOAM application

validate_state_impl(→ None)

Perform a detailed update of the job state

resetCase(→ None)

Dummy operation that calls resetCase

validateState(→ None)

Dummy operation which forwards to validate_state_impl. The reason for keeping this function

runParallelSolver(→ str)

runSerialSolver(job[, args])

archive(→ Literal[True])

apply(*jobs[, args])

Attributes

obr.signac_wrapper.operations.logger
class obr.signac_wrapper.operations.OpenFOAMProject(*args, **kwargs)

Bases: flow.FlowProject

A signac project class specialized for workflow management.

This class is used to define, execute, and submit workflows based on operations and conditions.

Users typically interact with this class through its command line interface.

This is a typical example of how to use this class:

@FlowProject.operation
def hello(job):
    print('hello', job)

FlowProject().main()

Parameters

pathstr, optional

The project directory. By default, the current working directory (Default value = None).

environmentflow.environment.ComputeEnvironment

An environment to use for scheduler submission. If None, the environment is automatically identified. The default is None.

entrypointdict

A dictionary with two possible keys: 'executable' and 'path'. The path represents the location of the script file (the script file must call FlowProject.main()). The executable represents the location of the Python interpreter used for the execution of BaseFlowOperation that are Python functions.

filtered_jobs: list[signac.job.Job] = []
print_operations()
filter_jobs(filters: list[str]) list[signac.job.Job]

filter_jobs accepts a list of filters.

The filters will be applied to all jobs inside the OpenFOAMProject instance and the filtered jobs will be returned as a list.

query(jobs: list[signac.job.Job], query: list[obr.core.queries.Query]) list[dict]

return list of job ids as result of Query.

set_entrypoint(entrypoint: dict)

Sets the entrypoint for a project, this is useful for submit so that submit writes scripts that call obr run -o <args> instead of the default signac run -o <args> call

group_jobs(jobs, view_id_map: dict[str, str], summarize: int = 0) dict[str, list[str]]

Returns the list of jobs of the given OpenFOAMProject where the last summarize levels are grouped together at the corresponding parent view. Returns a dict[str, list[str]] which maps the view path to a list of child jobs.

obr.signac_wrapper.operations.generate
obr.signac_wrapper.operations.simulate
obr.signac_wrapper.operations.is_case(job: signac.job.Job) bool
obr.signac_wrapper.operations.is_job(job: signac.job.Job) bool

checks OBR_JOB is set to the current job.id used to prevent multiple execution of jobs if –job=id is set

obr.signac_wrapper.operations.operation_complete(job: signac.job.Job, operation: str) bool

An operation is considered to be complete if an entry in the job document with same arguments exists and state is success

obr.signac_wrapper.operations.basic_eligible(job: signac.job.Job, operation: str) bool

Dispatches to standard checks if operations are eligible for given job

this includes:
  • check for lock, to avoid running operations when calling ‘obr run’ before operation is finished

  • check if parent case is ready

  • operation has been requested for job

  • copy and link files and folder

obr.signac_wrapper.operations.parent_job_is_ready(job: signac.job.Job) str

Checks whether the parent of the given job is ready

creates file tree under dst with same folder structure as base but all files are relative symlinks

obr.signac_wrapper.operations.needs_initialization(job: signac.job.Job) bool

Check if this job has been initialized already, without performing the initialization

obr.signac_wrapper.operations.initialize_if_required(job: signac.job.Job) bool

check if this job has been already linked to

The default strategy is to link all files. If a file is modified the modifying operations are responsible for unlinking and copying

obr.signac_wrapper.operations.get_args(job: signac.job.Job, args: dict | str) dict | str

operation can get args either via function call or it statepoint if no args are passed via function the args from the statepoint are taken

also args can be just a str in case of shell scripts

obr.signac_wrapper.operations.execute_operation(job: signac.job.Job, operation_name: str, operations) Literal[True]

check whether an operation is requested

operation can be simple operations defined by a keyword like blockMesh or operations with parameters defined by a dictionary

obr.signac_wrapper.operations.execute_post_build(operation_name: str, job: signac.job.Job)

check whether an operation is requested

operation can be simple operations defined by a keyword like blockMesh or operations with parameters defined by a dictionary

obr.signac_wrapper.operations.execute_pre_build(operation_name: str, job: signac.job.Job)

check whether an operation is requested

operation can be simple operations defined by a keyword like blockMesh or operations with parameters defined by a dictionary

obr.signac_wrapper.operations.start_job_state(_, job: signac.job.Job) None
obr.signac_wrapper.operations.end_job_state(_, job: signac.job.Job) Literal[True]
obr.signac_wrapper.operations.dispatch_pre_hooks(operation_name: str, job: signac.job.Job)

just forwards to start_job_state and execute_pre_build

obr.signac_wrapper.operations.dispatch_post_hooks(operation_name: str, job: signac.job.Job)

Forwards to execute_post_build, performs md5sum calculation of case files and finishes with end_job_state

obr.signac_wrapper.operations.set_failure(operation_name: str, error, job: signac.job.Job)

just forwards to start_job_state and execute_pre_build

obr.signac_wrapper.operations.copy_on_uses(args: dict, job: signac.job.Job, path: str, target: str)

copies the file specified in args[‘uses’] to path/target

obr.signac_wrapper.operations.controlDict(job: signac.job.Job, args={})

sets up the controlDict

obr.signac_wrapper.operations.MultiCase(job: signac.job.Job, args={})

Dummy operation to generate multiple cases

obr.signac_wrapper.operations.blockMesh(job: signac.job.Job, args={})
obr.signac_wrapper.operations.replaceMesh(job: signac.job.Job, args={})
obr.signac_wrapper.operations.shell(job: signac.job.Job, args={})
obr.signac_wrapper.operations.initialConditions(job: signac.job.Job, args={})

A special operation to allow copying from 0.orig folders

obr.signac_wrapper.operations.fvSolution(job: signac.job.Job, args={})
obr.signac_wrapper.operations.fvSchemes(job: signac.job.Job, args={})
obr.signac_wrapper.operations.transportProperties(job: signac.job.Job, args={})
obr.signac_wrapper.operations.turbulenceProperties(job: signac.job.Job, args={})
obr.signac_wrapper.operations.setKeyValuePair(job: signac.job.Job, args={})
obr.signac_wrapper.operations.has_mesh(job: signac.job.Job) bool

Check whether all mesh files are files (owning) or symlinks (non-owning)

TODO check also for .obr files for state of operation

obr.signac_wrapper.operations.decomposePar(job: signac.job.Job, args={})
obr.signac_wrapper.operations.fetchCase(job: signac.job.Job, args={})
obr.signac_wrapper.operations.is_locked(job: signac.job.Job) bool

Cases that are already started are set to tmp_lock dont try to execute them

obr.signac_wrapper.operations.refineMesh(job: signac.job.Job, args={})
obr.signac_wrapper.operations.allClean(job: signac.job.Job, args={})
obr.signac_wrapper.operations.checkMesh(job: signac.job.Job, args={})
obr.signac_wrapper.operations.get_number_of_procs(job: signac.job.Job) int

Deduces the number of processors For performance reasons the cache is used to store the number of subdomains

obr.signac_wrapper.operations.get_values(jobs: list, key: str) set

find all different statepoint values

obr.signac_wrapper.operations.run_cmd_builder(job: signac.job.Job, cmd_format: str, args: dict) str

Builds the cli command to run a OpenFOAM application

obr.signac_wrapper.operations.validate_state_impl(_: str, job: signac.job.Job) None

Perform a detailed update of the job state

obr.signac_wrapper.operations.resetCase(job: signac.job.Job, args={}) None

Dummy operation that calls resetCase

obr.signac_wrapper.operations.validateState(job: signac.job.Job, args={}) None

Dummy operation which forwards to validate_state_impl. The reason for keeping this function is that it can be called from the cli to force a detailed update

obr.signac_wrapper.operations.runParallelSolver(job: signac.job.Job, args={}) str
obr.signac_wrapper.operations.runSerialSolver(job: signac.job.Job, args={})
obr.signac_wrapper.operations.archive(job: signac.job.Job, args={}) Literal[True]
obr.signac_wrapper.operations.apply(*jobs, args={})