Source code for endf_parserpy.tape.index

############################################################
#
# Author(s):       Georg Schnabel
# Email:           g.schnabel@iaea.org
# Creation date:   2026/05/15
# Last modified:   2026/05/18
# License:         MIT
# Copyright (c) 2026 International Atomic Energy Agency (IAEA)
#
############################################################

"""Structural index of a multi-material ENDF tape.

The index records, for every material on a tape, its MAT number, the
ZA/AWR identifiers read from the first HEAD record, the byte range it
occupies in the file and the byte range of every (MF, MT) section it
contains.

It is built by a single linear scan that inspects only the *structural*
parts of the ENDF-6 format: the MAT/MF/MT control fields (columns
67-75) and the universal HEAD-record layout (``C1=ZA``, ``C2=AWR``). No
ENDF recipe is consulted and no section body is interpreted, so the
index is cheap to build and completely independent of the parsing
engine.

The scan is memory-bounded: :meth:`TapeIndex.from_file` reads the file
in chunks and never holds it wholly in memory. When ``numpy`` is
installed and the tape is a uniform array of fixed-width records (the
common case), a vectorized fast path extracts every record's control
field in one bulk operation per chunk; it produces an index identical
to the streaming line-by-line scan it falls back to for any tape that
is not uniform-width.
"""

import os
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple

from .errors import TapeStructureError
from .records import _control_int, _MAT_COLS, _MF_COLS, _MT_COLS, _CTRL_COLS

try:
    import numpy as _np
except ImportError:  # pragma: no cover - numpy is an optional accelerator
    _np = None

# default read granularity for from_file(); bounds peak memory during
# indexing to a small multiple of this regardless of the tape size
_DEFAULT_CHUNK_BYTES = 16 << 20


def _endf_float(field_text):
    """Parse an 11-column ENDF number field; return ``None`` on failure.

    ENDF numbers may use an implicit exponent, e.g. ``9.223800+4`` for
    ``9.223800e+4``. ``None`` is returned rather than raising, so that
    an unparsable identifier field never aborts an index build.

    This is a deliberately minimal number parser, kept independent of
    the parsing engine so the index stays recipe- and engine-free; it
    only needs to recover the ZA/AWR identifiers from a HEAD record.
    """
    text = field_text.strip()
    if text == "":
        return None
    try:
        return float(text)
    except ValueError:
        pass
    # implicit-exponent form: a sign that is neither the leading sign
    # nor preceded by 'e'/'E' marks the start of the exponent
    for i in range(len(text) - 1, 0, -1):
        if text[i] in "+-" and text[i - 1] not in "eE":
            try:
                return float(text[:i] + "e" + text[i:])
            except ValueError:
                return None
    return None


def _read_za_awr(line):
    """Read ``(ZA, AWR)`` from the C1/C2 fields of a HEAD record."""
    za = _endf_float(line[0:11])
    awr = _endf_float(line[11:22])
    za_int = int(round(za)) if za is not None else None
    return za_int, awr


@dataclass
class SectionIndexEntry:
    """Location of one (MF, MT) section within the tape file.

    Attributes
    ----------
    offset : int
        Byte offset of the section's first record.
    length : int
        Total byte length of the section, including its SEND record.
    line_count : int
        Number of records in the section, including its SEND record.
    """

    offset: int
    length: int
    line_count: int


@dataclass
class MaterialIndexEntry:
    """Location and identity of one material on the tape.

    Instances are intended to be treated as read-only.

    Attributes
    ----------
    position : int
        Zero-based position of the material on the tape; this is the
        material's canonical identity (see design decision D1).
    mat : int
        ENDF MAT number. Not unique: PENDF tapes repeat the same
        MAT at different temperatures.
    za : int or None
        ZA identifier (``1000*Z + A``) from the first HEAD record, or
        ``None`` if it could not be read.
    awr : float or None
        Atomic weight ratio from the first HEAD record, or ``None``.
    byte_offset : int
        Byte offset of the material's first record.
    byte_length : int
        Total byte length of the material, including its MEND record.
    sections : dict[tuple[int, int], SectionIndexEntry]
        The (MF, MT) sections contained in the material.
    """

    position: int
    mat: int
    za: Optional[int]
    awr: Optional[float]
    byte_offset: int
    byte_length: int
    sections: Dict[Tuple[int, int], SectionIndexEntry] = field(default_factory=dict)


def _iter_file_records(fh):
    """Yield ``(byte_length, line)`` for each record of a binary file.

    Records are delimited by ``"\\n"`` only, matching binary-mode file
    iteration. ``byte_length`` includes the terminator; ``line`` has a
    trailing ``"\\n"`` removed (a preceding ``"\\r"`` is kept, since the
    scanner reads only the control columns and strips it itself where
    text is needed). Iterating the handle keeps memory O(1).
    """
    for raw in fh:
        if raw.endswith(b"\n"):
            yield len(raw), raw[:-1]
        else:
            yield len(raw), raw


def _iter_line_records(lines):
    """Yield ``(byte_length, line)`` for an iterable of text lines.

    Byte lengths assume each line is terminated by a single ``"\\n"``,
    so the offsets they produce are only exact for a file written that
    way.
    """
    for line in lines:
        encoded = line.rstrip("\r\n").encode("latin-1", errors="replace")
        yield len(encoded) + 1, encoded


class _ScanState:
    """Carried state of a structural tape scan.

    Holds the progress of the MAT/MF/MT state machine -- the material
    under construction, the section under construction and the list of
    finished materials. The same state is driven record by record by
    the streaming :func:`_scan` and run by run, across block
    boundaries, by the vectorized :func:`_scan_chunk_runs`; both feed it
    through the shared :func:`_consume_run`.
    """

    __slots__ = (
        "materials",
        "cur",
        "sec_key",
        "sec_offset",
        "sec_length",
        "sec_lines",
        "done",
        "uniform",
    )

    def __init__(self):
        self.materials = []
        self.cur = None  # the MaterialIndexEntry under construction, or None
        self.sec_key = None  # (MF, MT) of the section under construction
        self.sec_offset = 0
        self.sec_length = 0
        self.sec_lines = 0
        self.done = False  # a TEND record has been seen
        self.uniform = True  # cleared if the tape proves not uniform-width

    def flush_section(self):
        # store a section that was left open (e.g. a missing SEND record)
        if self.sec_key is not None and self.cur is not None:
            self.cur.sections[self.sec_key] = SectionIndexEntry(
                self.sec_offset, self.sec_length, self.sec_lines
            )
        self.sec_key = None


def _consume_run(st, mat, mf, mt, offset, length, line_count, head_line):
    """Apply one record, or one run of identical-control records, to ``st``.

    This is the single classification of a record by its MAT/MF/MT
    control field, shared by the streaming :func:`_scan` and the
    vectorized :func:`_scan_chunk_runs` so the structural state machine
    lives in exactly one place.

    ``offset``, ``length`` and ``line_count`` describe the record or run
    being consumed; ``head_line`` is the decoded text of its first
    record, read for the ZA/AWR identifiers only when the run opens a
    new material. The state ``st`` is updated in place and ``st.done``
    is set once the tape end (TEND) record is reached.
    """
    if mat == -1:  # TEND: end of tape
        st.done = True
        return
    if mat > 0 and mf > 0 and mt > 0:  # regular section record(s)
        if st.cur is None:
            za, awr = _read_za_awr(head_line)
            # built up field by field as the scan proceeds; byte_length
            # stays a placeholder until the closing MEND record is seen
            st.cur = MaterialIndexEntry(
                position=len(st.materials),
                mat=mat,
                za=za,
                awr=awr,
                byte_offset=offset,
                byte_length=0,
            )
        if (mf, mt) != st.sec_key:
            st.flush_section()
            st.sec_key = (mf, mt)
            st.sec_offset = offset
            st.sec_length = 0
            st.sec_lines = 0
        st.sec_length += length
        st.sec_lines += line_count
    elif mf != 0 and mt == 0:  # SEND: end of section
        if st.sec_key is not None and st.cur is not None:
            st.cur.sections[st.sec_key] = SectionIndexEntry(
                st.sec_offset, st.sec_length + length, st.sec_lines + line_count
            )
        st.sec_key = None
    elif mat == 0 and mf == 0 and mt == 0:  # MEND: end of material
        st.flush_section()
        if st.cur is not None:
            st.cur.byte_length = offset + length - st.cur.byte_offset
            st.materials.append(st.cur)
            st.cur = None
    else:  # FEND (mat > 0, mf == 0, mt == 0), or any other record
        # a section cannot continue across such a record
        st.flush_section()


def _scan(records):
    """Scan an iterator of ``(byte_length, line)`` records.

    Return ``(materials, (tpid_line, offset, length))``. The iterator is
    consumed lazily, so a streaming source (see :func:`_iter_file_records`)
    keeps peak memory independent of the tape size.

    Only the structural fields are read. The MAT/MF/MT control fields
    occupy the fixed byte slice ``[66:75]`` of every record; since all
    records of a section share that slice, a record whose slice is
    unchanged from its predecessor is known to continue the current
    section and is accumulated without parsing any integer. This fast
    path covers the great majority of records; a record that starts a
    new run is classified by the shared :func:`_consume_run`.
    """
    st = _ScanState()
    prev_ctrl = None  # the [66:75] control slice of the previous record
    offset = 0
    tpid = None

    records = iter(records)

    # the tape head (TPID) is the first non-blank record
    for byte_length, part in records:
        if part.strip():
            mat = _control_int(part[_MAT_COLS])
            mf = _control_int(part[_MF_COLS])
            mt = _control_int(part[_MT_COLS])
            if mat < 0 or mf != 0 or mt != 0:
                raise TapeStructureError(
                    "the tape does not begin with a tape head (TPID) "
                    f"record (found MAT={mat}, MF={mf}, MT={mt})"
                )
            tpid = (part.rstrip(b"\r").decode("latin-1"), offset, byte_length)
            offset += byte_length
            break
        offset += byte_length
    if tpid is None:
        raise TapeStructureError("the tape does not contain any records")

    for byte_length, part in records:
        ctrl = part[_CTRL_COLS]

        # fast path: the control fields are unchanged, so this record
        # continues the section currently under construction
        if ctrl == prev_ctrl:
            st.sec_length += byte_length
            st.sec_lines += 1
            offset += byte_length
            continue

        if not part.strip():  # blank padding record
            offset += byte_length
            continue

        mat = _control_int(ctrl[0:4])
        mf = _control_int(ctrl[4:6])
        mt = _control_int(ctrl[6:9])
        _consume_run(st, mat, mf, mt, offset, byte_length, 1, part.decode("latin-1"))
        if st.done:  # TEND: nothing meaningful follows
            break
        # only a run of regular section records can be extended by the
        # fast path; after any framing record the next record restarts
        prev_ctrl = ctrl if (mat > 0 and mf > 0 and mt > 0) else None
        offset += byte_length

    if st.cur is not None:
        raise TapeStructureError(
            "the tape ends in the middle of a material; the final MEND or "
            "TEND record is missing"
        )
    return st.materials, tpid


def _scan_chunk_runs(buf, num_records, start_row, base, line_width, st):
    """Run the structural state machine over one block of whole records.

    ``buf`` holds at least ``num_records`` records of ``line_width``
    bytes; rows ``start_row`` onward are processed (``start_row`` skips
    the TPID in the first block). ``base`` is the byte offset of
    ``buf[0]`` within the file. The carried state ``st`` is updated in
    place; ``st.uniform`` is cleared and the scan abandoned if a row is
    not newline-terminated or a data record carries a blank control
    field.

    The MAT/MF/MT control field of every record is the byte slice
    ``[66:75]``; a section or material boundary is exactly a row whose
    control field differs from its predecessor, so the per-record scan
    collapses into a loop over *runs* of identical control fields. Each
    run is then classified by the shared :func:`_consume_run`.
    """
    if num_records <= start_row:
        return
    flat = _np.frombuffer(buf, dtype=_np.uint8)
    arr = flat[: num_records * line_width].reshape(num_records, line_width)
    # certain fixed-width check: the last byte of every record is "\n"
    if not bool(_np.all(arr[:, line_width - 1] == 0x0A)):
        st.uniform = False
        return
    ctrl = arr[:, _CTRL_COLS]  # the MAT/MF/MT control field of every record

    # segment the rows into runs of identical control fields
    window = ctrl[start_row:]
    if len(window) == 1:
        starts = [start_row]
    else:
        boundary = _np.any(window[1:] != window[:-1], axis=1)
        starts = [start_row] + (_np.flatnonzero(boundary) + 1 + start_row).tolist()
    ends = starts[1:] + [num_records]

    for start, end in zip(starts, ends):
        ctrl_field = ctrl[start].tobytes()
        run_offset = base + start * line_width
        run_length = (end - start) * line_width
        if not ctrl_field.strip():  # blank-control run
            if buf[start * line_width : end * line_width].strip():
                st.uniform = False  # a record with data but a blank control
                return
            continue
        mat = _control_int(ctrl_field[0:4])
        mf = _control_int(ctrl_field[4:6])
        mt = _control_int(ctrl_field[6:9])
        head_line = buf[start * line_width : start * line_width + line_width].decode(
            "latin-1"
        )
        _consume_run(st, mat, mf, mt, run_offset, run_length, end - start, head_line)
        if st.done:  # TEND: end of tape
            return


def _vec_scan_file(fh, chunk_bytes):
    """Chunked vectorized structural scan of an open binary tape file.

    Read the tape in blocks of about ``chunk_bytes`` and apply the
    vectorized scan to each, so peak memory stays a small multiple of
    ``chunk_bytes`` regardless of the tape size. Return
    ``(materials, tpid)`` for a clean uniform fixed-width tape, or
    ``None`` for any tape that is not uniform-width or not cleanly
    structured; the caller then falls back to :func:`_scan`, which is
    the authority for both the index and any structural error.

    A tape whose only irregularity is an unterminated final TEND record
    and/or a few trailing blank lines is still accepted: the partial
    trailing record is trimmed, gated on that tail being benign (only
    whitespace, or a TEND record), so a genuinely truncated record is
    never silently dropped.
    """
    first = fh.read(chunk_bytes)
    if not first:
        return None
    first_nl = first.find(b"\n")
    if first_nl < 0:
        return None
    line_width = first_nl + 1
    # the control field ends at column 75, so a record must be wide
    # enough to contain it
    if line_width < 76:
        return None
    num0 = len(first) // line_width
    if num0 == 0:
        return None

    # locate the TPID -- the first record with a non-blank control field
    flat = _np.frombuffer(first, dtype=_np.uint8)
    arr0 = flat[: num0 * line_width].reshape(num0, line_width)
    if not bool(_np.all(arr0[:, line_width - 1] == 0x0A)):
        return None
    ctrl0 = arr0[:, _CTRL_COLS]
    nonblank = _np.any(ctrl0 != 0x20, axis=1)
    if not bool(_np.any(nonblank)):
        return None
    tpid_row = int(_np.argmax(nonblank))
    if tpid_row and first[: tpid_row * line_width].strip():
        return None  # non-blank content before the TPID
    head = ctrl0[tpid_row].tobytes()
    if (
        _control_int(head[0:4]) < 0
        or _control_int(head[4:6]) != 0
        or _control_int(head[6:9]) != 0
    ):
        return None  # not a TPID record (a TEND has MAT=-1)
    tpid_offset = tpid_row * line_width
    tpid = (
        first[tpid_offset : tpid_offset + line_width].rstrip(b"\r\n").decode("latin-1"),
        tpid_offset,
        line_width,
    )

    st = _ScanState()
    # scan the records of the first block that follow the TPID
    _scan_chunk_runs(first, num0, tpid_row + 1, 0, line_width, st)
    if not st.uniform:
        return None

    if not st.done:
        # continue with record-aligned blocks; rewind to the last whole
        # record of the first block so its partial tail is re-read
        read_size = max(1, chunk_bytes // line_width) * line_width
        fh.seek(num0 * line_width)
        base = num0 * line_width
        while not st.done:
            block = fh.read(read_size)
            if not block:
                break
            final = len(block) < read_size
            remainder = len(block) % line_width
            if remainder:
                # a partial trailing record: accept it only if benign
                tail = block[len(block) - remainder :]
                if tail.strip() and _control_int(tail[_MAT_COLS]) != -1:
                    return None
                block = block[: len(block) - remainder]
            if block:
                num_records = len(block) // line_width
                _scan_chunk_runs(block, num_records, 0, base, line_width, st)
                if not st.uniform:
                    return None
                base += len(block)
            if final:
                break

    if st.cur is not None:
        return None  # truncated tape -- let _scan raise the structural error
    return st.materials, tpid


[docs] class TapeIndex: """A structural index over the materials of an ENDF tape. Build one with :meth:`from_file` (exact on-disk byte offsets) or :meth:`from_lines`. The index supports ``len()``, iteration and integer position indexing, and provides :meth:`by_mat` and :meth:`by_za` secondary lookups. It is recipe-free and picklable. Attributes ---------- materials : list[MaterialIndexEntry] The materials, in tape order. tpid_line : str The tape head (TPID) record. tpid_offset, tpid_length : int Byte location of the TPID record. source : str or None Path of the indexed file, if built with :meth:`from_file`. source_size, source_mtime_ns : int or None Size and modification time of the source file at index time; usable to detect that the file changed after indexing. """ def __init__( self, materials, tpid_line, tpid_offset, tpid_length, source=None, source_size=None, source_mtime_ns=None, ): self.materials = list(materials) self.tpid_line = tpid_line self.tpid_offset = tpid_offset self.tpid_length = tpid_length self.source = source self.source_size = source_size self.source_mtime_ns = source_mtime_ns self._by_mat = {} self._by_za = {} for entry in self.materials: self._by_mat.setdefault(entry.mat, []).append(entry.position) if entry.za is not None: self._by_za.setdefault(entry.za, []).append(entry.position)
[docs] @classmethod def from_file(cls, path, *, chunk_bytes=_DEFAULT_CHUNK_BYTES): """Build an index of the ENDF tape stored at ``path``. The tape is read in blocks of about ``chunk_bytes``, so peak memory during indexing stays a small multiple of ``chunk_bytes`` regardless of the tape size. """ path = os.fspath(path) with open(path, "rb") as fh: result = None if _np is not None: result = _vec_scan_file(fh, chunk_bytes) if result is None: fh.seek(0) # the fast path consumed part of the file if result is None: result = _scan(_iter_file_records(fh)) materials, tpid = result stat = os.stat(path) return cls( materials, tpid[0], tpid[1], tpid[2], source=path, source_size=stat.st_size, source_mtime_ns=stat.st_mtime_ns, )
[docs] @classmethod def from_lines(cls, lines, source=None): """Build an index from an iterable of ENDF tape lines. The lines are consumed lazily. Byte offsets are computed assuming a single ``"\\n"`` terminates each line; they are therefore only exact for a file written that way. Use :meth:`from_file` when exact on-disk offsets are required. """ materials, tpid = _scan(_iter_line_records(lines)) return cls(materials, tpid[0], tpid[1], tpid[2], source=source)
[docs] def by_mat(self, mat): """Return the positions of all materials with this MAT number.""" return list(self._by_mat.get(mat, ()))
[docs] def by_za(self, za): """Return the positions of all materials with this ZA identifier.""" return list(self._by_za.get(za, ()))
def __len__(self): return len(self.materials) def __iter__(self): return iter(self.materials) def __getitem__(self, position): return self.materials[position] def __repr__(self): source = f", source={self.source!r}" if self.source else "" return f"TapeIndex({len(self.materials)} materials{source})"