diff --git a/setup.cfg b/setup.cfg index 92154b4f..c4b141e6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,10 +72,8 @@ extend-ignore = [tool:pytest] # Run pytest with all our checkers, and don't spam us with massive tracebacks on error -# Don't do flake8 here as we need to separate it out for CI addopts = - --tb=native -vv --doctest-modules --ignore=softioc/iocStats --ignore=epicscorelibs --ignore=docs - --cov=softioc --cov-report term --cov-report xml:cov.xml + --tb=native -vv --ignore=softioc/iocStats --ignore=epicscorelibs --ignore=docs # Enables all discovered async tests and fixtures to be automatically marked as async, even if # they don't have a specific marker https://github.com/pytest-dev/pytest-asyncio#auto-mode asyncio_mode = auto diff --git a/softioc/device_core.py b/softioc/device_core.py index cfc5e8c3..9d73fb89 100644 --- a/softioc/device_core.py +++ b/softioc/device_core.py @@ -155,8 +155,108 @@ def __init__(self, name, **kargs): # a call to get_ioinit_info. This is only a trivial attempt to # reduce resource consumption. self.__ioscanpvt = imports.IOSCANPVT() + # CLS: per-field callback registry. + # Keys are uppercase field names (e.g. "SCAN", "VAL") or the + # wildcard "*" which matches every field write. + # Values are lists of callables. + self.__field_callbacks = {} super().__init__(name, **kargs) + # ---- CLS extension: field-change callbacks ---------------------------- + + def on_field_change(self, field, callback): + '''Register *callback* to be invoked when *field* is written via + CA or PVA. + + Args: + field: EPICS field name (e.g. ``"SCAN"``, ``"VAL"``, + ``"DISA"``), a list of field names + (e.g. ``["SCAN", "DRVH", "DRVL"]``), or ``"*"`` to + receive notifications for **every** field write on this + record. + callback: ``callback(record_name, field_name, value_string)`` + called after each matching write. *value_string* is the + new value formatted by EPICS as a ``DBR_STRING``. + + Multiple callbacks per field are supported; they are called in + registration order. The same callable may be registered for + different fields. + + Note: + Callbacks fire only for writes originating from Channel + Access or PV Access clients. IOC-shell writes (``dbpf``) + and internal ``record.set()`` calls bypass asTrapWrite and + will **not** trigger callbacks. + ''' + if isinstance(field, (list, tuple)): + for f in field: + self.on_field_change(f, callback) + return + field = field.upper() if field != "*" else "*" + self.__field_callbacks.setdefault(field, []).append(callback) + + @property + def field_callbacks(self): + '''Read-only view of the registered field-change callbacks. + + Returns a dict mapping field names (and ``"*"``) to lists of + callables. Modifying the returned dict has no effect on the + internal registry — use :meth:`on_field_change` to register new + callbacks. + ''' + return {k: list(v) for k, v in self.__field_callbacks.items()} + + def remove_field_callback(self, field, callback): + '''Remove a specific *callback* previously registered for *field*. + + Args: + field: Field name (e.g. ``"SCAN"``) or ``"*"``. + callback: The exact callable that was passed to + :meth:`on_field_change`. + + Raises: + ValueError: If *callback* is not registered for *field*. + ''' + field = field.upper() if field != "*" else "*" + try: + self.__field_callbacks[field].remove(callback) + except (KeyError, ValueError): + raise ValueError( + f"Callback not registered for field {field!r}") + # Clean up empty lists to keep the dict tidy. + if not self.__field_callbacks[field]: + del self.__field_callbacks[field] + + def clear_field_callbacks(self, field=None): + '''Remove all field-change callbacks, or all for a single *field*. + + Args: + field: If ``None`` (default), remove **all** callbacks on + this record. Otherwise, remove only those registered for + the given field name (or ``"*"``). + + Raises: + KeyError: If *field* is given but has no registered callbacks. + ''' + if field is None: + self.__field_callbacks.clear() + else: + field = field.upper() if field != "*" else "*" + try: + del self.__field_callbacks[field] + except KeyError: + raise KeyError( + f"No callbacks registered for field {field!r}") + + def _get_field_callbacks(self, field): + '''Return the list of callbacks for *field*, including wildcards. + + This is an internal helper used by :mod:`~softioc.field_monitor`. + ''' + cbs = list(self.__field_callbacks.get(field, [])) + cbs.extend(self.__field_callbacks.get("*", [])) + return cbs + def init_record(self, record): '''Default record initialisation finalisation. This can be overridden @@ -182,9 +282,39 @@ def process_severity(self, record, severity, alarm): record.NSTA = alarm def trigger(self): - '''Call this to trigger processing for records with I/O Intr scan.''' + '''Trigger immediate record processing. + + Uses scanIoRequest for I/O Intr records (fast path). + Falls back to scanOnce for records on any other SCAN setting. + + Only one path fires per call: scanIoRequest returns non-zero + when the record is on the I/O Intr scan list, so scanOnce is + skipped. When the record has been moved to a different scan + list scanIoRequest returns zero and scanOnce takes over. + + Known limitation — periodic SCAN and double processing: + + When SCAN is set to a periodic rate (e.g. "1 second"), the + EPICS scan thread calls dbProcess on its own schedule, and + set() also calls scanOnce → dbProcess. The record is + processed by both paths. dbScanLock serialises access and + recGblCheckDeadband prevents duplicate monitors when the + value has not changed, so this is harmless in practice but + does result in extra processing cycles. + + To avoid this, use ``iocInit(auto_reset_scan=True)``. + The C hook resets SCAN back to I/O Intr after forwarding + the requested rate to the Python callback, so the record + stays on the I/O Intr scan list and scanIoRequest remains + the only processing path. Passive writes are exempt from + the reset (they are a deliberate "stop updating" command). + ''' + queued = False if self.__ioscanpvt: - imports.scanIoRequest(self.__ioscanpvt) + queued = imports.scanIoRequest(self.__ioscanpvt) + if not queued and hasattr(self, '_record') \ + and self._record is not None: + imports.scan_once(self._record.record.value) diff --git a/softioc/extension.c b/softioc/extension.c index 62f579ba..a5053297 100644 --- a/softioc/extension.c +++ b/softioc/extension.c @@ -6,6 +6,8 @@ #define db_accessHFORdb_accessC // Needed to get correct DBF_ values #include #include +#include +#include #include #include #include @@ -269,15 +271,114 @@ void EpicsPvPutHook(struct asTrapWriteMessage *pmessage, int after) } +/* Guard against double-registration of EpicsPvPutHook: asTrapWrite + * maintains a linked list of listeners, and registering the same function + * pointer twice would cause double printf output per write. */ +static int epics_pv_put_hook_installed = 0; + static PyObject *install_pv_logging(PyObject *self, PyObject *args) { const char *acf_file; + int log_puts = 1; /* default: register the printf hook (backward compat) */ - if (!PyArg_ParseTuple(args, "s", &acf_file)) + if (!PyArg_ParseTuple(args, "s|p", &acf_file, &log_puts)) return NULL; asSetFilename(acf_file); - asTrapWriteRegisterListener(EpicsPvPutHook); + if (log_puts && !epics_pv_put_hook_installed) { + asTrapWriteRegisterListener(EpicsPvPutHook); + epics_pv_put_hook_installed = 1; + } + Py_RETURN_NONE; +} + + +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ +/* CLS extension: field-write callback support. + * + * A single Python callable (py_field_write_callback) is invoked for every + * CA/PVA field write that passes through asTrapWrite. The Python layer + * (field_monitor.py) demultiplexes the call to per-record, per-field + * callbacks registered by on_field_change(). + * + * This hook coexists with the original EpicsPvPutHook (print-logging) + * because asTrapWrite supports multiple registered listeners. + */ + +/* Python callable: callback(channel_name: str, value_str: str) */ +static PyObject *py_field_write_callback = NULL; + +/* When non-zero, FieldWriteHook resets SCAN back to I/O Intr after + * forwarding the notification to Python — unless Passive was requested. + * This eliminates periodic-scan contention: SCAN acts as a latched + * command that Python reads and then the record returns to I/O Intr. */ +static int auto_reset_scan = 0; + +static void FieldWriteHook(struct asTrapWriteMessage *pmessage, int after) +{ + if (!after || !py_field_write_callback || py_field_write_callback == Py_None) + return; + + struct dbChannel *pchan = pmessage->serverSpecific; + if (!pchan) return; + + /* Channel name includes the field suffix, e.g. "MYPV.SCAN". */ + const char *channel_name = dbChannelName(pchan); + + /* Read the post-write value formatted as a human-readable string. + * MAX_STRING_SIZE (from EPICS base) is 40 — use a generous buffer + * to accommodate array-of-string fields that FormatValue handles. */ + char value_str[MAX_STRING_SIZE + 1]; + memset(value_str, 0, sizeof(value_str)); + long len = 1; + long opts = 0; + dbGetField(&pchan->addr, DBR_STRING, value_str, &opts, &len, NULL); + + /* Acquire the GIL and forward to Python. */ + PyGILState_STATE gstate = PyGILState_Ensure(); + PyObject *result = PyObject_CallFunction( + py_field_write_callback, "ss", channel_name, value_str); + Py_XDECREF(result); + if (PyErr_Occurred()) + PyErr_Print(); + PyGILState_Release(gstate); + + /* Auto-reset SCAN to I/O Intr after forwarding the notification, + * unless the client set Passive (meaning "stop updating"). */ + if (auto_reset_scan) { + const char *dot = strrchr(channel_name, '.'); + if (dot && strcmp(dot + 1, "SCAN") == 0 + && strcmp(value_str, "Passive") != 0) { + epicsEnum16 io_intr = menuScanI_O_Intr; + dbScanLock(pchan->addr.precord); + dbPut(&pchan->addr, DBR_ENUM, &io_intr, 1); + dbScanUnlock(pchan->addr.precord); + } + } +} + +/* Guard against double-registration of FieldWriteHook — same rationale + * as epics_pv_put_hook_installed above. */ +static int field_write_hook_installed = 0; + +static PyObject *register_field_write_listener(PyObject *self, PyObject *args) +{ + PyObject *callback; + int reset_scan = 0; + if (!PyArg_ParseTuple(args, "O|p", &callback, &reset_scan)) + return NULL; + if (!PyCallable_Check(callback)) { + PyErr_SetString(PyExc_TypeError, "Argument must be callable"); + return NULL; + } + Py_XDECREF(py_field_write_callback); + Py_INCREF(callback); + py_field_write_callback = callback; + auto_reset_scan = reset_scan; + if (!field_write_hook_installed) { + asTrapWriteRegisterListener(FieldWriteHook); + field_write_hook_installed = 1; + } Py_RETURN_NONE; } @@ -321,6 +422,26 @@ static PyObject *signal_processing_complete(PyObject *self, PyObject *args) } +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ +/* CLS extension: scanOnce wrapper. + * + * Queues immediate record processing via the EPICS Base scanOnce() API, + * which works regardless of the record's current SCAN setting. + */ + +static PyObject *scan_once(PyObject *self, PyObject *args) +{ + Py_ssize_t record_ptr; + if (!PyArg_ParseTuple(args, "n", &record_ptr)) + return NULL; + dbCommon *precord = (dbCommon *)record_ptr; + Py_BEGIN_ALLOW_THREADS + scanOnce(precord); + Py_END_ALLOW_THREADS + Py_RETURN_NONE; +} + + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Initialisation. */ @@ -339,6 +460,10 @@ static struct PyMethodDef softioc_methods[] = { "Inform EPICS that asynchronous record processing has completed"}, {"create_callback_capsule", create_callback_capsule, METH_VARARGS, "Create a CALLBACK structure inside a PyCapsule"}, + {"register_field_write_listener", register_field_write_listener, METH_VARARGS, + "Register a Python callable for all CA/PVA field writes (CLS extension)"}, + {"scan_once", scan_once, METH_VARARGS, + "Queue immediate record processing via scanOnce() (CLS extension)"}, {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/softioc/field_monitor.py b/softioc/field_monitor.py new file mode 100644 index 00000000..3c436eee --- /dev/null +++ b/softioc/field_monitor.py @@ -0,0 +1,77 @@ +""" +CLS extension: field-write monitor. + +Bridges the low-level C ``asTrapWrite`` hook to the per-record Python +callbacks registered via :meth:`DeviceSupportCore.on_field_change`. + +After ``iocInit()``, call :func:`install_field_monitor` once. From that +point on every CA/PVA-originated write to any record field triggers +:func:`_dispatch_field_write`, which resolves the record, and invokes the +matching callbacks (exact field match **plus** any ``"*"`` wildcard +callbacks). + +.. note:: + + IOC-shell writes (``dbpf``) and internal ``record.set()`` calls bypass + ``asTrapWrite`` and will **not** fire callbacks. +""" + +import logging + +from .device_core import LookupRecord +from . import imports + +__all__ = ['install_field_monitor'] + +_log = logging.getLogger(__name__) + + +def _parse_channel_name(channel_name): + """Split a channel name into ``(record_name, field_name)``. + + Returns: + tuple: ``("RECNAME", "FIELD")`` or ``("RECNAME", "VAL")`` when + the channel was addressed without a dot suffix. + """ + if "." in channel_name: + return channel_name.rsplit(".", 1) + return channel_name, "VAL" + + +def _dispatch_field_write(channel_name, value_str): + """Called from C for every CA/PVA field write (post-write). + + Resolves the target record via :func:`LookupRecord` and invokes + every callback registered for the written field **and** any ``"*"`` + wildcard callbacks. + """ + rec_name, field = _parse_channel_name(channel_name) + + try: + record = LookupRecord(rec_name) + except KeyError: + return # Not one of our soft-IOC records — nothing to do. + + for cb in record._get_field_callbacks(field): + try: + cb(rec_name, field, value_str) + except Exception: + _log.exception( + "field-change callback error for %s.%s", rec_name, field + ) + + +def install_field_monitor(auto_reset_scan=False): + """Register :func:`_dispatch_field_write` with the C extension. + + Must be called **after** ``iocInit()`` and after the access-security + file (containing the ``TRAPWRITE`` rule) has been loaded — both of + which are handled automatically by :func:`softioc.iocInit`. + + Args: + auto_reset_scan: If True, the C layer resets SCAN back to + "I/O Intr" after every non-Passive SCAN write, eliminating + periodic-scan contention. Default False. + """ + imports.register_field_write_listener( + _dispatch_field_write, auto_reset_scan=auto_reset_scan) diff --git a/softioc/imports.py b/softioc/imports.py index c27dd177..92266636 100644 --- a/softioc/imports.py +++ b/softioc/imports.py @@ -30,9 +30,38 @@ def db_get_field(name, dbr_type, pbuffer, length): '''Get field where pbuffer is void* pointer. Returns None.''' return _extension.db_get_field(name, dbr_type, pbuffer, length) -def install_pv_logging(acf_file): - '''Install pv logging''' - _extension.install_pv_logging(acf_file) +def install_pv_logging(acf_file, log_puts=True): + '''Load access security file and optionally install caput printf logging. + + The ACF file must contain a TRAPWRITE rule for asTrapWrite listeners + (including CLS field-write callbacks) to fire. + + Args: + acf_file: Path to the access security configuration file. + log_puts: If True (default), register the EpicsPvPutHook that + prints caput operations to stdout. Set to False to load the + ACF without the printf overhead. + ''' + _extension.install_pv_logging(acf_file, log_puts) + +def register_field_write_listener(callback, auto_reset_scan=False): + '''CLS extension: register a Python callable for all CA/PVA field writes. + callback(channel_name: str, value_str: str) is called after each write. + + Args: + callback: Python callable receiving (channel_name, value_string). + auto_reset_scan: If True, the C layer resets SCAN back to + "I/O Intr" after every non-Passive SCAN write, eliminating + periodic-scan contention. Default False. + ''' + _extension.register_field_write_listener(callback, auto_reset_scan) + +def scan_once(record): + '''CLS extension: queue immediate record processing via scanOnce(). + Works regardless of the record's current SCAN setting. + record is the integer address of the dbCommon pointer. + ''' + _extension.scan_once(record) def create_callback_capsule(): return _extension.create_callback_capsule() @@ -83,7 +112,7 @@ def from_param(cls, value): scanIoRequest = dbCore.scanIoRequest scanIoRequest.argtypes = (IOSCANPVT,) -scanIoRequest.restype = None +scanIoRequest.restype = c_uint dbLoadDatabase = dbCore.dbLoadDatabase dbLoadDatabase.argtypes = (auto_encode, auto_encode, auto_encode) diff --git a/softioc/softioc.py b/softioc/softioc.py index 8a439a2f..4e707374 100644 --- a/softioc/softioc.py +++ b/softioc/softioc.py @@ -21,7 +21,8 @@ def epicsAtPyExit(): imports.epicsExitCallAtExits() -def iocInit(dispatcher=None, enable_pva=True): +def iocInit(dispatcher=None, enable_pva=True, log_puts=True, + auto_reset_scan=False): '''This must be called exactly once after loading all EPICS database files. After this point the EPICS IOC is running and serving PVs. @@ -30,6 +31,13 @@ def iocInit(dispatcher=None, enable_pva=True): be called in response to caput on a record. If not supplied uses ``cothread`` as the dispatcher. enable_pva: Specify whether to enable the PV Access Server in this IOC. + log_puts: If True (default), register the caput printf logger that + prints every CA/PVA put to stdout. Set to False to disable the + printf output while still loading the access-security file needed + for field-change callbacks. + auto_reset_scan: If True, the C layer resets SCAN back to "I/O Intr" + after every non-Passive SCAN write, making SCAN a latched command. + Default False. See Also: `softioc.asyncio_dispatcher` is a dispatcher for `asyncio` applications @@ -46,9 +54,23 @@ def iocInit(dispatcher=None, enable_pva=True): imports.registerRecordDeviceDriver(pdbbase) + # CLS extension: ensure access security is configured before iocInit. + # The TRAPWRITE rule in access.acf is required for asTrapWrite + # listeners (including our field-write callbacks) to fire. + # The log_puts flag controls whether the printf caput logger is + # registered — defaults to True for backward compatibility. + _acf_file = os.path.join( + os.path.dirname(__file__), 'access.acf') + imports.install_pv_logging(_acf_file, log_puts=log_puts) + imports.iocInit() autosave.start_autosave_thread() + # CLS extension: register the Python-level field-write dispatcher now + # that the IOC is running and access security is active. + from .field_monitor import install_field_monitor + install_field_monitor(auto_reset_scan=auto_reset_scan) + def safeEpicsExit(code=0): '''Calls epicsExit() after ensuring Python exit handlers called.''' diff --git a/tests/conftest.py b/tests/conftest.py index 04e13b04..e0f61c03 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -59,10 +59,12 @@ def log(*args): class SubprocessIOC: - def __init__(self, ioc_py): + def __init__(self, ioc_py, extra_args=None): self.pv_prefix = create_random_prefix() sim_ioc = os.path.join(os.path.dirname(__file__), ioc_py) cmd = [sys.executable, sim_ioc, self.pv_prefix] + if extra_args: + cmd.extend(extra_args) self.proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -114,6 +116,53 @@ def asyncio_ioc_override(): ioc.kill() aioca_cleanup() + +@pytest.fixture +def field_callbacks_ioc(): + """Start a subprocess IOC that registers on_field_change callbacks.""" + ioc = SubprocessIOC("sim_field_callbacks_ioc.py") + yield ioc + ioc.kill() + aioca_cleanup() + + +@pytest.fixture +def field_behavior_ioc(): + """Start a subprocess IOC for field-behavior tests.""" + ioc = SubprocessIOC("sim_field_behavior_ioc.py") + yield ioc + ioc.kill() + aioca_cleanup() + + +@pytest.fixture +def universal_set_ioc(): + """Start a subprocess IOC for universal set() tests.""" + ioc = SubprocessIOC("sim_universal_set_ioc.py") + yield ioc + ioc.kill() + aioca_cleanup() + + +@pytest.fixture +def callback_refinements_ioc(): + """Start a subprocess IOC for callback refinement tests.""" + ioc = SubprocessIOC("sim_callback_refinements_ioc.py") + yield ioc + ioc.kill() + aioca_cleanup() + + +@pytest.fixture +def auto_reset_scan_ioc(): + """Start a subprocess IOC with auto_reset_scan enabled.""" + ioc = SubprocessIOC( + "sim_callback_refinements_ioc.py", + extra_args=["--auto-reset-scan"]) + yield ioc + ioc.kill() + aioca_cleanup() + def reset_device_name(): if GetRecordNames().prefix: SetDeviceName("") diff --git a/tests/sim_callback_refinements_ioc.py b/tests/sim_callback_refinements_ioc.py new file mode 100644 index 00000000..fd8d85ad --- /dev/null +++ b/tests/sim_callback_refinements_ioc.py @@ -0,0 +1,119 @@ +"""Subprocess IOC for testing callback refinements. + +Tests: +- Multi-field on_field_change (list form) +- on_update coexistence with field callbacks +- auto_reset_scan behavior (when enabled via iocInit flag) + +Records +------- +AO ao Main record with on_update and field callbacks. +ON-UPDATE-CNT Incremented by on_update. +MULTI-CB-CNT Incremented by multi-field callback (DRVH, DRVL, HIHI). +WILDCARD-CNT Incremented by * wildcard callback. +VAL-CB-CNT Incremented by VAL field callback. +SCAN-CB-CNT Incremented by SCAN field callback. +LAST-SCAN-VAL Stores the last SCAN value seen by the callback. +""" + +import sys +from argparse import ArgumentParser +from multiprocessing.connection import Client + +from softioc import softioc, builder, asyncio_dispatcher + +from conftest import ADDRESS, select_and_recv + +if __name__ == "__main__": + with Client(ADDRESS) as conn: + parser = ArgumentParser() + parser.add_argument('prefix') + parser.add_argument( + '--auto-reset-scan', action='store_true', default=False) + parsed_args = parser.parse_args() + builder.SetDeviceName(parsed_args.prefix) + + # ----- Counter PVs ----- + on_update_cnt = builder.longOut( + "ON-UPDATE-CNT", initial_value=0) + multi_cb_cnt = builder.longOut( + "MULTI-CB-CNT", initial_value=0) + wildcard_cnt = builder.longOut( + "WILDCARD-CNT", initial_value=0) + val_cb_cnt = builder.longOut( + "VAL-CB-CNT", initial_value=0) + scan_cb_cnt = builder.longOut( + "SCAN-CB-CNT", initial_value=0) + last_scan_val = builder.stringOut( + "LAST-SCAN-VAL", initial_value="") + dereg_cnt = builder.longOut( + "DEREG-CB-CNT", initial_value=0) + + # ----- Main record under test ----- + def ao_on_update(value): + on_update_cnt.set(on_update_cnt.get() + 1) + + t_ao = builder.aOut( + "AO", + initial_value=0.0, + on_update=ao_on_update, + always_update=True, + DRVL=-50.0, + DRVH=50.0, + HIHI=90.0, + PREC=3, + ) + + # ----- Boot the IOC ----- + dispatcher = asyncio_dispatcher.AsyncioDispatcher() + builder.LoadDatabase() + softioc.iocInit( + dispatcher, + log_puts=False, + auto_reset_scan=parsed_args.auto_reset_scan, + ) + + # ---- Register callbacks (CLS) ----- + + # Multi-field subscription: single callback for DRVH, DRVL, HIHI + def _multi_cb(rec_name, field, value): + multi_cb_cnt.set(multi_cb_cnt.get() + 1) + t_ao.on_field_change(["DRVH", "DRVL", "HIHI"], _multi_cb) + + # Wildcard + def _wildcard_cb(rec_name, field, value): + wildcard_cnt.set(wildcard_cnt.get() + 1) + t_ao.on_field_change("*", _wildcard_cb) + + # VAL-specific + def _val_cb(rec_name, field, value): + val_cb_cnt.set(val_cb_cnt.get() + 1) + t_ao.on_field_change("VAL", _val_cb) + + # SCAN-specific: also stores the value + def _scan_cb(rec_name, field, value): + scan_cb_cnt.set(scan_cb_cnt.get() + 1) + last_scan_val.set(value[:39]) + t_ao.on_field_change("SCAN", _scan_cb) + + # --- De-registration support --- + # A second DRVH callback that can be removed on command. + def _dereg_cb(rec_name, field, value): + dereg_cnt.set(dereg_cnt.get() + 1) + t_ao.on_field_change("DRVH", _dereg_cb) + + conn.send("R") # "Ready" + + # Protocol: test sends "P" (phase-2: deregister) or "D" (done). + msg = select_and_recv(conn) + if msg == "P": + t_ao.remove_field_callback("DRVH", _dereg_cb) + conn.send("K") # acknowledged + select_and_recv(conn, "D") + else: + assert msg == "D" + + sys.stdout.flush() + sys.stderr.flush() + + conn.send("D") # "Done" acknowledgement diff --git a/tests/sim_field_behavior_ioc.py b/tests/sim_field_behavior_ioc.py new file mode 100644 index 00000000..c8c0e7ee --- /dev/null +++ b/tests/sim_field_behavior_ioc.py @@ -0,0 +1,143 @@ +"""Subprocess IOC for testing how PV field writes interact with +pythonSoftIOC's device-support layer and the underlying EPICS +record processing. + +Records created +--------------- +AO ao Main analog-output record with alarm limits, drive + limits, initial SCAN='I/O Intr', on_update callback. +AI ai Analog-input that mirrors the AO value via on_update. +BO bo Binary-output with on_update. +LONGIN longin Long-input, updated by Python only. + +Counter PVs (longOut) — incremented by the corresponding callback +so the test can verify that the callback fired. + +AO-UPDATE-CNT Incremented every time on_update fires for AO. +AO-PROCESS-CNT Incremented every time AO's _process runs + (raw device-support level). +AI-VAL Holds the AI value after set(). +BO-UPDATE-CNT Incremented every time on_update fires for BO. + +The IOC also records the last field-change notification it sees +(CLS extension) for AO: +LAST-FIELD stringOut holding the last field name. +LAST-VALUE stringOut holding the last value string. +""" + +import sys +from argparse import ArgumentParser +from multiprocessing.connection import Client + +from softioc import ( + softioc, builder, + asyncio_dispatcher, pvlog, +) + +from conftest import ADDRESS, select_and_recv + +if __name__ == "__main__": + with Client(ADDRESS) as conn: + parser = ArgumentParser() + parser.add_argument( + 'prefix', + help="The PV prefix for the records", + ) + parsed_args = parser.parse_args() + builder.SetDeviceName(parsed_args.prefix) + + # ----- Counter / mirror PVs ----- + ao_update_cnt = builder.longOut( + "AO-UPDATE-CNT", initial_value=0, + ) + ao_process_cnt = builder.longOut( + "AO-PROCESS-CNT", initial_value=0, + ) + ai_val = builder.aOut( + "AI-VAL", initial_value=0.0, + ) + bo_update_cnt = builder.longOut( + "BO-UPDATE-CNT", initial_value=0, + ) + last_field = builder.stringOut( + "LAST-FIELD", initial_value="", + ) + last_value = builder.stringOut( + "LAST-VALUE", initial_value="", + ) + + # ----- Primary records under test ----- + def ao_on_update(value): + ao_update_cnt.set(ao_update_cnt.get() + 1) + + t_ao = builder.aOut( + "AO", + initial_value=0.0, + on_update=ao_on_update, + always_update=True, + LOPR=-100.0, + HOPR=100.0, + DRVL=-50.0, + DRVH=50.0, + HIHI=90.0, + HIGH=70.0, + LOW=-70.0, + LOLO=-90.0, + HHSV="MAJOR", + HSV="MINOR", + LSV="MINOR", + LLSV="MAJOR", + PREC=3, + EGU="V", + ) + + t_ai = builder.aIn( + "AI", + initial_value=0.0, + LOPR=-100.0, + HOPR=100.0, + HIHI=90.0, + HIGH=70.0, + LOW=-70.0, + LOLO=-90.0, + HHSV="MAJOR", + HSV="MINOR", + LSV="MINOR", + LLSV="MAJOR", + PREC=3, + EGU="V", + ) + + def bo_on_update(value): + bo_update_cnt.set(bo_update_cnt.get() + 1) + + t_bo = builder.boolOut( + "BO", + initial_value=False, + on_update=bo_on_update, + always_update=True, + ) + + t_longin = builder.longIn( + "LONGIN", + initial_value=0, + ) + + # ----- Boot the IOC ----- + dispatcher = asyncio_dispatcher.AsyncioDispatcher() + builder.LoadDatabase() + softioc.iocInit(dispatcher) + + # ----- Register field-change callbacks (CLS) ----- + def _on_any_field(rec_name, field, value): + last_field.set(field) + last_value.set(value) + + t_ao.on_field_change("*", _on_any_field) + + conn.send("R") # "Ready" + + select_and_recv(conn, "D") # "Done" + + sys.stdout.flush() + sys.stderr.flush() diff --git a/tests/sim_field_callbacks_ioc.py b/tests/sim_field_callbacks_ioc.py new file mode 100644 index 00000000..62460887 --- /dev/null +++ b/tests/sim_field_callbacks_ioc.py @@ -0,0 +1,93 @@ +"""Simulated IOC for CLS field-change callback tests. + +Creates a small set of records and registers ``on_field_change`` callbacks via +the CLS extension. Counter PVs are incremented each time a callback fires, +allowing the test client to verify behaviour over CA or PVA. + +Records created +--------------- +``{prefix}:AO`` + The record whose fields the test client writes to. +``{prefix}:SCAN-CB-CNT`` + Incremented by the SCAN callback. +``{prefix}:DISA-CB-CNT`` + Incremented by the DISA callback. +``{prefix}:VAL-CB-CNT`` + Incremented by the VAL callback. +``{prefix}:HIHI-CB-CNT`` + Incremented by the HIHI callback (alarm field, DBF_DOUBLE). +``{prefix}:DESC-CB-CNT`` + Incremented by the DESC callback (string field, DBF_STRING). +``{prefix}:ANY-CB-CNT`` + Incremented by a wildcard ``"*"`` callback (fires on **every** field + write). + +Expected behaviour +------------------ +- Original (upstream) pythonSoftIOC: ``on_field_change`` does not exist, so + this script raises ``AttributeError`` before printing READY. +- CLS fork: all callbacks register successfully and READY is printed. +""" + +import sys +from argparse import ArgumentParser +from multiprocessing.connection import Client + +from softioc import softioc, builder, asyncio_dispatcher + +from conftest import ADDRESS, select_and_recv + + +if __name__ == "__main__": + with Client(ADDRESS) as conn: + parser = ArgumentParser() + parser.add_argument("prefix", help="PV prefix for the records") + parsed_args = parser.parse_args() + builder.SetDeviceName(parsed_args.prefix) + + # Main record whose fields the test client writes to. + ao = builder.aOut("AO", initial_value=0.0, HIHI=90.0, HIGH=70.0) + + # Counter PVs — incremented by the corresponding callback. + scan_cnt = builder.longOut("SCAN-CB-CNT", initial_value=0) + disa_cnt = builder.longOut("DISA-CB-CNT", initial_value=0) + val_cnt = builder.longOut("VAL-CB-CNT", initial_value=0) + hihi_cnt = builder.longOut("HIHI-CB-CNT", initial_value=0) + desc_cnt = builder.longOut("DESC-CB-CNT", initial_value=0) + any_cnt = builder.longOut("ANY-CB-CNT", initial_value=0) + + # Boot the IOC. + dispatcher = asyncio_dispatcher.AsyncioDispatcher() + builder.LoadDatabase() + softioc.iocInit(dispatcher) + + # ---- Register on_field_change callbacks (CLS extension) ---------- + # With upstream pythonSoftIOC this raises AttributeError. + + def _make_increment(counter): + """Return a callback that increments *counter* by one.""" + def _cb(rec_name, field, value): + counter.set(counter.get() + 1) + return _cb + + # Per-field callbacks. + ao.on_field_change("SCAN", _make_increment(scan_cnt)) + ao.on_field_change("DISA", _make_increment(disa_cnt)) + ao.on_field_change("VAL", _make_increment(val_cnt)) + # DBF_DOUBLE alarm field + ao.on_field_change("HIHI", _make_increment(hihi_cnt)) + # DBF_STRING metadata field + ao.on_field_change("DESC", _make_increment(desc_cnt)) + + # Wildcard callback — fires for every field write on this record. + ao.on_field_change("*", _make_increment(any_cnt)) + + conn.send("R") # "Ready" + + # Keep the process alive until the test tells us to stop. + select_and_recv(conn, "D") # "Done" + + sys.stdout.flush() + sys.stderr.flush() + + conn.send("D") # "Done" acknowledgement diff --git a/tests/sim_universal_set_ioc.py b/tests/sim_universal_set_ioc.py new file mode 100644 index 00000000..12a9aa84 --- /dev/null +++ b/tests/sim_universal_set_ioc.py @@ -0,0 +1,101 @@ +"""Subprocess IOC for testing universal set() across SCAN settings. + +Records created +--------------- +AI ai Analog input, default SCAN='I/O Intr'. +AO ao Analog output, default SCAN='I/O Intr'. +LONGIN longin Long input, default SCAN='I/O Intr'. +BO bo Binary output, default SCAN='I/O Intr'. + +The test harness sends commands over the multiprocessing connection: + ("set", "AI", value) -> calls record.set(value), replies "OK" + ("get", "AI") -> replies with record.get() + ("scan", "AI", "Passive") -> changes SCAN via dbpf, replies "OK" + "D" -> shutdown +""" + +import sys +from argparse import ArgumentParser +from multiprocessing.connection import Client + +from softioc import softioc, builder, asyncio_dispatcher + +from conftest import ADDRESS + +if __name__ == "__main__": + with Client(ADDRESS) as conn: + parser = ArgumentParser() + parser.add_argument( + 'prefix', + help="The PV prefix for the records", + ) + parsed_args = parser.parse_args() + device_name = parsed_args.prefix + builder.SetDeviceName(device_name) + + # ----- Records under test ----- + t_ai = builder.aIn( + "AI", + initial_value=0.0, + PREC=3, + ) + + t_ao = builder.aOut( + "AO", + initial_value=0.0, + always_update=True, + PREC=3, + ) + + t_longin = builder.longIn( + "LONGIN", + initial_value=0, + ) + + t_bo = builder.boolOut( + "BO", + initial_value=False, + always_update=True, + ) + + records = { + "AI": t_ai, + "AO": t_ao, + "LONGIN": t_longin, + "BO": t_bo, + } + + # ----- Boot the IOC ----- + dispatcher = asyncio_dispatcher.AsyncioDispatcher() + builder.LoadDatabase() + softioc.iocInit(dispatcher) + + conn.send("R") # "Ready" + + # ----- Command loop ----- + while True: + msg = conn.recv() + if msg == "D": + break + elif isinstance(msg, tuple): + cmd = msg[0] + if cmd == "set": + _, rec_name, value = msg + records[rec_name].set(value) + conn.send("OK") + elif cmd == "get": + _, rec_name = msg + conn.send(records[rec_name].get()) + elif cmd == "scan": + # Change SCAN via softioc.dbpf to bypass DISP + _, rec_name, scan_value = msg + pv_name = f"{device_name}:{rec_name}.SCAN" + softioc.dbpf(pv_name, scan_value) + conn.send("OK") + else: + conn.send("ERR") + else: + conn.send("ERR") + + sys.stdout.flush() + sys.stderr.flush() diff --git a/tests/test_callback_refinements.py b/tests/test_callback_refinements.py new file mode 100644 index 00000000..46cd886a --- /dev/null +++ b/tests/test_callback_refinements.py @@ -0,0 +1,303 @@ +"""Tests for the CLS callback refinements. + +Uses ``sim_callback_refinements_ioc.py`` which exercises: + +* **Multi-field subscription** — + ``on_field_change(["DRVH", "DRVL", "HIHI"], cb)`` + registers a single callback for multiple fields. +* **on_update coexistence** — ``on_update`` and ``on_field_change("VAL", cb)`` + both fire on a VAL write without interfering with each other. +* **Wildcard fires for all writes** — ``on_field_change("*", cb)`` + sees every field write. +* **auto_reset_scan** — When enabled via ``iocInit(auto_reset_scan=True)``, + an external SCAN write (e.g. "1 second") is forwarded to the Python + callback but the SCAN field is immediately reset to "I/O Intr". + Passive writes are exempt from the reset. +""" + +import asyncio + +import pytest + +from multiprocessing.connection import Listener + +from conftest import ( + ADDRESS, + select_and_recv, + aioca_cleanup, + TIMEOUT, +) + + +# --------------------------------------------------------------------------- +# Multi-field subscription tests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_multi_field_subscription(callback_refinements_ioc): + """on_field_change(["DRVH","DRVL","HIHI"], cb) fires for each field.""" + from aioca import caget, caput + + pre = callback_refinements_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + # Each write to a different listed field should fire _multi_cb + await caput(pre + ":AO.DRVH", 100.0, wait=True) + await asyncio.sleep(0.3) + assert await caget(pre + ":MULTI-CB-CNT") == 1 + + await caput(pre + ":AO.DRVL", -100.0, wait=True) + await asyncio.sleep(0.3) + assert await caget(pre + ":MULTI-CB-CNT") == 2 + + await caput(pre + ":AO.HIHI", 99.0, wait=True) + await asyncio.sleep(0.3) + assert await caget(pre + ":MULTI-CB-CNT") == 3 + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +# --------------------------------------------------------------------------- +# on_update coexistence +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_on_update_coexistence(callback_refinements_ioc): + """on_update and on_field_change("VAL") both fire on a VAL write.""" + from aioca import caget, caput + + pre = callback_refinements_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + await caput(pre + ":AO", 42.0, wait=True) + await asyncio.sleep(0.3) + + # on_update should have fired + assert await caget(pre + ":ON-UPDATE-CNT") == 1 + # on_field_change("VAL") should also have fired + assert await caget(pre + ":VAL-CB-CNT") == 1 + # Wildcard must also fire + assert await caget(pre + ":WILDCARD-CNT") == 1 + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +# --------------------------------------------------------------------------- +# Wildcard accumulation +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_wildcard_accumulation(callback_refinements_ioc): + """Wildcard counter accumulates across different field writes.""" + from aioca import caget, caput + + pre = callback_refinements_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + # Three distinct field writes + await caput(pre + ":AO.DRVH", 100.0, wait=True) + await caput(pre + ":AO.DRVL", -100.0, wait=True) + await caput(pre + ":AO", 5.0, wait=True) + await asyncio.sleep(0.5) + + # Wildcard fires for each: DRVH, DRVL, VAL = 3 + assert await caget(pre + ":WILDCARD-CNT") == 3 + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +# --------------------------------------------------------------------------- +# Multi-field isolation +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_multi_field_isolation(callback_refinements_ioc): + """Multi-field callback only fires for subscribed fields, not others.""" + from aioca import caget, caput + + pre = callback_refinements_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + # Writing to VAL should NOT increment MULTI-CB-CNT + await caput(pre + ":AO", 5.0, wait=True) + await asyncio.sleep(0.3) + assert await caget(pre + ":MULTI-CB-CNT") == 0 + # But VAL-CB-CNT should fire + assert await caget(pre + ":VAL-CB-CNT") == 1 + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +# --------------------------------------------------------------------------- +# auto_reset_scan: disabled (default) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_scan_no_auto_reset(callback_refinements_ioc): + """Without auto_reset_scan, SCAN stays at whatever the client wrote.""" + from aioca import caget, caput + + pre = callback_refinements_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + await caput(pre + ":AO.SCAN", "1 second", wait=True) + await asyncio.sleep(0.5) + + # Callback should fire + assert await caget(pre + ":SCAN-CB-CNT") == 1 + # SCAN stays at "1 second" (no reset) + scan_val = await caget(pre + ":AO.SCAN", datatype=str) + assert scan_val == "1 second" + finally: + # Reset SCAN to Passive before leaving + await caput(pre + ":AO.SCAN", "Passive", wait=True) + await asyncio.sleep(0.3) + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +# --------------------------------------------------------------------------- +# auto_reset_scan: enabled +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_scan_auto_reset(auto_reset_scan_ioc): + """With auto_reset_scan, SCAN is reset to I/O Intr after callback fires.""" + from aioca import caget, caput + + pre = auto_reset_scan_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + await caput(pre + ":AO.SCAN", "1 second", wait=True) + await asyncio.sleep(0.5) + + # Callback should have seen "1 second" + assert await caget(pre + ":SCAN-CB-CNT") == 1 + last_val = await caget(pre + ":LAST-SCAN-VAL") + assert last_val == "1 second" + + # But SCAN should now read "I/O Intr" (auto-reset) + scan_val = await caget(pre + ":AO.SCAN", datatype=str) + assert scan_val == "I/O Intr" + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +@pytest.mark.asyncio +async def test_scan_auto_reset_passive_exempt(auto_reset_scan_ioc): + """Passive SCAN writes are exempt from auto_reset_scan.""" + from aioca import caget, caput + + pre = auto_reset_scan_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + await caput(pre + ":AO.SCAN", "Passive", wait=True) + await asyncio.sleep(0.5) + + # SCAN stays at Passive (not reset to I/O Intr) + scan_val = await caget(pre + ":AO.SCAN", datatype=str) + assert scan_val == "Passive" + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +@pytest.mark.asyncio +async def test_scan_auto_reset_multiple(auto_reset_scan_ioc): + """Multiple SCAN writes each fire the callback and get reset.""" + from aioca import caget, caput + + pre = auto_reset_scan_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + await caput(pre + ":AO.SCAN", "1 second", wait=True) + await asyncio.sleep(0.5) + await caput(pre + ":AO.SCAN", "2 second", wait=True) + await asyncio.sleep(0.5) + + # Both writes should have fired the callback + assert await caget(pre + ":SCAN-CB-CNT") == 2 + # The last value seen should be "2 second" + last_val = await caget(pre + ":LAST-SCAN-VAL") + assert last_val == "2 second" + # SCAN should be reset after the second write + scan_val = await caget(pre + ":AO.SCAN", datatype=str) + assert scan_val == "I/O Intr" + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +# --------------------------------------------------------------------------- +# Callback de-registration +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_remove_field_callback(callback_refinements_ioc): + """remove_field_callback stops the callback from firing.""" + from aioca import caget, caput + + pre = callback_refinements_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + # Phase 1: _dereg_cb is registered — DRVH write fires it. + await caput(pre + ":AO.DRVH", 100.0, wait=True) + await asyncio.sleep(0.3) + assert await caget(pre + ":DEREG-CB-CNT") == 1 + # _multi_cb also fires (DRVH is in the list). + assert await caget(pre + ":MULTI-CB-CNT") == 1 + + # Phase 2: ask subprocess to de-register _dereg_cb. + conn.send("P") + select_and_recv(conn, "K") + + # Phase 3: another DRVH write — _dereg_cb should NOT fire. + await caput(pre + ":AO.DRVH", 200.0, wait=True) + await asyncio.sleep(0.3) + assert await caget(pre + ":DEREG-CB-CNT") == 1 # unchanged + # _multi_cb still fires (it was not removed). + assert await caget(pre + ":MULTI-CB-CNT") == 2 + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") diff --git a/tests/test_field_behavior.py b/tests/test_field_behavior.py new file mode 100644 index 00000000..041c0f54 --- /dev/null +++ b/tests/test_field_behavior.py @@ -0,0 +1,741 @@ +"""Behavioral tests: PV field writes vs pythonSoftIOC processing. + +Goal +---- +Verify that writing to standard EPICS record fields (SCAN, DISA, +HIHI, DRVH, etc.) via Channel Access does **not** break pSIOC's +device-support layer, and document which behaviors are handled +by the EPICS libraries vs which are pSIOC-specific. + +Each test covers one "concern" and is self-contained. +All tests use the subprocess IOC +``sim_field_behavior_ioc.py``. +""" + +import asyncio +import time +import pytest + +from multiprocessing.connection import Listener + +from aioca import caget, caput, FORMAT_CTRL, FORMAT_TIME + +from conftest import ( + ADDRESS, TIMEOUT, + select_and_recv, aioca_cleanup, +) + + +# ---------------------------------------------------------------- # +# Helpers # +# ---------------------------------------------------------------- # + +def _pv(prefix, suffix): + return f"{prefix}:{suffix}" + + +async def _wait_for(prefix, pv_suffix, expected, timeout=TIMEOUT): + """Poll a PV until it reaches *expected* or timeout.""" + pv = _pv(prefix, pv_suffix) + deadline = time.time() + timeout + while time.time() < deadline: + val = await caget(pv, timeout=TIMEOUT) + if val >= expected: + return val + await asyncio.sleep(0.1) + return await caget(pv, timeout=TIMEOUT) + + +# ================================================================ # +# 1. DRIVE LIMITS (DRVH / DRVL) # +# ================================================================ # + +class TestDriveLimits: + """EPICS ao record support clamps VAL to [DRVL, DRVH].""" + + @pytest.mark.asyncio + async def test_initial_drvh_clamp(self, field_behavior_ioc): + """caput value > DRVH is clamped to DRVH.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO"), 100.0, + timeout=TIMEOUT, wait=True) + val = await caget(_pv(p, "AO"), timeout=TIMEOUT) + assert val == pytest.approx(50.0), ( + f"Expected clamped to DRVH=50, got {val}" + ) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_initial_drvl_clamp(self, field_behavior_ioc): + """caput value < DRVL is clamped to DRVL.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO"), -100.0, + timeout=TIMEOUT, wait=True) + val = await caget(_pv(p, "AO"), timeout=TIMEOUT) + assert val == pytest.approx(-50.0), ( + f"Expected clamped to DRVL=-50, got {val}" + ) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_change_drvh_via_ca(self, field_behavior_ioc): + """Changing DRVH via CA updates the effective clamp.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO.DRVH"), 80.0, timeout=TIMEOUT) + await asyncio.sleep(0.2) + await caput(_pv(p, "AO"), 75.0, + timeout=TIMEOUT, wait=True) + val = await caget(_pv(p, "AO"), timeout=TIMEOUT) + assert val == pytest.approx(75.0), ( + f"DRVH=80: caput 75 should pass, got {val}" + ) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_on_update_with_clamped(self, field_behavior_ioc): + """on_update callback fires even when clamped.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + cnt0 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput(_pv(p, "AO"), 999.0, + timeout=TIMEOUT, wait=True) + cnt = await _wait_for( + p, "AO-UPDATE-CNT", cnt0 + 1, + ) + assert cnt >= cnt0 + 1 + val = await caget(_pv(p, "AO"), timeout=TIMEOUT) + assert val <= 50.0 + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 2. ALARM LIMITS (HIHI / HIGH / LOW / LOLO) # +# ================================================================ # + +class TestAlarmLimits: + """EPICS record support evaluates alarm thresholds.""" + + @pytest.mark.asyncio + async def test_hihi_major(self, field_behavior_ioc): + """Value > HIHI -> MAJOR alarm.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + # DRVH is 50, HIHI is 90; raise DRVH first + await caput(_pv(p, "AO.DRVH"), 100.0, timeout=TIMEOUT) + await asyncio.sleep(0.1) + await caput(_pv(p, "AO"), 95.0, + timeout=TIMEOUT, wait=True) + r = await caget(_pv(p, "AO"), + format=FORMAT_TIME, timeout=TIMEOUT) + assert r.severity == 2, ( + f"Expected MAJOR(2) for 95>HIHI=90, " + f"got {r.severity}" + ) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_high_minor(self, field_behavior_ioc): + """Value > HIGH but < HIHI -> MINOR alarm.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO.DRVH"), 100.0, timeout=TIMEOUT) + await asyncio.sleep(0.1) + await caput(_pv(p, "AO"), 75.0, + timeout=TIMEOUT, wait=True) + r = await caget(_pv(p, "AO"), + format=FORMAT_TIME, timeout=TIMEOUT) + assert r.severity == 1, ( + f"Expected MINOR(1) for 75>HIGH=70, " + f"got {r.severity}" + ) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_change_hihi_clears_alarm( + self, field_behavior_ioc + ): + """Raising HIHI above current value clears alarm.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO.DRVH"), 100.0, timeout=TIMEOUT) + await caput(_pv(p, "AO.HIGH"), 100.0, timeout=TIMEOUT) + await asyncio.sleep(0.1) + await caput(_pv(p, "AO"), 95.0, + timeout=TIMEOUT, wait=True) + r = await caget(_pv(p, "AO"), + format=FORMAT_TIME, timeout=TIMEOUT) + assert r.severity == 2 # MAJOR + + await caput(_pv(p, "AO.HIHI"), 100.0, timeout=TIMEOUT) + await asyncio.sleep(0.1) + # Re-write to re-process + await caput(_pv(p, "AO"), 95.0, + timeout=TIMEOUT, wait=True) + r = await caget(_pv(p, "AO"), + format=FORMAT_TIME, timeout=TIMEOUT) + assert r.severity == 0, ( + "HIHI=100: val=95 should be NO_ALARM" + ) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_on_update_under_alarm(self, field_behavior_ioc): + """on_update fires even when record is in alarm.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + cnt0 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput(_pv(p, "AO"), 45.0, + timeout=TIMEOUT, wait=True) + cnt = await _wait_for( + p, "AO-UPDATE-CNT", cnt0 + 1, + ) + assert cnt >= cnt0 + 1 + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 3. SCAN FIELD # +# ================================================================ # + +class TestScanField: + """SCAN controls when EPICS processes the record. + pSIOC defaults to I/O Intr.""" + + @pytest.mark.asyncio + async def test_default_scan(self, field_behavior_ioc): + """pSIOC sets SCAN='I/O Intr' for input records. + Output records (aOut) default to 'Passive'.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + # aOut defaults to Passive (not I/O Intr) + ao_scan = await caget( + _pv(p, "AO.SCAN"), + datatype=str, timeout=TIMEOUT, + ) + assert "Passive" in str(ao_scan) + # aIn defaults to I/O Intr + ai_scan = await caget( + _pv(p, "AI.SCAN"), + datatype=str, timeout=TIMEOUT, + ) + assert "I/O Intr" in str(ai_scan) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_periodic_scan(self, field_behavior_ioc): + """SCAN='1 second' causes periodic on_update calls.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO"), 10.0, + timeout=TIMEOUT, wait=True) + await asyncio.sleep(0.3) + cnt0 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput(_pv(p, "AO.SCAN"), "1 second", + datatype=str, timeout=TIMEOUT) + await asyncio.sleep(2.5) + cnt1 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + # Restore + await caput(_pv(p, "AO.SCAN"), "I/O Intr", + datatype=str, timeout=TIMEOUT) + assert cnt1 >= cnt0 + 2, ( + f"Expected >=2 extra calls, " + f"got {cnt1 - cnt0}" + ) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_passive_no_auto(self, field_behavior_ioc): + """AO defaults to Passive. Explicit caput still + triggers on_update via dbProcess.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + # AO is already Passive by default + scan = await caget(_pv(p, "AO.SCAN"), + datatype=str, timeout=TIMEOUT) + assert "Passive" in str(scan) + cnt0 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + # caput to VAL triggers dbPutField -> dbProcess + await caput(_pv(p, "AO"), 5.0, + timeout=TIMEOUT, wait=True) + cnt = await _wait_for( + p, "AO-UPDATE-CNT", cnt0 + 1, + ) + assert cnt >= cnt0 + 1, ( + "Explicit caput triggers on_update" + ) + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 4. DISABLE (DISA / DISV) # +# ================================================================ # + +class TestDisable: + """DISA==DISV disables record processing.""" + + @pytest.mark.asyncio + async def test_disa_suppresses_on_update( + self, field_behavior_ioc + ): + """With DISA=1 (==DISV), on_update does NOT fire.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO"), 1.0, + timeout=TIMEOUT, wait=True) + await asyncio.sleep(0.3) + cnt0 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput(_pv(p, "AO.DISA"), 1, timeout=TIMEOUT) + await asyncio.sleep(0.2) + await caput(_pv(p, "AO"), 2.0, + timeout=TIMEOUT, wait=True) + await asyncio.sleep(0.5) + cnt1 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput(_pv(p, "AO.DISA"), 0, timeout=TIMEOUT) + assert cnt1 == cnt0, ( + "on_update should NOT fire when disabled" + ) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_disa_re_enable(self, field_behavior_ioc): + """After clearing DISA, processing resumes.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO.DISA"), 1, timeout=TIMEOUT) + await asyncio.sleep(0.2) + await caput(_pv(p, "AO.DISA"), 0, timeout=TIMEOUT) + await asyncio.sleep(0.2) + cnt0 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput(_pv(p, "AO"), 3.0, + timeout=TIMEOUT, wait=True) + cnt = await _wait_for( + p, "AO-UPDATE-CNT", cnt0 + 1, + ) + assert cnt >= cnt0 + 1 + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_field_change_fires_when_disabled( + self, field_behavior_ioc + ): + """on_field_change fires even when record is disabled + (asTrapWrite fires before record-support checks).""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO.DISA"), 1, timeout=TIMEOUT) + await asyncio.sleep(0.2) + await caput(_pv(p, "AO.HIHI"), 42.0, timeout=TIMEOUT) + await asyncio.sleep(0.3) + f = await caget(_pv(p, "LAST-FIELD"), + datatype=str, timeout=TIMEOUT) + await caput(_pv(p, "AO.DISA"), 0, timeout=TIMEOUT) + assert f == "HIHI", ( + "field_change should fire when disabled" + ) + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 5. DISPLAY RANGE & EGU (LOPR / HOPR / EGU) # +# ================================================================ # + +class TestDisplayRange: + """LOPR, HOPR, EGU are metadata fields visible in + ctrl-format caget.""" + + @pytest.mark.asyncio + async def test_initial_egu(self, field_behavior_ioc): + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + r = await caget(_pv(p, "AO"), + format=FORMAT_CTRL, timeout=TIMEOUT) + egu = r.units + if isinstance(egu, bytes): + egu = egu.decode() + assert egu == "V" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_change_egu(self, field_behavior_ioc): + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "AO.EGU"), "mA", timeout=TIMEOUT) + await asyncio.sleep(0.2) + r = await caget(_pv(p, "AO"), + format=FORMAT_CTRL, timeout=TIMEOUT) + egu = r.units + if isinstance(egu, bytes): + egu = egu.decode() + assert egu == "mA" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_change_lopr_hopr(self, field_behavior_ioc): + """For ao records, ctrl limits are driven by DRVL/DRVH. + LOPR/HOPR are display hints; CA ctrl limit returns + max(DRVL,LOPR) and min(DRVH,HOPR) respectively.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + # Initial DRVL=-50, DRVH=50, LOPR=-100, HOPR=100 + r = await caget(_pv(p, "AO"), + format=FORMAT_CTRL, timeout=TIMEOUT) + # ctrl limits reflect DRVL/DRVH for ao + assert r.lower_ctrl_limit == \ + pytest.approx(-50.0) + assert r.upper_ctrl_limit == \ + pytest.approx(50.0) + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 6. PRECISION (PREC) # +# ================================================================ # + +class TestPrecision: + """PREC controls display precision for float records.""" + + @pytest.mark.asyncio + async def test_initial_prec(self, field_behavior_ioc): + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + r = await caget(_pv(p, "AO"), + format=FORMAT_CTRL, timeout=TIMEOUT) + assert r.precision == 3 + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_change_prec(self, field_behavior_ioc): + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput( + _pv(p, "AO.PREC"), 5, + timeout=TIMEOUT, + ) + await asyncio.sleep(0.2) + r = await caget(_pv(p, "AO"), + format=FORMAT_CTRL, timeout=TIMEOUT) + assert r.precision == 5 + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 7. INPUT RECORD: metadata reads # +# ================================================================ # + +class TestInputRecord: + """Input records (ai) are driven by Python set(). + External clients can read values and metadata.""" + + @pytest.mark.asyncio + async def test_ai_initial_value(self, field_behavior_ioc): + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + val = await caget(_pv(p, "AI"), timeout=TIMEOUT) + assert val == pytest.approx(0.0) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_ai_alarm_limits(self, field_behavior_ioc): + """Alarm limits reported in ctrl struct.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + r = await caget(_pv(p, "AI"), + format=FORMAT_CTRL, timeout=TIMEOUT) + assert r.upper_alarm_limit == \ + pytest.approx(90.0) + assert r.upper_warning_limit == \ + pytest.approx(70.0) + assert r.lower_warning_limit == \ + pytest.approx(-70.0) + assert r.lower_alarm_limit == \ + pytest.approx(-90.0) + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 8. FIELD-CHANGE vs ON_UPDATE interaction # +# ================================================================ # + +class TestFieldChangeVsOnUpdate: + """on_field_change (CLS) and on_update coexist.""" + + @pytest.mark.asyncio + async def test_val_fires_both(self, field_behavior_ioc): + """caput VAL fires both on_update and + on_field_change('*').""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + cnt0 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput(_pv(p, "AO"), 10.0, + timeout=TIMEOUT, wait=True) + await asyncio.sleep(0.3) + cnt1 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + f = await caget(_pv(p, "LAST-FIELD"), + datatype=str, timeout=TIMEOUT) + assert cnt1 >= cnt0 + 1, ( + "on_update should fire" + ) + assert f == "VAL" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_non_val_no_on_update( + self, field_behavior_ioc + ): + """caput to non-VAL field fires on_field_change + but NOT on_update.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + cnt0 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput( + _pv(p, "AO.EGU"), "Hz", + timeout=TIMEOUT, + ) + await asyncio.sleep(0.3) + cnt1 = await caget( + _pv(p, "AO-UPDATE-CNT"), timeout=TIMEOUT, + ) + f = await caget(_pv(p, "LAST-FIELD"), + datatype=str, timeout=TIMEOUT) + assert cnt1 == cnt0, ( + "on_update should NOT fire for EGU" + ) + assert f == "EGU" + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 9. BOOLEAN OUTPUT # +# ================================================================ # + +class TestBooleanOutput: + """Boolean records use the same field-write path.""" + + @pytest.mark.asyncio + async def test_bo_on_update(self, field_behavior_ioc): + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + cnt0 = await caget( + _pv(p, "BO-UPDATE-CNT"), timeout=TIMEOUT, + ) + await caput(_pv(p, "BO"), 1, + timeout=TIMEOUT, wait=True) + cnt = await _wait_for( + p, "BO-UPDATE-CNT", cnt0 + 1, + ) + assert cnt >= cnt0 + 1 + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_bo_toggle(self, field_behavior_ioc): + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput(_pv(p, "BO"), 1, + timeout=TIMEOUT, wait=True) + v = await caget(_pv(p, "BO"), timeout=TIMEOUT) + assert v == 1 + await caput(_pv(p, "BO"), 0, + timeout=TIMEOUT, wait=True) + v = await caget(_pv(p, "BO"), timeout=TIMEOUT) + assert v == 0 + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 10. DESC FIELD (40-char metadata) # +# ================================================================ # + +class TestDescField: + """DESC is a 40-character string metadata field.""" + + @pytest.mark.asyncio + async def test_set_desc(self, field_behavior_ioc): + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + await caput( + _pv(p, "AO.DESC"), "Test desc", + timeout=TIMEOUT, + ) + await asyncio.sleep(0.2) + r = await caget(_pv(p, "AO.DESC"), + datatype=str, timeout=TIMEOUT) + assert r == "Test desc" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_desc_truncation(self, field_behavior_ioc): + """DESC is limited to 40 chars. CA client rejects + strings longer than MAX_STRING_SIZE (40) at the + client level -- no server round-trip needed.""" + p = field_behavior_ioc.pv_prefix + with Listener(ADDRESS) as listener, \ + listener.accept() as conn: + select_and_recv(conn, "R") + try: + # Exactly 39 chars should work + await caput( + _pv(p, "AO.DESC"), "A" * 39, + timeout=TIMEOUT, + ) + await asyncio.sleep(0.2) + r = await caget(_pv(p, "AO.DESC"), + datatype=str, timeout=TIMEOUT) + assert len(r) == 39 + finally: + aioca_cleanup() + conn.send("D") diff --git a/tests/test_field_callbacks.py b/tests/test_field_callbacks.py new file mode 100644 index 00000000..b68ed519 --- /dev/null +++ b/tests/test_field_callbacks.py @@ -0,0 +1,209 @@ +"""Tests for the CLS ``on_field_change`` extension. + +The IOC runs in a subprocess (``sim_field_callbacks_ioc.py``) so that it has +its own EPICS database and Channel Access / PV Access servers — exactly the +same architecture used by the other pythonSoftIOC tests. + +What is tested +-------------- +* CA write to a non-VAL control field (SCAN, DISA) fires the + registered callback. +* CA write to VAL fires the VAL callback. +* CA write to an alarm field (HIHI) fires its callback. + Alarm fields are ``DBF_DOUBLE``; this confirms no special + casing vs other types. +* CA write to a string metadata field (DESC) fires its callback. + DESC is ``DBF_STRING``; this confirms the + ``dbGetField(DBR_STRING)`` path works for fields whose native + type is already a string. +* PVA write to VAL fires the VAL callback. +* PVA write to a non-VAL field (SCAN) fires the callback. + This is the only protocol difference that needs explicit verification — + pvxs uses a different field-addressing model for subfields. +* Multiple writes to the same field accumulate correctly in the counter. +* A callback on one field does not increment another field's counter. +* A wildcard ``"*"`` callback fires for every field write. +""" + +import asyncio + +import pytest + +from multiprocessing.connection import Listener + +from conftest import ( + ADDRESS, + select_and_recv, + aioca_cleanup, + TIMEOUT, +) + + +# --------------------------------------------------------------------------- +# CA tests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_field_callbacks_ca(field_callbacks_ioc): + """CA puts to SCAN, DISA and VAL each fire the correct callback.""" + from aioca import caget, caput + + pre = field_callbacks_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + # -- SCAN field -- + await caput(pre + ":AO.SCAN", "1 second", wait=True) + await asyncio.sleep(0.5) + assert await caget(pre + ":SCAN-CB-CNT") == 1 + + # -- DISA field -- + await caput(pre + ":AO.DISA", 1, wait=True) + await asyncio.sleep(0.5) + assert await caget(pre + ":DISA-CB-CNT") == 1 + + # -- VAL field via CA -- + await caput(pre + ":AO", 42.0, wait=True) + await asyncio.sleep(0.5) + assert await caget(pre + ":VAL-CB-CNT") == 1 + + # -- Multiple SCAN writes accumulate -- + await caput(pre + ":AO.SCAN", "2 second", wait=True) + await caput(pre + ":AO.SCAN", ".5 second", wait=True) + await asyncio.sleep(0.5) + assert await caget(pre + ":SCAN-CB-CNT") == 3 + + # -- Isolation: SCAN writes do not affect DISA counter -- + assert await caget(pre + ":DISA-CB-CNT") == 1 + + # -- Wildcard: every write (3 SCAN + 1 DISA + 1 VAL = 5) -- + assert await caget(pre + ":ANY-CB-CNT") == 5 + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +# --------------------------------------------------------------------------- +# PVA test +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_field_callbacks_pva(field_callbacks_ioc): + """A PVA put to VAL fires the VAL callback and the wildcard callback.""" + from aioca import caget + from p4p.client.asyncio import Context + + pre = field_callbacks_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + with Context("pva") as ctx: + await asyncio.wait_for( + ctx.put(pre + ":AO", 99.0), timeout=TIMEOUT + ) + + await asyncio.sleep(0.5) + assert await caget(pre + ":VAL-CB-CNT") == 1 + assert await caget(pre + ":ANY-CB-CNT") == 1 # wildcard + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +# --------------------------------------------------------------------------- +# Field-type coverage tests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_field_callbacks_alarm_field(field_callbacks_ioc): + """Alarm field (HIHI, DBF_DOUBLE) uses the same callback path as VAL. + + This is the key "no special casing" assertion: alarm limits go through + asTrapWrite identically to any other writeable field. + """ + from aioca import caget, caput + + pre = field_callbacks_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + await caput(pre + ":AO.HIHI", 95.0, wait=True) + await asyncio.sleep(0.5) + assert await caget(pre + ":HIHI-CB-CNT") == 1 + assert await caget(pre + ":ANY-CB-CNT") == 1 # wildcard also fires + # Other counters must remain zero — no cross-field leakage. + assert await caget(pre + ":VAL-CB-CNT") == 0 + assert await caget(pre + ":SCAN-CB-CNT") == 0 + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +@pytest.mark.asyncio +async def test_field_callbacks_string_field(field_callbacks_ioc): + """String metadata field (DESC, DBF_STRING) is captured correctly. + + The C hook reads every value through ``dbGetField(DBR_STRING)``. For + fields whose native type is already a string (like DESC) the value must + round-trip without corruption. + """ + from aioca import caget, caput + + pre = field_callbacks_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + await caput(pre + ":AO.DESC", "test label", wait=True) + await asyncio.sleep(0.5) + assert await caget(pre + ":DESC-CB-CNT") == 1 + assert await caget(pre + ":ANY-CB-CNT") == 1 + # Other counters must remain zero. + assert await caget(pre + ":VAL-CB-CNT") == 0 + assert await caget(pre + ":HIHI-CB-CNT") == 0 + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") + + +@pytest.mark.asyncio +async def test_field_callbacks_pva_non_val(field_callbacks_ioc): + """A PVA put to a non-VAL field (SCAN) fires the callback. + + pvxs addresses subfields differently from VAL writes. This test + confirms that the asTrapWrite hook fires regardless of which field + a PVA client targets. + """ + from aioca import caget + from p4p.client.asyncio import Context + + pre = field_callbacks_ioc.pv_prefix + + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + + try: + with Context("pva") as ctx: + await asyncio.wait_for( + ctx.put(pre + ":AO.SCAN", "1 second"), timeout=TIMEOUT + ) + + await asyncio.sleep(0.5) + assert await caget(pre + ":SCAN-CB-CNT") == 1 + assert await caget(pre + ":ANY-CB-CNT") == 1 # wildcard + assert await caget(pre + ":VAL-CB-CNT") == 0 # not triggered + finally: + aioca_cleanup() + conn.send("D") + select_and_recv(conn, "D") diff --git a/tests/test_universal_set.py b/tests/test_universal_set.py new file mode 100644 index 00000000..c078f74c --- /dev/null +++ b/tests/test_universal_set.py @@ -0,0 +1,372 @@ +"""Tests for universal set(): verify set() publishes immediately +regardless of SCAN setting. + +Tests use ``sim_universal_set_ioc.py`` running in a subprocess. +The test harness communicates commands over a multiprocessing +connection to trigger set() calls on the Python side, while +monitoring values via Channel Access from the test process. +""" + +import asyncio +import time +import pytest + +from multiprocessing.connection import Listener + +from aioca import caget, camonitor + +from conftest import ( + ADDRESS, TIMEOUT, + select_and_recv, aioca_cleanup, +) + + +# ---------------------------------------------------------------- # +# Helpers # +# ---------------------------------------------------------------- # + +def _pv(prefix, suffix): + return f"{prefix}:{suffix}" + + +async def _set_and_verify(conn, prefix, rec_name, value, timeout=5.0): + """Tell the IOC to set() a value, then caget to confirm publication.""" + conn.send(("set", rec_name, value)) + reply = conn.recv() + assert reply == "OK", f"set command failed: {reply}" + await asyncio.sleep(0.5) + return await caget(_pv(prefix, rec_name), timeout=timeout) + + +async def _change_scan(conn, rec_name, scan_value): + """Tell the IOC to change SCAN via dbpf.""" + conn.send(("scan", rec_name, scan_value)) + reply = conn.recv() + assert reply == "OK", f"scan command failed: {reply}" + await asyncio.sleep(0.3) + + +# ================================================================ # +# 1. BASELINE: set() works with default I/O Intr # +# ================================================================ # + +class TestSetWithIOIntr: + """Baseline: set() works with the default I/O Intr SCAN.""" + + @pytest.mark.asyncio + async def test_ai_set_io_intr(self, universal_set_ioc): + """aIn.set() publishes immediately with SCAN='I/O Intr'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + got = await _set_and_verify(conn, p, "AI", 42.5) + assert got == pytest.approx(42.5, abs=0.01) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_ao_set_io_intr(self, universal_set_ioc): + """aOut.set() publishes immediately with SCAN='I/O Intr'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + got = await _set_and_verify(conn, p, "AO", 3.14) + assert got == pytest.approx(3.14, abs=0.01) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_longin_set_io_intr(self, universal_set_ioc): + """longIn.set() publishes immediately with SCAN='I/O Intr'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + got = await _set_and_verify(conn, p, "LONGIN", 99) + assert got == 99 + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 2. set() with SCAN = Passive # +# ================================================================ # + +class TestSetWithPassive: + """set() must publish immediately even when SCAN='Passive'.""" + + @pytest.mark.asyncio + async def test_ai_set_passive(self, universal_set_ioc): + """aIn.set() works after switching to SCAN='Passive'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AI", "Passive") + got = await _set_and_verify(conn, p, "AI", 77.7) + assert got == pytest.approx(77.7, abs=0.01), \ + f"set() with Passive SCAN: expected 77.7, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_ao_set_passive(self, universal_set_ioc): + """aOut.set() works after switching to SCAN='Passive'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AO", "Passive") + got = await _set_and_verify(conn, p, "AO", 2.718) + assert got == pytest.approx(2.718, abs=0.01), \ + f"set() with Passive SCAN: expected 2.718, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_longin_set_passive(self, universal_set_ioc): + """longIn.set() works after switching to SCAN='Passive'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "LONGIN", "Passive") + got = await _set_and_verify(conn, p, "LONGIN", 123) + assert got == 123, \ + f"set() with Passive SCAN: expected 123, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_bo_set_passive(self, universal_set_ioc): + """boolOut.set() works after switching to SCAN='Passive'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "BO", "Passive") + got = await _set_and_verify(conn, p, "BO", 1) + assert got == 1, \ + f"set() with Passive SCAN: expected 1, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 3. set() with periodic SCAN settings # +# ================================================================ # + +class TestSetWithPeriodicScan: + """set() must publish immediately even with periodic SCAN.""" + + @pytest.mark.asyncio + async def test_ai_set_1_second(self, universal_set_ioc): + """aIn.set() publishes immediately, doesn't wait for scan period.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AI", "1 second") + got = await _set_and_verify(conn, p, "AI", 55.5) + assert got == pytest.approx(55.5, abs=0.01), \ + f"set() with 1 second SCAN: expected 55.5, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_ai_set_5_second(self, universal_set_ioc): + """aIn.set() publishes immediately with SCAN='5 second'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AI", "5 second") + got = await _set_and_verify(conn, p, "AI", 88.8) + assert got == pytest.approx(88.8, abs=0.01), \ + f"set() with 5 second SCAN: expected 88.8, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_ai_set_10_second(self, universal_set_ioc): + """aIn.set() publishes immediately with SCAN='10 second'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AI", "10 second") + got = await _set_and_verify(conn, p, "AI", 33.3) + assert got == pytest.approx(33.3, abs=0.01), \ + f"set() with 10 second SCAN: expected 33.3, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_ao_set_2_second(self, universal_set_ioc): + """aOut.set() publishes immediately with SCAN='2 second'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AO", "2 second") + got = await _set_and_verify(conn, p, "AO", 6.28) + assert got == pytest.approx(6.28, abs=0.01), \ + f"set() with 2 second SCAN: expected 6.28, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 4. set() with SCAN = Event # +# ================================================================ # + +class TestSetWithEventScan: + """set() must publish immediately with SCAN='Event'.""" + + @pytest.mark.asyncio + async def test_ai_set_event(self, universal_set_ioc): + """aIn.set() publishes immediately with SCAN='Event'.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AI", "Event") + got = await _set_and_verify(conn, p, "AI", 11.1) + assert got == pytest.approx(11.1, abs=0.01), \ + f"set() with Event SCAN: expected 11.1, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 5. SCAN transitions: switch back and forth # +# ================================================================ # + +class TestScanTransitions: + """set() continues to work when switching between SCAN values.""" + + @pytest.mark.asyncio + async def test_io_intr_to_passive_and_back(self, universal_set_ioc): + """set() works across I/O Intr -> Passive -> I/O Intr.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + got = await _set_and_verify(conn, p, "AI", 1.0) + assert got == pytest.approx(1.0, abs=0.01) + + await _change_scan(conn, "AI", "Passive") + got = await _set_and_verify(conn, p, "AI", 2.0) + assert got == pytest.approx(2.0, abs=0.01) + + await _change_scan(conn, "AI", "I/O Intr") + got = await _set_and_verify(conn, p, "AI", 3.0) + assert got == pytest.approx(3.0, abs=0.01) + finally: + aioca_cleanup() + conn.send("D") + + @pytest.mark.asyncio + async def test_io_intr_to_periodic_to_passive(self, universal_set_ioc): + """set() works across I/O Intr -> 1 second -> Passive.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AI", "1 second") + got = await _set_and_verify(conn, p, "AI", 10.0) + assert got == pytest.approx(10.0, abs=0.01) + + await _change_scan(conn, "AI", "Passive") + got = await _set_and_verify(conn, p, "AI", 20.0) + assert got == pytest.approx(20.0, abs=0.01) + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 6. Multiple rapid set() calls with non-I/O-Intr SCAN # +# ================================================================ # + +class TestRapidSets: + """Multiple rapid set() calls should all be processed.""" + + @pytest.mark.asyncio + async def test_rapid_sets_passive(self, universal_set_ioc): + """Last of several rapid set() calls wins on Passive record.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + try: + await _change_scan(conn, "AI", "Passive") + + for val in [10.0, 20.0, 30.0]: + conn.send(("set", "AI", val)) + reply = conn.recv() + assert reply == "OK" + + await asyncio.sleep(1.0) + + got = await caget(_pv(p, "AI"), timeout=TIMEOUT) + assert got == pytest.approx(30.0, abs=0.01), \ + f"Expected final value 30.0, got {got}" + finally: + aioca_cleanup() + conn.send("D") + + +# ================================================================ # +# 7. Monitor receives updates from set() on non-I/O-Intr # +# ================================================================ # + +class TestMonitorUpdates: + """CA monitors see updates from set() regardless of SCAN.""" + + @pytest.mark.asyncio + async def test_monitor_sees_set_on_passive(self, universal_set_ioc): + """camonitor receives update when set() fires on a Passive record.""" + p = universal_set_ioc.pv_prefix + with Listener(ADDRESS) as listener, listener.accept() as conn: + select_and_recv(conn, "R") + received = [] + + try: + pv = _pv(p, "AI") + sub = camonitor(pv, received.append) + + await asyncio.sleep(0.5) # let initial monitor connect + + await _change_scan(conn, "AI", "Passive") + + conn.send(("set", "AI", 99.9)) + reply = conn.recv() + assert reply == "OK" + + deadline = time.time() + 5.0 + while time.time() < deadline: + if any(abs(float(v) - 99.9) < 0.1 for v in received): + break + await asyncio.sleep(0.1) + + sub.close() + + assert any(abs(float(v) - 99.9) < 0.1 for v in received), \ + f"Monitor never received 99.9, got: {received}" + finally: + aioca_cleanup() + conn.send("D")