############################################################
#
# Author(s): Georg Schnabel
# Email: g.schnabel@iaea.org
# Creation date: 2022/05/30
# Last modified: 2025/02/05
# License: MIT
# Copyright (c) 2022-2024 International Atomic Energy Agency (IAEA)
#
############################################################
from collections.abc import Mapping
import logging
import re
from .logging_utils import setup_logger, write_info, RingBuffer
from appdirs import user_cache_dir
from os.path import exists as file_exists
from endf_parserpy.utils.tree_utils import (
is_tree,
get_child,
get_child_value,
retrieve_value,
)
from endf_parserpy.utils.accessories import EndfDict, EndfPath
from .endf_mappings import (
map_cont_dic,
map_head_dic,
map_text_dic,
map_dir_dic,
map_intg_dic,
map_tab1_dic,
map_tab2_dic,
map_list_dic,
)
from .endf_mapping_utils import eval_expr_without_unknown_var, get_varname
from .meta_control_utils import (
cycle_for_loop,
evaluate_if_clause,
open_section,
close_section,
should_proceed,
initialize_working_vars,
introduce_abbreviation,
remove_working_vars,
)
from .endf_utils import (
read_cont,
write_cont,
read_ctrl,
get_ctrl,
write_head,
read_head,
read_text,
write_text,
read_intg,
write_intg,
read_dir,
write_dir,
read_tab1,
write_tab1,
read_tab2,
write_tab2,
read_send,
write_send,
write_fend,
write_mend,
write_tend,
read_list,
write_list,
split_sections,
skip_blank_lines,
add_linenumbers_to_section,
)
from .custom_exceptions import (
InconsistentSectionBracketsError,
StopException,
ParserException,
VariableNotFoundError,
UnexpectedControlRecordError,
MissingSectionError,
)
from .endf_recipe_utils import (
get_recipe_parsetree_dic,
get_responsible_recipe_parsetree,
get_responsible_recipe_parsefun,
)
from endf_parserpy.endf_recipes import get_recipe_dict
from endf_parserpy.utils.debugging_utils import TrackingDict
from .helpers import array_dict_to_list
[docs]
class EndfParser:
"""Class for parsing and writing ENDF-6 formatted data.
This class provides functions for
(1) parsing ENDF-6 formatted data, and (2) converting data
given in a :class:`dict`-like object into the ENDF-6 format.
The ENDF-6 formatted data may be given
in a text file, a string, or a list of strings containing
separate lines. The essential methods of this class
are :func:`parsefile` and :func:`writefile`.
"""
def __init__(
self,
ignore_number_mismatch=False,
ignore_zero_mismatch=True,
ignore_varspec_mismatch=True,
fuzzy_matching=True,
abuse_signpos=False,
skip_intzero=False,
prefer_noexp=False,
accept_spaces=True,
ignore_blank_lines=False,
ignore_send_records=False,
ignore_missing_tpid=False,
keep_E=False,
preserve_value_strings=False,
include_linenum=True,
width=11,
check_arrays=True,
strict_datatypes=False,
array_type="dict",
explain_missing_variable=True,
cache_dir=None,
print_cache_info=True,
endf_format="endf6-ext",
recipes=None,
parsing_funs=None,
loglevel=logging.WARNING,
):
"""Initializaton of options for parsing and writing ENDF-6 data.
The process of parsing can be influenced by many options that
determine how inconsistencies in ENDF-6 formatted data should be
handled and the degree of flexibility in accepting unusual
number representations. Parameters pertaining to the parsing process
are indicated by `(parsing)` in their description.
The reverse process, writing data into the ENDF-6 format, can also be
influenced by a variety of options, e.g., impacting the output precision.
Parameters related to converting data into the ENDF-6 format are
marked by `(writing)`.
Parameters
------------------
ignore_number_mismatch: bool
Tolerate mismatches between numbers in ENDF-6 formatted
data and the expected numbers according to the ENDF-6 recipes.
*(parsing)*
ignore_zero_mismatch: bool
Tolerate non-zero numbers in ENDF-6 formatted data that
are required to be zero according to ENDF-6 recipes.
*(parsing)*
ignore_varspec_mismatch: bool
Tolerate distinct numbers that are supposed to be mapped
to the same symbol name. For the time being, even with this
option enabled, possible inconsistent variable assignment
have to be marked with a queston mark in the ENDF-6 recipe.
*(parsing)*
fuzzy_matching: bool
Tolerate small inconsistencies between fields when they
are linked by a mathematical relationship. *(parsing)*
abuse_signpos: bool
Permit positive numbers to start in the first character slot
of an ENDF-6 field, which is usually reserved for the sign,
to enhance numerical precision. *(writing)*
skip_intzero: bool
For numbers written out in decimal notation, eliminate
the integer part if zero, e.g. `0.12` becomes `.12` to
increase attainable precision. *(writing)*
prefer_noexp: bool
Switch to decimal representation (i.e. non-scientific) if it leads
to an increase of accuracy, e.g. `1.234567-1` becomes `0.12345678`.
*(writing)*
accept_spaces: bool
Eliminate spaces in a number before trying to parse it, e.g.
`1.234 +8` is transformed to `1.234+8`. *(parsing)*
ignore_blank_lines: bool
If ``True``, skip blank lines in ENDF-6 formatted input without
complaining. Otherwise, blank lines will lead to parsing failure.
*(parsing)*
ignore_send_records: bool
If ``True``, the correct positioning of SEND/FEND/MEND/TEND
to indicate the end of a section is not checked. *(parsing)*
ignore_missing_tpid: bool
If ``True``, the parser will tolerate a missing TPID
record at the beginning of the file. *(parsing)*
keep_E : bool
If ``True``, include the `e` character in scientific notation,
e.g. `1.23e-8` instead of `1.23-8`. The inclusion establishes
compatibility with programming languages different from Fortran
while the omission enhances numerical precision. *(writing)*
preserve_value_strings : bool
If ``True``, also the string representations of float numbers will
be recorded during the parsing process (via the
:class:`~endf_parserpy.interpreter.fortran_utils.EndfFloat` class).
These string representations when available will be used verbatim
for outputting ENDF-6 formatted data (irrespective of the
``preserve_value_strings`` option), overruling any of the other
options provided for controling the output format of floats.
*(parsing)*
include_linenum : bool
Controls whether the 5-digit line number should be
included at the end of each line. *(writing)*
width : int
The number of character slots in an ENDF-6 field. The ENDF-6
format requires 11 but the user may opt for a different width
for their storage/application needs. *(parsing, writing)*
check_arrays : bool
Ensures that index ranges provided in the Python :class:`dict`
passed as argument to the :func:`write` and :func:`writefile`
method are consistent with the values of counter variables.
If ``True``, a :class:`dict` containing larger index ranges than
expected by counter variables will lead to failure, otherwise
the presence of additional indices will be ignored. *(writing)*
strict_datatypes : bool
Strict data type checking will lead to failure if a `float`
needs to be cast to an `int`. If `false`, the writing process
will only fail if the a value in the `float` cannot be
perfectly represented by an `int`. *(writing)*
array_type : str
The Python datatype to use for representing arrays read from
ENDF-6 files. The two options are ``"dict"`` (default) and
``"list"``. *(parsing)*
explain_missing_variable : bool
If the :func:`write` or :func:`writefile` method
fail because a variable is missing in the dictionary,
print available explanation if this argument is ``True``.
cache_dir
Directory to store the parsing trees associated with ENDF-6 recipes
If `None`, the directory will be automatically determined
relying on the `appdirs` package. If `false`, no cache directory
will be used and ENDF-6 recipes will be compiled on the fly
whenever this class is instantiated. Finally, the user can
provide a custom directory as a string.
print_cache_info : bool
If ``True``, print out a message regarding the location of the
cache directory if it was automatically determined.
endf_format : str
Allow the user to pick specific ENDF format flavors.
The default `endf6-ext` tolerates deviations from the
ENDF-6 format encountered in some nuclear data libraries.
Other choices are `endf6` for strict compliance with the
ENDF-6 formats manual and `jendl` with JENDL specific
conventions, which are also implemented in `endf6-ext`.
recipes : dict_like
The default ENDF-6 recipes can be overrided by providing a
nested dictionary with custom recipes. Inspect the default
recipe dictionary to see the required structure
(`from endf_parserpy.endf_recipes import endf_recipe_dictionary`)
loglevel : int
Controls the level of detail for logging output. Default is
`logging.WARN`. Many ENDF-6 files in nuclear data libraries
contain auxiliary information in unused fields (expected to be zero),
which will trigger warnings. Use `logging.ERROR` to suppress
these warnings (you will need to `import logging`).
"""
# obtain the parsing tree for the language
# in which ENDF reading recipes are formulated
if cache_dir is None:
cache_dir = user_cache_dir("endf_parserpy", "gschnabel")
if print_cache_info:
print(
f"Compiled ENDF recipes are cached in {cache_dir}\n"
+ "Specify `cache_dir=False` to disable the creation "
+ "and use of a cache directory.\n"
+ "Alternatively, pass a path as `cache_dir` argument.\n"
+ "To suppress this message, specify `print_cache_info=False`."
)
if recipes is None:
recipes = get_recipe_dict(endf_format)
self.tree_dic = get_recipe_parsetree_dic(recipes, cache_dir)
self.parsing_funs = parsing_funs if parsing_funs is not None else {}
# endf record treatment
endf_actions = {}
endf_actions["head_or_cont_line"] = self.process_head_or_cont_line
endf_actions["text_line"] = self.process_text_line
endf_actions["dir_line"] = self.process_dir_line
endf_actions["intg_line"] = self.process_intg_line
endf_actions["tab1_line"] = self.process_tab1_line
endf_actions["tab2_line"] = self.process_tab2_line
endf_actions["list_line"] = self.process_list_line
endf_actions["send_line"] = self.process_send_line
endf_actions["stop_line"] = self.process_stop_line
self.endf_actions = endf_actions
# program flow
meta_actions = {}
meta_actions["for_loop"] = self.process_for_loop
meta_actions["if_clause"] = self.process_if_clause
meta_actions["section"] = self.process_section
meta_actions["abbreviation"] = self.process_abbreviation
meta_actions["comment_block"] = self.process_comment_block
self.meta_actions = meta_actions
self.parse_opts = {
"ignore_zero_mismatch": ignore_zero_mismatch,
"ignore_number_mismatch": ignore_number_mismatch,
"ignore_varspec_mismatch": ignore_varspec_mismatch,
"fuzzy_matching": fuzzy_matching,
"array_type": array_type,
}
self.write_opts = {
"abuse_signpos": abuse_signpos,
"skip_intzero": skip_intzero,
"prefer_noexp": prefer_noexp,
"keep_E": keep_E,
"preserve_value_strings": preserve_value_strings,
"include_linenum": include_linenum,
"width": width,
"check_arrays": check_arrays,
"strict_datatypes": strict_datatypes,
}
self.read_opts = {
"accept_spaces": accept_spaces,
"ignore_blank_lines": ignore_blank_lines,
"ignore_send_records": ignore_send_records,
"ignore_missing_tpid": ignore_missing_tpid,
"width": width,
"preserve_value_strings": preserve_value_strings,
}
self.explain_missing_variable = explain_missing_variable
self.variable_descriptions = EndfDict()
self.current_path = None
# set up the logging functionality
if not hasattr(EndfParser, "instance_counter"):
EndfParser.instance_counter = 0
EndfParser.instance_counter += 1
self.name = f"EndfParserInstance{EndfParser.instance_counter}"
self.logger = setup_logger(self.name, loglevel)
[docs]
def explain(self, varpath, stdout=True):
"""Explain the meaning of a variable.
ENDF-6 recipes can contain the descriptions of variables,
which are automatically read while calling the
:func:`parsefile`, :func:`parse`, :func:`write`
and :func:`writefile` method. Given the path to a
variable, this function can output the associated
description.
Parameters
----------
varpath : EndfPath
The EndfPath to the variable or an object accepted
by the constructor of the EndfPath class.
stdout : bool
If ``True``, print the description on stdout.
Otherwise, return the description.
Returns
-------
str
If ``stdout=True`` return ``None``,
otherwise the description as a ``str``.
"""
varpath = EndfPath(varpath)
vardescr = self.variable_descriptions
search_state = [0]
search_dicts = [vardescr]
level = 0
while level >= 0:
search_state[level] += 1
ss = search_state[level]
sd = search_dicts[level]
p = varpath[level]
if level == len(varpath) - 1:
if sd.exists(p) and isinstance(sd[p], str):
if stdout:
print(sd[p])
return
else:
return sd[p]
search_dicts.pop()
search_state.pop()
level -= 1
elif ss == 3:
search_dicts.pop()
search_state.pop()
level -= 1
else:
ps = p if ss == 1 else "*"
if sd.exists(ps):
search_dicts.append(sd[ps])
search_state.append(0)
level += 1
if stdout:
print(f"No description for `{str(varpath)}` available")
return None
def process_comment_block(self, tree):
def extract_info(comment):
rex = r" *#(?P<indentstr>( *var *"
rex += r"(?P<varname>[a-zA-Z0-9/*]+) *(\[[^]]*\])?"
rex += " *:)?)?"
rex += "(?P<comment>.*)"
dic = re.match(rex, comment).groupdict()
return dic["varname"], dic["comment"], len(dic["indentstr"])
idx = 0
comment_lines = get_child_value(tree, "COMMENT").splitlines()
while idx < len(comment_lines):
comment_line = comment_lines[idx]
varname, comment, indent = extract_info(comment_line)
if varname is not None:
if comment.strip() != "":
firstindent = indent + len(comment) - len(comment.lstrip())
curdescr = [comment.lstrip()]
else:
idx += 1
comment_line = comment_lines[idx]
tmp, comment, indent = extract_info(comment_line)
if tmp is not None:
raise ValueError(f"empty explaination of {varname}")
firstindent = len(comment) - len(comment.lstrip())
curdescr = [comment[firstindent:]]
idx += 1
while idx < len(comment_lines):
comment_line = comment_lines[idx]
newvarname, comment, _ = extract_info(comment_line)
if newvarname is not None:
idx -= 1
break
curindent = len(comment) - len(comment.lstrip())
maxindent = min(firstindent, curindent)
curdescr.append(comment[maxindent:])
idx += 1
vardescrs = self.variable_descriptions
vardescrs[self.current_path, varname] = "\n".join(curdescr).strip()
idx += 1
def process_stop_line(self, tree):
if self.rwmode == "read":
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
else:
self.logbuffer.save_reduced_record_log(tree)
stop_message = retrieve_value(tree, "STOP_MESSAGE")
stop_message = stop_message if stop_message is not None else "stop instruction"
raise StopException(stop_message)
def process_text_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.loop_vars["__ofs"] = self.ofs
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
write_info(self.logger, "Reading a TEXT record", self.ofs)
text_dic, self.ofs = read_text(
self.lines, self.ofs, with_ctrl=True, read_opts=self.read_opts
)
text_dic.update(self.logbuffer.get_last_entry(key_prefix="__"))
map_text_dic(
tree,
text_dic,
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
# this line adds MAT, MF, MT to the dictionary.
# this line is introduced here to deal with the tape head (mf=0, mt=0)
# which does not contain a head record as first item, which is the
# only other place that adds this information.
self.datadic.update(get_ctrl(text_dic))
else:
self.logbuffer.save_reduced_record_log(tree)
text_dic = map_text_dic(
tree,
{},
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
text_dic.update(get_ctrl(self.datadic))
newlines = write_text(text_dic, with_ctrl=True, write_opts=self.write_opts)
self.lines += newlines
def process_head_or_cont_line(self, tree):
line_type = get_child_value(tree, "CONT_SUBTYPE")
if line_type == "HEAD":
self.process_head_line(tree)
elif line_type == "CONT":
self.process_cont_line(tree)
else:
raise TypeError("parser code / grammar mismatch")
def process_head_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.loop_vars["__ofs"] = self.ofs
write_info(self.logger, "Reading a HEAD record", self.ofs)
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
cont_dic, self.ofs = read_head(
self.lines,
self.ofs,
with_ctrl=True,
read_opts=self.read_opts,
)
cont_dic.update(self.logbuffer.get_last_entry(key_prefix="__"))
write_info(
self.logger, "Content of the HEAD record: " + str(cont_dic), self.ofs
)
map_head_dic(
tree,
cont_dic,
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
self.datadic.update(get_ctrl(cont_dic))
else:
self.logbuffer.save_reduced_record_log(tree)
head_dic = map_head_dic(
tree,
{},
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
head_dic.update(get_ctrl(self.datadic))
newlines = write_head(head_dic, with_ctrl=True, write_opts=self.write_opts)
self.lines += newlines
def process_cont_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.loop_vars["__ofs"] = self.ofs
write_info(self.logger, "Reading a CONT record", self.ofs)
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
cont_dic, self.ofs = read_cont(
self.lines,
self.ofs,
read_opts=self.read_opts,
)
cont_dic.update(self.logbuffer.get_last_entry(key_prefix="__"))
write_info(self.logger, "Content of the CONT record: " + str(cont_dic))
map_cont_dic(
tree,
cont_dic,
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
else:
self.logbuffer.save_reduced_record_log(tree)
cont_dic = map_cont_dic(
tree,
{},
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
cont_dic.update(get_ctrl(self.datadic))
newlines = write_cont(cont_dic, with_ctrl=True, write_opts=self.write_opts)
self.lines += newlines
def process_dir_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.loop_vars["__ofs"] = self.ofs
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
dir_dic, self.ofs = read_dir(
self.lines,
self.ofs,
read_opts=self.read_opts,
)
dir_dic.update(self.logbuffer.get_last_entry(key_prefix="__"))
map_dir_dic(
tree,
dir_dic,
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
else:
self.logbuffer.save_reduced_record_log(tree)
dir_dic = map_dir_dic(
tree,
{},
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
dir_dic.update(get_ctrl(self.datadic))
newlines = write_dir(dir_dic, with_ctrl=True, write_opts=self.write_opts)
self.lines += newlines
def process_intg_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.loop_vars["__ofs"] = self.ofs
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
ndigit = eval_expr_without_unknown_var(
get_child(tree, "ndigit_expr"), self.datadic, self.loop_vars
)
intg_dic, self.ofs = read_intg(
self.lines,
self.ofs,
ndigit=ndigit,
read_opts=self.read_opts,
)
intg_dic.update(self.logbuffer.get_last_entry(key_prefix="__"))
map_intg_dic(
tree,
intg_dic,
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
else:
self.logbuffer.save_reduced_record_log(tree)
intg_dic = map_intg_dic(
tree,
{},
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
intg_dic.update(get_ctrl(self.datadic))
ndigit = eval_expr_without_unknown_var(
get_child(tree, "ndigit_expr"), self.datadic, self.loop_vars
)
newlines = write_intg(
intg_dic, with_ctrl=True, ndigit=ndigit, write_opts=self.write_opts
)
self.lines += newlines
def process_tab1_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.loop_vars["__ofs"] = self.ofs
write_info(self.logger, "Reading a TAB1 record", self.ofs)
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
tab1_dic, self.ofs = read_tab1(
self.lines,
self.ofs,
read_opts=self.read_opts,
)
tab1_dic.update(self.logbuffer.get_last_entry(key_prefix="__"))
map_tab1_dic(
tree,
tab1_dic,
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
else:
self.logbuffer.save_reduced_record_log(tree)
tab1_dic = map_tab1_dic(
tree,
{},
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
path=self.current_path,
logger=self.logger,
)
tab1_dic.update(get_ctrl(self.datadic))
newlines = write_tab1(tab1_dic, with_ctrl=True, write_opts=self.write_opts)
self.lines += newlines
def process_tab2_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.loop_vars["__ofs"] = self.ofs
write_info(self.logger, "Reading a TAB2 record", self.ofs)
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
tab2_dic, self.ofs = read_tab2(
self.lines,
self.ofs,
read_opts=self.read_opts,
)
tab2_dic.update(self.logbuffer.get_last_entry(key_prefix="__"))
map_tab2_dic(
tree,
tab2_dic,
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
else:
self.logbuffer.save_reduced_record_log(tree)
tab2_dic = map_tab2_dic(
tree,
{},
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
tab2_dic.update(get_ctrl(self.datadic))
newlines = write_tab2(tab2_dic, with_ctrl=True, write_opts=self.write_opts)
self.lines += newlines
def process_list_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.loop_vars["__ofs"] = self.ofs
write_info(self.logger, "Reading a LIST record", self.ofs)
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
list_dic, self.ofs = read_list(
self.lines,
self.ofs,
read_opts=self.read_opts,
)
list_dic.update(self.logbuffer.get_last_entry(key_prefix="__"))
map_list_dic(
tree,
list_dic,
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
else:
self.logbuffer.save_reduced_record_log(tree)
list_dic = map_list_dic(
tree,
{},
self.datadic,
self.loop_vars,
self.rwmode,
parse_opts=self.parse_opts,
logger=self.logger,
)
list_dic.update(get_ctrl(self.datadic))
newlines = write_list(list_dic, with_ctrl=True, write_opts=self.write_opts)
self.lines += newlines
def process_send_line(self, tree):
if self.rwmode == "read":
self.ofs = skip_blank_lines(self.lines, self.ofs)
self.logbuffer.save_record_log(self.ofs, self.lines[self.ofs], tree)
read_send(
self.lines,
self.ofs,
read_opts=self.read_opts,
)
else:
self.logbuffer.save_reduced_record_log(tree)
newlines = write_send(
self.datadic,
with_ctrl=True,
zero_as_blank=self.zero_as_blank,
write_opts=self.write_opts,
)
self.lines += newlines
def process_section(self, tree):
self.loop_vars["__ofs"] = self.ofs
section_head = get_child(tree, "section_head")
if self.rwmode == "write":
self.logbuffer.save_reduced_record_log(section_head)
section_tail = get_child(tree, "section_tail")
varname = get_varname(section_head)
varname2 = get_varname(section_tail)
if varname != varname2:
raise InconsistentSectionBracketsError(
"The section name in the tail does not correspond to "
+ f"the one in the head (`{varname}` vs `{varname2}`)"
)
create_missing = self.rwmode == "read"
previous_path = self.current_path
self.datadic, self.current_path = open_section(
section_head,
self.datadic,
self.loop_vars,
self.parse_opts,
create_missing,
path=self.current_path,
logger=self.logger,
)
section_body = get_child(tree, "section_body")
initialize_working_vars(self.datadic)
self.run_instruction(section_body)
remove_working_vars(self.datadic)
self.datadic = close_section(section_head, self.datadic, logger=self.logger)
self.current_path = previous_path
def process_for_loop(self, tree):
if self.rwmode == "write":
for_head = get_child(tree, "for_head")
self.logbuffer.save_reduced_record_log(for_head)
return cycle_for_loop(
tree,
self.run_instruction,
self.datadic,
self.loop_vars,
self.parse_opts,
logger=self.logger,
)
def process_if_clause(self, tree):
evaluate_if_clause(
tree,
self.datadic,
self.loop_vars,
self.parse_opts,
self.run_instruction,
set_parser_state=self.set_parser_state,
get_parser_state=self.get_parser_state,
logger=self.logger,
)
def process_abbreviation(self, tree):
introduce_abbreviation(tree, self.datadic)
def run_instruction(self, tree):
if tree.data in self.endf_actions:
if should_proceed(self.datadic, self.loop_vars, action_type="endf_action"):
self.endf_actions[tree.data](tree)
elif tree.data in self.meta_actions:
if should_proceed(self.datadic, self.loop_vars, action_type="meta_action"):
self.meta_actions[tree.data](tree)
else:
for child in tree.children:
if is_tree(child):
if should_proceed(
self.datadic, self.loop_vars, action_type="unspecified"
):
self.run_instruction(child)
else:
break
def reset_parser_state(self, rwmode="read", lines=None, datadic=None):
self.loop_vars = {}
datadic = datadic if datadic is not None else {}
lines = lines if lines is not None else []
self.loop_vars = {"__ofs": 0}
self.datadic = datadic
self.lines = lines
self.rwmode = rwmode
self.ofs = 0
self.logbuffer = RingBuffer(capacity=20)
self.current_path = None
def get_parser_state(self):
return {
"loop_vars": self.loop_vars,
"datadic": self.datadic,
"lines": self.lines,
"rwmode": self.rwmode,
"ofs": self.ofs,
"logbuffer_state": self.logbuffer.dump_state(),
"parse_opts": self.parse_opts,
"current_path": self.current_path,
}
def set_parser_state(self, parser_state):
self.loop_vars = parser_state["loop_vars"]
self.datadic = parser_state["datadic"]
self.lines = parser_state["lines"]
self.rwmode = parser_state["rwmode"]
self.ofs = parser_state["ofs"]
self.logbuffer.load_state(parser_state["logbuffer_state"])
self.parse_opts = parser_state["parse_opts"]
self.current_path = parser_state["current_path"]
def should_skip_section(self, mf, mt, exclude=None, include=None):
if include is not None:
if isinstance(include, int):
include = (include,)
include = tuple(tuple(p) if hasattr(p, "__iter__") else p for p in include)
if exclude is not None:
if isinstance(exclude, int):
exclude = (exclude,)
exclude = tuple(tuple(p) if hasattr(p, "__iter__") else p for p in exclude)
if exclude is None:
if include is not None:
if mf not in include and (mf, mt) not in include:
return True
# exclude not None
else:
if mf in exclude:
return True
elif (mf, mt) in exclude:
return True
return False
[docs]
def parse(self, lines, exclude=None, include=None, nofail=False):
"""Parse ENDF-6 formatted data.
Parameters
----------
lines : Union[str, list[str]]
The lines of text containing the ENDF-6 formatted data.
This argument can be either a list of strings with each
string storing a single line, or a string containing
all ENDF-6 formatted data including linebreaks.
exclude : Union[None, tuple[Union[int, tuple[int, int]]]]
See explanation of parameter ``exclude`` in
:func:`parsefile` for details.
include : Union[None, tuple[Union[int, tuple[int, int]]]]
See explanation of parameter ``include`` in
:func:`parsefile` for details.
nofail : bool
See explanation of parameter ``nofail`` in
:func:`parsefile` for details.
"""
if isinstance(lines, str):
lines = lines.split("\n")
array_type = self.parse_opts["array_type"]
self.parse_opts["internal_array_type"] = (
"list" if array_type == "list_slow" else "dict"
)
tree_dic = self.tree_dic
self.variable_descriptions = EndfDict()
mfmt_dic = split_sections(lines, read_opts=self.read_opts)
for mf in mfmt_dic:
write_info(self.logger, f"Parsing section MF{mf}")
for mt in mfmt_dic[mf]:
curmat = read_ctrl(mfmt_dic[mf][mt][0], read_opts=self.read_opts)
write_info(self.logger, f"Parsing subsection MF/MT {mf}/{mt}")
curlines = mfmt_dic[mf][mt]
cur_tree = get_responsible_recipe_parsetree(tree_dic, mf, mt)
cur_parsefun = get_responsible_recipe_parsefun(
self.parsing_funs, mf, mt
)
should_skip = self.should_skip_section(mf, mt, exclude, include)
if cur_parsefun is not None and not should_skip:
try:
curlines += write_send(
curmat, with_ctrl=True, write_opts=self.write_opts
)
curlines = "".join(curlines)
mfmt_dic[mf][mt] = cur_parsefun(curlines)
except Exception as exc:
raise type(exc)(
f"parsing function for MF={mf}/MT={mt} failed "
+ "with error message:\n"
+ str(exc)
)
elif cur_tree is not None and not should_skip:
# we add the SEND line so that parsing fails
# if the MT section cannot be completely parsed
curlines += write_send(
curmat, with_ctrl=True, write_opts=self.write_opts
)
self.reset_parser_state(rwmode="read", lines=curlines)
self.current_path = EndfPath((mf, mt))
try:
initialize_working_vars(self.datadic)
self.run_instruction(cur_tree)
remove_working_vars(self.datadic)
mfmt_dic[mf][mt] = self.datadic
if self.parse_opts["array_type"] == "list":
array_dict_to_list(mfmt_dic[mf][mt])
except ParserException as exc:
if not nofail:
logstr = self.logbuffer.display_record_logs()
del self.parse_opts["internal_array_type"]
raise type(exc)(
"\nHere is the parser record log until failure:\n\n"
+ logstr
+ "Error message: "
+ str(exc)
)
del self.parse_opts["internal_array_type"]
return mfmt_dic
[docs]
def write(self, endf_dic, exclude=None, include=None, zero_as_blank=False):
"""Convert data into the ENDF-6 format.
All parameters are explained in the description of
:func:`writefile`.
Returns
-------
list[str]
List of lines with the ENDF-6 formatted data.
"""
if isinstance(endf_dic, EndfDict):
endf_dic = endf_dic.unwrap()
self.zero_as_blank = zero_as_blank
array_type = self.parse_opts["array_type"]
self.parse_opts["internal_array_type"] = (
"list" if array_type in ("list", "list_slow") else "dict"
)
self.reset_parser_state(rwmode="write", datadic={})
self.variable_descriptions = EndfDict()
should_check_arrays = self.write_opts["check_arrays"]
tree_dic = self.tree_dic
lines = []
for mf in sorted(endf_dic):
some_mf_output = False
for mt in sorted(endf_dic[mf]):
should_skip = self.should_skip_section(mf, mt, exclude, include)
if should_skip:
continue
cur_tree = get_responsible_recipe_parsetree(tree_dic, mf, mt)
is_parsed = isinstance(endf_dic[mf][mt], Mapping)
if cur_tree is not None and is_parsed:
datadic = endf_dic[mf][mt]
if should_check_arrays:
datadic = TrackingDict(datadic)
self.reset_parser_state(rwmode="write", datadic=datadic)
self.current_path = EndfPath((mf, mt))
datadic.setdefault("MF", mf)
if datadic["MF"] != mf:
raise UnexpectedControlRecordError(
f"expected MF={mf} but found MF={datadic['MF']}"
)
datadic.setdefault("MT", mt)
if datadic["MT"] != mt:
raise UnexpectedControlRecordError(
f"expected MT={mt} but found MT={datadic['MT']}"
)
try:
initialize_working_vars(self.datadic)
self.run_instruction(cur_tree)
remove_working_vars(self.datadic)
except Exception as exc:
logstr = self.logbuffer.display_reduced_record_logs()
errmsg = (
"\nHere is the parser record log until failure:\n"
+ "--------------------------------------------\n"
+ logstr
+ "\n"
+ "Error message: "
+ str(exc)
)
if isinstance(
exc, (VariableNotFoundError, MissingSectionError)
):
if self.explain_missing_variable:
if isinstance(exc, VariableNotFoundError):
varpath = self.current_path + exc.varname
eltype = "variable"
elif isinstance(exc, MissingSectionError):
varpath = self.current_path + exc.section_name
eltype = exc.section_type
explanation = self.explain(varpath, stdout=False)
if explanation is None:
explanation = "No explanation available"
explain_header = (
f"Explanation of missing {eltype} `{varpath}`"
)
errmsg += "\n\n" + explain_header + "\n"
errmsg += "-" * len(explain_header) + "\n"
errmsg += explanation
del self.parse_opts["internal_array_type"]
raise type(exc)(errmsg)
# check if arrays have been written in their entirety
if should_check_arrays:
self.datadic.verify_complete_retrieval()
# add the NS number to the lines except last one
# because the SEND (=section end) record already
# contains it. For mf=0 (tape head), no SEND present
curlines = self.lines[:-1] if mf != 0 else self.lines
curlines = add_linenumbers_to_section(
curlines, write_opts=self.write_opts
)
# prepare the SEND (=section end) line
if mf != 0:
curline_send = self.lines[-1]
curlines.append(curline_send)
lines.extend(curlines)
# NOTE: the SEND record is part of the recipe
# and therefore will be added by the parser in
# process_send_line method. Hence there is no
# need to add it here, in contrast to the
# branch of the if-statement below to deal
# with non-parsable MF/MF sections.
else:
# nothing is parsed here, but in the spirit of
# defensive coding, we reset the parser nevertheless
self.reset_parser_state(rwmode="write")
# if no recipe is available to parse a
# MF/MT section, it will be preserved as a
# list of strings in the parse step
# and we output that unchanged
curlines = endf_dic[mf][mt].copy()
curlines = add_linenumbers_to_section(
curlines, write_opts=self.write_opts
)
lines.extend(curlines)
# update the MAT, MF, MT number
self.datadic = read_ctrl(lines[-1], read_opts=self.read_opts)
# add the SEND record in between the MT subections
# if it was not a tape head record (mf=0)
if mf != 0:
lines.extend(
write_send(
self.datadic,
with_ctrl=True,
zero_as_blank=zero_as_blank,
write_opts=self.write_opts,
)
)
some_mf_output = True
# we output the file end (fend) record only if something has been written
# to this mf section and it is not the tape head (mf=0)
if some_mf_output and mf != 0:
lines.extend(
write_fend(
self.datadic,
with_ctrl=True,
zero_as_blank=zero_as_blank,
write_opts=self.write_opts,
)
)
lines.extend(
write_mend(
with_ctrl=True,
zero_as_blank=zero_as_blank,
write_opts=self.write_opts,
)
)
lines.extend(
write_tend(
with_ctrl=True,
zero_as_blank=zero_as_blank,
write_opts=self.write_opts,
)
)
del self.zero_as_blank
del self.parse_opts["internal_array_type"]
return lines
[docs]
def parsefile(self, filename, exclude=None, include=None, nofail=False):
"""Parse ENDF-6 formatted data stored in a file.
Parameters
----------
filename : str
Path to the ENDF-6 file
exclude : Union[None, tuple[Union[int, tuple[int, int]]]]
MF/MT sections to exclude in the parsing process.
Excluded sections will only be available as a list of strings.
The default `None` indicates that nothing should be excluded.
If a `tuple` is provided, integers in that tuple denote the
`MF` sections to be excluded. In addition to integers,
also tuples composed of two integers can be provided to
indicate the `MF`/`MT` combinations that should be excluded.
For instance, ``(3,)`` would exclude MF section 3 and
``(3, (2, 151))`` would exclude MF section 3 and additionally
MF=2/MT=151.
Useful to speed up the parsing process or to ensure
a verbatim copy of data in a read/write sequence.
include : Union[None, tuple[Union[int, tuple[int, int]]]]
The MF/MT section to include in the parsing.
All the other sections will be only present as a list of
strings. This argument is only active if ``exclude=None``.
The MF and MF/MT sections are specified exactly in the
same way as for the ``exclude`` argument.
nofail : bool
If this argument is ``True``, the parser will attempt to
parse *all* MF/MT sections desired by the user, irrespective
of failure in the parsing of any section. Sections where
parsing failed will only be available as list of strings.
On the other hand, ``nofail=false`` instructs the parser
to abort immediately upon the first parsing failure.
Returns
-------
dict
A nested dictionary. The keys of the first level are
MF numbers and of the second level MT numbers.
The structure of the :class:`dict` stored under a specific
`MF`/`MT` combination is determined by the
corresponding ENDF recipe.
"""
with open(filename, "r") as fin:
lines = fin.readlines()
return self.parse(lines, exclude, include, nofail=nofail)
[docs]
def writefile(
self,
filename,
endf_dic,
exclude=None,
include=None,
zero_as_blank=False,
overwrite=False,
):
"""Write data to an ENDF-6 file.
Parameters
----------
filename : str
Path of the file to be created.
endf_dic : dict
Nested dictionary with nuclear data. Keys of first level
are MF numbers and the keys of second level MT numbers.
The structure of the :class:`dict` stored under an MF/MT combination
depends on the corresponding ENDF recipe.
exclude : Union[None, tuple[Union[int, tuple[int, int]]]]
A section will only be written to the file if
not excluded. For an explanation of how to specify
MF/MT sections to be excluded, see the ``exclude`` argument
of :func:`parsefile`.
include : Union[None, tuple[Union[int, tuple[int, int]]]]
This argument is only considered if ``exclude=None``.
If this argument is ``None``, all sections will be written
to the file. Otherwise, only the indicated sections will
be written to the file. For an explanation of how to
define which sections to include, see the ``include`` argument
of :func:`parsefile`.
overwrite : bool
Existing files will only be overwritten if this argument
is ``True``, otherwise this function will abort.
"""
if file_exists(filename) and not overwrite:
raise FileExistsError(
f"file {filename} already exists. "
"Change overwrite option to True if you "
"really want to overwrite this file."
)
else:
lines = self.write(endf_dic, exclude, include, zero_as_blank)
with open(filename, "w") as fout:
fout.write("\n".join(lines))
# DEPRECATED NAME
class BasicEndfParser(EndfParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logging.warning(
"DEPRECATION NOTICE: The name of the class is now `EndfParser`. "
+ "The alias `BasicEndfParser` will be abandoned soon."
)