# -*- coding: utf-8 -*-
#############################################################################
# zlib License
#
# (C) 2025 Cristóvão Beirão da Cruz e Silva <cbeiraod@cern.ch>
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
#############################################################################
import datetime
import heapq
import itertools
import math
import struct
from collections import Counter
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import field
from pathlib import Path
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Literal
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
import awkward as ak
import matplotlib.pyplot as plt
import mplhep as hep
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.feather as feather
import pyarrow.ipc as ipc
import pyarrow.parquet as pq
import uproot
from matplotlib.axes import Axes
from matplotlib.dates import AutoDateFormatter
from matplotlib.dates import AutoDateLocator
from matplotlib.lines import Line2D
from matplotlib.ticker import FormatStrFormatter
from natsort import natsorted
from pyarrow import RecordBatch
from scipy.signal import resample
from scipy.signal import resample_poly
from sampiclyser.sampic_decoder import SAMPIC_Schema_Info
from sampiclyser.sampic_decoder import build_empty_root_data_with_schema
from sampiclyser.sampic_decoder import build_schema
from sampiclyser.sampic_decoder import get_root_data_with_schema
from sampiclyser.sampic_decoder import prepare_header_metadata_in_bytes
sampiclyser_style = hep.style.CMS
[docs]
@dataclass(order=True)
class TimestampedRecord:
"""
Container for a hit record with an associated timestamp, sortable by time.
Attributes
----------
timestamp : float
The hit timestamp (seconds since epoch or reconstructed).
record : Any
The full hit data (e.g., dict of field values). Not used for ordering.
"""
timestamp: float
record: Any = field(compare=False)
[docs]
def set_mplhep_style(style: str = "CMS"):
global sampiclyser_style
if style == "CMS":
sampiclyser_style = hep.style.CMS
elif style == "ALICE":
sampiclyser_style = hep.style.ALICE
elif style == "ATLAS":
sampiclyser_style = hep.style.ATLAS
elif style == "ATLAS1":
sampiclyser_style = hep.style.ATLAS1
elif style == "ATLAS2":
sampiclyser_style = hep.style.ATLAS2
elif style == "LHCb1":
sampiclyser_style = hep.style.LHCb1
elif style == "LHCb2":
sampiclyser_style = hep.style.LHCb2
elif style == "DUNE":
sampiclyser_style = hep.style.DUNE
elif style == "DUNE1":
sampiclyser_style = hep.style.DUNE1
else:
raise RuntimeError(f"Unknown MPLHEP style: {style}")
[docs]
def open_hit_reader(
file_path: Path, cols: Sequence[str], batch_size: int = 100_000, root_tree: str = "sampic_hits"
) -> Iterator[Union[RecordBatch, ak.highlevel.Array]]:
"""
Stream selected columns from a SAMPIC output file in memory-efficient batches.
This function reads only the specified columns from large data files (Parquet,
Feather, or ROOT) in fixed-size batches, yielding either Arrow RecordBatches
(for Parquet/Feather) or dictionaries of NumPy arrays (for ROOT).
Parameters
----------
file_path : pathlib.Path
Path to the input file. Supported extensions:
- `.parquet` or `.pq` for Parquet
- `.feather` for Arrow Feather IPC
- `.root` for ROOT files containing a TTree named `root_tree`
cols : sequence of str
Names of the columns (or ROOT branches) to read.
batch_size : int, optional
Maximum number of rows/entries per yielded batch (default: 100000).
root_tree : str, optional
Name of the ROOT TTree to read from (default: "sampic_hits").
Yields
------
RecordBatch or dict
- For Parquet/Feather: `pyarrow.RecordBatch` containing the requested columns.
- For ROOT: dict mapping branch names to NumPy arrays for each batch.
Raises
------
ValueError
If the file extension is not among the supported types.
"""
suffix = file_path.suffix.lower()
if suffix in ('.parquet', '.pq'):
pqf = pq.ParquetFile(str(file_path))
yield from pqf.iter_batches(batch_size=batch_size, columns=list(cols))
elif suffix == ".feather":
dataset = ds.dataset(str(file_path), format="feather")
scanner = dataset.scanner(batch_size=batch_size, columns=list(cols))
yield from scanner.to_batches()
elif suffix == ".root":
tree_path = f"{file_path}:{root_tree}"
yield from uproot.iterate(tree_path, list(cols), step_size=batch_size)
else:
raise ValueError(f"Unsupported file format: {suffix}")
[docs]
def get_channel_hits(file_path: Path, batch_size: int = 100_000, root_tree: str = "sampic_hits") -> pd.DataFrame:
"""
Compute per-channel hit counts by streaming only the 'Channel' column.
Supports Feather, Parquet, or ROOT (.root) files written by the Sampic decoder.
Reads data in batches (to bound memory use) and tallies the number of rows
(hits) observed on each channel.
Parameters
----------
file_path : pathlib.Path
Path to the input data file. Must have suffix `.feather`, `.parquet`, or `.root`.
batch_size : int, optional
Number of entries to read per iteration (default: 100000).
root_tree : str, optional
Name of the TTree inside the ROOT file to read (only used if `file_path` is `.root`;
default: `"sampic_hits"`).
Returns
-------
pandas.DataFrame
A DataFrame with two columns:
- `Channel` (int): channel identifier
- `Hits` (int): total number of hits on that channel
Rows are sorted by increasing `Channel`.
Raises
------
ValueError
If the file suffix is not one of `.feather`, `.parquet`, or `.root`.
"""
counts = Counter()
for batch in open_hit_reader(file_path=file_path, cols=["Channel"], batch_size=batch_size, root_tree=root_tree):
arr = batch["Channel"].to_numpy()
uniques, cnts = np.unique(arr, return_counts=True)
for ch, cnt in zip(uniques, cnts):
counts[int(ch)] += int(cnt)
# Build and return the summary DataFrame
df = pd.DataFrame(sorted(counts.items()), columns=["Channel", "Hits"])
return df
[docs]
def plot_channel_hits(
df: pd.DataFrame,
first_channel: int,
last_channel: int,
label: str = "PPS",
log_y: bool = False,
figsize: tuple[float, float] = (6, 4),
rlabel: str = "(13 TeV)",
is_data: bool = True,
color="C0",
title: str | None = None,
) -> plt.Figure:
"""
Draw a CMS-style bar histogram of hit counts per channel.
Parameters
----------
df : pandas.DataFrame
Summary table with two columns:
- `Channel` (int): channel indices
- `Hits` (int): hit counts per channel
first_channel : int
Lowest channel index to include on the x-axis.
last_channel : int
Highest channel index to include on the x-axis.
label : str, optional
Text label for the experiment (default: "PPS").
log_y : bool, optional
If True, use a logarithmic y-axis (default: False).
figsize : tuple of float, optional
Figure size in inches as (width, height) (default: (6, 4)).
rlabel : str, optional
Right-hand text label, typically collision energy (default: "(13 TeV)").
is_data : bool, optional
If True, annotate the plot as “Data”; if False, annotate as “Simulation”
(default: True).
color : any, optional
Matplotlib color spec for the bars (default: "C0").
title : str or None, optional
Main title displayed above the axes; if None, no title is shown.
Returns
-------
matplotlib.figure.Figure
The Figure object containing the histogram.
Raises
------
ValueError
If `last_channel` is less than `first_channel`.
Notes
-----
- Channels missing from `df` are shown with zero hits.
- In linear mode, y-axis tick labels are formatted in uppercase scientific
notation (e.g. "4.0E6").
- The plot uses `mplhep.style.*` with `label` and `rlabel` positioned
according to respective styling conventions.
- The `is_data` flag controls the “Data” vs. “Simulation” annotation.
"""
# Build the full channel range and corresponding hit counts (0 if missing)
channels = list(range(first_channel, last_channel + 1))
hits_map = dict(zip(df["Channel"], df["Hits"]))
counts = [hits_map.get(ch, 0) for ch in channels]
# Apply selected sampiclyser style from mplhep
with plt.style.context(sampiclyser_style):
# Create figure and axis with custom size and create the bar histogram
fig, ax = plt.subplots(figsize=figsize)
ax.bar(channels, counts, align='center', width=1.0, edgecolor='black', color=color)
# label with customizable right text
if sampiclyser_style == hep.style.CMS:
hep.cms.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.ATLAS, hep.style.ATLAS1, hep.style.ATLAS2]:
hep.atlas.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.LHCb1, hep.style.LHCb2]:
hep.lhcb.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.DUNE, hep.style.DUNE1]:
hep.dune.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
# Optional main title
if title:
ax.set_title(title, pad=12, weight="bold")
# Y-axis scale and formatting
if log_y:
ax.set_yscale('log')
else:
# scientific notation for linear scale
ax.yaxis.set_major_formatter(FormatStrFormatter('%.1E'))
# Axis labels and limits
ax.set_xlabel("Channel")
ax.set_ylabel("Hits per Channel")
ax.set_xlim(first_channel - 0.5, last_channel + 0.5)
ax.set_xticks(channels)
plt.tight_layout()
return fig
[docs]
def plot_hit_rate( # noqa: max-complexity=22
file_path: Path,
bin_size: float = 1.0,
batch_size: int = 100_000,
plot_hits: bool = False,
start_time: datetime.datetime | float | None = None,
end_time: datetime.datetime | float | None = None,
root_tree: str = "sampic_hits",
scale_factor: float = 1.0,
label: str = "PPS",
log_y: bool = False,
figsize: tuple[float, float] = (6, 4),
rlabel: str = "(13 TeV)",
is_data: bool = True,
color="C0",
title: str | None = None,
) -> plt.Figure:
"""
Plot the hit rate (or raw hits) as a function of time from large data files.
Streams the “UnixTime” column in batches from a Feather, Parquet, or ROOT file,
bins events into fixed-width time intervals, and renders a CMS-style time series.
Parameters
----------
file_path : pathlib.Path
Path to the input data file; supported suffixes are `.feather`, `.parquet`, `.pq`, and `.root`.
bin_size : float, optional
Width of each time bin in seconds; values below 0.1 are rounded up to 0.1 (default: 1.0).
batch_size : int, optional
Number of entries to read per I/O batch (default: 100000).
plot_hits : bool, optional
If True, plot the raw count per bin; otherwise plot the rate
(count divided by `bin_size`) (default: False).
start_time : datetime.datetime, float, or None, optional
Start of the time window for plotting, as a datetime or UNIX timestamp.
If None, uses the file's “start_of_run” metadata. Aligned to the
nearest lower multiple of `bin_size` (default: None).
end_time : datetime.datetime, float, or None, optional
End of the time window for plotting, as a datetime or UNIX timestamp.
If None, determined from the data. Aligned to the nearest upper
multiple of `bin_size` (default: None).
root_tree : str, optional
Name of the TTree in a ROOT file (only used if `file_path` ends in `.root`;
default: `"sampic_hits"`).
scale_factor : float, optional
Multiplier applied to each bin's count (e.g. to account for
central trigger multiplicity) before plotting (default: 1.0).
label : str, optional
experiment label (default: `"PPS"`).
log_y : bool, optional
If True, use a logarithmic y-axis (default: False).
figsize : tuple of float, optional
Figure size in inches as (width, height) (default: (6, 4)).
rlabel : str, optional
Additional right-hand label (e.g. collision energy) (default: `"(13 TeV)"`).
is_data : bool, optional
If True, annotate plots as “Data”; if False, annotate as “Simulation”
(default: True).
color : color spec, optional
Matplotlib color for the line or bars (default: `"C0"`).
title : str or None, optional
Main title for the figure; if None, no title is drawn (default: None).
Returns
-------
fig : matplotlib.figure.Figure
Figure object containing the hit-rate (or hit-count) vs. time plot,
styled according to CMS conventions.
Raises
------
ValueError
If `file_path` has an unsupported suffix.
Notes
-----
- Time bins are computed as `floor((t - t0)/bin_size)` indices,
then shifted back to absolute times for plotting.
- X-axis tick formatting uses Matplotlib's `AutoDateLocator` and
`AutoDateFormatter` for sensible date/time labels across variable spans.
"""
# enforce minimum bin size
bin_size = max(bin_size, 0.1)
# fetch run‐start from metadata; override if start_time provided
metadata = get_file_metadata(file_path)
run_start = metadata.get("timestamp")
if isinstance(run_start, datetime.datetime):
run_start_ts = run_start.timestamp()
else:
run_start_ts = float(run_start)
# align to bin boundary
run_start_ts = math.floor(run_start_ts / bin_size) * bin_size
# apply user override
if start_time is not None:
st = start_time.timestamp() if isinstance(start_time, datetime.datetime) else float(start_time)
if st > run_start_ts:
run_start_ts = math.floor(st / bin_size) * bin_size
counts = Counter()
for batch in open_hit_reader(file_path=file_path, cols=["UnixTime"], batch_size=batch_size, root_tree=root_tree):
arr = batch["UnixTime"].to_numpy()
for t in arr:
idx = int((t - run_start_ts) // bin_size)
if idx >= 0:
counts[idx] += 1
if not counts:
raise RuntimeError("No hits found in file.")
for idx in range(max(counts.keys())):
if idx not in counts:
counts[idx] = 0
# Build sorted time and rate arrays
bins = np.array(sorted(counts.keys()), dtype=int)
times = bins * bin_size + run_start_ts
# apply end_time override
if end_time is not None:
et = end_time.timestamp() if isinstance(end_time, datetime.datetime) else float(end_time)
max_bin = math.ceil((et - run_start_ts) / bin_size)
mask = bins <= max_bin
bins = bins[mask]
times = bins * bin_size + run_start_ts
# convert to datetime for plotting
dtimes = [datetime.datetime.fromtimestamp(ts) for ts in times]
if plot_hits:
rates = np.array([counts[b] * scale_factor for b in bins], dtype=int)
else:
rates = np.array([counts[b] * scale_factor / bin_size for b in bins], dtype=int)
# Apply selected sampiclyser style from mplhep
with plt.style.context(sampiclyser_style):
# Create figure and axis with custom size and create the plot
fig, ax = plt.subplots(figsize=figsize)
ax.step(dtimes, rates, where="mid", color=color)
# label with customizable right text
if sampiclyser_style == hep.style.CMS:
hep.cms.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.ATLAS, hep.style.ATLAS1, hep.style.ATLAS2]:
hep.atlas.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.LHCb1, hep.style.LHCb2]:
hep.lhcb.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.DUNE, hep.style.DUNE1]:
hep.dune.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
# Optional main title
if title:
ax.set_title(title, pad=12, weight="bold")
# Y-axis scale and formatting
if log_y:
ax.set_yscale('log')
ax.set_xlabel("Time")
if plot_hits:
ax.set_ylabel(f"Hits per {bin_size:.1f} s")
else:
ax.set_ylabel("Hit Rate [Hz]")
# date formatting
locator = AutoDateLocator()
formatter = AutoDateFormatter(locator)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)
ax.set_xlim(dtimes[0], dtimes[-1])
# format x-axis as dates
fig.autofmt_xdate()
plt.tight_layout()
return fig
[docs]
def plot_channel_hit_rate( # noqa: max-complexity=22
file_path: Path,
channel: int = 0,
bin_size: float = 1.0,
batch_size: int = 100_000,
plot_hits: bool = False,
start_time: datetime.datetime | float | None = None,
end_time: datetime.datetime | float | None = None,
root_tree: str = "sampic_hits",
scale_factor: float = 1.0,
label: str = "PPS",
log_y: bool = False,
figsize: tuple[float, float] = (6, 4),
rlabel: str = "(13 TeV)",
is_data: bool = True,
color="C0",
title: str | None = None,
) -> plt.Figure:
"""
Plot the hit rate (or raw hits) as a function of time from large data files.
Streams the “UnixTime” column in batches from a Feather, Parquet, or ROOT file,
bins events into fixed-width time intervals, and renders a CMS-style time series.
Parameters
----------
file_path : pathlib.Path
Path to the input data file; supported suffixes are `.feather`, `.parquet`, `.pq`, and `.root`.
channel : int, optional
The SAMPIC channel to plot (default: 0).
bin_size : float, optional
Width of each time bin in seconds; values below 0.1 are rounded up to 0.1 (default: 1.0).
batch_size : int, optional
Number of entries to read per I/O batch (default: 100000).
plot_hits : bool, optional
If True, plot the raw count per bin; otherwise plot the rate
(count divided by `bin_size`) (default: False).
start_time : datetime.datetime, float, or None, optional
Start of the time window for plotting, as a datetime or UNIX timestamp.
If None, uses the file's “start_of_run” metadata. Aligned to the
nearest lower multiple of `bin_size` (default: None).
end_time : datetime.datetime, float, or None, optional
End of the time window for plotting, as a datetime or UNIX timestamp.
If None, determined from the data. Aligned to the nearest upper
multiple of `bin_size` (default: None).
root_tree : str, optional
Name of the TTree in a ROOT file (only used if `file_path` ends in `.root`;
default: `"sampic_hits"`).
scale_factor : float, optional
Multiplier applied to each bin's count (e.g. to account for
central trigger multiplicity) before plotting (default: 1.0).
label : str, optional
experiment label (default: `"PPS"`).
log_y : bool, optional
If True, use a logarithmic y-axis (default: False).
figsize : tuple of float, optional
Figure size in inches as (width, height) (default: (6, 4)).
rlabel : str, optional
Additional right-hand label (e.g. collision energy) (default: `"(13 TeV)"`).
is_data : bool, optional
If True, annotate plots as “Data”; if False, annotate as “Simulation”
(default: True).
color : color spec, optional
Matplotlib color for the line or bars (default: `"C0"`).
title : str or None, optional
Main title for the figure; if None, no title is drawn (default: None).
Returns
-------
fig : matplotlib.figure.Figure
Figure object containing the hit-rate (or hit-count) vs. time plot,
styled according to CMS conventions.
Raises
------
ValueError
If `file_path` has an unsupported suffix.
Notes
-----
- Time bins are computed as `floor((t - t0)/bin_size)` indices,
then shifted back to absolute times for plotting.
- X-axis tick formatting uses Matplotlib's `AutoDateLocator` and
`AutoDateFormatter` for sensible date/time labels across variable spans.
"""
# enforce minimum bin size
bin_size = max(bin_size, 0.1)
# fetch run‐start from metadata; override if start_time provided
metadata = get_file_metadata(file_path)
run_start = metadata.get("timestamp")
if isinstance(run_start, datetime.datetime):
run_start_ts = run_start.timestamp()
else:
run_start_ts = float(run_start)
# align to bin boundary
run_start_ts = math.floor(run_start_ts / bin_size) * bin_size
# apply user override
if start_time is not None:
st = start_time.timestamp() if isinstance(start_time, datetime.datetime) else float(start_time)
if st > run_start_ts:
run_start_ts = math.floor(st / bin_size) * bin_size
counts = Counter()
for batch in open_hit_reader(file_path=file_path, cols=["Channel", "UnixTime"], batch_size=batch_size, root_tree=root_tree):
# Duck‐type: try Arrow first, else assume Awkward
# if hasattr(batch, "column"):
# # PyArrow RecordBatch
# ch_arr = batch.column("Channel").to_numpy()
# time_arr = batch.column("UnixTime").to_numpy()
# else:
# # Awkward Array from uproot.iterate
# # convert to numpy via __array__ interface
# ch_arr = np.asarray(batch["Channel"])
# time_arr = np.asarray(batch["UnixTime"])
ch_arr = batch["Channel"].to_numpy()
time_arr = batch["UnixTime"].to_numpy()
arr = time_arr[ch_arr == channel]
for t in arr:
idx = int((t - run_start_ts) // bin_size)
if idx >= 0:
counts[idx] += 1
if not counts:
raise RuntimeError("No hits found in file.")
for idx in range(max(counts.keys())):
if idx not in counts:
counts[idx] = 0
# Build sorted time and rate arrays
bins = np.array(sorted(counts.keys()), dtype=int)
times = bins * bin_size + run_start_ts
# apply end_time override
if end_time is not None:
et = end_time.timestamp() if isinstance(end_time, datetime.datetime) else float(end_time)
max_bin = math.ceil((et - run_start_ts) / bin_size)
mask = bins <= max_bin
bins = bins[mask]
times = bins * bin_size + run_start_ts
# convert to datetime for plotting
dtimes = [datetime.datetime.fromtimestamp(ts) for ts in times]
if plot_hits:
rates = np.array([counts[b] * scale_factor for b in bins], dtype=int)
else:
rates = np.array([counts[b] * scale_factor / bin_size for b in bins], dtype=int)
# Apply selected sampiclyser style from mplhep
with plt.style.context(sampiclyser_style):
# Create figure and axis with custom size and create the plot
fig, ax = plt.subplots(figsize=figsize)
ax.step(dtimes, rates, where="mid", color=color)
# label with customizable right text
if sampiclyser_style == hep.style.CMS:
hep.cms.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.ATLAS, hep.style.ATLAS1, hep.style.ATLAS2]:
hep.atlas.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.LHCb1, hep.style.LHCb2]:
hep.lhcb.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
elif sampiclyser_style in [hep.style.DUNE, hep.style.DUNE1]:
hep.dune.label(label, data=is_data, rlabel=rlabel, loc=0, ax=ax)
# Optional main title
if title:
ax.set_title(title, pad=12, weight="bold")
# Y-axis scale and formatting
if log_y:
ax.set_yscale('log')
ax.set_xlabel("Time")
if plot_hits:
ax.set_ylabel(f"Channel {channel} hits per {bin_size:.1f} s")
else:
ax.set_ylabel(f"Channel {channel} hit Rate [Hz]")
# date formatting
locator = AutoDateLocator()
formatter = AutoDateFormatter(locator)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)
ax.set_xlim(dtimes[0], dtimes[-1])
# format x-axis as dates
fig.autofmt_xdate()
plt.tight_layout()
return fig
[docs]
def windowed_sinc_interpolation(t_orig: np.ndarray, y_orig: np.ndarray, t_new: np.ndarray, window: str = 'hann', M: int = 8) -> np.ndarray:
r"""
Band-limited interpolation of a uniformly sampled signal using a windowed sinc kernel.
Constructs a truncated sinc filter of half-width `M` samples, applies a smooth
tapering window (Hann or Hamming) to reduce ringing, and convolves it with the
input data to estimate values at new time points.
Parameters
----------
t_orig : ndarray of float, shape (N,)
Original sample times (must be uniformly spaced).
y_orig : ndarray of float, shape (N,)
Original sample values.
t_new : ndarray of float, shape (M_new,)
Desired output times (must lie within the range of `t_orig`).
window : {'hann', 'hamming'}, optional
Type of tapering window to apply to the sinc kernel. Default is 'hann'.
M : int, optional
Half-width of the truncated sinc kernel, in number of original samples.
Total kernel length will be `2*M + 1`. Default is 8.
Returns
-------
y_new : ndarray of float, shape (M_new,)
Interpolated values at `t_new`.
Raises
------
ValueError
If `t_orig` is not at least two points, if `window` is not recognized,
or if `t_new` contains values outside the range of `t_orig`.
Notes
-----
- This implementation assumes `t_orig` is **uniformly** spaced. If that is
not the case, consider first resampling to a uniform grid or using a
more general interpolator.
- The Hann window is defined as
$$ w[k] = 0.5 + 0.5\cos\bigl(2\pi k/(2M+1)\bigr),\quad k=-M\ldots M. $$
- The Hamming window uses
$$ w[k] = 0.54 + 0.46\cos\bigl(2\pi k/(2M+1)\bigr). $$
- Each output sample `y_new[i]` is
$$ \sum_{k=-M}^{M} y_{\!n}\,\text{sinc}\bigl((t_{\!new}-t_{\!orig,n})/T\bigr)\,w[k], $$
where \(T\) is the uniform sample spacing and \(n\) is chosen so that
\(t_{\!orig,n}\) is the nearest original sample to each \(t_{\!new,i}\).
Examples
--------
>>> import numpy as np
>>> t = np.linspace(0, 1, 11)
>>> y = np.sin(2*np.pi*5*t)
>>> t_fine = np.linspace(0, 1, 101)
>>> y_fine = windowed_sinc_interpolation(t, y, t_fine, window='hann', M=4)
"""
# Validate inputs
t_orig = np.asarray(t_orig, dtype=float)
y_orig = np.asarray(y_orig, dtype=float)
t_new = np.asarray(t_new, dtype=float)
# Basic validation
if t_orig.ndim != 1 or y_orig.ndim != 1:
raise ValueError("t_orig and y_orig must be 1D arrays")
if t_orig.size < 2:
raise ValueError("Need at least two original samples for interpolation")
if t_new.ndim != 1:
raise ValueError("t_new must be a 1D array")
if t_new.min() < t_orig.min() or t_new.max() > t_orig.max():
raise ValueError("t_new values must lie within the range of t_orig")
if window not in ('hann', 'hamming'):
raise ValueError(f"Unknown window type '{window}'; use 'hann' or 'hamming'")
# Uniform spacing
T = t_orig[1] - t_orig[0]
if not np.allclose(np.diff(t_orig), T, atol=1e-8 * T):
raise ValueError("t_orig must be uniformly spaced")
# Precompute windowed‐sinc kernel indices and window
k = np.arange(-M, M + 1)
if window == 'hann':
w = 0.5 + 0.5 * np.cos(2 * np.pi * k / (2 * M + 1))
else: # 'hamming'
w = 0.54 + 0.46 * np.cos(2 * np.pi * k / (2 * M + 1))
w = w / w[M] # normalize to 1 at center
y_new = np.empty_like(t_new, dtype=float)
# For each target time, center the kernel at the nearest original sample
for i, tn in enumerate(t_new):
# find nearest index in t_orig
n0 = int(np.round((tn - t_orig[0]) / T))
idx = n0 + k
# mask out-of-bounds indices
valid = (idx >= 0) & (idx < t_orig.size)
ti = t_orig[idx[valid]]
yi = y_orig[idx[valid]]
wi = w[valid]
# compute sinc((tn - ti)/T)
sinc_vals = np.sinc((tn - ti) / T)
y_new[i] = np.dot(yi * sinc_vals, wi)
return y_new
[docs]
def lanczos_interpolation(t_orig: np.ndarray, y_orig: np.ndarray, t_new: np.ndarray, a: int = 3) -> np.ndarray:
"""
Interpolate a uniformly sampled signal using the Lanczos kernel.
The Lanczos filter uses a windowed sinc kernel of order `a`:
L(x) = sinc(x) · sinc(x / a) for |x| ≤ a, zero otherwise. This
yields near-ideal bandlimited interpolation with reduced ringing.
Parameters
----------
t_orig : ndarray of float, shape (N,)
Original, uniformly spaced sample times.
y_orig : ndarray of float, shape (N,)
Original sample values.
t_new : ndarray of float, shape (M,)
Desired output times (must lie within the range of `t_orig`).
a : int, optional
Lanczos order (kernel half-width in samples). Common choices are
2 or 3. Default is 3.
Returns
-------
y_new : ndarray of float, shape (M,)
Interpolated values at `t_new`.
Raises
------
ValueError
If `t_orig` and `y_orig` are not 1D arrays of equal length,
or if `t_new` lies outside the range `[t_orig.min(), t_orig.max()]`,
or if `a` is not a positive integer.
Notes
-----
- Assumes uniform spacing in `t_orig`. If spacing varies, results
will be invalid.
- For each `t_new[i]`, the kernel covers indices `k` from
`⌈(t_new[i]-t_orig[0])/T⌉ - a + 1` to `⌊(t_new[i]-t_orig[0])/T⌋ + a`,
where `T = t_orig[1] - t_orig[0]`.
- Out-of-bounds sample indices are clipped to the valid range.
- The kernel is defined as:
```
L_n = sinc((t_new-t_orig[n])/T) * sinc((t_new-t_orig[n])/(a*T))
```
and `y_new[i] = Σ_n y_orig[n] · L_n`.
Examples
--------
>>> import numpy as np
>>> t = np.linspace(0, 1, 11)
>>> y = np.sin(2*np.pi*5*t)
>>> t_fine = np.linspace(0, 1, 101)
>>> y_fine = lanczos_interpolation(t, y, t_fine, a=3)
"""
# Validate inputs
t_orig = np.asarray(t_orig, dtype=float)
y_orig = np.asarray(y_orig, dtype=float)
t_new = np.asarray(t_new, dtype=float)
if t_orig.ndim != 1 or y_orig.ndim != 1:
raise ValueError("t_orig and y_orig must be 1D arrays")
if t_orig.shape != y_orig.shape:
raise ValueError("t_orig and y_orig must have the same length")
if t_new.ndim != 1:
raise ValueError("t_new must be a 1D array")
if a < 1 or not isinstance(a, int):
raise ValueError("Lanczos order 'a' must be a positive integer")
if t_new.min() < t_orig.min() or t_new.max() > t_orig.max():
raise ValueError("t_new values must lie within the range of t_orig")
t_min, _ = t_orig[0], t_orig[-1]
# Uniform sample interval
T = t_orig[1] - t_orig[0]
if not np.allclose(np.diff(t_orig), T, atol=1e-8 * abs(T)):
raise ValueError("t_orig must be uniformly spaced")
y_new = np.empty_like(t_new, dtype=float)
# Precompute sinc denominator factor
for i, tn in enumerate(t_new):
# position in sample units
x = (tn - t_min) / T
m = int(np.floor(x))
# kernel support indices
k = np.arange(m - a + 1, m + a + 1, dtype=int)
# Keep only in-bounds sample indices
mask = (k >= 0) & (k < len(t_orig))
k_clipped = k[mask]
t_k = t_orig[k_clipped]
y_k = y_orig[k_clipped]
# compute lanczos kernel: sinc + windowed sinc
arg = (tn - t_k) / T
lanczos_kernel = np.sinc(arg) * np.sinc(arg / a)
# Renormalize so the truncated kernel sums to 1
lanczos_kernel /= np.sum(lanczos_kernel)
y_new[i] = np.dot(y_k, lanczos_kernel)
return y_new
[docs]
def apply_interpolation_method(
x_orig: np.ndarray,
y_orig: np.ndarray,
period: float,
interpolation_method: Optional[str] = "sinc",
interpolation_factor: int = 4,
interpolation_parameter: int = 8,
offset: Optional[float] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Interpolate a uniformly sampled waveform using various methods.
Parameters
----------
x_orig : ndarray, shape (N,)
Original sample times (must be monotonically increasing and
uniformly spaced by `period`).
y_orig : ndarray, shape (N,)
Original sample values.
period : float
Time interval between consecutive samples in `x_orig`.
interpolation_method : {'sinc', 'hann', 'hamming', 'lanczos', 'resample', 'resample_poly'}, optional
Which method to use:
- `'sinc'` : ideal sinc interpolation (no window)
- `'hann'` : windowed-sinc with Hann window
- `'hamming'` : windowed-sinc with Hamming window
- `'lanczos'` : Lanczos kernel of order `interpolation_parameter`
- `'resample'` : FFT-based resample via `scipy.signal.resample`
- `'resample_poly'`: polyphase FIR via `scipy.signal.resample_poly`
Default is `'sinc'`.
interpolation_factor : int, optional
Upsampling factor: number of output points = `len(x_orig) * interpolation_factor`.
Must be ≥ 1. Default is 4.
interpolation_parameter : int, optional
Secondary parameter for certain methods:
- For `'hann'`/`'hamming'`, this is the half-width `M` of the windowed-sinc.
- For `'lanczos'`, this is the Lanczos order `a`.
- For `'resample_poly'`, this is the FIR window's beta (in a Kaiser window).
Ignored by `'sinc'` and `'resample'`. Default is 8.
offset : float or None, optional
Baseline offset to subtract before interpolation, then add back afterward.
If None, treated as zero. Default is None.
Returns
-------
x_fine : ndarray, shape (N * interpolation_factor,)
Uniformly spaced output time axis from `x_orig[0]` to `x_orig[-1]`.
y_fine : ndarray, same shape as `x_fine`
Interpolated sample values.
Raises
------
ValueError
- If `x_orig` and `y_orig` have mismatched lengths or are not 1-D.
- If `interpolation_factor` < 1.
- If `interpolation_method` is unrecognized.
Examples
--------
>>> import numpy as np
>>> t = np.linspace(0, 1, 11)
>>> y = np.sin(2*np.pi*5*t)
>>> t_fine, y_fine = apply_interpolation_method(t, y, t[1]-t[0], interpolation_method='sinc')
"""
# Input validation
x_orig = np.asarray(x_orig, dtype=float)
y_orig = np.asarray(y_orig, dtype=float)
if x_orig.ndim != 1 or y_orig.ndim != 1:
raise ValueError("x_orig and y_orig must be 1D arrays")
if x_orig.shape != y_orig.shape:
raise ValueError("x_orig and y_orig must have the same length")
if interpolation_factor < 1:
raise ValueError("interpolation_factor must be >= 1")
offset = 0.0 if offset is None else float(offset)
# Build the fine grid once
num_fine = len(x_orig) * interpolation_factor
x_fine = np.linspace(x_orig[0], x_orig[-1], num_fine)
# Dispatch to the chosen method
method = interpolation_method.lower() if interpolation_method else ""
if method == "sinc":
kernel = np.sinc((x_fine[:, None] - x_orig[None, :]) / period)
y_fine = kernel.dot(y_orig - offset) + offset
elif method in ("hann", "hamming"):
y_fine = (
windowed_sinc_interpolation(t_orig=x_orig, y_orig=y_orig - offset, t_new=x_fine, window=method, M=interpolation_parameter)
+ offset
)
elif method == "lanczos":
y_fine = lanczos_interpolation(t_orig=x_orig, y_orig=y_orig - offset, t_new=x_fine, a=interpolation_parameter) + offset
elif method == "resample":
# scipy.signal.resample returns (y_new, x_new) if given t=x_orig
y_fine, x_temp = resample(y_orig - offset, num=num_fine, t=x_orig)
# override x_fine to match returned times
x_fine = x_temp
y_fine = y_fine + offset
elif method == "resample_poly":
y_fine = (
resample_poly(
x=y_orig - offset, up=interpolation_factor, down=1, window=('kaiser', interpolation_parameter), padtype='constant', cval=0.0
)
+ offset
)
else:
raise ValueError(f"Unknown interpolation method: {interpolation_method!r}")
return x_fine, y_fine
[docs]
def reorder_circular_samples_with_trigger(
trig_arr: np.ndarray,
samp_arr: np.ndarray,
reorder_samples: bool,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Rotate a circular buffer so that the contiguous trigger block (1s) appears at the end.
Verify that all trigger markers (1s) in a circular array are contiguous,
then rotate both the trigger array and optionally the associated sample array so
that the block of 1s appears at the end of the array.
This is useful when interpreting a circular buffer of ADC samples where
the trigger position-marked by one or more consecutive 1s—may wrap
around the end of the buffer. After reordering, the data will be in true
time-order, with the trigger block at the end.
In circular context, a block of 1s may wrap from the end back to the start
of the array (e.g. [1,0,0,1] is a valid 2-wide trigger block). This function
verifies that all 1s form exactly one circularly-contiguous block, then
performs a roll so that those 1s occupy the last positions in the array.
The sample array is optionally rotated identically to preserve alignment.
Parameters
----------
trig_arr : ndarray of int (0 or 1), shape (N,)
Circular array marking trigger positions. Must contain one or more
contiguous 1s; all other entries must be 0.
samp_arr : ndarray, shape (N,)
Sample values corresponding to each position in `trig_arr`.
reorder_samples : bool
If True, rotate `samp_arr` identically to `trig_arr`; if False, leave
`samp_arr` unchanged.
Returns
-------
trig_reordered : ndarray of int, shape (N,)
The trigger array rotated so that its contiguous 1s occupy the final
positions of the array.
samp_reordered : ndarray, shape (N,)
The sample array, either rotated in lock-step (if `reorder_samples`)
or returned unchanged.
start_indicator : ndarray of int (0 or 1), shape (N,)
All zeros except a single 1 at the index where the original buffer start
appears in the reordered buffer.
Raises
------
ValueError
If `trig_arr` and `samp_arr` have different lengths.
If `trig_arr` does not contain any 1s.
If the 1s in `trig_arr` are not contiguous.
Examples
--------
>>> trig = np.array([0, 0, 1, 1, 0, 0])
>>> samp = np.arange(6)
>>> t_new, s_new, start_mask = reorder_circular_samples_with_trigger(trig, samp)
>>> t_new
array([0, 0, 0, 0, 1, 1])
>>> s_new
array([4, 5, 0, 1, 2, 3])
>>> start_mask
array([0, 0, 1, 0, 0, 0])
"""
# Basic validation
if trig_arr.ndim != 1 or samp_arr.ndim != 1:
raise ValueError("Both arrays must be 1D")
if trig_arr.shape != samp_arr.shape:
raise ValueError("trig_arr and samp_arr must have the same shape")
n = trig_arr.size
# Find indices of ones
ones_idx = np.flatnonzero(trig_arr == 1)
if ones_idx.size == 0:
raise ValueError("trig_arr must contain at least one '1'")
# Check contiguity (in circular sense)
# Identify breaks in the sorted ones_idx sequence
diffs = np.diff(ones_idx)
breaks = np.nonzero(diffs != 1)[0] # indices where run breaks (non-contiguous)
# Check cases:
# 1) no breaks → single linear run
# 2) one break AND wrap condition → circular (wrap) run
if breaks.size == 0:
# linear run
start = ones_idx[0]
elif breaks.size == 1 and ones_idx[0] == 0 and ones_idx[-1] == n - 1:
# wrap-around run: pick the second run’s start
brk = breaks[0]
start = ones_idx[brk + 1]
else:
raise ValueError("1s must form exactly one contiguous block (possibly wrapping) in trig_arr")
m = ones_idx.size
# We want the block of length m to occupy indices [n-m ... n-1].
# The element at 'start' should move to index n-m, so shift = (n-m) - start.
shift = (n - m) - int(start)
# Positive shift in np.roll moves elements right
trig_reordered = np.roll(trig_arr, shift)
# Build the one‐hot start‐indicator
start_indicator = np.zeros(n, dtype=int)
start_indicator[shift] = 1
if reorder_samples:
samp_reordered = np.roll(samp_arr, shift)
return trig_reordered, samp_reordered, start_indicator
else:
return trig_reordered, samp_arr, start_indicator
# Original taken from: https://stackoverflow.com/a/20007730
# Then updated with docstring by me and minor tweaks
[docs]
def ordinal(n: int) -> str:
"""
Convert an integer to its English ordinal string (e.g., 1 → "1st").
Original function from https://stackoverflow.com/a/20007730, then adjusted with minor tweaks
Parameters
----------
n : int
The integer to convert.
Returns
-------
str
The integer followed by its ordinal suffix:
"st" for numbers ending in 1,
"nd" for numbers ending in 2,
"rd" for numbers ending in 3,
and "th" otherwise. Special cases 11, 12, and 13 all use "th".
Notes
-----
- English ordinals use "th" for the teens (11, 12, 13), even though they end in 1-3.
- For all other numbers, the suffix is chosen by the last digit:
1→"st", 2→"nd", 3→"rd", otherwise "th".
- This simple list-based lookup (with `min(n % 10, 4)`) is a common Python recipe.
"""
if 11 <= (n % 100) <= 13:
suffix = "th"
else:
suffix = ["th", "st", "nd", "rd", "th"][min(n % 10, 4)]
return f"{n}{suffix}"
[docs]
def sampic_reconstruct_time_dict(rec: dict) -> float:
"""
Function implementing the custom SAMPIC time reconstruction logic, operating on a dict of SAMPIC recorded fields
Parameters
----------
rec : dict
Dictionary containing the required SAMPIC fields for time reconstruction
Required:
UnixTime -
Raises
------
ValueError
If `use_unix_time` is False and no reconstruction algorithm
is provided in `_reconstruct_time`.
"""
# Placeholder for custom SAMPIC time reconstruction logic
# Must return a float timestamp for a hit record `rec`
# return rec['UnixTime']
raise ValueError("Custom time reconstruction not implemented")
[docs]
def extract_unix_time_and_record(batch: Union[RecordBatch, ak.highlevel.Array], idx: int) -> Tuple[float, Dict[str, Any]]:
"""
Extract the UnixTime timestamp and full record from a batch at a given index.
This utility supports both PyArrow RecordBatch and Awkward Array inputs.
It builds a dictionary of all fields for the specified hit and returns
its UnixTime timestamp along with the record dict.
Parameters
----------
batch : RecordBatch or awkward.highlevel.Array
A batch of hit records containing a 'UnixTime' field among others.
idx : int
Zero-based index within the batch to extract.
Returns
-------
ts : float
The UnixTime timestamp (in seconds) of the hit.
rec : dict
Mapping of field names to their Python values for that hit.
Raises
------
KeyError
If 'UnixTime' or any other field is missing from the batch.
IndexError
If `idx` is out of bounds for the batch.
TypeError
If `batch` is neither a RecordBatch nor an Awkward Array.
"""
# Determine field names
if isinstance(batch, RecordBatch):
field_names = batch.schema.names
elif isinstance(batch, ak.highlevel.Array):
field_names = batch.fields
else:
raise TypeError(f"Unsupported batch type {type(batch)}; expected RecordBatch or ak.Array")
rec: Dict[str, Any] = {}
# Extract all fields
for col in field_names:
try:
if isinstance(batch, RecordBatch):
rec[col] = batch.column(col)[idx].as_py()
else:
rec[col] = np.asarray(batch[col])[idx]
except KeyError:
raise KeyError(f"Missing required field '{col}' in batch")
except IndexError:
raise IndexError(f"Index {idx} out of bounds for field '{col}'")
# Extract timestamp
try:
ts = float(rec['UnixTime'])
except KeyError:
raise KeyError("'UnixTime' field not found for timestamp extraction")
except (TypeError, ValueError):
raise ValueError(f"Invalid UnixTime value: {rec.get('UnixTime')}")
return ts, rec
[docs]
def extract_SAMPIC_time_and_record(batch: Union[RecordBatch, ak.highlevel.Array], idx: int) -> Tuple[float, Dict[str, Any]]:
"""
Reconstruct SAMPIC hit timestamp and return full record from a batch.
Builds a dict of all fields for the specified hit and uses
`sampic_reconstruct_time_dict` to compute its timestamp.
Parameters
----------
batch : RecordBatch or awkward.highlevel.Array
A batch of hit records containing all fields required for reconstruction.
idx : int
Zero-based index within the batch to extract.
Returns
-------
ts : float
The reconstructed timestamp (in seconds) for the hit.
rec : dict
Mapping of field names to their Python values for that hit.
Raises
------
KeyError
If any required field is missing from the batch.
IndexError
If `idx` is out of bounds for the batch.
TypeError
If `batch` is neither a RecordBatch nor an Awkward Array.
ValueError
If `sampic_reconstruct_time_dict(rec)` fails or returns a non-numeric value.
"""
# Determine field names
if isinstance(batch, RecordBatch):
field_names = batch.schema.names
elif isinstance(batch, ak.highlevel.Array):
field_names = batch.fields
else:
raise TypeError(f"Unsupported batch type {type(batch)}; expected RecordBatch or ak.Array")
rec: Dict[str, Any] = {}
# Extract all fields
for col in field_names:
try:
if isinstance(batch, RecordBatch):
rec[col] = batch.column(col)[idx].as_py()
else:
rec[col] = np.asarray(batch[col])[idx]
except KeyError:
raise KeyError(f"Missing required field '{col}' in batch")
except IndexError:
raise IndexError(f"Index {idx} out of bounds for field '{col}'")
# Reconstruct timestamp
ts = sampic_reconstruct_time_dict(rec)
if not isinstance(ts, (int, float)):
raise ValueError(f"Reconstructed timestamp must be numeric, got {type(ts)}")
return float(ts), rec
[docs]
def check_time_ordering(
file_path: Path, use_unix_time: bool = False, find_all: bool = False, batch_size: int = 100_000, root_tree: str = "sampic_hits"
) -> List[Tuple[int, float, float]]:
"""
Verify that hit records in a SAMPIC output file are non-decreasing in time.
Streams through the file in memory-efficient batches, reconstructs or
reads each hit's timestamp, and checks for any out-of-order intervals.
Parameters
----------
file_path : pathlib.Path
Path to the input data file (.parquet, .feather, or .root).
use_unix_time : bool, optional
If True, use the 'UnixTime' column directly as the hit timestamp.
Otherwise, applies a custom reconstruction algorithm (must be
implemented in `_reconstruct_time`). Default is False.
find_all : bool, optional
If True, continue scanning the entire file and collect all
out-of-order events; if False, stop at the first detection.
Default is False.
batch_size : int, optional
Number of rows to read per batch from `open_hit_reader`. Default is 100000.
root_tree : str, optional
Name of the TTree inside a ROOT file (only used for .root). Default is "sampic_hits".
Returns
-------
list of (hit_index, previous_time, current_time)
A list of tuples for each detected out-of-order event, where:
- `hit_index` is the zero-based index of the later (out-of-order) hit.
- `previous_time` is the timestamp of the immediately preceding hit.
- `current_time` is the timestamp of the out-of-order hit.
If no violations are found, an empty list is returned.
Raises
------
ValueError
If `use_unix_time` is False and no reconstruction algorithm
is provided in `_reconstruct_time`.
"""
violations: List[Tuple[int, float, float]] = []
last_time: Optional[float] = None
hit_idx = 0
# Select extractor
if use_unix_time:
extractor = lambda batch, i: extract_ts_unix_time(batch, i)
else:
extractor = lambda batch, i: extract_ts_SAMPIC(batch, i)
# Stream through hits
for batch in open_hit_reader(file_path, cols=['UnixTime'], batch_size=batch_size, root_tree=root_tree):
# determine number of entries in this batch
n = batch.num_rows if isinstance(batch, RecordBatch) else len(batch['UnixTime'])
for i in range(n):
ts = extractor(batch, i)
if last_time is not None and ts < last_time:
violations.append((hit_idx, last_time, ts))
if not find_all:
return violations
last_time = ts
hit_idx += 1
return violations
[docs]
@contextmanager
def reprocess_data_files(
input_path: Path,
output_feather_path: Optional[Path] = None,
output_parquet_path: Optional[Path] = None,
output_root_path: Optional[Path] = None,
root_tree: str = "sampic_hits",
schemaInfo: Dict[str, Tuple] = SAMPIC_Schema_Info,
new_columns: Optional[List[str]] = None,
restrict_columns: Optional[List[str]] = None,
) -> Iterator[
Tuple[List[str], pa.Schema, Optional[ipc.RecordBatchFileWriter], Optional[pq.ParquetWriter], Optional[uproot.writing.WritableTree]]
]:
"""
Context manager to open/prepare readers and writers for re-processing SAMPIC data.
Reads schema & metadata from `input_path` (Parquet, Feather, or ROOT),
optionally restricts or extends the column list, then yields:
1. `columns`: final list of column names
2. `schema`: Arrow Schema with metadata embedded
3. `feather_writer`: IPC writer for Feather (or None)
4. `parquet_writer`: ParquetWriter instance (or None)
5. `root_tree_obj`: Writable ROOT TTree (or None)
Upon exit, finalizes metadata and closes all writers.
Parameters
----------
input_path : pathlib.Path
Path to an existing decoded SAMPIC input file. Supported extensions:
.parquet/.pq, .feather, .root.
output_feather_path : pathlib.Path or None, optional
Path for the output Feather file; if None, Feather writing is disabled.
output_parquet_path : pathlib.Path or None, optional
Path for the output Parquet file; if None, Parquet writing is disabled.
output_root_path : pathlib.Path or None, optional
Path for the output ROOT file; if None, ROOT writing is disabled.
root_tree : str
Name of the TTree for ROOT I/O (default: "sampic_hits").
schemaInfo
Mapping of column names to (pandas_dtype, pa_type, numpy_dtype, optional_size).
Used to rebuild the Arrow schema if `new_columns` or `restrict_columns` is given.
new_columns : list of str
Extra column names to append to those discovered in `input_path`.
restrict_columns : list of str
If given, only the intersection of this list and the discovered columns is used.
Yields
------
columns : list of str
The final ordered list of column names for downstream reads/writes.
schema : pa.Schema
Arrow schema for reading and writing data.
feather_writer : pa.ipc.RecordBatchFileWriter or None
Open IPC writer for Feather, or None if disabled.
parquet_writer : pq.ParquetWriter or None
Open ParquetWriter, or None if disabled.
root_tree_obj : uproot.writing.WritableTree or None
Writable TTree for ROOT output, or None if disabled.
Raises
------
ValueError
If `input_path` has unsupported suffix.
Notes
-----
- Input metadata (from get_file_metadata) is embedded in the Arrow schema
and, upon context exit, written into the ROOT `metadata` TTree if used.
- All open writers are closed automatically in the `finally` block.
- If both `new_columns` and `restrict_columns` are None, the input file’s
native schema is preserved; otherwise a new schema is built via `build_schema`.
"""
# Initialize writers
feather_writer = None
parquet_writer = None
root_tree_obj = None
froot = None
sink = None
# Determine input format and read metadata
input_suffix = input_path.suffix.lower()
metadata = get_file_metadata(input_path)
metadata_bytes = prepare_header_metadata_in_bytes(metadata)
rebuild_schema = False
# Read existing schema/columns
if input_suffix in ('.parquet', '.pq'):
pqf = pq.ParquetFile(str(input_path))
columns = pqf.schema_arrow.names
schema = pqf.schema_arrow
elif input_suffix == '.feather':
rf = feather.read_table(str(input_path), memory_map=True)
columns = rf.schema.names
schema = rf.schema
elif input_suffix == '.root':
reader = uproot.open(str(input_path))
columns = list(reader[root_tree].keys())
reader.close()
rebuild_schema = True
else:
raise ValueError(f"Unsupported input format: {input_suffix}. Expected one of ['.parquet', '.pq', '.feather', '.root']")
# Apply column restrictions or additions
if restrict_columns is not None or new_columns is not None or rebuild_schema:
if restrict_columns is not None:
columns = [c for c in columns if c in restrict_columns]
if new_columns is not None:
columns = columns + [c for c in new_columns if c not in columns]
schema = build_schema(metadata=metadata_bytes, schemaInfo=schemaInfo)
try:
# Open output writers
if output_parquet_path:
# parquet_writer = pq.ParquetWriter(str(output_path), schema,
# version='2.0', use_deprecated_int96_timestamps=False,
# metadata=metadata)
parquet_writer = pq.ParquetWriter(str(output_parquet_path), schema)
if output_feather_path:
sink = pa.OSFile(str(output_feather_path), 'wb')
feather_writer = ipc.new_file(sink, schema)
if output_root_path:
froot = uproot.recreate(output_root_path)
froot[root_tree] = build_empty_root_data_with_schema(schemaInfo=schemaInfo)
root_tree_obj = froot[root_tree]
yield (columns, schema, feather_writer, parquet_writer, root_tree_obj)
finally:
# Write metadata TTree for ROOT
if root_tree_obj is not None and output_root_path:
# Convert to Awkward Arrays of strings for variable-length support
keys = ak.from_iter(list(metadata.keys()), highlevel=True)
vals = ak.from_iter([str(v) for v in metadata.values()], highlevel=True)
froot['metadata'] = {'key': keys, 'value': vals}
froot.close()
# Close Parquet
if parquet_writer is not None:
parquet_writer.close()
# Close Feather
if feather_writer is not None:
feather_writer.close()
sink.close()
def _write_to_outputs(
rec: Dict[str, Any],
schema: pa.Schema,
schemaInfo: Dict[str, Tuple],
feather_writer: Optional[Any],
parquet_writer: Optional[pq.ParquetWriter],
root_tree_obj: Optional[Any],
) -> None:
"""
Internal helper: write a single record to enabled output writers.
"""
if feather_writer or parquet_writer:
# Convert to Table for Arrow formats
table = pa.Table.from_pydict({k: [v] for k, v in rec.items()}, schema=schema)
if feather_writer:
feather_writer.write_batch(table.to_batches()[0])
# feather_writer.write(tbl)
if parquet_writer:
parquet_writer.write_table(table)
if root_tree_obj:
root_data = {
col: np.array([rec[col]], dtype=schemaInfo[col][2]) for col in rec if col in schemaInfo and schemaInfo[col][2] is not None
}
root_tree_obj.extend(root_data)
[docs]
def reorder_hits(
input_path: Path,
output_feather_path: Optional[Path] = None,
output_parquet_path: Optional[Path] = None,
output_root_path: Optional[Path] = None,
root_tree: str = "sampic_hits",
use_unix_time: bool = False,
batch_size: int = 100_000,
max_time_offset: Optional[float] = None,
schemaInfo: Dict[str, Tuple] = SAMPIC_Schema_Info,
) -> None:
"""
Stream and reorder hit records into non-decreasing time order, writing to a new file.
This function reads hit records from `input_path` (Feather, Parquet, or ROOT) in
memory-efficient batches, reconstructs or reads each hit's timestamp, and emits
them in sorted order (non-decreasing time) to `output_path` in the same or different
format. A sliding window (heap) is used to handle small out-of-order intervals
up to `max_time_offset` if provided, otherwise fully buffers.
Parameters
----------
input_path : pathlib.Path
Path to the input decoded SAMPIC data file (.parquet, .pq, .feather, or .root).
output_feather_path : pathlib.Path or None, optional
If not None, path to write the reordered data in Feather format.
output_parquet_path : pathlib.Path or None, optional
If not None, path to write the reordered data in Parquet format.
output_root_path : pathlib.Path or None, optional
If not None, path to write the reordered data to a ROOT file. Writes
data TTree with same `root_tree` name and a metadata TTree.
root_tree : str, optional
Name of the TTree inside the ROOT file (default: `"sampic_hits"`).
use_unix_time : bool, default False
If True, use the 'UnixTime' column directly; otherwise use
`sampic_reconstruct_time_dict` to compute timestamps.
batch_size : int, default 100000
Number of rows/entries to read per batch from the input.
max_time_offset : float or None, optional
Maximum lag (seconds) allowed before emitting buffered records.
Records older than (current_max_time - max_time_offset) are written.
If None, all records are buffered and sorted fully before writing.
schemaInfo : dict, default SAMPIC_Schema_Info
Mapping of field names to type definitions used for ROOT dtype.
Returns
-------
None
Raises
------
ValueError
If timestamp reconstruction fails or required columns missing.
RuntimeError
If no output path is provided.
Notes
-----
- Metadata from the input file is propagated to all output formats.
- Parquet and Feather writing use PyArrow writers with an explicit schema.
- ROOT writing uses uproot to extend a TTree in streaming fashion.
"""
# Ensure at least one output
if not any([output_feather_path, output_parquet_path, output_root_path]):
raise RuntimeError("At least one output path must be specified")
# Use context manager for opening files and writers
with reprocess_data_files(
input_path=input_path,
output_feather_path=output_feather_path,
output_parquet_path=output_parquet_path,
output_root_path=output_root_path,
root_tree=root_tree,
schemaInfo=schemaInfo,
) as (columns, schema, feather_writer, parquet_writer, root_tree_obj):
# Select extractor
if use_unix_time:
extractor = lambda batch, i: extract_unix_time_and_record(batch, i)
else:
extractor = lambda batch, i: extract_SAMPIC_time_and_record(batch, i)
heap: List[TimestampedRecord] = []
current_max_time = None
# Stream input
for batch in open_hit_reader(input_path, cols=columns, batch_size=batch_size, root_tree=root_tree):
n = batch.num_rows if isinstance(batch, RecordBatch) else len(batch[columns[0]])
for i in range(n):
ts, rec = extractor(batch, i)
if current_max_time is None or ts > current_max_time:
current_max_time = ts
heapq.heappush(heap, TimestampedRecord(ts, rec))
# Emit due records
if max_time_offset is not None and current_max_time is not None:
cutoff = current_max_time - max_time_offset
while heap and heap[0].timestamp <= cutoff:
write_record = heapq.heappop(heap).record
_write_to_outputs(write_record, schema, schemaInfo, feather_writer, parquet_writer, root_tree_obj)
# Flush remaining
while heap:
write_record = heapq.heappop(heap).record
_write_to_outputs(write_record, schema, schemaInfo, feather_writer, parquet_writer, root_tree_obj)
[docs]
def reprocess_noop(
input_path: Path,
output_feather_path: Optional[Path] = None,
output_parquet_path: Optional[Path] = None,
output_root_path: Optional[Path] = None,
root_tree: str = "sampic_hits",
batch_size: int = 100_000,
schemaInfo: Dict[str, Tuple] = SAMPIC_Schema_Info,
) -> None:
"""
A “no-op” reprocessor that copies Sampic data from one format to another in batches.
Reads the same columns from `input_path` (Parquet, Feather, or ROOT) and
writes them unmodified to any of the enabled outputs (Feather, Parquet, or ROOT),
streaming in `batch_size` chunks. Metadata is inherited and re-emitted
automatically, so this can convert file formats or update compression, etc.
Parameters
----------
input_path : pathlib.Path
Path to the input decoded SAMPIC data file. Supported suffixes:
`.parquet`/`.pq`, `.feather`, or `.root`. For ROOT, `root_tree`
must point to an existing TTree of hits.
output_feather_path : pathlib.Path or None, default None
If provided, write output as an Arrow IPC/Feather file.
output_parquet_path : pathlib.Path or None, default None
If provided, write output as a Parquet file.
output_root_path : pathlib.Path or None, default None
If provided, write output as a ROOT file with the same TTree name
plus a metadata TTree.
root_tree : str, default "sampic_hits"
Name of the hit TTree for ROOT input/output.
batch_size : int, default 100000
Number of rows/entries to read per batch from the input.
schemaInfo : dict, default SAMPIC_Schema_Info
Mapping of field names to type definitions used for ROOT dtype.
Returns
-------
None
Raises
------
RuntimeError
If no output path is specified.
ValueError
If required columns are missing or conversion fails.
Notes
-----
- This performs **no** transformation on the data itself—fields,
datatypes, and ordering are preserved.
- For ROOT output, array columns are converted via
`get_root_data_with_schema` to fixed-length NumPy vectors.
- Metadata from the input file is propagated to all output formats.
- Parquet and Feather writing use PyArrow writers with an explicit schema.
"""
# Ensure at least one output
if not any([output_feather_path, output_parquet_path, output_root_path]):
raise RuntimeError("Must specify at least one output")
# Use context manager for opening files and writers
with reprocess_data_files(
input_path=input_path,
output_feather_path=output_feather_path,
output_parquet_path=output_parquet_path,
output_root_path=output_root_path,
root_tree=root_tree,
schemaInfo=schemaInfo,
) as (columns, schema, feather_writer, parquet_writer, root_tree_obj):
# Stream input
for batch in open_hit_reader(input_path, cols=columns, batch_size=batch_size, root_tree=root_tree):
table = None
df = None
record_batch = None
# 1) Normalize to a RecordBatch
if isinstance(batch, RecordBatch):
record_batch = batch
else:
arrays = [np.asarray(batch[col]) for col in columns]
record_batch = RecordBatch.from_arrays(arrays, columns)
# 2) Write out in Feather IPC (full batches)
if feather_writer:
feather_writer.write_batch(record_batch)
# 3) Write out in Parquet (full batches)
if parquet_writer:
table = pa.Table.from_batches([record_batch], schema=schema)
# table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
parquet_writer.write_table(table)
# 4) Write out in ROOT
if root_tree_obj:
# fastest: avoid pandas roundtrip if possible
df = record_batch.to_pandas()
root_data = get_root_data_with_schema(df, schemaInfo=schemaInfo)
root_tree_obj.extend(root_data)