#!/usr/bin/env python3 """Small CLI for searching and lightly updating Redmine CRM contacts.""" from __future__ import annotations import argparse import json import os import re import sys import time import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from difflib import SequenceMatcher from pathlib import Path from typing import Any DEFAULT_BASE_URL = "http://192.168.50.170" DEFAULT_PROJECT = "customer-service" DEFAULT_CACHE_DIR = Path(".cache/redmine_contacts") SEARCH_FIELDS = ( "first_name", "middle_name", "last_name", "company", "job_title", "background", "website", "skype_name", "tag_list", ) UPDATE_FIELDS = { "first_name", "middle_name", "last_name", "company", "website", "skype_name", "birthday", "job_title", "background", "phone", "email", "tag_list", } class RedmineError(RuntimeError): pass @dataclass class RedmineClient: base_url: str api_key: str project: str def request( self, method: str, path: str, params: dict[str, Any] | None = None, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: url = self._url(path, params) body = None headers = { "Accept": "application/json", "X-Redmine-API-Key": self.api_key, } if payload is not None: body = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" request = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(request, timeout=30) as response: raw = response.read() except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RedmineError(f"HTTP {exc.code} from Redmine: {detail[:500]}") from exc except urllib.error.URLError as exc: raise RedmineError(f"Could not reach Redmine: {exc.reason}") from exc if not raw: return {} try: return json.loads(raw.decode("utf-8")) except json.JSONDecodeError as exc: raise RedmineError(f"Redmine returned non-JSON response from {url}") from exc def fetch_contacts(self, limit: int = 100) -> list[dict[str, Any]]: contacts: list[dict[str, Any]] = [] offset = 0 total = None while total is None or offset < total: data = self.request( "GET", f"/projects/{self.project}/contacts.json", {"limit": limit, "offset": offset}, ) page = data.get("contacts", []) contacts.extend(page) total = int(data.get("total_count", len(contacts))) received = len(page) if received == 0: break offset += received return contacts def update_contact(self, contact_id: int, fields: dict[str, Any]) -> None: self.request("PUT", f"/contacts/{contact_id}.json", payload={"contact": fields}) def helpdesk_ticket_by_issue(self, issue_id: int) -> dict[str, Any]: return self.request("GET", f"/helpdesk_search/issues/{issue_id}/ticket") def helpdesk_issues_by_contact(self, contact_id: int, limit: int) -> dict[str, Any]: return self.request("GET", f"/helpdesk_search/contacts/{contact_id}/issues", {"limit": limit}) def helpdesk_messages_by_issue(self, issue_id: int, limit: int) -> dict[str, Any]: return self.request("GET", f"/helpdesk_search/issues/{issue_id}/messages", {"limit": limit}) def helpdesk_timeline(self, contact_id: int, limit: int) -> dict[str, Any]: return self.request("GET", f"/helpdesk_search/contacts/{contact_id}/timeline", {"limit": limit}) def _url(self, path: str, params: dict[str, Any] | None = None) -> str: base = self.base_url.rstrip("/") url = f"{base}{path}" if params: url += "?" + urllib.parse.urlencode(params) return url def main() -> int: parser = argparse.ArgumentParser(description="Search and update Redmine CRM contacts.") parser.add_argument("--base-url", default=os.getenv("REDMINE_BASE_URL", DEFAULT_BASE_URL)) parser.add_argument("--project", default=os.getenv("REDMINE_PROJECT", DEFAULT_PROJECT)) parser.add_argument("--cache", type=Path, help="Override the cache file path.") subparsers = parser.add_subparsers(dest="command", required=True) fetch_parser = subparsers.add_parser("fetch", help="Fetch all visible contacts into the local cache.") fetch_parser.add_argument("--limit", type=int, default=100) search_parser = subparsers.add_parser("search", help="Search cached contacts.") search_parser.add_argument("query") search_parser.add_argument("--refresh", action="store_true", help="Refresh cache before searching.") search_parser.add_argument( "--offline-cache", action="store_true", help="Search only the local cache without requiring REDMINE_API_KEY.", ) search_parser.add_argument("--limit", type=int, default=20) search_parser.add_argument("--min-score", type=float, default=0.68) update_parser = subparsers.add_parser("update", help="Update safe fields on one contact.") update_parser.add_argument("contact_id", type=int) update_parser.add_argument( "--set", action="append", default=[], metavar="FIELD=VALUE", help="Set a contact field. Repeat for multiple fields.", ) update_parser.add_argument("--apply", action="store_true", help="Actually send the update.") ticket_parser = subparsers.add_parser( "helpdesk-ticket", help="Fetch helpdesk ticket metadata for an issue.", ) ticket_parser.add_argument("issue_id", type=int) contact_issues_parser = subparsers.add_parser( "helpdesk-issues", help="List helpdesk issues for a contact.", ) contact_issues_parser.add_argument("contact_id", type=int) contact_issues_parser.add_argument("--limit", type=int, default=100) issue_messages_parser = subparsers.add_parser( "helpdesk-messages", help="List helpdesk journal message metadata for an issue.", ) issue_messages_parser.add_argument("issue_id", type=int) issue_messages_parser.add_argument("--limit", type=int, default=100) timeline_parser = subparsers.add_parser( "helpdesk-timeline", help="Show a contact's helpdesk ticket/message timeline.", ) timeline_parser.add_argument("contact_id", type=int) timeline_parser.add_argument("--limit", type=int, default=100) args = parser.parse_args() try: if args.command == "fetch": client = client_from_args(args) contacts = client.fetch_contacts(limit=args.limit) cache_path = resolve_cache_path(args) write_cache(cache_path, args, contacts) print(f"Cached {len(contacts)} contacts in {cache_path}") return 0 if args.command == "search": contacts = load_or_refresh(args) matches = search_contacts(contacts, args.query, args.min_score) print_matches(matches[: args.limit]) return 0 if args.command == "update": fields = parse_update_fields(args.set) if not fields: raise RedmineError("No fields supplied. Use --set FIELD=VALUE.") print(f"Contact {args.contact_id} update payload:") print(json.dumps({"contact": fields}, indent=2, sort_keys=True)) if not args.apply: print("Dry run only. Re-run with --apply to write to Redmine.") return 0 client = client_from_args(args) client.update_contact(args.contact_id, fields) print("Update sent.") return 0 if args.command == "helpdesk-ticket": client = client_from_args(args) print_json(client.helpdesk_ticket_by_issue(args.issue_id)) return 0 if args.command == "helpdesk-issues": client = client_from_args(args) print_json(client.helpdesk_issues_by_contact(args.contact_id, args.limit)) return 0 if args.command == "helpdesk-messages": client = client_from_args(args) print_json(client.helpdesk_messages_by_issue(args.issue_id, args.limit)) return 0 if args.command == "helpdesk-timeline": client = client_from_args(args) print_json(client.helpdesk_timeline(args.contact_id, args.limit)) return 0 except RedmineError as exc: print(f"error: {exc}", file=sys.stderr) return 1 return 1 def client_from_args(args: argparse.Namespace) -> RedmineClient: api_key = os.getenv("REDMINE_API_KEY") if not api_key: raise RedmineError("Set REDMINE_API_KEY in the environment.") return RedmineClient(base_url=args.base_url, api_key=api_key, project=args.project) def load_or_refresh(args: argparse.Namespace) -> list[dict[str, Any]]: cache_path = resolve_cache_path(args) if args.refresh or not cache_path.exists(): client = client_from_args(args) contacts = client.fetch_contacts() write_cache(cache_path, args, contacts) return contacts if not args.offline_cache: require_api_key_for_cached_search() with cache_path.open("r", encoding="utf-8") as handle: data = json.load(handle) validate_cache_metadata(data, args, cache_path) return data["contacts"] def write_cache(cache_path: Path, args: argparse.Namespace, contacts: list[dict[str, Any]]) -> None: cache_path.parent.mkdir(parents=True, exist_ok=True) payload = { "base_url": canonical_base_url(args.base_url), "project": args.project, "cached_at": int(time.time()), "contacts": contacts, } with cache_path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2, sort_keys=True) handle.write("\n") def resolve_cache_path(args: argparse.Namespace) -> Path: if args.cache: return args.cache parsed = urllib.parse.urlparse(args.base_url) host = parsed.netloc or parsed.path host_slug = re.sub(r"[^A-Za-z0-9_.-]+", "_", host.strip("/")) project_slug = re.sub(r"[^A-Za-z0-9_.-]+", "_", args.project) return DEFAULT_CACHE_DIR / f"{host_slug}_{project_slug}.json" def canonical_base_url(base_url: str) -> str: return base_url.rstrip("/") def require_api_key_for_cached_search() -> None: if not os.getenv("REDMINE_API_KEY"): raise RedmineError( "Set REDMINE_API_KEY, or pass --offline-cache to search only a local cache explicitly." ) def validate_cache_metadata(data: dict[str, Any], args: argparse.Namespace, cache_path: Path) -> None: expected_base_url = canonical_base_url(args.base_url) cached_base_url = data.get("base_url") cached_project = data.get("project") if cached_base_url is None or cached_project is None: raise RedmineError( f"{cache_path} is a legacy cache without base-url/project metadata. " "Refresh it before searching." ) if cached_base_url != expected_base_url or cached_project != args.project: raise RedmineError( f"{cache_path} was created for {cached_base_url} project {cached_project}, " f"not {expected_base_url} project {args.project}." ) def parse_update_fields(assignments: list[str]) -> dict[str, str]: fields: dict[str, str] = {} for assignment in assignments: if "=" not in assignment: raise RedmineError(f"Invalid --set value: {assignment!r}") field, value = assignment.split("=", 1) field = field.strip() if field not in UPDATE_FIELDS: allowed = ", ".join(sorted(UPDATE_FIELDS)) raise RedmineError(f"Unsupported update field {field!r}. Allowed fields: {allowed}") fields[field] = value return fields def search_contacts( contacts: list[dict[str, Any]], query: str, min_score: float, ) -> list[tuple[float, dict[str, Any], str]]: normalized_query = normalize(query) query_digits = digits_only(query) scored: list[tuple[float, dict[str, Any], str]] = [] for contact in contacts: haystacks = contact_haystacks(contact) score, reason = score_contact(normalized_query, query_digits, haystacks) if score >= min_score: scored.append((score, contact, reason)) return sorted(scored, key=lambda item: (-item[0], display_name(item[1]).lower(), item[1].get("id", 0))) def score_contact( query: str, query_digits: str, haystacks: list[tuple[str, str]], ) -> tuple[float, str]: best_score = 0.0 best_reason = "" for label, value in haystacks: normalized_value = normalize(value) if not normalized_value: continue if query and query in normalized_value: score = 1.0 else: score = SequenceMatcher(None, query, normalized_value).ratio() for token in normalized_value.split(): score = max(score, SequenceMatcher(None, query, token).ratio() * 0.92) if score > best_score: best_score = score best_reason = label if query_digits: for label, value in haystacks: value_digits = digits_only(value) if value_digits and query_digits in value_digits: return 1.0, label if value_digits: score = SequenceMatcher(None, query_digits, value_digits).ratio() * 0.95 if score > best_score: best_score = score best_reason = label return best_score, best_reason def contact_haystacks(contact: dict[str, Any]) -> list[tuple[str, str]]: haystacks: list[tuple[str, str]] = [("name", display_name(contact))] for field in SEARCH_FIELDS: value = contact.get(field) if value: haystacks.append((field, flatten(value))) for email in contact.get("emails", []) or []: haystacks.append(("email", flatten(email))) for phone in contact.get("phones", []) or []: haystacks.append(("phone", flatten(phone))) address = contact.get("address") or {} for key, value in address.items(): if value: haystacks.append((f"address.{key}", flatten(value))) custom_fields = contact.get("custom_fields") or [] for field in custom_fields: name = field.get("name", "custom_field") value = field.get("value") if value: haystacks.append((f"custom.{name}", flatten(value))) return haystacks def display_name(contact: dict[str, Any]) -> str: if contact.get("is_company"): return str(contact.get("first_name") or contact.get("company") or "").strip() parts = [contact.get("first_name"), contact.get("middle_name"), contact.get("last_name")] name = " ".join(str(part).strip() for part in parts if part) return name or str(contact.get("company") or "").strip() def flatten(value: Any) -> str: if isinstance(value, dict): return " ".join(flatten(item) for item in value.values()) if isinstance(value, list): return " ".join(flatten(item) for item in value) return str(value) def normalize(value: str) -> str: value = value.lower() value = re.sub(r"[^a-z0-9@.+]+", " ", value) return re.sub(r"\s+", " ", value).strip() def digits_only(value: str) -> str: return re.sub(r"\D+", "", value) def print_matches(matches: list[tuple[float, dict[str, Any], str]]) -> None: if not matches: print("No contacts matched.") return for score, contact, reason in matches: emails = ", ".join(flatten(item) for item in contact.get("emails", []) or []) phones = ", ".join(flatten(item) for item in contact.get("phones", []) or []) company = contact.get("company") or "" line = f"{contact.get('id')}: {display_name(contact)}" if company and company != display_name(contact): line += f" | {company}" if emails: line += f" | {emails}" if phones: line += f" | {phones}" line += f" | score={score:.2f} via {reason}" print(line) def print_json(data: dict[str, Any]) -> None: # Helpdesk commands intentionally print raw JSON so later scripts/indexers # can pipe the metadata without parsing a human-oriented table format. print(json.dumps(data, indent=2, sort_keys=True)) if __name__ == "__main__": raise SystemExit(main())