Add outbox worker processing policy

This commit is contained in:
Jason Thistlethwaite
2026-04-25 00:53:49 +00:00
parent c9f4c69525
commit f109fdcb91
8 changed files with 249 additions and 33 deletions
+71 -1
View File
@@ -25,7 +25,7 @@ from typing import Any, Iterable
DEFAULT_SSH_HOST = "reddev@192.168.50.170"
DEFAULT_SSH_KEY = Path("/tmp/reddev")
DEFAULT_REMOTE_REDMINE = "/usr/share/redmine"
DEFAULT_OUTPUT = Path(".cache/redmine_outbox/derived_documents.jsonl")
DEFAULT_OUTPUT = Path("/tmp/redmine-outbox/derived_documents.jsonl")
class OutboxWorkerError(RuntimeError):
@@ -102,12 +102,30 @@ def main() -> int:
parser.add_argument("--stale-lock-minutes", type=int, default=30)
parser.add_argument("--dry-run", action="store_true", help="Fetch and enrich pending rows without locking or marking them.")
parser.add_argument("--claim-only", action="store_true", help="Claim rows, print them, then release the claim without marking processed.")
parser.add_argument("--status", action="store_true", help="Report outbox row counts and exit.")
parser.add_argument(
"--purge-processed-days",
type=int,
help="Preview purging processed rows older than this many days, or delete with --apply-purge.",
)
parser.add_argument("--apply-purge", action="store_true", help="Actually delete rows selected by --purge-processed-days.")
args = parser.parse_args()
remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine)
worker_id = make_worker_id()
try:
if args.apply_purge and args.purge_processed_days is None:
raise OutboxWorkerError("--apply-purge requires --purge-processed-days.")
if args.status:
print(json.dumps(outbox_status(remote, args.max_attempts, args.stale_lock_minutes), indent=2, sort_keys=True))
return 0
if args.purge_processed_days is not None:
purge_count = purge_processed(remote, args.purge_processed_days, apply=args.apply_purge)
action = "purged" if args.apply_purge else "would purge"
print(f"{action} {purge_count} processed outbox row(s)")
return 0
if args.dry_run:
events = pending_events(remote, args.batch_size, args.max_attempts, args.stale_lock_minutes)
else:
@@ -138,6 +156,8 @@ def main() -> int:
action = "previewed" if args.dry_run else "processed"
print(f"{action} {processed} outbox event(s)")
if processed and not args.dry_run:
print(f"output: {args.output}")
return 0
except OutboxWorkerError as exc:
print(f"error: {exc}", file=sys.stderr)
@@ -170,6 +190,32 @@ LIMIT {sql_int(limit)};
)
def outbox_status(remote: RemoteRedmine, max_attempts: int, stale_lock_minutes: int) -> dict[str, Any]:
rows = remote.mysql_json_lines(
f"""
SELECT HEX(CAST(JSON_OBJECT(
'total', COUNT(*),
'pending', SUM(IF(processed_at IS NULL AND attempts < {sql_int(max_attempts)}, 1, 0)),
'ready', SUM(IF(processed_at IS NULL
AND attempts < {sql_int(max_attempts)}
AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE), 1, 0)),
'locked', SUM(IF(processed_at IS NULL
AND locked_at IS NOT NULL
AND locked_at >= UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE, 1, 0)),
'failed', SUM(IF(processed_at IS NULL AND attempts >= {sql_int(max_attempts)}, 1, 0)),
'processed', SUM(IF(processed_at IS NOT NULL, 1, 0)),
'oldest_pending_id', MIN(IF(processed_at IS NULL, id, NULL)),
'newest_pending_id', MAX(IF(processed_at IS NULL, id, NULL)),
'oldest_processed_at', DATE_FORMAT(MIN(processed_at), '%Y-%m-%dT%H:%i:%sZ'),
'newest_processed_at', DATE_FORMAT(MAX(processed_at), '%Y-%m-%dT%H:%i:%sZ')
) AS CHAR)) AS document
FROM event_outbox_events;
"""
)
status = rows[0] if rows else {}
return {key: (0 if value is None and key in {"total", "pending", "ready", "locked", "failed", "processed"} else value) for key, value in status.items()}
def claim_events(
remote: RemoteRedmine,
worker_id: str,
@@ -212,6 +258,30 @@ LIMIT {sql_int(limit)};
)
def purge_processed(remote: RemoteRedmine, days: int, apply: bool) -> int:
if days < 0:
raise OutboxWorkerError("--purge-processed-days must be zero or greater.")
count_sql = f"""
SELECT HEX(CAST(JSON_OBJECT(
'count', COUNT(*)
) AS CHAR)) AS document
FROM event_outbox_events
WHERE processed_at IS NOT NULL
AND processed_at < UTC_TIMESTAMP() - INTERVAL {sql_int(days)} DAY;
"""
rows = remote.mysql_json_lines(count_sql)
count = int((rows[0] if rows else {}).get("count") or 0)
if apply and count:
remote.mysql(
f"""
DELETE FROM event_outbox_events
WHERE processed_at IS NOT NULL
AND processed_at < UTC_TIMESTAMP() - INTERVAL {sql_int(days)} DAY;
"""
)
return count
def enrich_event(remote: RemoteRedmine, event: dict[str, Any]) -> list[dict[str, Any]]:
payload = parse_payload(event.get("payload"))
documents: list[dict[str, Any]] = [event_document(event, payload)]