#!/usr/bin/env python3 """Local CLI for searching RedmineUP helpdesk tickets and email messages. This tool intentionally reads the helpdesk/contact tables directly through SSH and MySQL. Helpdesk tickets often have Anonymous Redmine issue authors, while the real customer identity lives in helpdesk_tickets, journal_messages, and contacts. """ from __future__ import annotations import argparse import json import os import re import subprocess import sys import time from dataclasses import dataclass from difflib import SequenceMatcher from pathlib import Path from typing import Any, Iterable DEFAULT_SSH_HOST = "reddev@192.168.50.170" DEFAULT_SSH_KEY = Path("/tmp/reddev") DEFAULT_REMOTE_REDMINE = "/usr/share/redmine" DEFAULT_CACHE_DIR = Path(".cache/redmine_helpdesk") DEFAULT_CACHE_FILE = DEFAULT_CACHE_DIR / "helpdesk_documents.jsonl" class HelpdeskSearchError(RuntimeError): pass @dataclass(frozen=True) class RemoteRedmine: ssh_host: str ssh_key: Path remote_redmine: str def mysql_json_lines(self, sql: str) -> list[dict[str, Any]]: """Run a read-only SQL statement remotely and parse one JSON object per row.""" command = [ "ssh", "-i", str(self.ssh_key), "-o", "IdentitiesOnly=yes", self.ssh_host, self._mysql_runner_command(), ] try: result = subprocess.run( command, input=sql, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) except OSError as exc: raise HelpdeskSearchError(f"Could not run ssh: {exc}") from exc if result.returncode != 0: raise HelpdeskSearchError(result.stderr.strip() or "Remote MySQL query failed.") rows: list[dict[str, Any]] = [] for line in result.stdout.splitlines(): if not line.strip(): continue try: rows.append(json.loads(bytes.fromhex(line.strip()).decode("utf-8"))) except json.JSONDecodeError as exc: raise HelpdeskSearchError(f"Remote query returned non-JSON row: {line[:200]}") from exc except ValueError as exc: raise HelpdeskSearchError(f"Remote query returned non-hex row: {line[:200]}") from exc return rows def _mysql_runner_command(self) -> str: # Ruby reads database.yml and execs mysql with MYSQL_PWD in the child # environment. That avoids putting the DB password in ssh command args. ruby = ( "require 'yaml'; " "c = YAML.load_file('config/database.yml')['production']; " "ENV['MYSQL_PWD'] = c['password'].to_s; " "args = ['--batch', '--raw', '--quick', '--skip-column-names', " "'--default-character-set=utf8', '-h', c['host'].to_s, " "'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; " "exec('mysql', *args)" ) return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}" def main() -> int: parser = argparse.ArgumentParser(description="Fetch and search Redmine helpdesk communications.") parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST)) parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY)))) parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE)) parser.add_argument("--cache", type=Path, default=DEFAULT_CACHE_FILE) subparsers = parser.add_subparsers(dest="command", required=True) fetch_parser = subparsers.add_parser("fetch", help="Fetch helpdesk ticket/message docs into JSONL cache.") fetch_parser.add_argument("--limit", type=int, help="Limit each document type for a quick test fetch.") search_parser = subparsers.add_parser("search", help="Search cached helpdesk tickets/messages.") search_parser.add_argument("query") search_parser.add_argument("--type", choices=["all", "ticket", "message"], default="all") search_parser.add_argument("--limit", type=int, default=20) search_parser.add_argument("--min-score", type=float, default=0.35) search_parser.add_argument("--refresh", action="store_true", help="Fetch before searching.") timeline_parser = subparsers.add_parser("timeline", help="Show cached helpdesk timeline for a contact id.") timeline_parser.add_argument("contact_id", type=int) timeline_parser.add_argument("--limit", type=int, default=50) timeline_parser.add_argument("--refresh", action="store_true", help="Fetch before showing timeline.") issues_parser = subparsers.add_parser("issues-by-contact", help="List cached helpdesk issues for a contact id.") issues_parser.add_argument("contact_id", type=int) issues_parser.add_argument("--limit", type=int, default=50) issues_parser.add_argument("--refresh", action="store_true", help="Fetch before listing issues.") try: if args := parser.parse_args(): remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine) if args.command == "fetch": documents = fetch_documents(remote, args.limit) write_jsonl(args.cache, documents) print(f"Cached {len(documents)} documents in {args.cache}") return 0 if args.command == "search": if args.refresh or not args.cache.exists(): documents = fetch_documents(remote, None) write_jsonl(args.cache, documents) documents = read_jsonl(args.cache) matches = search_documents(documents, args.query, args.type, args.min_score) print_matches(matches[: args.limit]) return 0 if args.command == "timeline": documents = load_cached_or_refresh(args, remote) print_timeline(documents, args.contact_id, args.limit) return 0 if args.command == "issues-by-contact": documents = load_cached_or_refresh(args, remote) print_issues_by_contact(documents, args.contact_id, args.limit) return 0 except HelpdeskSearchError as exc: print(f"error: {exc}", file=sys.stderr) return 1 return 1 def fetch_documents(remote: RemoteRedmine, limit: int | None) -> list[dict[str, Any]]: fetched_at = int(time.time()) documents: list[dict[str, Any]] = [] documents.extend(add_fetch_metadata(remote.mysql_json_lines(ticket_sql(limit)), fetched_at)) documents.extend(add_fetch_metadata(remote.mysql_json_lines(message_sql(limit)), fetched_at)) return documents def add_fetch_metadata(documents: list[dict[str, Any]], fetched_at: int) -> list[dict[str, Any]]: for document in documents: document["fetched_at"] = fetched_at document["text"] = clean_body_text(document.get("text")) document["search_text"] = normalize( " ".join( flatten(document.get(key)) for key in ( "issue_id", "issue_subject", "contact_name", "contact_company", "contact_email", "from_address", "to_address", "cc_address", "message_id", "text", ) ) ) return documents def ticket_sql(limit: int | None) -> str: limit_clause = sql_limit(limit) return f""" SELECT HEX(CAST(JSON_OBJECT( 'doc_type', 'ticket', 'doc_id', CONCAT('ticket:', ht.id), 'helpdesk_ticket_id', ht.id, 'issue_id', i.id, 'project_id', i.project_id, 'project_identifier', p.identifier, 'contact_id', ht.contact_id, 'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), 'contact_company', c.company, 'contact_email', c.email, 'from_address', ht.from_address, 'to_address', ht.to_address, 'cc_address', ht.cc_address, 'message_id', ht.message_id, 'source', ht.source, 'is_incoming', ht.is_incoming, 'issue_subject', i.subject, 'status', s.name, 'tracker', t.name, 'assigned_to', TRIM(CONCAT_WS(' ', au.firstname, au.lastname)), 'ticket_date', DATE_FORMAT(ht.ticket_date, '%Y-%m-%dT%H:%i:%sZ'), 'issue_updated_on', DATE_FORMAT(i.updated_on, '%Y-%m-%dT%H:%i:%sZ'), 'text', CONCAT_WS('\\n', i.subject, LEFT(i.description, 8000), TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), c.company, c.email, ht.from_address, ht.to_address, ht.cc_address ) ) AS CHAR)) AS document FROM helpdesk_tickets ht JOIN issues i ON i.id = ht.issue_id LEFT JOIN contacts c ON c.id = ht.contact_id LEFT JOIN projects p ON p.id = i.project_id LEFT JOIN issue_statuses s ON s.id = i.status_id LEFT JOIN trackers t ON t.id = i.tracker_id LEFT JOIN users au ON au.id = i.assigned_to_id ORDER BY ht.ticket_date DESC, ht.id DESC {limit_clause}; """ def message_sql(limit: int | None) -> str: limit_clause = sql_limit(limit) return f""" SELECT HEX(CAST(JSON_OBJECT( 'doc_type', 'message', 'doc_id', CONCAT('message:', jm.id), 'journal_message_id', jm.id, 'journal_id', j.id, 'issue_id', i.id, 'project_id', i.project_id, 'project_identifier', p.identifier, 'contact_id', jm.contact_id, 'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), 'contact_company', c.company, 'contact_email', c.email, 'from_address', jm.from_address, 'to_address', jm.to_address, 'cc_address', jm.cc_address, 'has_bcc_address', IF(jm.bcc_address IS NULL OR jm.bcc_address = '', false, true), 'message_id', jm.message_id, 'source', jm.source, 'is_incoming', jm.is_incoming, 'issue_subject', i.subject, 'status', s.name, 'tracker', t.name, 'journal_user', TRIM(CONCAT_WS(' ', ju.firstname, ju.lastname)), 'message_date', DATE_FORMAT(jm.message_date, '%Y-%m-%dT%H:%i:%sZ'), 'journal_created_on', DATE_FORMAT(j.created_on, '%Y-%m-%dT%H:%i:%sZ'), 'text', CONCAT_WS('\\n', i.subject, LEFT(j.notes, 8000), TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)), c.company, c.email, jm.from_address, jm.to_address, jm.cc_address ) ) AS CHAR)) AS document FROM journal_messages jm JOIN journals j ON j.id = jm.journal_id JOIN issues i ON i.id = j.journalized_id AND j.journalized_type = 'Issue' LEFT JOIN contacts c ON c.id = jm.contact_id LEFT JOIN projects p ON p.id = i.project_id LEFT JOIN issue_statuses s ON s.id = i.status_id LEFT JOIN trackers t ON t.id = i.tracker_id LEFT JOIN users ju ON ju.id = j.user_id ORDER BY jm.message_date DESC, jm.id DESC {limit_clause}; """ def sql_limit(limit: int | None) -> str: if limit is None: return "" return f"LIMIT {max(1, int(limit))}" def load_cached_or_refresh(args: argparse.Namespace, remote: RemoteRedmine) -> list[dict[str, Any]]: if args.refresh or not args.cache.exists(): documents = fetch_documents(remote, None) write_jsonl(args.cache, documents) return documents return read_jsonl(args.cache) def write_jsonl(path: Path, documents: Iterable[dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: for document in documents: handle.write(json.dumps(document, ensure_ascii=False, sort_keys=True)) handle.write("\n") def read_jsonl(path: Path) -> list[dict[str, Any]]: if not path.exists(): raise HelpdeskSearchError(f"Cache does not exist: {path}. Run fetch first.") documents: list[dict[str, Any]] = [] with path.open("r", encoding="utf-8") as handle: for line in handle: if line.strip(): documents.append(json.loads(line)) return documents def search_documents( documents: list[dict[str, Any]], query: str, doc_type: str, min_score: float, ) -> list[tuple[float, dict[str, Any], str]]: normalized_query = normalize(query) query_tokens = [token for token in normalized_query.split() if token] scored: list[tuple[float, dict[str, Any], str]] = [] for document in documents: if doc_type != "all" and document.get("doc_type") != doc_type: continue score, reason = score_document(document, normalized_query, query_tokens) if score >= min_score: scored.append((score, document, reason)) return sorted(scored, key=lambda item: (-item[0], sort_date(item[1]), item[1].get("doc_id", ""))) def score_document(document: dict[str, Any], query: str, query_tokens: list[str]) -> tuple[float, str]: fields = weighted_fields(document) best_score = 0.0 best_reason = "" for field, value, weight in fields: normalized_value = value if field == "text" and isinstance(value, str) else normalize(value) if not normalized_value: continue score = 0.0 if query and query in normalized_value: score = 1.0 * weight elif query_tokens: matched = sum(1 for token in query_tokens if token in normalized_value) score = max(score, (matched / len(query_tokens)) * 0.85 * weight) if field != "text" or len(normalized_value) < 500: score = max(score, SequenceMatcher(None, query, normalized_value[:500]).ratio() * 0.65 * weight) if score > best_score: best_score = score best_reason = field return best_score, best_reason def weighted_fields(document: dict[str, Any]) -> list[tuple[str, str, float]]: return [ ("issue", f"{document.get('issue_id', '')} {document.get('issue_subject', '')}", 1.3), ("contact", " ".join(flatten(document.get(key)) for key in ("contact_name", "contact_company", "contact_email")), 1.2), ("addresses", " ".join(flatten(document.get(key)) for key in ("from_address", "to_address", "cc_address")), 1.1), ("message_id", flatten(document.get("message_id")), 1.0), ("text", flatten(document.get("search_text") or document.get("text")), 1.0), ] def print_matches(matches: list[tuple[float, dict[str, Any], str]]) -> None: if not matches: print("No helpdesk documents matched.") return for score, document, reason in matches: date = document.get("message_date") or document.get("ticket_date") or document.get("issue_updated_on") or "" direction = "in" if document.get("is_incoming") else "out" contact = display_contact(document) print( f"{document.get('doc_id')} issue=#{document.get('issue_id')} " f"contact=#{document.get('contact_id')} {direction} {date} " f"score={score:.2f} via {reason}" ) print(f" {document.get('issue_subject') or ''}") if contact: print(f" {contact}") snippet = make_snippet(document.get("text") or "") if snippet: print(f" {snippet}") def print_timeline(documents: list[dict[str, Any]], contact_id: int, limit: int) -> None: rows = [doc for doc in documents if int(doc.get("contact_id") or 0) == contact_id] rows.sort(key=sort_date, reverse=True) for document in rows[:limit]: date = document.get("message_date") or document.get("ticket_date") or "" direction = "in" if document.get("is_incoming") else "out" print(f"{date} {document.get('doc_type')} {direction} issue=#{document.get('issue_id')} {document.get('issue_subject')}") def print_issues_by_contact(documents: list[dict[str, Any]], contact_id: int, limit: int) -> None: tickets = [doc for doc in documents if doc.get("doc_type") == "ticket" and int(doc.get("contact_id") or 0) == contact_id] tickets.sort(key=sort_date, reverse=True) seen: set[int] = set() count = 0 for ticket in tickets: issue_id = int(ticket.get("issue_id") or 0) if issue_id in seen: continue seen.add(issue_id) print(f"{ticket.get('ticket_date')} issue=#{issue_id} {ticket.get('status') or ''} {ticket.get('issue_subject')}") count += 1 if count >= limit: break def sort_date(document: dict[str, Any]) -> str: return str(document.get("message_date") or document.get("ticket_date") or document.get("issue_updated_on") or "") def display_contact(document: dict[str, Any]) -> str: return " | ".join( item for item in [ flatten(document.get("contact_name")), flatten(document.get("contact_company")), flatten(document.get("contact_email")), ] if item ) def make_snippet(value: str, length: int = 220) -> str: value = re.sub(r"\s+", " ", value).strip() if len(value) <= length: return value return value[: length - 3].rstrip() + "..." def clean_body_text(value: Any) -> str: text = flatten(value) text = text.replace("\u200c", " ").replace("\u200d", " ").replace("\ufeff", " ") return re.sub(r"\s+", " ", text).strip() def normalize(value: Any) -> str: value = flatten(value).lower() value = re.sub(r"[^a-z0-9@.+#-]+", " ", value) return re.sub(r"\s+", " ", value).strip() def flatten(value: Any) -> str: if value is None: return "" if isinstance(value, list): return " ".join(flatten(item) for item in value) if isinstance(value, dict): return " ".join(flatten(item) for item in value.values()) return str(value) def shell_quote(value: str) -> str: return "'" + value.replace("'", "'\"'\"'") + "'" if __name__ == "__main__": raise SystemExit(main())