472 lines
18 KiB
Python
Executable File
472 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Local CLI for searching RedmineUP helpdesk tickets and email messages.
|
|
|
|
This tool intentionally reads the helpdesk/contact tables directly through SSH
|
|
and MySQL. Helpdesk tickets often have Anonymous Redmine issue authors, while
|
|
the real customer identity lives in helpdesk_tickets, journal_messages, and
|
|
contacts.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from difflib import SequenceMatcher
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
|
|
DEFAULT_SSH_HOST = "reddev@192.168.50.170"
|
|
DEFAULT_SSH_KEY = Path("/tmp/reddev")
|
|
DEFAULT_REMOTE_REDMINE = "/usr/share/redmine"
|
|
DEFAULT_CACHE_DIR = Path(".cache/redmine_helpdesk")
|
|
DEFAULT_CACHE_FILE = DEFAULT_CACHE_DIR / "helpdesk_documents.jsonl"
|
|
|
|
|
|
class HelpdeskSearchError(RuntimeError):
|
|
pass
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RemoteRedmine:
|
|
ssh_host: str
|
|
ssh_key: Path
|
|
remote_redmine: str
|
|
|
|
def mysql_json_lines(self, sql: str) -> list[dict[str, Any]]:
|
|
"""Run a read-only SQL statement remotely and parse one JSON object per row."""
|
|
command = [
|
|
"ssh",
|
|
"-i",
|
|
str(self.ssh_key),
|
|
"-o",
|
|
"IdentitiesOnly=yes",
|
|
self.ssh_host,
|
|
self._mysql_runner_command(),
|
|
]
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
input=sql,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
except OSError as exc:
|
|
raise HelpdeskSearchError(f"Could not run ssh: {exc}") from exc
|
|
|
|
if result.returncode != 0:
|
|
raise HelpdeskSearchError(result.stderr.strip() or "Remote MySQL query failed.")
|
|
|
|
rows: list[dict[str, Any]] = []
|
|
for line in result.stdout.splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
rows.append(json.loads(bytes.fromhex(line.strip()).decode("utf-8")))
|
|
except json.JSONDecodeError as exc:
|
|
raise HelpdeskSearchError(f"Remote query returned non-JSON row: {line[:200]}") from exc
|
|
except ValueError as exc:
|
|
raise HelpdeskSearchError(f"Remote query returned non-hex row: {line[:200]}") from exc
|
|
return rows
|
|
|
|
def _mysql_runner_command(self) -> str:
|
|
# Ruby reads database.yml and execs mysql with MYSQL_PWD in the child
|
|
# environment. That avoids putting the DB password in ssh command args.
|
|
ruby = (
|
|
"require 'yaml'; "
|
|
"c = YAML.load_file('config/database.yml')['production']; "
|
|
"ENV['MYSQL_PWD'] = c['password'].to_s; "
|
|
"args = ['--batch', '--raw', '--quick', '--skip-column-names', "
|
|
"'--default-character-set=utf8', '-h', c['host'].to_s, "
|
|
"'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; "
|
|
"exec('mysql', *args)"
|
|
)
|
|
return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}"
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Fetch and search Redmine helpdesk communications.")
|
|
parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST))
|
|
parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY))))
|
|
parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE))
|
|
parser.add_argument("--cache", type=Path, default=DEFAULT_CACHE_FILE)
|
|
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
fetch_parser = subparsers.add_parser("fetch", help="Fetch helpdesk ticket/message docs into JSONL cache.")
|
|
fetch_parser.add_argument("--limit", type=int, help="Limit each document type for a quick test fetch.")
|
|
|
|
search_parser = subparsers.add_parser("search", help="Search cached helpdesk tickets/messages.")
|
|
search_parser.add_argument("query")
|
|
search_parser.add_argument("--type", choices=["all", "ticket", "message"], default="all")
|
|
search_parser.add_argument("--limit", type=int, default=20)
|
|
search_parser.add_argument("--min-score", type=float, default=0.35)
|
|
search_parser.add_argument("--refresh", action="store_true", help="Fetch before searching.")
|
|
|
|
timeline_parser = subparsers.add_parser("timeline", help="Show cached helpdesk timeline for a contact id.")
|
|
timeline_parser.add_argument("contact_id", type=int)
|
|
timeline_parser.add_argument("--limit", type=int, default=50)
|
|
timeline_parser.add_argument("--refresh", action="store_true", help="Fetch before showing timeline.")
|
|
|
|
issues_parser = subparsers.add_parser("issues-by-contact", help="List cached helpdesk issues for a contact id.")
|
|
issues_parser.add_argument("contact_id", type=int)
|
|
issues_parser.add_argument("--limit", type=int, default=50)
|
|
issues_parser.add_argument("--refresh", action="store_true", help="Fetch before listing issues.")
|
|
|
|
try:
|
|
if args := parser.parse_args():
|
|
remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine)
|
|
|
|
if args.command == "fetch":
|
|
documents = fetch_documents(remote, args.limit)
|
|
write_jsonl(args.cache, documents)
|
|
print(f"Cached {len(documents)} documents in {args.cache}")
|
|
return 0
|
|
|
|
if args.command == "search":
|
|
if args.refresh or not args.cache.exists():
|
|
documents = fetch_documents(remote, None)
|
|
write_jsonl(args.cache, documents)
|
|
documents = read_jsonl(args.cache)
|
|
matches = search_documents(documents, args.query, args.type, args.min_score)
|
|
print_matches(matches[: args.limit])
|
|
return 0
|
|
|
|
if args.command == "timeline":
|
|
documents = load_cached_or_refresh(args, remote)
|
|
print_timeline(documents, args.contact_id, args.limit)
|
|
return 0
|
|
|
|
if args.command == "issues-by-contact":
|
|
documents = load_cached_or_refresh(args, remote)
|
|
print_issues_by_contact(documents, args.contact_id, args.limit)
|
|
return 0
|
|
except HelpdeskSearchError as exc:
|
|
print(f"error: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
return 1
|
|
|
|
|
|
def fetch_documents(remote: RemoteRedmine, limit: int | None) -> list[dict[str, Any]]:
|
|
fetched_at = int(time.time())
|
|
documents: list[dict[str, Any]] = []
|
|
documents.extend(add_fetch_metadata(remote.mysql_json_lines(ticket_sql(limit)), fetched_at))
|
|
documents.extend(add_fetch_metadata(remote.mysql_json_lines(message_sql(limit)), fetched_at))
|
|
return documents
|
|
|
|
|
|
def add_fetch_metadata(documents: list[dict[str, Any]], fetched_at: int) -> list[dict[str, Any]]:
|
|
for document in documents:
|
|
document["fetched_at"] = fetched_at
|
|
document["text"] = clean_body_text(document.get("text"))
|
|
document["search_text"] = normalize(
|
|
" ".join(
|
|
flatten(document.get(key))
|
|
for key in (
|
|
"issue_id",
|
|
"issue_subject",
|
|
"contact_name",
|
|
"contact_company",
|
|
"contact_email",
|
|
"from_address",
|
|
"to_address",
|
|
"cc_address",
|
|
"message_id",
|
|
"text",
|
|
)
|
|
)
|
|
)
|
|
return documents
|
|
|
|
|
|
def ticket_sql(limit: int | None) -> str:
|
|
limit_clause = sql_limit(limit)
|
|
return f"""
|
|
SELECT HEX(CAST(JSON_OBJECT(
|
|
'doc_type', 'ticket',
|
|
'doc_id', CONCAT('ticket:', ht.id),
|
|
'helpdesk_ticket_id', ht.id,
|
|
'issue_id', i.id,
|
|
'project_id', i.project_id,
|
|
'project_identifier', p.identifier,
|
|
'contact_id', ht.contact_id,
|
|
'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
|
|
'contact_company', c.company,
|
|
'contact_email', c.email,
|
|
'from_address', ht.from_address,
|
|
'to_address', ht.to_address,
|
|
'cc_address', ht.cc_address,
|
|
'message_id', ht.message_id,
|
|
'source', ht.source,
|
|
'is_incoming', ht.is_incoming,
|
|
'issue_subject', i.subject,
|
|
'status', s.name,
|
|
'tracker', t.name,
|
|
'assigned_to', TRIM(CONCAT_WS(' ', au.firstname, au.lastname)),
|
|
'ticket_date', DATE_FORMAT(ht.ticket_date, '%Y-%m-%dT%H:%i:%sZ'),
|
|
'issue_updated_on', DATE_FORMAT(i.updated_on, '%Y-%m-%dT%H:%i:%sZ'),
|
|
'text', CONCAT_WS('\\n',
|
|
i.subject,
|
|
LEFT(i.description, 8000),
|
|
TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
|
|
c.company,
|
|
c.email,
|
|
ht.from_address,
|
|
ht.to_address,
|
|
ht.cc_address
|
|
)
|
|
) AS CHAR)) AS document
|
|
FROM helpdesk_tickets ht
|
|
JOIN issues i ON i.id = ht.issue_id
|
|
LEFT JOIN contacts c ON c.id = ht.contact_id
|
|
LEFT JOIN projects p ON p.id = i.project_id
|
|
LEFT JOIN issue_statuses s ON s.id = i.status_id
|
|
LEFT JOIN trackers t ON t.id = i.tracker_id
|
|
LEFT JOIN users au ON au.id = i.assigned_to_id
|
|
ORDER BY ht.ticket_date DESC, ht.id DESC
|
|
{limit_clause};
|
|
"""
|
|
|
|
|
|
def message_sql(limit: int | None) -> str:
|
|
limit_clause = sql_limit(limit)
|
|
return f"""
|
|
SELECT HEX(CAST(JSON_OBJECT(
|
|
'doc_type', 'message',
|
|
'doc_id', CONCAT('message:', jm.id),
|
|
'journal_message_id', jm.id,
|
|
'journal_id', j.id,
|
|
'issue_id', i.id,
|
|
'project_id', i.project_id,
|
|
'project_identifier', p.identifier,
|
|
'contact_id', jm.contact_id,
|
|
'contact_name', TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
|
|
'contact_company', c.company,
|
|
'contact_email', c.email,
|
|
'from_address', jm.from_address,
|
|
'to_address', jm.to_address,
|
|
'cc_address', jm.cc_address,
|
|
'has_bcc_address', IF(jm.bcc_address IS NULL OR jm.bcc_address = '', false, true),
|
|
'message_id', jm.message_id,
|
|
'source', jm.source,
|
|
'is_incoming', jm.is_incoming,
|
|
'issue_subject', i.subject,
|
|
'status', s.name,
|
|
'tracker', t.name,
|
|
'journal_user', TRIM(CONCAT_WS(' ', ju.firstname, ju.lastname)),
|
|
'message_date', DATE_FORMAT(jm.message_date, '%Y-%m-%dT%H:%i:%sZ'),
|
|
'journal_created_on', DATE_FORMAT(j.created_on, '%Y-%m-%dT%H:%i:%sZ'),
|
|
'text', CONCAT_WS('\\n',
|
|
i.subject,
|
|
LEFT(j.notes, 8000),
|
|
TRIM(CONCAT_WS(' ', c.first_name, c.middle_name, c.last_name)),
|
|
c.company,
|
|
c.email,
|
|
jm.from_address,
|
|
jm.to_address,
|
|
jm.cc_address
|
|
)
|
|
) AS CHAR)) AS document
|
|
FROM journal_messages jm
|
|
JOIN journals j ON j.id = jm.journal_id
|
|
JOIN issues i ON i.id = j.journalized_id AND j.journalized_type = 'Issue'
|
|
LEFT JOIN contacts c ON c.id = jm.contact_id
|
|
LEFT JOIN projects p ON p.id = i.project_id
|
|
LEFT JOIN issue_statuses s ON s.id = i.status_id
|
|
LEFT JOIN trackers t ON t.id = i.tracker_id
|
|
LEFT JOIN users ju ON ju.id = j.user_id
|
|
ORDER BY jm.message_date DESC, jm.id DESC
|
|
{limit_clause};
|
|
"""
|
|
|
|
|
|
def sql_limit(limit: int | None) -> str:
|
|
if limit is None:
|
|
return ""
|
|
return f"LIMIT {max(1, int(limit))}"
|
|
|
|
|
|
def load_cached_or_refresh(args: argparse.Namespace, remote: RemoteRedmine) -> list[dict[str, Any]]:
|
|
if args.refresh or not args.cache.exists():
|
|
documents = fetch_documents(remote, None)
|
|
write_jsonl(args.cache, documents)
|
|
return documents
|
|
return read_jsonl(args.cache)
|
|
|
|
|
|
def write_jsonl(path: Path, documents: Iterable[dict[str, Any]]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
for document in documents:
|
|
handle.write(json.dumps(document, ensure_ascii=False, sort_keys=True))
|
|
handle.write("\n")
|
|
|
|
|
|
def read_jsonl(path: Path) -> list[dict[str, Any]]:
|
|
if not path.exists():
|
|
raise HelpdeskSearchError(f"Cache does not exist: {path}. Run fetch first.")
|
|
documents: list[dict[str, Any]] = []
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
if line.strip():
|
|
documents.append(json.loads(line))
|
|
return documents
|
|
|
|
|
|
def search_documents(
|
|
documents: list[dict[str, Any]],
|
|
query: str,
|
|
doc_type: str,
|
|
min_score: float,
|
|
) -> list[tuple[float, dict[str, Any], str]]:
|
|
normalized_query = normalize(query)
|
|
query_tokens = [token for token in normalized_query.split() if token]
|
|
scored: list[tuple[float, dict[str, Any], str]] = []
|
|
for document in documents:
|
|
if doc_type != "all" and document.get("doc_type") != doc_type:
|
|
continue
|
|
score, reason = score_document(document, normalized_query, query_tokens)
|
|
if score >= min_score:
|
|
scored.append((score, document, reason))
|
|
return sorted(scored, key=lambda item: (-item[0], sort_date(item[1]), item[1].get("doc_id", "")))
|
|
|
|
|
|
def score_document(document: dict[str, Any], query: str, query_tokens: list[str]) -> tuple[float, str]:
|
|
fields = weighted_fields(document)
|
|
best_score = 0.0
|
|
best_reason = ""
|
|
for field, value, weight in fields:
|
|
normalized_value = value if field == "text" and isinstance(value, str) else normalize(value)
|
|
if not normalized_value:
|
|
continue
|
|
score = 0.0
|
|
if query and query in normalized_value:
|
|
score = 1.0 * weight
|
|
elif query_tokens:
|
|
matched = sum(1 for token in query_tokens if token in normalized_value)
|
|
score = max(score, (matched / len(query_tokens)) * 0.85 * weight)
|
|
if field != "text" or len(normalized_value) < 500:
|
|
score = max(score, SequenceMatcher(None, query, normalized_value[:500]).ratio() * 0.65 * weight)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_reason = field
|
|
return best_score, best_reason
|
|
|
|
|
|
def weighted_fields(document: dict[str, Any]) -> list[tuple[str, str, float]]:
|
|
return [
|
|
("issue", f"{document.get('issue_id', '')} {document.get('issue_subject', '')}", 1.3),
|
|
("contact", " ".join(flatten(document.get(key)) for key in ("contact_name", "contact_company", "contact_email")), 1.2),
|
|
("addresses", " ".join(flatten(document.get(key)) for key in ("from_address", "to_address", "cc_address")), 1.1),
|
|
("message_id", flatten(document.get("message_id")), 1.0),
|
|
("text", flatten(document.get("search_text") or document.get("text")), 1.0),
|
|
]
|
|
|
|
|
|
def print_matches(matches: list[tuple[float, dict[str, Any], str]]) -> None:
|
|
if not matches:
|
|
print("No helpdesk documents matched.")
|
|
return
|
|
for score, document, reason in matches:
|
|
date = document.get("message_date") or document.get("ticket_date") or document.get("issue_updated_on") or ""
|
|
direction = "in" if document.get("is_incoming") else "out"
|
|
contact = display_contact(document)
|
|
print(
|
|
f"{document.get('doc_id')} issue=#{document.get('issue_id')} "
|
|
f"contact=#{document.get('contact_id')} {direction} {date} "
|
|
f"score={score:.2f} via {reason}"
|
|
)
|
|
print(f" {document.get('issue_subject') or ''}")
|
|
if contact:
|
|
print(f" {contact}")
|
|
snippet = make_snippet(document.get("text") or "")
|
|
if snippet:
|
|
print(f" {snippet}")
|
|
|
|
|
|
def print_timeline(documents: list[dict[str, Any]], contact_id: int, limit: int) -> None:
|
|
rows = [doc for doc in documents if int(doc.get("contact_id") or 0) == contact_id]
|
|
rows.sort(key=sort_date, reverse=True)
|
|
for document in rows[:limit]:
|
|
date = document.get("message_date") or document.get("ticket_date") or ""
|
|
direction = "in" if document.get("is_incoming") else "out"
|
|
print(f"{date} {document.get('doc_type')} {direction} issue=#{document.get('issue_id')} {document.get('issue_subject')}")
|
|
|
|
|
|
def print_issues_by_contact(documents: list[dict[str, Any]], contact_id: int, limit: int) -> None:
|
|
tickets = [doc for doc in documents if doc.get("doc_type") == "ticket" and int(doc.get("contact_id") or 0) == contact_id]
|
|
tickets.sort(key=sort_date, reverse=True)
|
|
seen: set[int] = set()
|
|
count = 0
|
|
for ticket in tickets:
|
|
issue_id = int(ticket.get("issue_id") or 0)
|
|
if issue_id in seen:
|
|
continue
|
|
seen.add(issue_id)
|
|
print(f"{ticket.get('ticket_date')} issue=#{issue_id} {ticket.get('status') or ''} {ticket.get('issue_subject')}")
|
|
count += 1
|
|
if count >= limit:
|
|
break
|
|
|
|
|
|
def sort_date(document: dict[str, Any]) -> str:
|
|
return str(document.get("message_date") or document.get("ticket_date") or document.get("issue_updated_on") or "")
|
|
|
|
|
|
def display_contact(document: dict[str, Any]) -> str:
|
|
return " | ".join(
|
|
item
|
|
for item in [
|
|
flatten(document.get("contact_name")),
|
|
flatten(document.get("contact_company")),
|
|
flatten(document.get("contact_email")),
|
|
]
|
|
if item
|
|
)
|
|
|
|
|
|
def make_snippet(value: str, length: int = 220) -> str:
|
|
value = re.sub(r"\s+", " ", value).strip()
|
|
if len(value) <= length:
|
|
return value
|
|
return value[: length - 3].rstrip() + "..."
|
|
|
|
|
|
def clean_body_text(value: Any) -> str:
|
|
text = flatten(value)
|
|
text = text.replace("\u200c", " ").replace("\u200d", " ").replace("\ufeff", " ")
|
|
return re.sub(r"\s+", " ", text).strip()
|
|
|
|
|
|
def normalize(value: Any) -> str:
|
|
value = flatten(value).lower()
|
|
value = re.sub(r"[^a-z0-9@.+#-]+", " ", value)
|
|
return re.sub(r"\s+", " ", value).strip()
|
|
|
|
|
|
def flatten(value: Any) -> str:
|
|
if value is None:
|
|
return ""
|
|
if isinstance(value, list):
|
|
return " ".join(flatten(item) for item in value)
|
|
if isinstance(value, dict):
|
|
return " ".join(flatten(item) for item in value.values())
|
|
return str(value)
|
|
|
|
|
|
def shell_quote(value: str) -> str:
|
|
return "'" + value.replace("'", "'\"'\"'") + "'"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|