Darwin  1.0
Event loop based prototype framework
PrefixCommand

Description

General utilities for prefix commands (try, parallel, submit).
The explicit options (`cmds`) are unchanged; among the implicit options,
the inputs are checked (if any), and the output is adapted to each thread.

Public Member Functions

None __init__ (self, str prefix, List[str] cmds)
 
None parse (self, List[str] args)
 
int get_entries (self, input)
 
None prepare_single (self)
 
None prepare_multi (self)
 
None prepare_fire_and_forget (self)
 
None __init__ (self, str prefix, List[str] cmds)
 
None parse (self, List[str] args)
 
int get_entries (self, input)
 
None prepare_single (self)
 
None prepare_multi (self)
 
None prepare_fire_and_forget (self)
 

Public Attributes

 name
 
 cmds
 
 helper
 
 extension
 
 inputs
 
 output
 
 args
 
 absoutput
 
 absinputs
 

Static Public Attributes

 str
 
 Path
 

Private Member Functions

str _help_text (self)
 
List[str_synopsis (self)
 
str _help_text (self)
 
List[str_synopsis (self)
 

Constructor & Destructor Documentation

◆ __init__() [1/2]

None __init__ (   self,
str  prefix,
List[str cmds 
)
Constructor from a separated list of arguments.
153  def __init__(self, prefix: str, cmds: List[str]) -> None:
154  """Constructor from a separated list of arguments."""
155 
156  self.name = os.path.basename(prefix)
157  self.cmds = cmds
158 
159  self.helper = self._help_text()
160  if not can_parallelize(self.helper[1:]):
161  raise ValueError(
162  f"`{self.cmds.exec}`"
163  + " does not match the requirements to be run with this prefix command: "
164  + "exactly one `output` argument and `nSplit` argument must be defined."
165  )
166 
167  self.extension = check_output([self.cmds.exec] + ["-o"]).decode().strip().split("\n")[0].split(' ')[0]
168 

◆ __init__() [2/2]

None __init__ (   self,
str  prefix,
List[str cmds 
)
Constructor from a separated list of arguments.
153  def __init__(self, prefix: str, cmds: List[str]) -> None:
154  """Constructor from a separated list of arguments."""
155 
156  self.name = os.path.basename(prefix)
157  self.cmds = cmds
158 
159  self.helper = self._help_text()
160  if not can_parallelize(self.helper[1:]):
161  raise ValueError(
162  f"`{self.cmds.exec}`"
163  + " does not match the requirements to be run with this prefix command: "
164  + "exactly one `output` argument and `nSplit` argument must be defined."
165  )
166 
167  self.extension = check_output([self.cmds.exec] + ["-o"]).decode().strip().split("\n")[0].split(' ')[0]
168 

Member Function Documentation

◆ _help_text() [1/2]

str _help_text (   self)
private
Returns the command's normal help text.
169  def _help_text(self) -> str:
170  """Returns the command's normal help text."""
171  # if executable is given, then the helper is adapted to match with the prefix command
172  text = (
173  check_output(self.cmds.exec + " -h", shell=True)
174  .decode()
175  .strip()
176  .split("\n")
177  )
178  return text
179 

◆ _help_text() [2/2]

str _help_text (   self)
private
Returns the command's normal help text.
169  def _help_text(self) -> str:
170  """Returns the command's normal help text."""
171  # if executable is given, then the helper is adapted to match with the prefix command
172  text = (
173  check_output(self.cmds.exec + " -h", shell=True)
174  .decode()
175  .strip()
176  .split("\n")
177  )
178  return text
179 

◆ _synopsis() [1/2]

List[str] _synopsis (   self)
private
Extract synopsis from helper (after undoing the bold characters).
180  def _synopsis(self) -> List[str]:
181  """Extract synopsis from helper (after undoing the bold characters)."""
182  synopsis = self.helper[0].replace("\x1b[1m", "").replace("\x1b[0m", "")
183  return synopsis.split()[1:]
184 

◆ _synopsis() [2/2]

List[str] _synopsis (   self)
private
Extract synopsis from helper (after undoing the bold characters).
180  def _synopsis(self) -> List[str]:
181  """Extract synopsis from helper (after undoing the bold characters)."""
182  synopsis = self.helper[0].replace("\x1b[1m", "").replace("\x1b[0m", "")
183  return synopsis.split()[1:]
184 

◆ get_entries() [1/2]

int get_entries (   self,
  input 
)
Obtain the number of entries from input n-tuple.
198  def get_entries(self, input) -> int:
199  """Obtain the number of entries from input n-tuple."""
200 
201  output = check_output(["printEntries"] + [input.strip('\\"')])
202  nevents = int(output.decode().splitlines()[0].replace("\n", ""))
203  if nevents == 0:
204  raise ValueError("Empty input tree.")
205  return nevents
206 

◆ get_entries() [2/2]

int get_entries (   self,
  input 
)
Obtain the number of entries from input n-tuple.
198  def get_entries(self, input) -> int:
199  """Obtain the number of entries from input n-tuple."""
200 
201  output = check_output(["printEntries"] + [input.strip('\\"')])
202  nevents = int(output.decode().splitlines()[0].replace("\n", ""))
203  if nevents == 0:
204  raise ValueError("Empty input tree.")
205  return nevents
206 

◆ parse() [1/2]

None parse (   self,
List[str args 
)
Put input(s), output, and other arguments in dedicated members.
185  def parse(self, args: List[str]) -> None:
186  """Put input(s), output, and other arguments in dedicated members."""
187 
188  synopsis = self._synopsis()
189  i = synopsis.index("output")
190  if i >= len(args):
191  raise ValueError("Unable to identify `output` in given arguments")
192 
193  self.inputs = args[:i]
194  self.output = args[i]
195  if len(args) > i:
196  self.args = args[i + 1 :]
197 

◆ parse() [2/2]

None parse (   self,
List[str args 
)
Put input(s), output, and other arguments in dedicated members.
185  def parse(self, args: List[str]) -> None:
186  """Put input(s), output, and other arguments in dedicated members."""
187 
188  synopsis = self._synopsis()
189  i = synopsis.index("output")
190  if i >= len(args):
191  raise ValueError("Unable to identify `output` in given arguments")
192 
193  self.inputs = args[:i]
194  self.output = args[i]
195  if len(args) > i:
196  self.args = args[i + 1 :]
197 

◆ prepare_fire_and_forget() [1/2]

None prepare_fire_and_forget (   self)
Copies the executable, libraries, and dictionaries to the output directory.
271  def prepare_fire_and_forget(self) -> None:
272  """Copies the executable, libraries, and dictionaries to the output directory."""
273 
274  # copy executables and libraries to working directory
275  exec = Path(which(self.cmds.exec)).absolute()
276  copy_exec = self.absoutput / exec.name
277  if exec != copy_exec:
278  copy2(exec, self.output)
279  exec = copy_exec
280  self.cmds.exec = str(exec)
281 
282  for lib in glob(os.environ["DARWIN_FIRE_AND_FORGET"] + "/*so"):
283  copy2(lib, self.output)
284  for dic in glob(os.environ["DARWIN_FIRE_AND_FORGET"] + "/*pcm"):
285  copy2(dic, self.output)
286 
287 

◆ prepare_fire_and_forget() [2/2]

None prepare_fire_and_forget (   self)
Copies the executable, libraries, and dictionaries to the output directory.
271  def prepare_fire_and_forget(self) -> None:
272  """Copies the executable, libraries, and dictionaries to the output directory."""
273 
274  # copy executables and libraries to working directory
275  exec = Path(which(self.cmds.exec)).absolute()
276  copy_exec = self.absoutput / exec.name
277  if exec != copy_exec:
278  copy2(exec, self.output)
279  exec = copy_exec
280  self.cmds.exec = str(exec)
281 
282  for lib in glob(os.environ["DARWIN_FIRE_AND_FORGET"] + "/*so"):
283  copy2(lib, self.output)
284  for dic in glob(os.environ["DARWIN_FIRE_AND_FORGET"] + "/*pcm"):
285  copy2(dic, self.output)
286 
287 

◆ prepare_multi() [1/2]

None prepare_multi (   self)
Prepare the members for parallel running (local or on farm).
222  def prepare_multi(self) -> None:
223  """Prepare the members for parallel running (local or on farm)."""
224 
225  for x in self.inputs:
226  self.absinputs += glob(x)
227  if len(self.inputs) > 0 and len(self.absinputs) == 0:
228  raise ValueError("Inputs could not be found: " + " ".join(self.inputs))
229 
230  self.absinputs = [str(Path(x).absolute()) for x in self.absinputs]
231 
232  self.inputs = ['"' + x + '"' for x in self.inputs]
233  self.absoutput = Path(self.output).absolute()
234  if self.absoutput.suffix != "":
235  raise ValueError(
236  f"A directory with an extension is misleading: {self.output}"
237  )
238 
239  if self.absoutput.exists():
240  if self.absoutput.is_file():
241  raise ValueError(self.output + " is not a directory")
242 
243  new_rootfiles = []
244  for k in range(self.cmds.nSplit):
245  rootfile = self.absoutput / f"{k}{self.extension}"
246  if rootfile.exists() and not os.access(rootfile, os.W_OK):
247  raise ValueError(
248  "The root files in " + self.output + " are not writable"
249  )
250  new_rootfiles += [str(rootfile)]
251 
252  existing_rootfiles = glob(str(self.absoutput / f"*{self.extension}"))
253  if len(set(existing_rootfiles).difference(new_rootfiles)) > 0:
254  raise ValueError(
255  "Residual files from a former run will not be overwritten."
256  )
257  else:
258  self.absoutput.mkdir(parents=True)
259 
260  # the following lines are specific for EOS at CERN, where
261  # `/eos/home-b/bejones` should be replaced with `/eos/user/b/bejones`
262  try:
263  institute = find_institute()
264  if find_institute() == "CERN" and self.absoutput.parts[1] == "eos":
265  user = str(self.absoutput.parts[3])
266  homedir = Path(*self.absoutput.parts[3:])
267  self.absoutput = Path(f"/eos/user/{user[0]}") / homedir
268  except RuntimeError:
269  pass
270 

◆ prepare_multi() [2/2]

None prepare_multi (   self)
Prepare the members for parallel running (local or on farm).
222  def prepare_multi(self) -> None:
223  """Prepare the members for parallel running (local or on farm)."""
224 
225  for x in self.inputs:
226  self.absinputs += glob(x)
227  if len(self.inputs) > 0 and len(self.absinputs) == 0:
228  raise ValueError("Inputs could not be found: " + " ".join(self.inputs))
229 
230  self.absinputs = [str(Path(x).absolute()) for x in self.absinputs]
231 
232  self.inputs = ['"' + x + '"' for x in self.inputs]
233  self.absoutput = Path(self.output).absolute()
234  if self.absoutput.suffix != "":
235  raise ValueError(
236  f"A directory with an extension is misleading: {self.output}"
237  )
238 
239  if self.absoutput.exists():
240  if self.absoutput.is_file():
241  raise ValueError(self.output + " is not a directory")
242 
243  new_rootfiles = []
244  for k in range(self.cmds.nSplit):
245  rootfile = self.absoutput / f"{k}{self.extension}"
246  if rootfile.exists() and not os.access(rootfile, os.W_OK):
247  raise ValueError(
248  "The root files in " + self.output + " are not writable"
249  )
250  new_rootfiles += [str(rootfile)]
251 
252  existing_rootfiles = glob(str(self.absoutput / f"*{self.extension}"))
253  if len(set(existing_rootfiles).difference(new_rootfiles)) > 0:
254  raise ValueError(
255  "Residual files from a former run will not be overwritten."
256  )
257  else:
258  self.absoutput.mkdir(parents=True)
259 
260  # the following lines are specific for EOS at CERN, where
261  # `/eos/home-b/bejones` should be replaced with `/eos/user/b/bejones`
262  try:
263  institute = find_institute()
264  if find_institute() == "CERN" and self.absoutput.parts[1] == "eos":
265  user = str(self.absoutput.parts[3])
266  homedir = Path(*self.absoutput.parts[3:])
267  self.absoutput = Path(f"/eos/user/{user[0]}") / homedir
268  except RuntimeError:
269  pass
270 

◆ prepare_single() [1/2]

None prepare_single (   self)
Prepare the working area for a single-run command.
207  def prepare_single(self) -> None:
208  """Prepare the working area for a single-run command."""
209 
210  self.absoutput = Path(self.output).absolute()
211 
212  if self.absoutput.exists():
213  if self.absoutput.is_dir():
214  self.absoutput /= "0" + self.extension
215  if self.absoutput.exists() and not os.access(self.absoutput, os.W_OK):
216  raise ValueError(self.absoutput + " is not writable")
217  else:
218  if len(self.absoutput.suffix) == 0: # then assume it's a directory
219  self.absoutput.mkdir(parents=True)
220  self.absoutput /= "0" + self.extension
221 

◆ prepare_single() [2/2]

None prepare_single (   self)
Prepare the working area for a single-run command.
207  def prepare_single(self) -> None:
208  """Prepare the working area for a single-run command."""
209 
210  self.absoutput = Path(self.output).absolute()
211 
212  if self.absoutput.exists():
213  if self.absoutput.is_dir():
214  self.absoutput /= "0" + self.extension
215  if self.absoutput.exists() and not os.access(self.absoutput, os.W_OK):
216  raise ValueError(self.absoutput + " is not writable")
217  else:
218  if len(self.absoutput.suffix) == 0: # then assume it's a directory
219  self.absoutput.mkdir(parents=True)
220  self.absoutput /= "0" + self.extension
221 

Member Data Documentation

◆ absinputs

absinputs

◆ absoutput

absoutput

◆ args

args

◆ cmds

cmds

◆ extension

extension

◆ helper

helper

◆ inputs

inputs

◆ name

name

◆ output

output

◆ Path

Path
static

◆ str

str
static

The documentation for this class was generated from the following file:
prefix.find_institute
def find_institute()
Definition: prefix.py:121
Darwin::Tools::split
@ split
activate -k and -j to define slice
Definition: Options.h:26
prefix.can_parallelize
bool can_parallelize(helper)
Definition: prefix.py:109