Add Helpdesk outbox worker validation

This commit is contained in:
Jason Thistlethwaite
2026-04-25 00:31:09 +00:00
parent dde4dca8a2
commit c9f4c69525
8 changed files with 388 additions and 27 deletions
+228
View File
@@ -0,0 +1,228 @@
#!/usr/bin/env python3
"""Live Helpdesk outbox/worker validation for the LAN Redmine copy.
This creates one controlled Helpdesk ticket through Mailpit, then inspects the
outbox rows for that issue and dry-runs worker enrichment without claiming or
marking rows processed.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from helpdesk_smoke_test import (
DEFAULT_CONTROL_PROJECT,
DEFAULT_MAILPIT_HOST,
DEFAULT_MAILPIT_HTTP_PORT,
DEFAULT_MAILPIT_SMTP_PORT,
DEFAULT_PROJECT,
DEFAULT_REDMINE_URL,
DEFAULT_REMOTE_REDMINE,
DEFAULT_SSH_HOST,
DEFAULT_SSH_KEY,
SmokeConfig,
SmokeError,
load_env_file,
run_smoke,
)
from redmine_outbox_worker import OutboxWorkerError, RemoteRedmine, enrich_event, sql_int
EXPECTED_EVENT_TYPES = {
"helpdesk_ticket.created",
"helpdesk_ticket.updated",
"journal_message.created",
"journal.created",
"issue.updated",
}
class ValidationError(RuntimeError):
pass
@dataclass(frozen=True)
class ValidationConfig:
smoke: SmokeConfig
remote: RemoteRedmine
def main() -> int:
parser = argparse.ArgumentParser(
description="Create a controlled Helpdesk issue and validate outbox worker enrichment for it."
)
parser.add_argument("--redmine-url", default=os.getenv("REDMINE_URL", DEFAULT_REDMINE_URL))
parser.add_argument("--api-key", default=os.getenv("REDMINE_API_KEY") or os.getenv("REDMNINE_API_KEY"))
parser.add_argument("--env-file", type=Path, default=Path("redMCP/.env"))
parser.add_argument("--mailpit-host", default=DEFAULT_MAILPIT_HOST)
parser.add_argument("--mailpit-http-port", type=int, default=DEFAULT_MAILPIT_HTTP_PORT)
parser.add_argument("--mailpit-smtp-port", type=int, default=DEFAULT_MAILPIT_SMTP_PORT)
parser.add_argument("--project", default=DEFAULT_PROJECT)
parser.add_argument("--control-project", default=DEFAULT_CONTROL_PROJECT)
parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST))
parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY))))
parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE))
parser.add_argument("--skip-preflight", action="store_true", help="Skip validate_test_instance.py.")
parser.add_argument("--keep-open", action="store_true", help="Do not close the created Helpdesk issue.")
args = parser.parse_args()
try:
config = build_config(args)
run_validation(config)
return 0
except (SmokeError, OutboxWorkerError, ValidationError) as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
def build_config(args: argparse.Namespace) -> ValidationConfig:
env = load_env_file(args.env_file)
redmine_url = args.redmine_url or env.get("REDMINE_URL") or DEFAULT_REDMINE_URL
api_key = args.api_key or env.get("REDMINE_API_KEY") or env.get("REDMNINE_API_KEY")
if not api_key:
raise ValidationError("missing Redmine API key. Set REDMINE_API_KEY or redMCP/.env.")
smoke = SmokeConfig(
redmine_url=redmine_url.rstrip("/"),
api_key=api_key,
mailpit_host=args.mailpit_host,
mailpit_http_port=args.mailpit_http_port,
mailpit_smtp_port=args.mailpit_smtp_port,
project=args.project,
control_project=args.control_project,
ssh_host=args.ssh_host,
ssh_key=args.ssh_key,
remote_redmine=args.remote_redmine,
skip_preflight=args.skip_preflight,
keep_open=args.keep_open,
)
remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine)
return ValidationConfig(smoke=smoke, remote=remote)
def run_validation(config: ValidationConfig) -> None:
smoke_result = run_smoke(config.smoke)
events = fetch_issue_events(config.remote, smoke_result.issue_id)
assert_expected_events(events)
assert_events_unprocessed(events)
documents: list[dict[str, Any]] = []
for event in events:
documents.extend(enrich_event(config.remote, event))
assert_worker_documents(events, documents)
assert_no_bcc_address_leak(documents)
print_summary(smoke_result.issue_id, events, documents)
def fetch_issue_events(remote: RemoteRedmine, issue_id: int) -> list[dict[str, Any]]:
events = remote.mysql_json_lines(
f"""
SELECT HEX(CAST(JSON_OBJECT(
'id', id,
'event_type', event_type,
'source_type', source_type,
'source_id', source_id,
'project_id', project_id,
'issue_id', issue_id,
'journal_id', journal_id,
'user_id', user_id,
'occurred_at', DATE_FORMAT(occurred_at, '%Y-%m-%dT%H:%i:%sZ'),
'attempts', attempts,
'processed_at', IF(processed_at IS NULL, NULL, DATE_FORMAT(processed_at, '%Y-%m-%dT%H:%i:%sZ')),
'locked_at', IF(locked_at IS NULL, NULL, DATE_FORMAT(locked_at, '%Y-%m-%dT%H:%i:%sZ')),
'payload', payload
) AS CHAR)) AS document
FROM event_outbox_events
WHERE issue_id = {sql_int(issue_id)}
ORDER BY id;
"""
)
if not events:
raise ValidationError(f"no event_outbox_events rows found for issue #{issue_id}")
return events
def assert_expected_events(events: list[dict[str, Any]]) -> None:
found = {str(event.get("event_type") or "") for event in events}
missing = sorted(EXPECTED_EVENT_TYPES - found)
if missing:
raise ValidationError(f"missing expected Helpdesk outbox event types: {', '.join(missing)}")
def assert_events_unprocessed(events: list[dict[str, Any]]) -> None:
mutated = [
event
for event in events
if event.get("processed_at") is not None or event.get("locked_at") is not None
]
if mutated:
ids = ", ".join(str(event.get("id")) for event in mutated)
raise ValidationError(f"new smoke-test outbox rows were already processed or locked: {ids}")
def assert_worker_documents(events: list[dict[str, Any]], documents: list[dict[str, Any]]) -> None:
doc_types = {str(document.get("doc_type") or "") for document in documents}
required_doc_types = {"event", "ticket", "message"}
missing_doc_types = sorted(required_doc_types - doc_types)
if missing_doc_types:
raise ValidationError(f"worker dry-run did not derive document types: {', '.join(missing_doc_types)}")
event_doc_ids = {document.get("event_id") for document in documents if document.get("doc_type") == "event"}
missing_events = [str(event.get("id")) for event in events if event.get("id") not in event_doc_ids]
if missing_events:
raise ValidationError(f"worker dry-run did not emit event documents for rows: {', '.join(missing_events)}")
def assert_no_bcc_address_leak(documents: list[dict[str, Any]]) -> None:
for document in documents:
if contains_key(document, "bcc_address"):
raise ValidationError(f"worker document leaked raw bcc_address: {document.get('doc_id')}")
def contains_key(value: Any, needle: str) -> bool:
if isinstance(value, dict):
return any(key == needle or contains_key(child, needle) for key, child in value.items())
if isinstance(value, list):
return any(contains_key(child, needle) for child in value)
return False
def print_summary(issue_id: int, events: list[dict[str, Any]], documents: list[dict[str, Any]]) -> None:
event_types = Counter(str(event.get("event_type") or "") for event in events)
doc_types = Counter(str(document.get("doc_type") or "") for document in documents)
ticket_ids = sorted(
{
int(document["helpdesk_ticket_id"])
for document in documents
if document.get("doc_type") == "ticket" and document.get("helpdesk_ticket_id")
}
)
message_ids = sorted(
{
int(document["journal_message_id"])
for document in documents
if document.get("doc_type") == "message" and document.get("journal_message_id")
}
)
event_rows = [f"{event.get('id')}:{event.get('event_type')}" for event in events]
print("\nHelpdesk outbox worker validation passed.")
print(f" issue_id: {issue_id}")
print(f" helpdesk_ticket_ids: {json.dumps(ticket_ids)}")
print(f" journal_message_ids: {json.dumps(message_ids)}")
print(f" event_rows: {', '.join(event_rows)}")
print(f" event_type_counts: {json.dumps(dict(sorted(event_types.items())), sort_keys=True)}")
print(f" derived_doc_type_counts: {json.dumps(dict(sorted(doc_types.items())), sort_keys=True)}")
print(" worker_mode: dry-run enrichment only; rows were not claimed or marked processed")
if __name__ == "__main__":
raise SystemExit(main())