#!/usr/bin/env python3 """External worker for Redmine event outbox rows. This worker runs outside Redmine. It claims pending `event_outbox_events` rows from the Redmine database over SSH/MySQL, enriches them with read-only joins, writes deterministic JSONL records, and marks rows processed only after the write succeeds. """ from __future__ import annotations import argparse import json import os import socket import subprocess import sys import time import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, Iterable DEFAULT_SSH_HOST = "reddev@192.168.50.170" DEFAULT_SSH_KEY = Path("/tmp/reddev") DEFAULT_REMOTE_REDMINE = "/usr/share/redmine" DEFAULT_OUTPUT = Path(".cache/redmine_outbox/derived_documents.jsonl") class OutboxWorkerError(RuntimeError): pass @dataclass(frozen=True) class RemoteRedmine: ssh_host: str ssh_key: Path remote_redmine: str def mysql_json_lines(self, sql: str) -> list[dict[str, Any]]: stdout = self.mysql(sql) rows: list[dict[str, Any]] = [] for line in stdout.splitlines(): if not line.strip(): continue try: rows.append(json.loads(bytes.fromhex(line.strip()).decode("utf-8"))) except json.JSONDecodeError as exc: raise OutboxWorkerError(f"Remote query returned non-JSON row: {line[:200]}") from exc except ValueError as exc: raise OutboxWorkerError(f"Remote query returned non-hex row: {line[:200]}") from exc return rows def mysql(self, sql: str) -> str: command = [ "ssh", "-i", str(self.ssh_key), "-o", "IdentitiesOnly=yes", self.ssh_host, self._mysql_runner_command(), ] try: result = subprocess.run( command, input=sql, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) except OSError as exc: raise OutboxWorkerError(f"Could not run ssh: {exc}") from exc if result.returncode != 0: raise OutboxWorkerError(result.stderr.strip() or "Remote MySQL command failed.") return result.stdout def _mysql_runner_command(self) -> str: ruby = ( "require 'yaml'; " "c = YAML.load_file('config/database.yml')['production']; " "ENV['MYSQL_PWD'] = c['password'].to_s; " "args = ['--batch', '--raw', '--quick', '--skip-column-names', " "'--default-character-set=utf8', '-h', c['host'].to_s, " "'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; " "exec('mysql', *args)" ) return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}" def main() -> int: parser = argparse.ArgumentParser(description="Process Redmine event outbox rows into enriched JSONL documents.") parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST)) parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY)))) parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE)) parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT) parser.add_argument("--batch-size", type=int, default=20) parser.add_argument("--max-attempts", type=int, default=5) parser.add_argument("--stale-lock-minutes", type=int, default=30) parser.add_argument("--dry-run", action="store_true", help="Fetch and enrich pending rows without locking or marking them.") parser.add_argument("--claim-only", action="store_true", help="Claim rows, print them, then release the claim without marking processed.") args = parser.parse_args() remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine) worker_id = make_worker_id() try: if args.dry_run: events = pending_events(remote, args.batch_size, args.max_attempts, args.stale_lock_minutes) else: events = claim_events(remote, worker_id, args.batch_size, args.max_attempts, args.stale_lock_minutes) if args.claim_only: print(json.dumps(events, indent=2, sort_keys=True)) if not args.dry_run: release_claims(remote, worker_id) return 0 processed = 0 for event in events: try: documents = enrich_event(remote, event) if args.dry_run: for document in documents: print(json.dumps(document, ensure_ascii=False, sort_keys=True)) else: append_jsonl(args.output, documents) mark_processed(remote, event["id"], worker_id) processed += 1 except Exception as exc: if args.dry_run: raise mark_failed(remote, event["id"], worker_id, exc) print(f"event #{event.get('id')} failed: {exc}", file=sys.stderr) action = "previewed" if args.dry_run else "processed" print(f"{action} {processed} outbox event(s)") return 0 except OutboxWorkerError as exc: print(f"error: {exc}", file=sys.stderr) return 1 def pending_events(remote: RemoteRedmine, limit: int, max_attempts: int, stale_lock_minutes: int) -> list[dict[str, Any]]: return remote.mysql_json_lines( f""" SELECT HEX(CAST(JSON_OBJECT( 'id', id, 'event_type', event_type, 'source_type', source_type, 'source_id', source_id, 'project_id', project_id, 'issue_id', issue_id, 'journal_id', journal_id, 'user_id', user_id, 'occurred_at', DATE_FORMAT(occurred_at, '%Y-%m-%dT%H:%i:%sZ'), 'attempts', attempts, 'payload', payload ) AS CHAR)) AS document FROM event_outbox_events WHERE processed_at IS NULL AND attempts < {sql_int(max_attempts)} AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE) ORDER BY id LIMIT {sql_int(limit)}; """ ) def claim_events( remote: RemoteRedmine, worker_id: str, limit: int, max_attempts: int, stale_lock_minutes: int, ) -> list[dict[str, Any]]: remote.mysql( f""" UPDATE event_outbox_events SET locked_at = UTC_TIMESTAMP(), locked_by = {sql_string(worker_id)} WHERE processed_at IS NULL AND attempts < {sql_int(max_attempts)} AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE) ORDER BY id LIMIT {sql_int(limit)}; """ ) return remote.mysql_json_lines( f""" SELECT HEX(CAST(JSON_OBJECT( 'id', id, 'event_type', event_type, 'source_type', source_type, 'source_id', source_id, 'project_id', project_id, 'issue_id', issue_id, 'journal_id', journal_id, 'user_id', user_id, 'occurred_at', DATE_FORMAT(occurred_at, '%Y-%m-%dT%H:%i:%sZ'), 'attempts', attempts, 'payload', payload ) AS CHAR)) AS document FROM event_outbox_events WHERE processed_at IS NULL AND locked_by = {sql_string(worker_id)} ORDER BY id LIMIT {sql_int(limit)}; """ ) def enrich_event(remote: RemoteRedmine, event: dict[str, Any]) -> list[dict[str, Any]]: payload = parse_payload(event.get("payload")) documents: list[dict[str, Any]] = [event_document(event, payload)] event_type = str(event.get("event_type") or "") if event_type.startswith("helpdesk_ticket."): documents.extend(fetch_ticket_documents(remote, [payload.get("helpdesk_ticket_id") or event.get("source_id")])) elif event_type.startswith("journal_message."): documents.extend(fetch_message_documents(remote, [payload.get("journal_message_id") or event.get("source_id")])) elif event_type.startswith("issue.") and event.get("issue_id"): documents.extend(fetch_tickets_by_issue(remote, [event.get("issue_id")])) elif event_type.startswith("journal.") and event.get("journal_id"): documents.extend(fetch_messages_by_journal(remote, [event.get("journal_id")])) elif event_type.startswith("contact."): documents.extend(fetch_contact_documents(remote, [payload.get("contact_id") or event.get("source_id")])) return [with_event_context(document, event) for document in documents] def event_document(event: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]: return { "doc_type": "event", "doc_id": f"event:{event.get('id')}", "event_id": event.get("id"), "event_type": event.get("event_type"), "source_type": event.get("source_type"), "source_id": event.get("source_id"), "project_id": event.get("project_id"), "issue_id": event.get("issue_id"), "journal_id": event.get("journal_id"), "user_id": event.get("user_id"), "occurred_at": event.get("occurred_at"), "payload": payload, } def fetch_ticket_documents(remote: RemoteRedmine, ids: Iterable[Any]) -> list[dict[str, Any]]: id_list = sql_id_list(ids) if not id_list: return [] return remote.mysql_json_lines(ticket_sql(f"ht.id IN ({id_list})")) def fetch_tickets_by_issue(remote: RemoteRedmine, issue_ids: Iterable[Any]) -> list[dict[str, Any]]: id_list = sql_id_list(issue_ids) if not id_list: return [] return remote.mysql_json_lines(ticket_sql(f"ht.issue_id IN ({id_list})")) def fetch_message_documents(remote: RemoteRedmine, ids: Iterable[Any]) -> list[dict[str, Any]]: id_list = sql_id_list(ids) if not id_list: return [] return remote.mysql_json_lines(message_sql(f"jm.id IN ({id_list})")) def fetch_messages_by_journal(remote: RemoteRedmine, journal_ids: Iterable[Any]) -> list[dict[str, Any]]: id_list = sql_id_list(journal_ids) if not id_list: return [] return remote.mysql_json_lines(message_sql(f"jm.journal_id IN ({id_list})")) def fetch_contact_documents(remote: RemoteRedmine, ids: Iterable[Any]) -> list[dict[str, Any]]: id_list = sql_id_list(ids) if not id_list: return [] return remote.mysql_json_lines( f""" SELECT HEX(CAST(JSON_OBJECT( 'doc_type', 'contact', 'doc_id', CONCAT('contact:', c.id), 'contact_id', c.id, 'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), 'contact_company', c.company, 'contact_email', c.email, 'is_company', c.is_company, 'created_on', DATE_FORMAT(c.created_on, '%Y-%m-%dT%H:%i:%sZ'), 'updated_on', DATE_FORMAT(c.updated_on, '%Y-%m-%dT%H:%i:%sZ'), 'text', CONCAT_WS('\\n', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), c.company, c.email ) ) AS CHAR)) AS document FROM contacts c WHERE c.id IN ({id_list}) ORDER BY c.id; """ ) def ticket_sql(where_clause: str) -> str: return f""" SELECT HEX(CAST(JSON_OBJECT( 'doc_type', 'ticket', 'doc_id', CONCAT('ticket:', ht.id), 'helpdesk_ticket_id', ht.id, 'issue_id', i.id, 'project_id', i.project_id, 'project_identifier', p.identifier, 'contact_id', ht.contact_id, 'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), 'contact_company', c.company, 'contact_email', c.email, 'from_address', ht.from_address, 'to_address', ht.to_address, 'cc_address', ht.cc_address, 'message_id', ht.message_id, 'source', ht.source, 'is_incoming', ht.is_incoming, 'issue_subject', i.subject, 'status', s.name, 'tracker', t.name, 'assigned_to', TRIM(CONCAT_WS(' ', au.firstname, au.lastname)), 'ticket_date', DATE_FORMAT(ht.ticket_date, '%Y-%m-%dT%H:%i:%sZ'), 'issue_updated_on', DATE_FORMAT(i.updated_on, '%Y-%m-%dT%H:%i:%sZ'), 'text', CONCAT_WS('\\n', i.subject, LEFT(i.description, 8000), TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), c.company, c.email, ht.from_address, ht.to_address, ht.cc_address ) ) AS CHAR)) AS document FROM helpdesk_tickets ht JOIN issues i ON i.id = ht.issue_id LEFT JOIN contacts c ON c.id = ht.contact_id LEFT JOIN projects p ON p.id = i.project_id LEFT JOIN issue_statuses s ON s.id = i.status_id LEFT JOIN trackers t ON t.id = i.tracker_id LEFT JOIN users au ON au.id = i.assigned_to_id WHERE {where_clause} ORDER BY ht.id; """ def message_sql(where_clause: str) -> str: return f""" SELECT HEX(CAST(JSON_OBJECT( 'doc_type', 'message', 'doc_id', CONCAT('message:', jm.id), 'journal_message_id', jm.id, 'journal_id', j.id, 'issue_id', i.id, 'project_id', i.project_id, 'project_identifier', p.identifier, 'contact_id', jm.contact_id, 'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), 'contact_company', c.company, 'contact_email', c.email, 'from_address', jm.from_address, 'to_address', jm.to_address, 'cc_address', jm.cc_address, 'has_bcc_address', IF(jm.bcc_address IS NULL OR jm.bcc_address = '', false, true), 'message_id', jm.message_id, 'source', jm.source, 'is_incoming', jm.is_incoming, 'issue_subject', i.subject, 'status', s.name, 'tracker', t.name, 'journal_user', TRIM(CONCAT_WS(' ', ju.firstname, ju.lastname)), 'message_date', DATE_FORMAT(jm.message_date, '%Y-%m-%dT%H:%i:%sZ'), 'journal_created_on', DATE_FORMAT(j.created_on, '%Y-%m-%dT%H:%i:%sZ'), 'text', CONCAT_WS('\\n', i.subject, LEFT(j.notes, 8000), TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), c.company, c.email, jm.from_address, jm.to_address, jm.cc_address ) ) AS CHAR)) AS document FROM journal_messages jm JOIN journals j ON j.id = jm.journal_id JOIN issues i ON i.id = j.journalized_id AND j.journalized_type = 'Issue' LEFT JOIN contacts c ON c.id = jm.contact_id LEFT JOIN projects p ON p.id = i.project_id LEFT JOIN issue_statuses s ON s.id = i.status_id LEFT JOIN trackers t ON t.id = i.tracker_id LEFT JOIN users ju ON ju.id = j.user_id WHERE {where_clause} ORDER BY jm.id; """ def with_event_context(document: dict[str, Any], event: dict[str, Any]) -> dict[str, Any]: document["event_id"] = event.get("id") document["event_type"] = event.get("event_type") document["event_occurred_at"] = event.get("occurred_at") document["derived_at"] = int(time.time()) return document def append_jsonl(path: Path, documents: Iterable[dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: for document in documents: handle.write(json.dumps(document, ensure_ascii=False, sort_keys=True)) handle.write("\n") def mark_processed(remote: RemoteRedmine, event_id: Any, worker_id: str) -> None: remote.mysql( f""" UPDATE event_outbox_events SET processed_at = UTC_TIMESTAMP(), locked_at = NULL, locked_by = NULL, last_error = NULL WHERE id = {sql_int(event_id)} AND locked_by = {sql_string(worker_id)}; """ ) def mark_failed(remote: RemoteRedmine, event_id: Any, worker_id: str, exc: Exception) -> None: message = f"{exc.__class__.__name__}: {exc}" remote.mysql( f""" UPDATE event_outbox_events SET attempts = attempts + 1, last_error = {sql_string(message[:4000])}, locked_at = NULL, locked_by = NULL WHERE id = {sql_int(event_id)} AND locked_by = {sql_string(worker_id)}; """ ) def release_claims(remote: RemoteRedmine, worker_id: str) -> None: remote.mysql( f""" UPDATE event_outbox_events SET locked_at = NULL, locked_by = NULL WHERE processed_at IS NULL AND locked_by = {sql_string(worker_id)}; """ ) def parse_payload(value: Any) -> dict[str, Any]: if isinstance(value, dict): return value if not value: return {} try: parsed = json.loads(str(value)) except json.JSONDecodeError: return {} return parsed if isinstance(parsed, dict) else {} def sql_id_list(values: Iterable[Any]) -> str: ids = [] for value in values: try: int_value = int(value) except (TypeError, ValueError): continue if int_value > 0: ids.append(str(int_value)) return ",".join(sorted(set(ids), key=int)) def sql_int(value: Any) -> int: try: return max(0, int(value)) except (TypeError, ValueError): return 0 def sql_string(value: str) -> str: return "'" + str(value).replace("\\", "\\\\").replace("'", "\\'") + "'" def shell_quote(value: str) -> str: return "'" + value.replace("'", "'\"'\"'") + "'" def make_worker_id() -> str: return f"{socket.gethostname()}:{os.getpid()}:{uuid.uuid4().hex[:12]}" if __name__ == "__main__": raise SystemExit(main())