#!/usr/bin/env python3 """External worker for Redmine event outbox rows. This worker runs outside Redmine. It claims pending `event_outbox_events` rows from the Redmine database over SSH/MySQL, enriches them with read-only joins, writes deterministic JSONL records, and marks rows processed only after the write succeeds. """ import argparse import json import os import socket import subprocess import sys import time import uuid from pathlib import Path from typing import Any, Iterable DEFAULT_SSH_HOST = "reddev@192.168.50.170" DEFAULT_SSH_KEY = Path("/tmp/reddev") DEFAULT_REMOTE_REDMINE = "/usr/share/redmine" DEFAULT_OUTPUT = Path("/tmp/redmine-outbox/derived_documents.jsonl") class OutboxWorkerError(RuntimeError): pass class RemoteRedmine: def __init__(self, ssh_host, ssh_key, remote_redmine, local=False): self.ssh_host = ssh_host self.ssh_key = ssh_key self.remote_redmine = remote_redmine self.local = local def mysql_json_lines(self, sql): stdout = self.mysql(sql) rows = [] for line in stdout.splitlines(): if not line.strip(): continue try: rows.append(json.loads(bytes.fromhex(line.strip()).decode("utf-8"))) except json.JSONDecodeError as exc: raise OutboxWorkerError(f"Remote query returned non-JSON row: {line[:200]}") from exc except ValueError as exc: raise OutboxWorkerError(f"Remote query returned non-hex row: {line[:200]}") from exc return rows def mysql(self, sql): command = self._mysql_runner_command() shell = True if not self.local: command = [ "ssh", "-i", str(self.ssh_key), "-o", "IdentitiesOnly=yes", self.ssh_host, self._mysql_runner_command(), ] shell = False try: result = subprocess.run( command, input=sql, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, shell=shell, ) except OSError as exc: raise OutboxWorkerError(f"Could not run ssh: {exc}") from exc if result.returncode != 0: raise OutboxWorkerError(result.stderr.strip() or "Remote MySQL command failed.") return result.stdout def _mysql_runner_command(self): ruby = ( "require 'yaml'; " "c = YAML.load_file('config/database.yml')['production']; " "ENV['MYSQL_PWD'] = c['password'].to_s; " "args = ['--batch', '--raw', '--quick', '--skip-column-names', " "'--default-character-set=utf8', '-h', c['host'].to_s, " "'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; " "exec('mysql', *args)" ) return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}" def main(): parser = argparse.ArgumentParser(description="Process Redmine event outbox rows into enriched JSONL documents.") parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST)) parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY)))) parser.add_argument("--local", action="store_true", help="Read the Redmine database locally instead of over SSH.") parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE)) parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT) parser.add_argument("--batch-size", type=int, default=20) parser.add_argument("--max-attempts", type=int, default=5) parser.add_argument("--stale-lock-minutes", type=int, default=30) parser.add_argument("--dry-run", action="store_true", help="Fetch and enrich pending rows without locking or marking them.") parser.add_argument("--claim-only", action="store_true", help="Claim rows, print them, then release the claim without marking processed.") parser.add_argument("--status", action="store_true", help="Report outbox row counts and exit.") parser.add_argument( "--purge-processed-days", type=int, help="Preview purging processed rows older than this many days, or delete with --apply-purge.", ) parser.add_argument("--apply-purge", action="store_true", help="Actually delete rows selected by --purge-processed-days.") args = parser.parse_args() remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine, local=args.local) worker_id = make_worker_id() try: if args.apply_purge and args.purge_processed_days is None: raise OutboxWorkerError("--apply-purge requires --purge-processed-days.") if args.status: print(json.dumps(outbox_status(remote, args.max_attempts, args.stale_lock_minutes), indent=2, sort_keys=True)) return 0 if args.purge_processed_days is not None: purge_count = purge_processed(remote, args.purge_processed_days, apply=args.apply_purge) action = "purged" if args.apply_purge else "would purge" print(f"{action} {purge_count} processed outbox row(s)") return 0 if args.dry_run: events = pending_events(remote, args.batch_size, args.max_attempts, args.stale_lock_minutes) else: events = claim_events(remote, worker_id, args.batch_size, args.max_attempts, args.stale_lock_minutes) if args.claim_only: print(json.dumps(events, indent=2, sort_keys=True)) if not args.dry_run: release_claims(remote, worker_id) return 0 processed = 0 for event in events: try: documents = enrich_event(remote, event) if args.dry_run: for document in documents: print(json.dumps(document, ensure_ascii=False, sort_keys=True)) else: append_jsonl(args.output, documents) mark_processed(remote, event["id"], worker_id) processed += 1 except Exception as exc: if args.dry_run: raise mark_failed(remote, event["id"], worker_id, exc) print(f"event #{event.get('id')} failed: {exc}", file=sys.stderr) action = "previewed" if args.dry_run else "processed" print(f"{action} {processed} outbox event(s)") if processed and not args.dry_run: print(f"output: {args.output}") return 0 except OutboxWorkerError as exc: print(f"error: {exc}", file=sys.stderr) return 1 def pending_events(remote, limit, max_attempts, stale_lock_minutes): return remote.mysql_json_lines( f""" SELECT HEX(CAST(JSON_OBJECT( 'id', id, 'event_type', event_type, 'source_type', source_type, 'source_id', source_id, 'project_id', project_id, 'issue_id', issue_id, 'journal_id', journal_id, 'user_id', user_id, 'occurred_at', DATE_FORMAT(occurred_at, '%Y-%m-%dT%H:%i:%sZ'), 'attempts', attempts, 'payload', payload ) AS CHAR)) AS document FROM event_outbox_events WHERE processed_at IS NULL AND attempts < {sql_int(max_attempts)} AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE) ORDER BY id LIMIT {sql_int(limit)}; """ ) def outbox_status(remote, max_attempts, stale_lock_minutes): rows = remote.mysql_json_lines( f""" SELECT HEX(CAST(JSON_OBJECT( 'total', COUNT(*), 'pending', SUM(IF(processed_at IS NULL AND attempts < {sql_int(max_attempts)}, 1, 0)), 'ready', SUM(IF(processed_at IS NULL AND attempts < {sql_int(max_attempts)} AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE), 1, 0)), 'locked', SUM(IF(processed_at IS NULL AND locked_at IS NOT NULL AND locked_at >= UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE, 1, 0)), 'failed', SUM(IF(processed_at IS NULL AND attempts >= {sql_int(max_attempts)}, 1, 0)), 'processed', SUM(IF(processed_at IS NOT NULL, 1, 0)), 'oldest_pending_id', MIN(IF(processed_at IS NULL, id, NULL)), 'newest_pending_id', MAX(IF(processed_at IS NULL, id, NULL)), 'oldest_processed_at', DATE_FORMAT(MIN(processed_at), '%Y-%m-%dT%H:%i:%sZ'), 'newest_processed_at', DATE_FORMAT(MAX(processed_at), '%Y-%m-%dT%H:%i:%sZ') ) AS CHAR)) AS document FROM event_outbox_events; """ ) status = rows[0] if rows else {} return {key: (0 if value is None and key in {"total", "pending", "ready", "locked", "failed", "processed"} else value) for key, value in status.items()} def claim_events( remote, worker_id, limit, max_attempts, stale_lock_minutes, ): remote.mysql( f""" UPDATE event_outbox_events SET locked_at = UTC_TIMESTAMP(), locked_by = {sql_string(worker_id)} WHERE processed_at IS NULL AND attempts < {sql_int(max_attempts)} AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE) ORDER BY id LIMIT {sql_int(limit)}; """ ) return remote.mysql_json_lines( f""" SELECT HEX(CAST(JSON_OBJECT( 'id', id, 'event_type', event_type, 'source_type', source_type, 'source_id', source_id, 'project_id', project_id, 'issue_id', issue_id, 'journal_id', journal_id, 'user_id', user_id, 'occurred_at', DATE_FORMAT(occurred_at, '%Y-%m-%dT%H:%i:%sZ'), 'attempts', attempts, 'payload', payload ) AS CHAR)) AS document FROM event_outbox_events WHERE processed_at IS NULL AND locked_by = {sql_string(worker_id)} ORDER BY id LIMIT {sql_int(limit)}; """ ) def purge_processed(remote, days, apply): if days < 0: raise OutboxWorkerError("--purge-processed-days must be zero or greater.") count_sql = f""" SELECT HEX(CAST(JSON_OBJECT( 'count', COUNT(*) ) AS CHAR)) AS document FROM event_outbox_events WHERE processed_at IS NOT NULL AND processed_at < UTC_TIMESTAMP() - INTERVAL {sql_int(days)} DAY; """ rows = remote.mysql_json_lines(count_sql) count = int((rows[0] if rows else {}).get("count") or 0) if apply and count: remote.mysql( f""" DELETE FROM event_outbox_events WHERE processed_at IS NOT NULL AND processed_at < UTC_TIMESTAMP() - INTERVAL {sql_int(days)} DAY; """ ) return count def enrich_event(remote, event): payload = parse_payload(event.get("payload")) documents = [event_document(event, payload)] event_type = str(event.get("event_type") or "") if event_type.startswith("helpdesk_ticket."): documents.extend(fetch_ticket_documents(remote, [payload.get("helpdesk_ticket_id") or event.get("source_id")])) elif event_type.startswith("journal_message."): documents.extend(fetch_message_documents(remote, [payload.get("journal_message_id") or event.get("source_id")])) elif event_type.startswith("issue.") and event.get("issue_id"): documents.extend(fetch_tickets_by_issue(remote, [event.get("issue_id")])) elif event_type.startswith("journal.") and event.get("journal_id"): documents.extend(fetch_messages_by_journal(remote, [event.get("journal_id")])) elif event_type.startswith("contact."): documents.extend(fetch_contact_documents(remote, [payload.get("contact_id") or event.get("source_id")])) return [with_event_context(document, event) for document in documents] def event_document(event, payload): return { "doc_type": "event", "doc_id": f"event:{event.get('id')}", "event_id": event.get("id"), "event_type": event.get("event_type"), "source_type": event.get("source_type"), "source_id": event.get("source_id"), "project_id": event.get("project_id"), "issue_id": event.get("issue_id"), "journal_id": event.get("journal_id"), "user_id": event.get("user_id"), "occurred_at": event.get("occurred_at"), "payload": payload, } def fetch_ticket_documents(remote, ids): id_list = sql_id_list(ids) if not id_list: return [] return remote.mysql_json_lines(ticket_sql(f"ht.id IN ({id_list})")) def fetch_tickets_by_issue(remote, issue_ids): id_list = sql_id_list(issue_ids) if not id_list: return [] return remote.mysql_json_lines(ticket_sql(f"ht.issue_id IN ({id_list})")) def fetch_message_documents(remote, ids): id_list = sql_id_list(ids) if not id_list: return [] return remote.mysql_json_lines(message_sql(f"jm.id IN ({id_list})")) def fetch_messages_by_journal(remote, journal_ids): id_list = sql_id_list(journal_ids) if not id_list: return [] return remote.mysql_json_lines(message_sql(f"jm.journal_id IN ({id_list})")) def fetch_contact_documents(remote, ids): id_list = sql_id_list(ids) if not id_list: return [] return remote.mysql_json_lines( f""" SELECT HEX(CAST(JSON_OBJECT( 'doc_type', 'contact', 'doc_id', CONCAT('contact:', c.id), 'contact_id', c.id, 'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), 'contact_company', c.company, 'contact_email', c.email, 'is_company', c.is_company, 'created_on', DATE_FORMAT(c.created_on, '%Y-%m-%dT%H:%i:%sZ'), 'updated_on', DATE_FORMAT(c.updated_on, '%Y-%m-%dT%H:%i:%sZ'), 'text', CONCAT_WS('\\n', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), c.company, c.email ) ) AS CHAR)) AS document FROM contacts c WHERE c.id IN ({id_list}) ORDER BY c.id; """ ) def ticket_sql(where_clause): return f""" SELECT HEX(CAST(JSON_OBJECT( 'doc_type', 'ticket', 'doc_id', CONCAT('ticket:', ht.id), 'helpdesk_ticket_id', ht.id, 'issue_id', i.id, 'project_id', i.project_id, 'project_identifier', p.identifier, 'contact_id', ht.contact_id, 'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), 'contact_company', c.company, 'contact_email', c.email, 'from_address', ht.from_address, 'to_address', ht.to_address, 'cc_address', ht.cc_address, 'message_id', ht.message_id, 'source', ht.source, 'is_incoming', ht.is_incoming, 'issue_subject', i.subject, 'status', s.name, 'tracker', t.name, 'assigned_to', TRIM(CONCAT_WS(' ', au.firstname, au.lastname)), 'ticket_date', DATE_FORMAT(ht.ticket_date, '%Y-%m-%dT%H:%i:%sZ'), 'issue_updated_on', DATE_FORMAT(i.updated_on, '%Y-%m-%dT%H:%i:%sZ'), 'text', CONCAT_WS('\\n', i.subject, LEFT(i.description, 8000), TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), c.company, c.email, ht.from_address, ht.to_address, ht.cc_address ) ) AS CHAR)) AS document FROM helpdesk_tickets ht JOIN issues i ON i.id = ht.issue_id LEFT JOIN contacts c ON c.id = ht.contact_id LEFT JOIN projects p ON p.id = i.project_id LEFT JOIN issue_statuses s ON s.id = i.status_id LEFT JOIN trackers t ON t.id = i.tracker_id LEFT JOIN users au ON au.id = i.assigned_to_id WHERE {where_clause} ORDER BY ht.id; """ def message_sql(where_clause): return f""" SELECT HEX(CAST(JSON_OBJECT( 'doc_type', 'message', 'doc_id', CONCAT('message:', jm.id), 'journal_message_id', jm.id, 'journal_id', j.id, 'issue_id', i.id, 'project_id', i.project_id, 'project_identifier', p.identifier, 'contact_id', jm.contact_id, 'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), 'contact_company', c.company, 'contact_email', c.email, 'from_address', jm.from_address, 'to_address', jm.to_address, 'cc_address', jm.cc_address, 'has_bcc_address', IF(jm.bcc_address IS NULL OR jm.bcc_address = '', false, true), 'message_id', jm.message_id, 'source', jm.source, 'is_incoming', jm.is_incoming, 'issue_subject', i.subject, 'status', s.name, 'tracker', t.name, 'journal_user', TRIM(CONCAT_WS(' ', ju.firstname, ju.lastname)), 'message_date', DATE_FORMAT(jm.message_date, '%Y-%m-%dT%H:%i:%sZ'), 'journal_created_on', DATE_FORMAT(j.created_on, '%Y-%m-%dT%H:%i:%sZ'), 'text', CONCAT_WS('\\n', i.subject, LEFT(j.notes, 8000), TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), c.company, c.email, jm.from_address, jm.to_address, jm.cc_address ) ) AS CHAR)) AS document FROM journal_messages jm JOIN journals j ON j.id = jm.journal_id JOIN issues i ON i.id = j.journalized_id AND j.journalized_type = 'Issue' LEFT JOIN contacts c ON c.id = jm.contact_id LEFT JOIN projects p ON p.id = i.project_id LEFT JOIN issue_statuses s ON s.id = i.status_id LEFT JOIN trackers t ON t.id = i.tracker_id LEFT JOIN users ju ON ju.id = j.user_id WHERE {where_clause} ORDER BY jm.id; """ def with_event_context(document, event): document["event_id"] = event.get("id") document["event_type"] = event.get("event_type") document["event_occurred_at"] = event.get("occurred_at") document["derived_at"] = int(time.time()) return document def append_jsonl(path, documents): path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: for document in documents: handle.write(json.dumps(document, ensure_ascii=False, sort_keys=True)) handle.write("\n") def mark_processed(remote, event_id, worker_id): remote.mysql( f""" UPDATE event_outbox_events SET processed_at = UTC_TIMESTAMP(), locked_at = NULL, locked_by = NULL, last_error = NULL WHERE id = {sql_int(event_id)} AND locked_by = {sql_string(worker_id)}; """ ) def mark_failed(remote, event_id, worker_id, exc): message = f"{exc.__class__.__name__}: {exc}" remote.mysql( f""" UPDATE event_outbox_events SET attempts = attempts + 1, last_error = {sql_string(message[:4000])}, locked_at = NULL, locked_by = NULL WHERE id = {sql_int(event_id)} AND locked_by = {sql_string(worker_id)}; """ ) def release_claims(remote, worker_id): remote.mysql( f""" UPDATE event_outbox_events SET locked_at = NULL, locked_by = NULL WHERE processed_at IS NULL AND locked_by = {sql_string(worker_id)}; """ ) def parse_payload(value): if isinstance(value, dict): return value if not value: return {} try: parsed = json.loads(str(value)) except json.JSONDecodeError: return {} return parsed if isinstance(parsed, dict) else {} def sql_id_list(values): ids = [] for value in values: try: int_value = int(value) except (TypeError, ValueError): continue if int_value > 0: ids.append(str(int_value)) return ",".join(sorted(set(ids), key=int)) def sql_int(value): try: return max(0, int(value)) except (TypeError, ValueError): return 0 def sql_string(value): return "'" + str(value).replace("\\", "\\\\").replace("'", "\\'") + "'" def shell_quote(value): return "'" + value.replace("'", "'\"'\"'") + "'" def make_worker_id(): return f"{socket.gethostname()}:{os.getpid()}:{uuid.uuid4().hex[:12]}" if __name__ == "__main__": raise SystemExit(main())