# -*- coding: utf-8 -*-
#############################################################################
# zlib License
#
# (C) 2025 Cristóvão Beirão da Cruz e Silva <cbeiraod@cern.ch>
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
#############################################################################
import mmap
import re
import struct
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import field
from datetime import datetime
from pathlib import Path
from struct import Struct
from typing import Any
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple
import awkward as ak
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import uproot
from natsort import natsorted
from pyarrow.ipc import new_file
from termcolor import colored
# SchemaInfo
SAMPIC_Schema_Info = {
# Format:
# Name: (pandas, pyarrow, numpy for root, optional array size)
"HITNumber": ("int32", pa.int32(), np.int32),
"UnixTime": ("float64", pa.float64(), np.double),
"Channel": ("int32", pa.int32(), np.int32),
"Cell": ("int32", pa.int32(), np.int32),
"TimeStampA": ("int32", pa.int32(), np.int32),
"TimeStampB": ("int32", pa.int32(), np.int32),
"FPGATimeStamp": ("uint64", pa.uint64(), np.double),
"StartOfADCRamp": ("int32", pa.int32(), np.int32),
"RawTOTValue": ("int32", pa.int32(), np.int32),
"TOTValue": ("int32", pa.int32(), np.int32),
"PhysicalCell0Time": ("float64", pa.float64(), np.double),
"OrderedCell0Time": ("float64", pa.float64(), np.double),
"Time": ("float64", pa.float64(), np.double),
"Baseline": ("float32", pa.float32(), np.float32),
"RawPeak": ("float32", pa.float32(), np.float32),
"Amplitude": ("float32", pa.float32(), np.float32),
"ADCCounterLatched": ("int32", pa.int32(), np.int32),
"DataSize": ("int32", pa.int32(), np.int32),
"TriggerPosition": (None, pa.list_(pa.int32()), np.int32, 64),
"DataSample": (None, pa.list_(pa.float32()), np.float32, 64),
# … etc …
}
[docs]
def build_schema(
metadata: Optional[Dict[bytes, bytes]] = None,
schemaInfo: Dict[str, Tuple] = SAMPIC_Schema_Info,
) -> pa.Schema:
"""
Construct a PyArrow Schema from predefined field types and optional metadata.
This utility builds a fully-specified Arrow Schema by iterating over
a `schemaInfo` mapping of field names to type definitions. It
includes only those fields whose PyArrow type is non-null, and
attaches any provided metadata as key/value byte pairs.
Parameters
----------
metadata : dict of bytes->bytes, optional
Key/value metadata to attach to the schema (e.g. from file header).
If None, no metadata will be set on the schema.
schemaInfo : dict of str->tuple
Mapping from field name to a tuple defining:
- pandas dtype string (unused here)
- PyArrow DataType (used)
- NumPy dtype for ROOT output
- optional list/array size for array fields
Only entries where the PyArrow DataType is not None are used.
Returns
-------
pa.Schema
A PyArrow Schema containing all enabled fields and attached metadata.
Raises
------
KeyError
If any key in `schemaInfo` is missing its PyArrow type entry.
"""
# Build list of fields with valid PyArrow types
try:
fields = [pa.field(name, schemaInfo[name][1]) for name in schemaInfo if schemaInfo[name][1] is not None]
except KeyError as e:
raise KeyError(f"Field '{e.args[0]}' missing PyArrow type in schemaInfo")
schema = pa.schema(fields)
# Attach metadata if provided
if metadata is not None:
schema = schema.with_metadata(metadata)
return schema
[docs]
def convert_df_with_schema(df: pd.DataFrame, schemaInfo: Dict[str, Tuple] = SAMPIC_Schema_Info) -> pd.DataFrame:
"""
Cast DataFrame columns to types defined in the schema information.
Parameters
----------
df : pandas.DataFrame
Input DataFrame whose columns should be converted in-place.
schemaInfo : dict of str -> tuple
Mapping of column names to schema tuples of the form:
(pandas_dtype, pyarrow_type, numpy_dtype, [optional array size]).
Only the pandas_dtype at index 0 is used for conversion when not None.
Returns
-------
pandas.DataFrame
The same DataFrame with its columns cast to the specified pandas dtypes.
Notes
-----
- Columns not present in schemaInfo or with None pandas dtype are left unchanged.
- Conversion is done in-place; the returned DataFrame is the same object.
"""
for col in df.columns:
dtype_info = schemaInfo.get(col)
if dtype_info is None:
continue
pandas_dtype = dtype_info[0]
if pandas_dtype:
df[col] = df[col].astype(pandas_dtype)
return df
[docs]
def get_root_data_with_schema(df: pd.DataFrame, schemaInfo: Dict[str, Tuple] = SAMPIC_Schema_Info) -> Dict[str, np.ndarray]:
"""
Prepare a dictionary of numpy arrays from a DataFrame for ROOT writing.
Parameters
----------
df : pandas.DataFrame
Input DataFrame containing fields defined in the schema.
schemaInfo : dict of str -> tuple
Mapping of column names to schema tuples of the form:
(pandas_dtype, pyarrow_type, numpy_dtype, [optional array size]).
The numpy_dtype at index 2 is used for array construction when not None.
Returns
-------
dict
Mapping of column names to numpy.ndarray with appropriate dtype.
Raises
------
ValueError
If conversion of any column fails due to incompatible data or dtype.
Notes
-----
- Only columns with a non-None numpy dtype in schemaInfo are included.
- Any ValueError during conversion prints the offending column names for debugging.
"""
ret_val: Dict[str, np.ndarray] = {}
for col, info in schemaInfo.items():
numpy_dtype = None
if len(info) > 2:
numpy_dtype = info[2]
if numpy_dtype is None:
continue
try:
# Convert column to numpy array of target dtype
# ret_val[col] = np.array(df[col].values, dtype=numpy_dtype)
if len(info) > 3:
ret_val[col] = np.array(df[col].tolist(), dtype=numpy_dtype)
else:
ret_val[col] = np.array(df[col], dtype=numpy_dtype)
except Exception as e:
# Debugging output
print(f"Conversion error for column '{col}': dtype {numpy_dtype}")
raise e
return ret_val
[docs]
def build_empty_root_data_with_schema(schemaInfo: Dict[str, Tuple] = SAMPIC_Schema_Info) -> Dict[str, np.ndarray]:
"""
Construct an empty data dictionary for ROOT branches based on schema.
Parameters
----------
schemaInfo : dict of str -> tuple
Mapping of column names to schema tuples of the form:
(pandas_dtype, pyarrow_type, numpy_dtype, [optional array size]).
The numpy_dtype at index 2 and optional array size at index 3 are used.
Returns
-------
dict
Mapping of column names to empty numpy.ndarray with correct shape and dtype.
Notes
-----
- Scalar fields produce 1D arrays of length zero.
- Fixed-size array fields (length provided in schemaInfo[3]) produce 2D arrays
with shape (0, size).
"""
ret_val: Dict[str, np.ndarray] = {}
for col, info in schemaInfo.items():
numpy_dtype = None
arr_size = None
if len(info) > 2:
numpy_dtype = info[2]
if len(info) > 3:
arr_size = info[3]
if numpy_dtype is None:
continue
if arr_size is not None:
# Fixed-length vector field
ret_val[col] = np.empty((0, arr_size), dtype=numpy_dtype)
else:
ret_val[col] = np.empty(0, dtype=numpy_dtype)
return ret_val
[docs]
class SAMPIC_Run_Decoder:
"""
Decode and process a complete SAMPIC run.
Provides a one-pass, memory-efficient workflow for:
1. Reading raw SAMPIC binary files from a run directory.
2. Extracting and decoding header metadata.
3. Streaming hit records in fixed-size chunks.
4. Writing decoded hits and metadata to Feather, Parquet, or ROOT formats.
The class preserves metadata both as raw bytes (for Arrow/Parquet) and
as native Python types (for ROOT), and supports arbitrarily large files
without loading everything into memory.
Attributes
----------
run_base_path : pathlib.Path
Path to the directory containing all binary files for one run.
run_header : SampicHeader
Parsed header metadata for the current file being processed.
run_files : list[pathlib.Path]
List of all SAMPIC binary files in `run_base_path`, in sort order.
"""
front_end_fpga_re = re.compile(r"^FRONT-END FPGA INDEX: (\d+) FIRMWARE VERSION (.+) BASELINE VALUE: ([\d\.]+)")
timestamp_re = re.compile(r"^UnixTime = (.+) date = (.+) time = (.+ms)")
def __init__(
self,
run_dir_path: Path,
):
"""
Initialize a SAMPIC run decoder.
Parameters
----------
run_dir_path : pathlib.Path
Directory containing the SAMPIC binary files for a single run.
Raises
------
FileNotFoundError
If `run_dir_path` does not exist or is not a directory.
"""
self.run_base_path = run_dir_path
self.run_files = natsorted(list(self.run_base_path.glob("*.bin*")))
[docs]
@contextmanager
def open_sampic_file_in_chunks_and_get_header(
self,
file_path: Path,
extra_header_bytes: int,
chunk_size: int = 64 * 1024,
debug: bool = False,
) -> Generator[Tuple[bytes, Generator[bytes, None, None]], None, None]:
"""
Memory-map a SAMPIC file, extract its header, and stream the remainder in chunks.
This context manager opens `file_path` in read-only mode, mmaps the entire
file, and locates the header boundary as the last '=' byte before the first
`0x00`. It returns the header (including `extra_header_bytes`) and a
generator yielding the file body in `chunk_size`-byte blocks. On exit,
both the file and the mmap are cleanly closed.
During this process, `self.current_filesize` is set to the size of the file.
Parameters
----------
file_path : pathlib.Path
Path to the binary SAMPIC file to read.
extra_header_bytes : int
Number of bytes to include *after* the header delimiter (`=`) in the
returned header.
chunk_size : int, optional
Size of each chunk (in bytes) produced by the body generator.
Default is 64 KiB.
debug : bool, optional
If True, print debugging information. Default is False.
Yields
------
header_bytes : bytes
The raw header bytes, from the file start up through the computed end.
body_gen : generator of bytes
Generator yielding successive `chunk_size`-byte slices of the file body.
Raises
------
ValueError
If the header delimiter cannot be located (i.e. no '=' before the
first 0x00), indicating a malformed file.
"""
f = file_path.open('rb')
try:
self.current_filesize = file_path.stat().st_size
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
mm_size = mm.size()
# 1) find first null
first_null = mm.find(b'\x00')
if first_null <= 0: # Explicitly do not accept situation where the first null is the first byte in the file
raise ValueError("No null byte (0x00) found in file")
# 2) within header region, find last '='
last_eq = mm.rfind(b'===\n', 0, first_null)
if last_eq < 0:
raise ValueError("No '=' found before first 0x00")
# 3) compute header slice
header_end = min(last_eq + 3 + extra_header_bytes, mm_size)
header = mm[:header_end]
if debug:
print(file_path.name)
print(header_end)
# 4) define body generator
def body_gen() -> Generator[bytes, None, None]:
offset = header_end
while offset < mm_size:
yield mm[offset : offset + chunk_size]
offset += chunk_size
yield header, body_gen()
finally:
delattr(self, "current_filesize")
mm.close()
f.close()
@staticmethod
def _parse_header_field( # noqa: max-complexity=20
field: str,
header: SampicHeader,
keep_unparsed: bool = True,
) -> None:
"""
Parse a single header field string and populate the corresponding attribute.
This helper inspects a raw field fragment (text between “===” delimiters),
extracts the key and value(s), converts them to the appropriate type, and
assigns them on the supplied `SampicHeader` instance. If the key is not
one of the recognized header attributes and `keep_unparsed` is True, the
raw field text is stored in `header.extra`.
Parameters
----------
field : str
Raw header fragment, e.g. "param1: value1 part2 = 42".
header : SampicHeader
The dataclass instance to be populated in-place.
keep_unparsed : bool, optional
If True (default), any unrecognized field is appended to
`header.extra` under its raw key; if False, unrecognized
fields are ignored.
Returns
-------
None
Raises
------
ValueError
If the field text cannot be split into a key and a value,
or if a known key's value fails conversion (e.g. non-numeric
text for an integer field).
"""
if "==" in field:
key = None
sub_fields = [f.strip() for f in field.split("==") if f.strip()]
for sub_field in sub_fields:
SAMPIC_Run_Decoder._parse_header_field(sub_field, header, keep_unparsed=keep_unparsed)
elif " " in field:
key = None
sub_fields = [f.strip() for f in field.split(" ") if f.strip()]
for sub_field in sub_fields:
SAMPIC_Run_Decoder._parse_header_field(sub_field, header, keep_unparsed=keep_unparsed)
elif "MEZZA_SAMPIC BOARD" == field[:18]:
key = "MEZZA_SAMPIC BOARD"
key_l = "sampic_mezzanine_board_version"
val = field[19:]
elif "NB OF CHANNELS IN SYSTEM" == field[:24]:
key = "NB OF CHANNELS IN SYSTEM"
key_l = "num_channels"
val = int(field[25:])
elif "CTRL FPGA FIRMWARE VERSION" == field[:26]:
key = "CTRL FPGA FIRMWARE VERSION"
key_l = "ctrl_fpga_firmware_version"
val = field[27:]
elif "SAMPLING FREQUENCY" == field[:18]:
key = "SAMPLING FREQUENCY"
key_l = "sampling_frequency"
val = field[19:]
elif "FRONT-END FPGA INDEX" == field[:20]:
key = None
match = SAMPIC_Run_Decoder.front_end_fpga_re.match(field)
index = int(match.group(1))
version = match.group(2)
baseline = float(match.group(3))
start_len = len(header.front_end_fpga_firmware_version)
if start_len < index + 1:
header.front_end_fpga_firmware_version = header.front_end_fpga_firmware_version + [None] * (index + 1 - start_len)
header.front_end_fpga_baseline = header.front_end_fpga_baseline + [None] * (index + 1 - start_len)
header.front_end_fpga_firmware_version[index] = version
header.front_end_fpga_baseline[index] = baseline
elif ":" in field:
key, val = (p.strip() for p in field.split(":", 1))
# Perform data conversion where needed
key_l = None
if "DATA FILE SAVED WITH SOFTWARE VERSION" == key:
key_l = "software_version"
elif "DATE OF RUN" == key:
key = None
match = SAMPIC_Run_Decoder.timestamp_re.match(val)
dt_local = datetime.fromtimestamp(float(match.group(1)))
key = "TIMESTAMP"
key_l = "timestamp"
val = dt_local
elif "Enabled Channels Mask" == key:
val = int(val, base=16)
elif "REDUCED DATA TYPE" == key:
if val == "NO":
val = False
else:
val = True
elif "WITHOUT WAVEFORM" == key:
if val == "NO":
val = False
else:
val = True
elif "TDC-LIKE FILES" == key:
if val == "NO":
val = False
else:
val = True
elif "INL Correction" == key[-14:]:
SAMPIC_Run_Decoder._parse_header_field(key[:-19].strip(), header, keep_unparsed=keep_unparsed)
key = "INL Correction"
if val == "ON":
val = True
else:
val = False
elif "ADC Correction" == key:
if val == "ON":
val = True
else:
val = False
if (key_l is None) and (key is not None):
key_l = key.lower().replace(" ", "_").replace("-", "_")
elif field[:4] == "Ch (":
setattr(header, "data_format", field)
key = None
elif field[-1] == ']':
index = field.find('[')
key = field[:index]
val = field[index + 1 : -1]
if "DataSamples" == key:
key_l = "data_samples_format"
elif "TriggerPosition" == key:
key_l = "trigger_position_format"
else:
key_l = key
elif field[-1] == ')':
index = field.find('(')
key = field[:index].strip()
val = field[index + 1 : -1].strip()
if "HIT number" == key:
key_l = "hit_number_format"
elif "UnixTime" == key:
key_l = "unix_time_format"
else:
key_l = key
else:
# fallback: dump everything into extra with a generic key
key = None
if keep_unparsed:
header.extra[f"unparsed_{len(header.extra)}"] = field
if key is not None:
if hasattr(header, key_l):
setattr(header, key_l, val)
else:
header.extra[key] = val
[docs]
def parse_hit_records( # noqa: max-complexity=26
self,
limit_hits: int = 0,
extra_header_bytes: int = 1,
chunk_size: int = 64 * 1024,
) -> Generator[Dict[str, Any], None, None]:
"""
Stream and decode hit records from all files in the run.
This generator method opens each SAMPIC binary file in turn, extracts
its header (via `open_sampic_file_in_chunks_and_get_header`), checks
for header consistency across files, then streams the body in fixed-size
chunks, parsing out complete hit records until either the file ends or
`limit_hits` is reached.
Parameters
----------
limit_hits : int, optional
Maximum number of hit records to yield across all files.
A value of 0 (default) means no limit (process all hits).
extra_header_bytes : int, optional
Number of bytes to include _after_ the header delimiter when
extracting the header (default is 1 to include the newline).
chunk_size : int, optional
Size in bytes of each data chunk read from the body (default is
64 KiB). Larger chunks may be more efficient but use more memory.
Yields
------
record : dict
A mapping from field names (str) to parsed values (int, float,
bool, list, etc.) for each hit record.
Raises
------
ValueError
If header parsing fails or a file's header does not match the
previously parsed header (mismatched run files).
Notes
-----
- Uses a rolling buffer to accumulate bytes from the stream until a
full record can be parsed by `try_parse_record`.
- After parsing each record, advances the buffer and continues until
all records are yielded or `limit_hits` is reached.
"""
mismatched_header_errors = []
buffer = bytearray()
hits = 0
# TODO: Adjust the field_specs according to the header
# Pre-Compile Struct Formats
# See https://docs.python.org/3/library/struct.html
s_i32 = Struct('<i')
# s_i64 = Struct('<q')
s_ui64 = Struct('<Q')
s_f32 = Struct('<f')
s_f64 = Struct('<d')
field_specs = [
# ("HIT number", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("Unix time", 8, lambda b: struct.unpack('d', b)[0], "S"),
# ("Channel", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("Cell", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("TimeStampA", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("TimeStampB", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("FPGATimeStamp", 8, lambda b: int.from_bytes(b, "little", signed=False), "S"),
# ("StartOfADCRamp", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("RawTOTValue", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("TOTValue", 4, lambda b: struct.unpack('f', b)[0], "S"),
# ("PhysicalCell0Time", 8, lambda b: struct.unpack('d', b)[0], "S"),
# ("OrderedCell0Time", 8, lambda b: struct.unpack('d', b)[0], "S"),
# ("Time", 8, lambda b: struct.unpack('d', b)[0], "S"),
# ("Baseline", 4, lambda b: struct.unpack('f', b)[0], "S"),
# ("RawPeak", 4, lambda b: struct.unpack('f', b)[0], "S"),
# ("Amplitude", 4, lambda b: struct.unpack('f', b)[0], "S"),
# ("ADCCounterLatched", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("DataSize", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("TriggerPosition", 4, lambda b: int.from_bytes(b, "little", signed=True), "A[DataSize]"),
# ("DataSample", 4, lambda b: struct.unpack('f', b)[0], "A[DataSize]"),
# ("field1", 4, lambda b: int.from_bytes(b, "little", signed=True), "S"),
# ("field2", 8, lambda b: struct.unpack("<d", b)[0], "S"),
# ("flag", 1, lambda b: bool(b[0]), "S"),
# …etc…
("HITNumber", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
# ("UnixTime", 8, lambda v, o: datetime.fromtimestamp(s_f64.unpack_from(v, o)[0]), "S"),
("UnixTime", 8, lambda v, o: s_f64.unpack_from(v, o)[0], "S"),
("Channel", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
("Cell", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
("TimeStampA", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
("TimeStampB", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
("FPGATimeStamp", 8, lambda v, o: s_ui64.unpack_from(v, o)[0], "S"),
("StartOfADCRamp", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
("RawTOTValue", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
("TOTValue", 4, lambda v, o: s_f32.unpack_from(v, o)[0], "S"),
("PhysicalCell0Time", 8, lambda v, o: s_f64.unpack_from(v, o)[0], "S"),
("OrderedCell0Time", 8, lambda v, o: s_f64.unpack_from(v, o)[0], "S"),
("Time", 8, lambda v, o: s_f64.unpack_from(v, o)[0], "S"),
("Baseline", 4, lambda v, o: s_f32.unpack_from(v, o)[0], "S"),
("RawPeak", 4, lambda v, o: s_f32.unpack_from(v, o)[0], "S"),
("Amplitude", 4, lambda v, o: s_f32.unpack_from(v, o)[0], "S"),
("ADCCounterLatched", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
("DataSize", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "S"),
("TriggerPosition", 4, lambda v, o: s_i32.unpack_from(v, o)[0], "A[DataSize]"),
("DataSample", 4, lambda v, o: s_f32.unpack_from(v, o)[0], "A[DataSize]"),
]
# Helper to try parsing one record from the buffer
def try_parse_record() -> Dict[str, Any] | None:
view = memoryview(buffer)
# First ensure we have at least the fixed portion
fixed_len = sum(n for _, n, _, t in field_specs if t == "S")
if len(buffer) < fixed_len:
return None # need more data
# Parse fixed fields to extract the fixed portion, counts should be in this portion so we can parse the rest of the data structure
record: Dict[str, Any] = {}
offset = 0
for name, nbytes, conv, field_type in field_specs:
if field_type != "S":
break # Stop once we find the first non scalar/single type field
# record[name] = conv(view[offset : offset + nbytes])
record[name] = conv(view, offset)
offset += nbytes
# Compute total required length (fixed + arrays)
total_len = 0
for name, nbytes, _, field_type in field_specs:
multiplier = 1
if field_type != "S": # For non-scalar field types:
if field_type[0] == 'A': # For Vector and Array field types
if field_type[1] != '[' or field_type[-1] != ']':
raise RuntimeError(
f"Malformed field type for {name}, please double check configuration is correct: {field_type}"
)
count_name = field_type[2:-1]
if count_name not in record:
raise RuntimeError(
f"Could not find the count field ({count_name}), either the name is wrong or the field is not in the fixed portion of the record"
)
multiplier = record[count_name]
else:
raise RuntimeError(
"Unknown field type defined, unable to parse data, so we are aborting since we can not guarantee the data is correctly interpreted"
)
total_len += nbytes * multiplier
if len(buffer) < total_len:
return None # wait for more data
# Parse remaining data, including arrays
offset = 0
for name, nbytes, conv, field_type in field_specs:
multiplier = 1
if field_type != "S": # For non-scalar field types:
if field_type[0] == 'A': # For Vector and Array field types
# Don't need to perform check below because they were performed above
# if field_type[1] != '[' or field_type[-1] != ']':
# raise RuntimeError(f"Malformed field type for {name}, please double check configuration is correct: {field_type}")
count_name = field_type[2:-1]
# if count_name not in record:
# raise RuntimeError(f"Could not find the count field ({count_name}), either the name is wrong or the field is not in the fixed portion of the record")
multiplier = record[count_name]
array = []
for i in range(multiplier):
start = offset + i * nbytes
# end = start + nbytes
# array.append(conv(view[start : end]))
array.append(conv(view, start))
record[name] = array
# else:
# raise RuntimeError("Unknown field type defined, unable to parse data, so we are aborting since we can not guarantee the data is correctly interpreted")
else:
pass
else:
if name not in record:
# record[name] = conv(view[offset : offset + nbytes])
record[name] = conv(view, offset)
offset += nbytes * multiplier
# consume bytes from buffer
del view
del buffer[:total_len]
return record
first_header = None
return_now = False
for file in self.run_files:
with self.open_sampic_file_in_chunks_and_get_header(file, extra_header_bytes, chunk_size) as (raw_header, body_gen):
header = self.decode_sampic_header(raw_header, keep_unparsed=True)
if first_header is not None:
if not (header == first_header):
mismatched_header_errors.append(file)
else:
first_header = header
self.run_header = header
# Stream through chunks
for chunk in body_gen:
buffer.extend(chunk)
# parse as many complete records as we can
while True:
if limit_hits > 0 and hits >= limit_hits:
return_now = True
break
rec = try_parse_record()
if rec is None:
break
hits += 1
yield rec
if return_now:
break
if return_now:
break
# Cleanup
if len(mismatched_header_errors) > 0:
print(
colored("Warning:", "yellow"),
f"Found mismatches in the headers of the files that make this run. The list of mismatched files is: {mismatched_header_errors}",
)
return
[docs]
def decode_data( # noqa: max-complexity=24
self,
limit_hits: int = 0,
feather_path: Optional[Path] = None,
parquet_path: Optional[Path] = None,
root_path: Optional[Path] = None,
root_tree: str = "sampic_hits",
extra_header_bytes: int = 1,
chunk_size: int = 64 * 1024,
batch_size: int = 100_000,
) -> None:
"""
Decode hit records from SAMPIC run files and export to Feather, Parquet, and/or ROOT.
This method streams parsed hit-record dictionaries (via
`parse_hit_records`), accumulates them in batches to build a
pandas DataFrame, and then writes each batch to the specified
output formats. It never holds all records in memory at once.
Parameters
----------
limit_hits : int, optional
Maximum number of hit records to process across all run files.
A value of 0 (default) means “no limit” (process all hits).
feather_path : pathlib.Path or None, optional
If not None, path to write the DataFrame in Feather format.
parquet_path : pathlib.Path or None, optional
If not None, path to write the DataFrame in Parquet format.
root_path : pathlib.Path or None, optional
If not None, path to write the DataFrame to a ROOT file.
root_tree : str, optional
Name of the TTree inside the ROOT file (default: `"sampic_hits"`).
extra_header_bytes : int, optional
Number of bytes to include *after* the detected header boundary
(default: 1 to capture the trailing newline).
chunk_size : int, optional
Byte size for each memory-mapped file read chunk
(default: 64 KiB).
batch_size : int, optional
Number of records to collect before flushing to output
(default: 100 000).
Raises
------
ValueError
If header parsing fails, or if writing to any format encounters
missing or mismatched branch/column schemas.
Notes
-----
- Feather and Parquet outputs preserve exact column dtypes by
casting before writing.
- ROOT output is written via `uproot` using dict-of-NumPy-arrays
(or `mktree` + `extend`) to ensure correct branch types.
- Each batch is written immediately; the final partial batch is
flushed at the end.
"""
buffer: list[dict] = []
first = True
# Schema related objects
# TODO: Change this to use info from the header
schema = build_schema()
# Writers placeholders
parquet_writer = None
feather_writer = None
root_tree_obj = None
for hit_record in self.parse_hit_records(limit_hits=limit_hits, extra_header_bytes=extra_header_bytes, chunk_size=chunk_size):
buffer.append(hit_record)
if len(buffer) < batch_size:
continue
df_batch = pd.DataFrame(buffer)
convert_df_with_schema(df_batch)
if first:
schema = schema.with_metadata(self.prepare_header_metadata())
table = pa.Table.from_pandas(df_batch, schema=schema, preserve_index=False)
df_batch.reset_index(drop=True, inplace=True)
# Initialize & write first batch
root_written = False
if first:
if parquet_path:
parquet_writer = pq.ParquetWriter(parquet_path, table.schema)
if feather_path:
sink = open(feather_path, "wb")
feather_writer = new_file(sink, table.schema)
if root_path:
froot = uproot.recreate(root_path)
froot[root_tree] = get_root_data_with_schema(df_batch) # df_batch.to_dict(orient="list")
root_tree_obj = froot[root_tree]
root_written = True
first = False
# Append subsequent batches
if parquet_writer:
parquet_writer.write_table(table)
if feather_writer:
feather_writer.write(table)
if root_tree_obj and not root_written:
root_tree_obj.extend(get_root_data_with_schema(df_batch)) # df_batch.to_dict(orient="list"))
buffer.clear()
# Flush final partial batch
if buffer:
df_batch = pd.DataFrame(buffer)
convert_df_with_schema(df_batch)
if first:
schema = schema.with_metadata(self.prepare_header_metadata())
table = pa.Table.from_pandas(df_batch, schema=schema, preserve_index=False)
df_batch.reset_index(drop=True, inplace=True)
root_written = False
if first:
if parquet_path:
parquet_writer = pq.ParquetWriter(parquet_path, table.schema)
if feather_path:
sink = open(feather_path, "wb")
feather_writer = new_file(sink, table.schema)
if root_path:
froot = uproot.recreate(root_path)
froot[root_tree] = get_root_data_with_schema(df_batch) # df_batch.to_dict(orient="list")
root_tree_obj = froot[root_tree]
root_written = True
first = False
if parquet_writer:
parquet_writer.write_table(table)
if feather_writer:
feather_writer.write(table)
if root_tree_obj and not root_written:
root_tree_obj.extend(get_root_data_with_schema(df_batch)) # df_batch.to_dict(orient="list"))
if root_path:
self.write_root_header(froot)
# Close writers
if parquet_writer:
parquet_writer.close()
if feather_writer:
feather_writer.close()
sink.close()
# ROOT file closed by context of recreate()