Files
redmine/redmine_outbox_worker.py
2026-05-04 09:49:47 -04:00

579 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
"""External worker for Redmine event outbox rows.
This worker runs outside Redmine. It claims pending `event_outbox_events` rows
from the Redmine database over SSH/MySQL, enriches them with read-only joins,
writes deterministic JSONL records, and marks rows processed only after the
write succeeds.
"""
import argparse
import json
import os
import socket
import subprocess
import sys
import time
import uuid
from pathlib import Path
from typing import Any, Iterable
DEFAULT_SSH_HOST = "reddev@192.168.50.170"
DEFAULT_SSH_KEY = Path("/tmp/reddev")
DEFAULT_REMOTE_REDMINE = "/usr/share/redmine"
DEFAULT_OUTPUT = Path("/tmp/redmine-outbox/derived_documents.jsonl")
class OutboxWorkerError(RuntimeError):
pass
class RemoteRedmine:
def __init__(self, ssh_host, ssh_key, remote_redmine, local=False):
self.ssh_host = ssh_host
self.ssh_key = ssh_key
self.remote_redmine = remote_redmine
self.local = local
def mysql_json_lines(self, sql):
stdout = self.mysql(sql)
rows = []
for line in stdout.splitlines():
if not line.strip():
continue
try:
rows.append(json.loads(bytes.fromhex(line.strip()).decode("utf-8")))
except json.JSONDecodeError as exc:
raise OutboxWorkerError(f"Remote query returned non-JSON row: {line[:200]}") from exc
except ValueError as exc:
raise OutboxWorkerError(f"Remote query returned non-hex row: {line[:200]}") from exc
return rows
def mysql(self, sql):
command = self._mysql_runner_command()
shell = True
if not self.local:
command = [
"ssh",
"-i",
str(self.ssh_key),
"-o",
"IdentitiesOnly=yes",
self.ssh_host,
self._mysql_runner_command(),
]
shell = False
try:
result = subprocess.run(
command,
input=sql,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
shell=shell,
)
except OSError as exc:
raise OutboxWorkerError(f"Could not run ssh: {exc}") from exc
if result.returncode != 0:
raise OutboxWorkerError(result.stderr.strip() or "Remote MySQL command failed.")
return result.stdout
def _mysql_runner_command(self):
ruby = (
"require 'yaml'; "
"c = YAML.load_file('config/database.yml')['production']; "
"ENV['MYSQL_PWD'] = c['password'].to_s; "
"args = ['--batch', '--raw', '--quick', '--skip-column-names', "
"'--default-character-set=utf8', '-h', c['host'].to_s, "
"'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; "
"exec('mysql', *args)"
)
return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}"
def main():
parser = argparse.ArgumentParser(description="Process Redmine event outbox rows into enriched JSONL documents.")
parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST))
parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY))))
parser.add_argument("--local", action="store_true", help="Read the Redmine database locally instead of over SSH.")
parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE))
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--batch-size", type=int, default=20)
parser.add_argument("--max-attempts", type=int, default=5)
parser.add_argument("--stale-lock-minutes", type=int, default=30)
parser.add_argument("--dry-run", action="store_true", help="Fetch and enrich pending rows without locking or marking them.")
parser.add_argument("--claim-only", action="store_true", help="Claim rows, print them, then release the claim without marking processed.")
parser.add_argument("--status", action="store_true", help="Report outbox row counts and exit.")
parser.add_argument(
"--purge-processed-days",
type=int,
help="Preview purging processed rows older than this many days, or delete with --apply-purge.",
)
parser.add_argument("--apply-purge", action="store_true", help="Actually delete rows selected by --purge-processed-days.")
args = parser.parse_args()
remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine, local=args.local)
worker_id = make_worker_id()
try:
if args.apply_purge and args.purge_processed_days is None:
raise OutboxWorkerError("--apply-purge requires --purge-processed-days.")
if args.status:
print(json.dumps(outbox_status(remote, args.max_attempts, args.stale_lock_minutes), indent=2, sort_keys=True))
return 0
if args.purge_processed_days is not None:
purge_count = purge_processed(remote, args.purge_processed_days, apply=args.apply_purge)
action = "purged" if args.apply_purge else "would purge"
print(f"{action} {purge_count} processed outbox row(s)")
return 0
if args.dry_run:
events = pending_events(remote, args.batch_size, args.max_attempts, args.stale_lock_minutes)
else:
events = claim_events(remote, worker_id, args.batch_size, args.max_attempts, args.stale_lock_minutes)
if args.claim_only:
print(json.dumps(events, indent=2, sort_keys=True))
if not args.dry_run:
release_claims(remote, worker_id)
return 0
processed = 0
for event in events:
try:
documents = enrich_event(remote, event)
if args.dry_run:
for document in documents:
print(json.dumps(document, ensure_ascii=False, sort_keys=True))
else:
append_jsonl(args.output, documents)
mark_processed(remote, event["id"], worker_id)
processed += 1
except Exception as exc:
if args.dry_run:
raise
mark_failed(remote, event["id"], worker_id, exc)
print(f"event #{event.get('id')} failed: {exc}", file=sys.stderr)
action = "previewed" if args.dry_run else "processed"
print(f"{action} {processed} outbox event(s)")
if processed and not args.dry_run:
print(f"output: {args.output}")
return 0
except OutboxWorkerError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
def pending_events(remote, limit, max_attempts, stale_lock_minutes):
return remote.mysql_json_lines(
f"""
SELECT HEX(CAST(JSON_OBJECT(
'id', id,
'event_type', event_type,
'source_type', source_type,
'source_id', source_id,
'project_id', project_id,
'issue_id', issue_id,
'journal_id', journal_id,
'user_id', user_id,
'occurred_at', DATE_FORMAT(occurred_at, '%Y-%m-%dT%H:%i:%sZ'),
'attempts', attempts,
'payload', payload
) AS CHAR)) AS document
FROM event_outbox_events
WHERE processed_at IS NULL
AND attempts < {sql_int(max_attempts)}
AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE)
ORDER BY id
LIMIT {sql_int(limit)};
"""
)
def outbox_status(remote, max_attempts, stale_lock_minutes):
rows = remote.mysql_json_lines(
f"""
SELECT HEX(CAST(JSON_OBJECT(
'total', COUNT(*),
'pending', SUM(IF(processed_at IS NULL AND attempts < {sql_int(max_attempts)}, 1, 0)),
'ready', SUM(IF(processed_at IS NULL
AND attempts < {sql_int(max_attempts)}
AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE), 1, 0)),
'locked', SUM(IF(processed_at IS NULL
AND locked_at IS NOT NULL
AND locked_at >= UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE, 1, 0)),
'failed', SUM(IF(processed_at IS NULL AND attempts >= {sql_int(max_attempts)}, 1, 0)),
'processed', SUM(IF(processed_at IS NOT NULL, 1, 0)),
'oldest_pending_id', MIN(IF(processed_at IS NULL, id, NULL)),
'newest_pending_id', MAX(IF(processed_at IS NULL, id, NULL)),
'oldest_processed_at', DATE_FORMAT(MIN(processed_at), '%Y-%m-%dT%H:%i:%sZ'),
'newest_processed_at', DATE_FORMAT(MAX(processed_at), '%Y-%m-%dT%H:%i:%sZ')
) AS CHAR)) AS document
FROM event_outbox_events;
"""
)
status = rows[0] if rows else {}
return {key: (0 if value is None and key in {"total", "pending", "ready", "locked", "failed", "processed"} else value) for key, value in status.items()}
def claim_events(
remote,
worker_id,
limit,
max_attempts,
stale_lock_minutes,
):
remote.mysql(
f"""
UPDATE event_outbox_events
SET locked_at = UTC_TIMESTAMP(), locked_by = {sql_string(worker_id)}
WHERE processed_at IS NULL
AND attempts < {sql_int(max_attempts)}
AND (locked_at IS NULL OR locked_at < UTC_TIMESTAMP() - INTERVAL {sql_int(stale_lock_minutes)} MINUTE)
ORDER BY id
LIMIT {sql_int(limit)};
"""
)
return remote.mysql_json_lines(
f"""
SELECT HEX(CAST(JSON_OBJECT(
'id', id,
'event_type', event_type,
'source_type', source_type,
'source_id', source_id,
'project_id', project_id,
'issue_id', issue_id,
'journal_id', journal_id,
'user_id', user_id,
'occurred_at', DATE_FORMAT(occurred_at, '%Y-%m-%dT%H:%i:%sZ'),
'attempts', attempts,
'payload', payload
) AS CHAR)) AS document
FROM event_outbox_events
WHERE processed_at IS NULL
AND locked_by = {sql_string(worker_id)}
ORDER BY id
LIMIT {sql_int(limit)};
"""
)
def purge_processed(remote, days, apply):
if days < 0:
raise OutboxWorkerError("--purge-processed-days must be zero or greater.")
count_sql = f"""
SELECT HEX(CAST(JSON_OBJECT(
'count', COUNT(*)
) AS CHAR)) AS document
FROM event_outbox_events
WHERE processed_at IS NOT NULL
AND processed_at < UTC_TIMESTAMP() - INTERVAL {sql_int(days)} DAY;
"""
rows = remote.mysql_json_lines(count_sql)
count = int((rows[0] if rows else {}).get("count") or 0)
if apply and count:
remote.mysql(
f"""
DELETE FROM event_outbox_events
WHERE processed_at IS NOT NULL
AND processed_at < UTC_TIMESTAMP() - INTERVAL {sql_int(days)} DAY;
"""
)
return count
def enrich_event(remote, event):
payload = parse_payload(event.get("payload"))
documents = [event_document(event, payload)]
event_type = str(event.get("event_type") or "")
if event_type.startswith("helpdesk_ticket."):
documents.extend(fetch_ticket_documents(remote, [payload.get("helpdesk_ticket_id") or event.get("source_id")]))
elif event_type.startswith("journal_message."):
documents.extend(fetch_message_documents(remote, [payload.get("journal_message_id") or event.get("source_id")]))
elif event_type.startswith("issue.") and event.get("issue_id"):
documents.extend(fetch_tickets_by_issue(remote, [event.get("issue_id")]))
elif event_type.startswith("journal.") and event.get("journal_id"):
documents.extend(fetch_messages_by_journal(remote, [event.get("journal_id")]))
elif event_type.startswith("contact."):
documents.extend(fetch_contact_documents(remote, [payload.get("contact_id") or event.get("source_id")]))
return [with_event_context(document, event) for document in documents]
def event_document(event, payload):
return {
"doc_type": "event",
"doc_id": f"event:{event.get('id')}",
"event_id": event.get("id"),
"event_type": event.get("event_type"),
"source_type": event.get("source_type"),
"source_id": event.get("source_id"),
"project_id": event.get("project_id"),
"issue_id": event.get("issue_id"),
"journal_id": event.get("journal_id"),
"user_id": event.get("user_id"),
"occurred_at": event.get("occurred_at"),
"payload": payload,
}
def fetch_ticket_documents(remote, ids):
id_list = sql_id_list(ids)
if not id_list:
return []
return remote.mysql_json_lines(ticket_sql(f"ht.id IN ({id_list})"))
def fetch_tickets_by_issue(remote, issue_ids):
id_list = sql_id_list(issue_ids)
if not id_list:
return []
return remote.mysql_json_lines(ticket_sql(f"ht.issue_id IN ({id_list})"))
def fetch_message_documents(remote, ids):
id_list = sql_id_list(ids)
if not id_list:
return []
return remote.mysql_json_lines(message_sql(f"jm.id IN ({id_list})"))
def fetch_messages_by_journal(remote, journal_ids):
id_list = sql_id_list(journal_ids)
if not id_list:
return []
return remote.mysql_json_lines(message_sql(f"jm.journal_id IN ({id_list})"))
def fetch_contact_documents(remote, ids):
id_list = sql_id_list(ids)
if not id_list:
return []
return remote.mysql_json_lines(
f"""
SELECT HEX(CAST(JSON_OBJECT(
'doc_type', 'contact',
'doc_id', CONCAT('contact:', c.id),
'contact_id', c.id,
'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
'contact_company', c.company,
'contact_email', c.email,
'is_company', c.is_company,
'created_on', DATE_FORMAT(c.created_on, '%Y-%m-%dT%H:%i:%sZ'),
'updated_on', DATE_FORMAT(c.updated_on, '%Y-%m-%dT%H:%i:%sZ'),
'text', CONCAT_WS('\\n',
TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
c.company,
c.email
)
) AS CHAR)) AS document
FROM contacts c
WHERE c.id IN ({id_list})
ORDER BY c.id;
"""
)
def ticket_sql(where_clause):
return f"""
SELECT HEX(CAST(JSON_OBJECT(
'doc_type', 'ticket',
'doc_id', CONCAT('ticket:', ht.id),
'helpdesk_ticket_id', ht.id,
'issue_id', i.id,
'project_id', i.project_id,
'project_identifier', p.identifier,
'contact_id', ht.contact_id,
'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
'contact_company', c.company,
'contact_email', c.email,
'from_address', ht.from_address,
'to_address', ht.to_address,
'cc_address', ht.cc_address,
'message_id', ht.message_id,
'source', ht.source,
'is_incoming', ht.is_incoming,
'issue_subject', i.subject,
'status', s.name,
'tracker', t.name,
'assigned_to', TRIM(CONCAT_WS(' ', au.firstname, au.lastname)),
'ticket_date', DATE_FORMAT(ht.ticket_date, '%Y-%m-%dT%H:%i:%sZ'),
'issue_updated_on', DATE_FORMAT(i.updated_on, '%Y-%m-%dT%H:%i:%sZ'),
'text', CONCAT_WS('\\n',
i.subject,
LEFT(i.description, 8000),
TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
c.company,
c.email,
ht.from_address,
ht.to_address,
ht.cc_address
)
) AS CHAR)) AS document
FROM helpdesk_tickets ht
JOIN issues i ON i.id = ht.issue_id
LEFT JOIN contacts c ON c.id = ht.contact_id
LEFT JOIN projects p ON p.id = i.project_id
LEFT JOIN issue_statuses s ON s.id = i.status_id
LEFT JOIN trackers t ON t.id = i.tracker_id
LEFT JOIN users au ON au.id = i.assigned_to_id
WHERE {where_clause}
ORDER BY ht.id;
"""
def message_sql(where_clause):
return f"""
SELECT HEX(CAST(JSON_OBJECT(
'doc_type', 'message',
'doc_id', CONCAT('message:', jm.id),
'journal_message_id', jm.id,
'journal_id', j.id,
'issue_id', i.id,
'project_id', i.project_id,
'project_identifier', p.identifier,
'contact_id', jm.contact_id,
'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
'contact_company', c.company,
'contact_email', c.email,
'from_address', jm.from_address,
'to_address', jm.to_address,
'cc_address', jm.cc_address,
'has_bcc_address', IF(jm.bcc_address IS NULL OR jm.bcc_address = '', false, true),
'message_id', jm.message_id,
'source', jm.source,
'is_incoming', jm.is_incoming,
'issue_subject', i.subject,
'status', s.name,
'tracker', t.name,
'journal_user', TRIM(CONCAT_WS(' ', ju.firstname, ju.lastname)),
'message_date', DATE_FORMAT(jm.message_date, '%Y-%m-%dT%H:%i:%sZ'),
'journal_created_on', DATE_FORMAT(j.created_on, '%Y-%m-%dT%H:%i:%sZ'),
'text', CONCAT_WS('\\n',
i.subject,
LEFT(j.notes, 8000),
TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
c.company,
c.email,
jm.from_address,
jm.to_address,
jm.cc_address
)
) AS CHAR)) AS document
FROM journal_messages jm
JOIN journals j ON j.id = jm.journal_id
JOIN issues i ON i.id = j.journalized_id AND j.journalized_type = 'Issue'
LEFT JOIN contacts c ON c.id = jm.contact_id
LEFT JOIN projects p ON p.id = i.project_id
LEFT JOIN issue_statuses s ON s.id = i.status_id
LEFT JOIN trackers t ON t.id = i.tracker_id
LEFT JOIN users ju ON ju.id = j.user_id
WHERE {where_clause}
ORDER BY jm.id;
"""
def with_event_context(document, event):
document["event_id"] = event.get("id")
document["event_type"] = event.get("event_type")
document["event_occurred_at"] = event.get("occurred_at")
document["derived_at"] = int(time.time())
return document
def append_jsonl(path, documents):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
for document in documents:
handle.write(json.dumps(document, ensure_ascii=False, sort_keys=True))
handle.write("\n")
def mark_processed(remote, event_id, worker_id):
remote.mysql(
f"""
UPDATE event_outbox_events
SET processed_at = UTC_TIMESTAMP(), locked_at = NULL, locked_by = NULL, last_error = NULL
WHERE id = {sql_int(event_id)}
AND locked_by = {sql_string(worker_id)};
"""
)
def mark_failed(remote, event_id, worker_id, exc):
message = f"{exc.__class__.__name__}: {exc}"
remote.mysql(
f"""
UPDATE event_outbox_events
SET attempts = attempts + 1,
last_error = {sql_string(message[:4000])},
locked_at = NULL,
locked_by = NULL
WHERE id = {sql_int(event_id)}
AND locked_by = {sql_string(worker_id)};
"""
)
def release_claims(remote, worker_id):
remote.mysql(
f"""
UPDATE event_outbox_events
SET locked_at = NULL, locked_by = NULL
WHERE processed_at IS NULL
AND locked_by = {sql_string(worker_id)};
"""
)
def parse_payload(value):
if isinstance(value, dict):
return value
if not value:
return {}
try:
parsed = json.loads(str(value))
except json.JSONDecodeError:
return {}
return parsed if isinstance(parsed, dict) else {}
def sql_id_list(values):
ids = []
for value in values:
try:
int_value = int(value)
except (TypeError, ValueError):
continue
if int_value > 0:
ids.append(str(int_value))
return ",".join(sorted(set(ids), key=int))
def sql_int(value):
try:
return max(0, int(value))
except (TypeError, ValueError):
return 0
def sql_string(value):
return "'" + str(value).replace("\\", "\\\\").replace("'", "\\'") + "'"
def shell_quote(value):
return "'" + value.replace("'", "'\"'\"'") + "'"
def make_worker_id():
return f"{socket.gethostname()}:{os.getpid()}:{uuid.uuid4().hex[:12]}"
if __name__ == "__main__":
raise SystemExit(main())