Automate post-import refresh and validation workflow

This commit is contained in:
Jason Thistlethwaite
2026-05-04 09:49:47 -04:00
parent fba494dada
commit faad70872b
13 changed files with 995 additions and 136 deletions
+57 -53
View File
@@ -7,8 +7,6 @@ writes deterministic JSONL records, and marks rows processed only after the
write succeeds.
"""
from __future__ import annotations
import argparse
import json
import os
@@ -17,7 +15,6 @@ import subprocess
import sys
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable
@@ -32,15 +29,16 @@ class OutboxWorkerError(RuntimeError):
pass
@dataclass(frozen=True)
class RemoteRedmine:
ssh_host: str
ssh_key: Path
remote_redmine: str
def __init__(self, ssh_host, ssh_key, remote_redmine, local=False):
self.ssh_host = ssh_host
self.ssh_key = ssh_key
self.remote_redmine = remote_redmine
self.local = local
def mysql_json_lines(self, sql: str) -> list[dict[str, Any]]:
def mysql_json_lines(self, sql):
stdout = self.mysql(sql)
rows: list[dict[str, Any]] = []
rows = []
for line in stdout.splitlines():
if not line.strip():
continue
@@ -52,24 +50,29 @@ class RemoteRedmine:
raise OutboxWorkerError(f"Remote query returned non-hex row: {line[:200]}") from exc
return rows
def mysql(self, sql: str) -> str:
command = [
"ssh",
"-i",
str(self.ssh_key),
"-o",
"IdentitiesOnly=yes",
self.ssh_host,
self._mysql_runner_command(),
]
def mysql(self, sql):
command = self._mysql_runner_command()
shell = True
if not self.local:
command = [
"ssh",
"-i",
str(self.ssh_key),
"-o",
"IdentitiesOnly=yes",
self.ssh_host,
self._mysql_runner_command(),
]
shell = False
try:
result = subprocess.run(
command,
input=sql,
text=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
shell=shell,
)
except OSError as exc:
raise OutboxWorkerError(f"Could not run ssh: {exc}") from exc
@@ -78,7 +81,7 @@ class RemoteRedmine:
raise OutboxWorkerError(result.stderr.strip() or "Remote MySQL command failed.")
return result.stdout
def _mysql_runner_command(self) -> str:
def _mysql_runner_command(self):
ruby = (
"require 'yaml'; "
"c = YAML.load_file('config/database.yml')['production']; "
@@ -91,10 +94,11 @@ class RemoteRedmine:
return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}"
def main() -> int:
def main():
parser = argparse.ArgumentParser(description="Process Redmine event outbox rows into enriched JSONL documents.")
parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST))
parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY))))
parser.add_argument("--local", action="store_true", help="Read the Redmine database locally instead of over SSH.")
parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE))
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--batch-size", type=int, default=20)
@@ -111,7 +115,7 @@ def main() -> int:
parser.add_argument("--apply-purge", action="store_true", help="Actually delete rows selected by --purge-processed-days.")
args = parser.parse_args()
remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine)
remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine, local=args.local)
worker_id = make_worker_id()
try:
@@ -164,7 +168,7 @@ def main() -> int:
return 1
def pending_events(remote: RemoteRedmine, limit: int, max_attempts: int, stale_lock_minutes: int) -> list[dict[str, Any]]:
def pending_events(remote, limit, max_attempts, stale_lock_minutes):
return remote.mysql_json_lines(
f"""
SELECT HEX(CAST(JSON_OBJECT(
@@ -190,7 +194,7 @@ LIMIT {sql_int(limit)};
)
def outbox_status(remote: RemoteRedmine, max_attempts: int, stale_lock_minutes: int) -> dict[str, Any]:
def outbox_status(remote, max_attempts, stale_lock_minutes):
rows = remote.mysql_json_lines(
f"""
SELECT HEX(CAST(JSON_OBJECT(
@@ -217,12 +221,12 @@ FROM event_outbox_events;
def claim_events(
remote: RemoteRedmine,
worker_id: str,
limit: int,
max_attempts: int,
stale_lock_minutes: int,
) -> list[dict[str, Any]]:
remote,
worker_id,
limit,
max_attempts,
stale_lock_minutes,
):
remote.mysql(
f"""
UPDATE event_outbox_events
@@ -258,7 +262,7 @@ LIMIT {sql_int(limit)};
)
def purge_processed(remote: RemoteRedmine, days: int, apply: bool) -> int:
def purge_processed(remote, days, apply):
if days < 0:
raise OutboxWorkerError("--purge-processed-days must be zero or greater.")
count_sql = f"""
@@ -282,9 +286,9 @@ WHERE processed_at IS NOT NULL
return count
def enrich_event(remote: RemoteRedmine, event: dict[str, Any]) -> list[dict[str, Any]]:
def enrich_event(remote, event):
payload = parse_payload(event.get("payload"))
documents: list[dict[str, Any]] = [event_document(event, payload)]
documents = [event_document(event, payload)]
event_type = str(event.get("event_type") or "")
if event_type.startswith("helpdesk_ticket."):
@@ -301,7 +305,7 @@ def enrich_event(remote: RemoteRedmine, event: dict[str, Any]) -> list[dict[str,
return [with_event_context(document, event) for document in documents]
def event_document(event: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]:
def event_document(event, payload):
return {
"doc_type": "event",
"doc_id": f"event:{event.get('id')}",
@@ -318,35 +322,35 @@ def event_document(event: dict[str, Any], payload: dict[str, Any]) -> dict[str,
}
def fetch_ticket_documents(remote: RemoteRedmine, ids: Iterable[Any]) -> list[dict[str, Any]]:
def fetch_ticket_documents(remote, ids):
id_list = sql_id_list(ids)
if not id_list:
return []
return remote.mysql_json_lines(ticket_sql(f"ht.id IN ({id_list})"))
def fetch_tickets_by_issue(remote: RemoteRedmine, issue_ids: Iterable[Any]) -> list[dict[str, Any]]:
def fetch_tickets_by_issue(remote, issue_ids):
id_list = sql_id_list(issue_ids)
if not id_list:
return []
return remote.mysql_json_lines(ticket_sql(f"ht.issue_id IN ({id_list})"))
def fetch_message_documents(remote: RemoteRedmine, ids: Iterable[Any]) -> list[dict[str, Any]]:
def fetch_message_documents(remote, ids):
id_list = sql_id_list(ids)
if not id_list:
return []
return remote.mysql_json_lines(message_sql(f"jm.id IN ({id_list})"))
def fetch_messages_by_journal(remote: RemoteRedmine, journal_ids: Iterable[Any]) -> list[dict[str, Any]]:
def fetch_messages_by_journal(remote, journal_ids):
id_list = sql_id_list(journal_ids)
if not id_list:
return []
return remote.mysql_json_lines(message_sql(f"jm.journal_id IN ({id_list})"))
def fetch_contact_documents(remote: RemoteRedmine, ids: Iterable[Any]) -> list[dict[str, Any]]:
def fetch_contact_documents(remote, ids):
id_list = sql_id_list(ids)
if not id_list:
return []
@@ -375,7 +379,7 @@ ORDER BY c.id;
)
def ticket_sql(where_clause: str) -> str:
def ticket_sql(where_clause):
return f"""
SELECT HEX(CAST(JSON_OBJECT(
'doc_type', 'ticket',
@@ -423,7 +427,7 @@ ORDER BY ht.id;
"""
def message_sql(where_clause: str) -> str:
def message_sql(where_clause):
return f"""
SELECT HEX(CAST(JSON_OBJECT(
'doc_type', 'message',
@@ -474,7 +478,7 @@ ORDER BY jm.id;
"""
def with_event_context(document: dict[str, Any], event: dict[str, Any]) -> dict[str, Any]:
def with_event_context(document, event):
document["event_id"] = event.get("id")
document["event_type"] = event.get("event_type")
document["event_occurred_at"] = event.get("occurred_at")
@@ -482,7 +486,7 @@ def with_event_context(document: dict[str, Any], event: dict[str, Any]) -> dict[
return document
def append_jsonl(path: Path, documents: Iterable[dict[str, Any]]) -> None:
def append_jsonl(path, documents):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
for document in documents:
@@ -490,7 +494,7 @@ def append_jsonl(path: Path, documents: Iterable[dict[str, Any]]) -> None:
handle.write("\n")
def mark_processed(remote: RemoteRedmine, event_id: Any, worker_id: str) -> None:
def mark_processed(remote, event_id, worker_id):
remote.mysql(
f"""
UPDATE event_outbox_events
@@ -501,7 +505,7 @@ WHERE id = {sql_int(event_id)}
)
def mark_failed(remote: RemoteRedmine, event_id: Any, worker_id: str, exc: Exception) -> None:
def mark_failed(remote, event_id, worker_id, exc):
message = f"{exc.__class__.__name__}: {exc}"
remote.mysql(
f"""
@@ -516,7 +520,7 @@ WHERE id = {sql_int(event_id)}
)
def release_claims(remote: RemoteRedmine, worker_id: str) -> None:
def release_claims(remote, worker_id):
remote.mysql(
f"""
UPDATE event_outbox_events
@@ -527,7 +531,7 @@ WHERE processed_at IS NULL
)
def parse_payload(value: Any) -> dict[str, Any]:
def parse_payload(value):
if isinstance(value, dict):
return value
if not value:
@@ -539,7 +543,7 @@ def parse_payload(value: Any) -> dict[str, Any]:
return parsed if isinstance(parsed, dict) else {}
def sql_id_list(values: Iterable[Any]) -> str:
def sql_id_list(values):
ids = []
for value in values:
try:
@@ -551,22 +555,22 @@ def sql_id_list(values: Iterable[Any]) -> str:
return ",".join(sorted(set(ids), key=int))
def sql_int(value: Any) -> int:
def sql_int(value):
try:
return max(0, int(value))
except (TypeError, ValueError):
return 0
def sql_string(value: str) -> str:
def sql_string(value):
return "'" + str(value).replace("\\", "\\\\").replace("'", "\\'") + "'"
def shell_quote(value: str) -> str:
def shell_quote(value):
return "'" + value.replace("'", "'\"'\"'") + "'"
def make_worker_id() -> str:
def make_worker_id():
return f"{socket.gethostname()}:{os.getpid()}:{uuid.uuid4().hex[:12]}"