From f109fdcb91d14ffe602e30d14302dc6f39811abc Mon Sep 17 00:00:00 2001 From: Jason Thistlethwaite Date: Sat, 25 Apr 2026 00:53:49 +0000 Subject: [PATCH] Add outbox worker processing policy --- AGENT.md | 2 +- AGENTS.md | 4 +- README.md | 52 +++++++++++-------- docs/event_outbox_spec.md | 27 +++++++--- docs/outbox_worker_policy.md | 63 ++++++++++++++++++++++ docs/redmineup_local_fork_changelog.md | 32 +++++++++++- docs/test_instance_post_import.md | 30 ++++++++++- redmine_outbox_worker.py | 72 +++++++++++++++++++++++++- 8 files changed, 249 insertions(+), 33 deletions(-) create mode 100644 docs/outbox_worker_policy.md diff --git a/AGENT.md b/AGENT.md index 18de2ed..fb1629d 100644 --- a/AGENT.md +++ b/AGENT.md @@ -174,7 +174,7 @@ It: - supports `--dry-run` for non-mutating previews - uses `locked_at`, `locked_by`, `processed_at`, `attempts`, and `last_error` - enriches helpdesk ticket/message/contact events with read-only joins -- writes derived JSONL to `.cache/redmine_outbox/derived_documents.jsonl` +- writes derived JSONL to `/tmp/redmine-outbox/derived_documents.jsonl` - marks rows processed only after a successful local write This is still a prototype output target, not the final vector index. diff --git a/AGENTS.md b/AGENTS.md index 3af9a4c..23492bf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -37,6 +37,7 @@ Dry-run operational helpers before applying changes: ./validate_test_instance.py ./helpdesk_smoke_test.py ./validate_helpdesk_outbox_worker.py +./redmine_outbox_worker.py --status ./redmine_outbox_worker.py --dry-run --batch-size 10 ``` @@ -56,7 +57,8 @@ against the LAN Redmine copy using controlled projects such as `fud-helpdesk` or deployment assumptions change. The post-import workflow lives in `docs/test_instance_post_import.md`; the Helpdesk/redMCP live smoke test is documented in `docs/helpdesk_smoke_test.md`; the Helpdesk outbox worker -validator is documented in `docs/helpdesk_outbox_worker_validation.md`. +validator is documented in `docs/helpdesk_outbox_worker_validation.md`; worker +processing policy is documented in `docs/outbox_worker_policy.md`. ## Commit & Pull Request Guidelines diff --git a/README.md b/README.md index cecf033..d5968e9 100644 --- a/README.md +++ b/README.md @@ -222,16 +222,22 @@ It runs outside Redmine and consumes `event_outbox_events` over SSH/MySQL. The initial output target is deterministic local JSONL rather than a live search service: -- default output: `.cache/redmine_outbox/derived_documents.jsonl` +- default output: `/tmp/redmine-outbox/derived_documents.jsonl` Current behavior: - dry-runs pending rows without marking them processed +- reports queue counts with `--status` - claims bounded batches with `locked_at` and `locked_by` - enriches helpdesk ticket/message/contact-related events with read-only joins - writes derived event/ticket/message/contact documents as JSONL - marks rows processed only after a successful local write - increments `attempts` and writes `last_error` when processing fails +- previews or applies processed-row cleanup with `--purge-processed-days` + +The worker processing policy is documented in: + +- [docs/outbox_worker_policy.md](/home/iadnah/redmine/docs/outbox_worker_policy.md:1) ### 7. Test Helpdesk Mail Reset @@ -335,18 +341,10 @@ The deployed helpdesk search routes were verified with: ## What Is Not Finished Yet -### 1. Worker Processing Policy +### 1. External Search Index -The Helpdesk outbox worker now has a repeatable live validator. The next worker -decision is operational policy: - -- choose when to mark rows processed on the test instance -- decide where durable JSONL/index output should live -- define retention or replay expectations for `event_outbox_events` - -### 2. External Search Index - -We have not yet built the actual external index. +The worker can now publish bounded JSONL batches and mark successful rows +processed. We have not yet built the actual external index. Planned direction: @@ -358,7 +356,7 @@ Planned direction: This should wait until the worker validation pass has documented the derived document shape that will feed the index. -### 3. Pre-Existing UI/Plugin Bugs +### 2. Pre-Existing UI/Plugin Bugs We discovered old plugin issues while working: @@ -401,10 +399,9 @@ If continuing this project, the next best work is: 2. Generate controlled Helpdesk activity in `fud-helpdesk`. 3. Inspect `event_outbox_events` for the corresponding issue, journal, Helpdesk ticket, and journal message rows. -4. Run `redmine_outbox_worker.py --dry-run --batch-size 10` and document the - derived JSONL output shape. -5. Process a bounded batch only after the dry-run output is correct. -6. Choose the first external index target after the worker output is validated. +4. Check worker queue state with `redmine_outbox_worker.py --status`. +5. Process a bounded JSONL batch after dry-run output is correct. +6. Choose the first external index target after JSONL publishing is stable. ## Practical Commands @@ -448,12 +445,24 @@ processed: ./redmine_outbox_worker.py --dry-run --batch-size 10 ``` -Process a bounded outbox batch into local JSONL and mark successful rows: +Check queue status: + +```sh +./redmine_outbox_worker.py --status +``` + +Process a bounded outbox batch into JSONL and mark successful rows: ```sh ./redmine_outbox_worker.py --batch-size 20 ``` +Preview processed-row cleanup: + +```sh +./redmine_outbox_worker.py --purge-processed-days 30 +``` + Search the cached helpdesk documents: ```sh @@ -474,7 +483,6 @@ not cosmetic UI work. It is the architectural clarification that helpdesk data is the authoritative customer communication layer, plus the first safe event and export tooling around it. -The repo is now at the point where the next meaningful leap is an external -worker validation pass, not more Redmine UI surface. The Qdrant/OpenAI index -work should follow once the worker output is proven against controlled Helpdesk -activity. +The repo is now at the point where bounded external worker publishing is proven +against the LAN test instance. The next meaningful leap is choosing and building +the first external search/index target, likely Qdrant with OpenAI embeddings. diff --git a/docs/event_outbox_spec.md b/docs/event_outbox_spec.md index 1f735ae..814da71 100644 --- a/docs/event_outbox_spec.md +++ b/docs/event_outbox_spec.md @@ -384,7 +384,7 @@ row exists. ## Worker/Rake Task -Current implemented worker-facing command: +Current implemented Redmine-side worker-facing command: ```sh bundle exec rake redmine_event_outbox:dump RAILS_ENV=production @@ -392,7 +392,7 @@ bundle exec rake redmine_event_outbox:dump RAILS_ENV=production This prints pending rows as JSON and does not mark them processed. -Next worker command: +Historical planned Redmine-side publish command: ```sh bundle exec rake redmine_event_outbox:publish RAILS_ENV=production @@ -411,10 +411,11 @@ bundle exec rake redmine_event_outbox:publish bundle exec rake redmine_event_outbox:publish RETRY=1 ``` -Initial publisher target can be stdout or a local JSONL file. That lets us test -the Redmine side before choosing Redis Streams or RabbitMQ. +The current external publisher is `redmine_outbox_worker.py`. Its initial +publisher target is JSONL at `/tmp/redmine-outbox/derived_documents.jsonl`, +which lets us test the Redmine side before choosing Redis Streams or RabbitMQ. -The next implementation should keep the worker small and conservative: +The worker stays small and conservative: - select pending rows in id order - lock or claim a bounded batch @@ -424,6 +425,19 @@ The next implementation should keep the worker small and conservative: - leave failed rows available for retry - make duplicate delivery acceptable to consumers +Current external worker commands: + +```sh +./redmine_outbox_worker.py --status +./redmine_outbox_worker.py --dry-run --batch-size 10 +./redmine_outbox_worker.py --batch-size 20 +./redmine_outbox_worker.py --purge-processed-days 30 +./redmine_outbox_worker.py --purge-processed-days 30 --apply-purge +``` + +Processed rows are retained by default. Purge is explicit, age-gated, and +intended for test-instance cleanup, not normal delivery. + Later publisher targets: - Redis Streams @@ -550,7 +564,8 @@ decision code fails. ## Open Questions -- How long should processed outbox rows be retained? +- Do production processed rows need a formal retention window, or should they + be retained until external index/message-bus replay policy is settled? - Do we need a Redmine admin page to inspect outbox health, or is a rake task/log enough? - What is the current production mail ingestion path? - Which vector-capable external search index should be used first? diff --git a/docs/outbox_worker_policy.md b/docs/outbox_worker_policy.md new file mode 100644 index 0000000..b138176 --- /dev/null +++ b/docs/outbox_worker_policy.md @@ -0,0 +1,63 @@ +# Outbox Worker Policy + +`redmine_outbox_worker.py` is the first external publisher for +`event_outbox_events`. It currently publishes enriched documents to JSONL, but +the processing policy is intended to carry forward to a later message bus. + +## Default Output + +The default output is: + +```sh +/tmp/redmine-outbox/derived_documents.jsonl +``` + +Use `--output /path/to/file.jsonl` to override it. The worker creates parent +directories as needed. + +## Processing Semantics + +- Read eligible pending rows globally in id order. +- Claim a bounded batch with `locked_at` and `locked_by`. +- Enrich each event with read-only joins. +- Append derived JSON documents to the output. +- Mark a row `processed_at` only after the output write succeeds. +- On failure, increment `attempts`, write `last_error`, and release the lock. + +This is at-least-once delivery. Downstream consumers must tolerate duplicate +documents keyed by `event_id` or `doc_id`. + +## Operations + +Inspect queue state: + +```sh +./redmine_outbox_worker.py --status +``` + +Preview a batch without claiming or marking rows processed: + +```sh +./redmine_outbox_worker.py --dry-run --batch-size 10 +``` + +Process a bounded batch: + +```sh +./redmine_outbox_worker.py --batch-size 20 +``` + +Preview processed-row cleanup: + +```sh +./redmine_outbox_worker.py --purge-processed-days 30 +``` + +Apply cleanup explicitly: + +```sh +./redmine_outbox_worker.py --purge-processed-days 30 --apply-purge +``` + +Processed rows are kept by default. Purge is an administrative cleanup tool for +test instances and should not be part of normal publishing. diff --git a/docs/redmineup_local_fork_changelog.md b/docs/redmineup_local_fork_changelog.md index f81a41f..7295fe8 100644 --- a/docs/redmineup_local_fork_changelog.md +++ b/docs/redmineup_local_fork_changelog.md @@ -30,7 +30,37 @@ environment. Before risky edits, archive the current plugin directories in - Helpdesk outbox/worker validation now has a repeatable live script. - Next meaningful milestone: - Run `validate_helpdesk_outbox_worker.py` after outbox or worker changes, - then choose the external index target and durable processing policy. + then choose the external index target. + +## 2026-04-25 - Outbox Worker Processing Policy + +- Touched areas: + - External outbox worker + - Worker operations documentation +- Purpose: + - Make the worker usable for bounded JSONL publishing before a message bus is + chosen. + - Add queue visibility and explicit processed-row cleanup for test-instance + maintenance. +- Behavior changed: + - Default worker output is `/tmp/redmine-outbox/derived_documents.jsonl`. + - `--status` reports pending, ready, locked, failed, and processed counts. + - `--purge-processed-days N` previews processed-row cleanup; `--apply-purge` + is required to delete rows. + - Normal processing still marks rows processed only after successful output. +- LAN test result: + - `./redmine_outbox_worker.py --status` reported 100 total rows, 100 ready, + 0 locked, 0 failed, and 0 processed before the processing run. + - `./redmine_outbox_worker.py --dry-run --batch-size 5` previewed the first + five rows without marking them processed. + - `./redmine_outbox_worker.py --batch-size 5` processed five rows and wrote + six JSONL documents to `/tmp/redmine-outbox/derived_documents.jsonl`. + - Follow-up status reported 95 pending, 95 ready, 0 locked, 0 failed, and + 5 processed. + - `./redmine_outbox_worker.py --purge-processed-days 30` previewed zero rows. + - Follow-up `./validate_helpdesk_outbox_worker.py` passed with controlled + Helpdesk issue `#39875`; final status reported 118 total rows, 113 ready, + 0 locked, 0 failed, and 5 processed. ## 2026-04-25 - Helpdesk Outbox Worker Validation diff --git a/docs/test_instance_post_import.md b/docs/test_instance_post_import.md index ad032d8..004a409 100644 --- a/docs/test_instance_post_import.md +++ b/docs/test_instance_post_import.md @@ -149,7 +149,35 @@ Helpdesk behavior, checks the matching `event_outbox_events` rows, and dry-runs worker enrichment without claiming or marking rows processed. Details are in `docs/helpdesk_outbox_worker_validation.md`. -## 9. Re-Run The Read-Only Validator +## 9. Check Or Process The Outbox Worker + +Inspect outbox queue state: + +```sh +./redmine_outbox_worker.py --status +``` + +Preview worker output without marking rows processed: + +```sh +./redmine_outbox_worker.py --dry-run --batch-size 10 +``` + +Process a small bounded batch only after the dry-run output looks correct: + +```sh +./redmine_outbox_worker.py --batch-size 5 +``` + +The default JSONL output path is +`/tmp/redmine-outbox/derived_documents.jsonl`. Use `--output` to override it. +Processed rows are retained by default. To preview test-instance cleanup: + +```sh +./redmine_outbox_worker.py --purge-processed-days 30 +``` + +## 10. Re-Run The Read-Only Validator Finish by running: diff --git a/redmine_outbox_worker.py b/redmine_outbox_worker.py index 056b482..5dc1b0b 100755 --- a/redmine_outbox_worker.py +++ b/redmine_outbox_worker.py @@ -25,7 +25,7 @@ from typing import Any, Iterable DEFAULT_SSH_HOST = "reddev@192.168.50.170" DEFAULT_SSH_KEY = Path("/tmp/reddev") DEFAULT_REMOTE_REDMINE = "/usr/share/redmine" -DEFAULT_OUTPUT = Path(".cache/redmine_outbox/derived_documents.jsonl") +DEFAULT_OUTPUT = Path("/tmp/redmine-outbox/derived_documents.jsonl") class OutboxWorkerError(RuntimeError): @@ -102,12 +102,30 @@ def main() -> int: parser.add_argument("--stale-lock-minutes", type=int, default=30) parser.add_argument("--dry-run", action="store_true", help="Fetch and enrich pending rows without locking or marking them.") parser.add_argument("--claim-only", action="store_true", help="Claim rows, print them, then release the claim without marking processed.") + parser.add_argument("--status", action="store_true", help="Report outbox row counts and exit.") + parser.add_argument( + "--purge-processed-days", + type=int, + help="Preview purging processed rows older than this many days, or delete with --apply-purge.", + ) + parser.add_argument("--apply-purge", action="store_true", help="Actually delete rows selected by --purge-processed-days.") args = parser.parse_args() remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine) worker_id = make_worker_id() try: + if args.apply_purge and args.purge_processed_days is None: + raise OutboxWorkerError("--apply-purge requires --purge-processed-days.") + if args.status: + print(json.dumps(outbox_status(remote, args.max_attempts, args.stale_lock_minutes), indent=2, sort_keys=True)) + return 0 + if args.purge_processed_days is not None: + purge_count = purge_processed(remote, args.purge_processed_days, apply=args.apply_purge) + action = "purged" if args.apply_purge else "would purge" + print(f"{action} {purge_count} processed outbox row(s)") + return 0 + if args.dry_run: events = pending_events(remote, args.batch_size, args.max_attempts, args.stale_lock_minutes) else: @@ -138,6 +156,8 @@ def main() -> int: action = "previewed" if args.dry_run else "processed" print(f"{action} {processed} outbox event(s)") + if processed and not args.dry_run: + print(f"output: {args.output}") return 0 except OutboxWorkerError as exc: print(f"error: {exc}", file=sys.stderr) @@ -170,6 +190,32 @@ LIMIT {sql_int(limit)}; ) +def outbox_status(remote: RemoteRedmine, max_attempts: int, stale_lock_minutes: int) -> dict[str, Any]: + rows = remote.mysql_json_lines( + f""" +SELECT HEX(CAST(JSON_OBJECT( + 'total', COUNT(*), + 'pending', SUM(IF(processed_at IS NULL AND attempts < {sql_int(max_attempts)}, 1, 0)), + 'ready', SUM(IF(processed_at IS NULL + AND attempts < {sql_int(max_attempts)} + AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE), 1, 0)), + 'locked', SUM(IF(processed_at IS NULL + AND locked_at IS NOT NULL + AND locked_at >= UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE, 1, 0)), + 'failed', SUM(IF(processed_at IS NULL AND attempts >= {sql_int(max_attempts)}, 1, 0)), + 'processed', SUM(IF(processed_at IS NOT NULL, 1, 0)), + 'oldest_pending_id', MIN(IF(processed_at IS NULL, id, NULL)), + 'newest_pending_id', MAX(IF(processed_at IS NULL, id, NULL)), + 'oldest_processed_at', DATE_FORMAT(MIN(processed_at), '%Y-%m-%dT%H:%i:%sZ'), + 'newest_processed_at', DATE_FORMAT(MAX(processed_at), '%Y-%m-%dT%H:%i:%sZ') +) AS CHAR)) AS document +FROM event_outbox_events; +""" + ) + status = rows[0] if rows else {} + return {key: (0 if value is None and key in {"total", "pending", "ready", "locked", "failed", "processed"} else value) for key, value in status.items()} + + def claim_events( remote: RemoteRedmine, worker_id: str, @@ -212,6 +258,30 @@ LIMIT {sql_int(limit)}; ) +def purge_processed(remote: RemoteRedmine, days: int, apply: bool) -> int: + if days < 0: + raise OutboxWorkerError("--purge-processed-days must be zero or greater.") + count_sql = f""" +SELECT HEX(CAST(JSON_OBJECT( + 'count', COUNT(*) +) AS CHAR)) AS document +FROM event_outbox_events +WHERE processed_at IS NOT NULL + AND processed_at < UTC_TIMESTAMP() - INTERVAL {sql_int(days)} DAY; +""" + rows = remote.mysql_json_lines(count_sql) + count = int((rows[0] if rows else {}).get("count") or 0) + if apply and count: + remote.mysql( + f""" +DELETE FROM event_outbox_events +WHERE processed_at IS NOT NULL + AND processed_at < UTC_TIMESTAMP() - INTERVAL {sql_int(days)} DAY; +""" + ) + return count + + def enrich_event(remote: RemoteRedmine, event: dict[str, Any]) -> list[dict[str, Any]]: payload = parse_payload(event.get("payload")) documents: list[dict[str, Any]] = [event_document(event, payload)]