Files
redmine/redmine_contacts.py
2026-04-24 22:01:18 +00:00

466 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""Small CLI for searching and lightly updating Redmine CRM contacts."""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any
DEFAULT_BASE_URL = "http://192.168.50.170"
DEFAULT_PROJECT = "customer-service"
DEFAULT_CACHE_DIR = Path(".cache/redmine_contacts")
SEARCH_FIELDS = (
"first_name",
"middle_name",
"last_name",
"company",
"job_title",
"background",
"website",
"skype_name",
"tag_list",
)
UPDATE_FIELDS = {
"first_name",
"middle_name",
"last_name",
"company",
"website",
"skype_name",
"birthday",
"job_title",
"background",
"phone",
"email",
"tag_list",
}
class RedmineError(RuntimeError):
pass
@dataclass
class RedmineClient:
base_url: str
api_key: str
project: str
def request(
self,
method: str,
path: str,
params: dict[str, Any] | None = None,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
url = self._url(path, params)
body = None
headers = {
"Accept": "application/json",
"X-Redmine-API-Key": self.api_key,
}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(request, timeout=30) as response:
raw = response.read()
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RedmineError(f"HTTP {exc.code} from Redmine: {detail[:500]}") from exc
except urllib.error.URLError as exc:
raise RedmineError(f"Could not reach Redmine: {exc.reason}") from exc
if not raw:
return {}
try:
return json.loads(raw.decode("utf-8"))
except json.JSONDecodeError as exc:
raise RedmineError(f"Redmine returned non-JSON response from {url}") from exc
def fetch_contacts(self, limit: int = 100) -> list[dict[str, Any]]:
contacts: list[dict[str, Any]] = []
offset = 0
total = None
while total is None or offset < total:
data = self.request(
"GET",
f"/projects/{self.project}/contacts.json",
{"limit": limit, "offset": offset},
)
page = data.get("contacts", [])
contacts.extend(page)
total = int(data.get("total_count", len(contacts)))
received = len(page)
if received == 0:
break
offset += received
return contacts
def update_contact(self, contact_id: int, fields: dict[str, Any]) -> None:
self.request("PUT", f"/contacts/{contact_id}.json", payload={"contact": fields})
def helpdesk_ticket_by_issue(self, issue_id: int) -> dict[str, Any]:
return self.request("GET", f"/helpdesk_search/issues/{issue_id}/ticket")
def helpdesk_issues_by_contact(self, contact_id: int, limit: int) -> dict[str, Any]:
return self.request("GET", f"/helpdesk_search/contacts/{contact_id}/issues", {"limit": limit})
def helpdesk_messages_by_issue(self, issue_id: int, limit: int) -> dict[str, Any]:
return self.request("GET", f"/helpdesk_search/issues/{issue_id}/messages", {"limit": limit})
def helpdesk_timeline(self, contact_id: int, limit: int) -> dict[str, Any]:
return self.request("GET", f"/helpdesk_search/contacts/{contact_id}/timeline", {"limit": limit})
def _url(self, path: str, params: dict[str, Any] | None = None) -> str:
base = self.base_url.rstrip("/")
url = f"{base}{path}"
if params:
url += "?" + urllib.parse.urlencode(params)
return url
def main() -> int:
parser = argparse.ArgumentParser(description="Search and update Redmine CRM contacts.")
parser.add_argument("--base-url", default=os.getenv("REDMINE_BASE_URL", DEFAULT_BASE_URL))
parser.add_argument("--project", default=os.getenv("REDMINE_PROJECT", DEFAULT_PROJECT))
parser.add_argument("--cache", type=Path, help="Override the cache file path.")
subparsers = parser.add_subparsers(dest="command", required=True)
fetch_parser = subparsers.add_parser("fetch", help="Fetch all visible contacts into the local cache.")
fetch_parser.add_argument("--limit", type=int, default=100)
search_parser = subparsers.add_parser("search", help="Search cached contacts.")
search_parser.add_argument("query")
search_parser.add_argument("--refresh", action="store_true", help="Refresh cache before searching.")
search_parser.add_argument(
"--offline-cache",
action="store_true",
help="Search only the local cache without requiring REDMINE_API_KEY.",
)
search_parser.add_argument("--limit", type=int, default=20)
search_parser.add_argument("--min-score", type=float, default=0.68)
update_parser = subparsers.add_parser("update", help="Update safe fields on one contact.")
update_parser.add_argument("contact_id", type=int)
update_parser.add_argument(
"--set",
action="append",
default=[],
metavar="FIELD=VALUE",
help="Set a contact field. Repeat for multiple fields.",
)
update_parser.add_argument("--apply", action="store_true", help="Actually send the update.")
ticket_parser = subparsers.add_parser(
"helpdesk-ticket",
help="Fetch helpdesk ticket metadata for an issue.",
)
ticket_parser.add_argument("issue_id", type=int)
contact_issues_parser = subparsers.add_parser(
"helpdesk-issues",
help="List helpdesk issues for a contact.",
)
contact_issues_parser.add_argument("contact_id", type=int)
contact_issues_parser.add_argument("--limit", type=int, default=100)
issue_messages_parser = subparsers.add_parser(
"helpdesk-messages",
help="List helpdesk journal message metadata for an issue.",
)
issue_messages_parser.add_argument("issue_id", type=int)
issue_messages_parser.add_argument("--limit", type=int, default=100)
timeline_parser = subparsers.add_parser(
"helpdesk-timeline",
help="Show a contact's helpdesk ticket/message timeline.",
)
timeline_parser.add_argument("contact_id", type=int)
timeline_parser.add_argument("--limit", type=int, default=100)
args = parser.parse_args()
try:
if args.command == "fetch":
client = client_from_args(args)
contacts = client.fetch_contacts(limit=args.limit)
cache_path = resolve_cache_path(args)
write_cache(cache_path, args, contacts)
print(f"Cached {len(contacts)} contacts in {cache_path}")
return 0
if args.command == "search":
contacts = load_or_refresh(args)
matches = search_contacts(contacts, args.query, args.min_score)
print_matches(matches[: args.limit])
return 0
if args.command == "update":
fields = parse_update_fields(args.set)
if not fields:
raise RedmineError("No fields supplied. Use --set FIELD=VALUE.")
print(f"Contact {args.contact_id} update payload:")
print(json.dumps({"contact": fields}, indent=2, sort_keys=True))
if not args.apply:
print("Dry run only. Re-run with --apply to write to Redmine.")
return 0
client = client_from_args(args)
client.update_contact(args.contact_id, fields)
print("Update sent.")
return 0
if args.command == "helpdesk-ticket":
client = client_from_args(args)
print_json(client.helpdesk_ticket_by_issue(args.issue_id))
return 0
if args.command == "helpdesk-issues":
client = client_from_args(args)
print_json(client.helpdesk_issues_by_contact(args.contact_id, args.limit))
return 0
if args.command == "helpdesk-messages":
client = client_from_args(args)
print_json(client.helpdesk_messages_by_issue(args.issue_id, args.limit))
return 0
if args.command == "helpdesk-timeline":
client = client_from_args(args)
print_json(client.helpdesk_timeline(args.contact_id, args.limit))
return 0
except RedmineError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
return 1
def client_from_args(args: argparse.Namespace) -> RedmineClient:
api_key = os.getenv("REDMINE_API_KEY")
if not api_key:
raise RedmineError("Set REDMINE_API_KEY in the environment.")
return RedmineClient(base_url=args.base_url, api_key=api_key, project=args.project)
def load_or_refresh(args: argparse.Namespace) -> list[dict[str, Any]]:
cache_path = resolve_cache_path(args)
if args.refresh or not cache_path.exists():
client = client_from_args(args)
contacts = client.fetch_contacts()
write_cache(cache_path, args, contacts)
return contacts
if not args.offline_cache:
require_api_key_for_cached_search()
with cache_path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
validate_cache_metadata(data, args, cache_path)
return data["contacts"]
def write_cache(cache_path: Path, args: argparse.Namespace, contacts: list[dict[str, Any]]) -> None:
cache_path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"base_url": canonical_base_url(args.base_url),
"project": args.project,
"cached_at": int(time.time()),
"contacts": contacts,
}
with cache_path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\n")
def resolve_cache_path(args: argparse.Namespace) -> Path:
if args.cache:
return args.cache
parsed = urllib.parse.urlparse(args.base_url)
host = parsed.netloc or parsed.path
host_slug = re.sub(r"[^A-Za-z0-9_.-]+", "_", host.strip("/"))
project_slug = re.sub(r"[^A-Za-z0-9_.-]+", "_", args.project)
return DEFAULT_CACHE_DIR / f"{host_slug}_{project_slug}.json"
def canonical_base_url(base_url: str) -> str:
return base_url.rstrip("/")
def require_api_key_for_cached_search() -> None:
if not os.getenv("REDMINE_API_KEY"):
raise RedmineError(
"Set REDMINE_API_KEY, or pass --offline-cache to search only a local cache explicitly."
)
def validate_cache_metadata(data: dict[str, Any], args: argparse.Namespace, cache_path: Path) -> None:
expected_base_url = canonical_base_url(args.base_url)
cached_base_url = data.get("base_url")
cached_project = data.get("project")
if cached_base_url is None or cached_project is None:
raise RedmineError(
f"{cache_path} is a legacy cache without base-url/project metadata. "
"Refresh it before searching."
)
if cached_base_url != expected_base_url or cached_project != args.project:
raise RedmineError(
f"{cache_path} was created for {cached_base_url} project {cached_project}, "
f"not {expected_base_url} project {args.project}."
)
def parse_update_fields(assignments: list[str]) -> dict[str, str]:
fields: dict[str, str] = {}
for assignment in assignments:
if "=" not in assignment:
raise RedmineError(f"Invalid --set value: {assignment!r}")
field, value = assignment.split("=", 1)
field = field.strip()
if field not in UPDATE_FIELDS:
allowed = ", ".join(sorted(UPDATE_FIELDS))
raise RedmineError(f"Unsupported update field {field!r}. Allowed fields: {allowed}")
fields[field] = value
return fields
def search_contacts(
contacts: list[dict[str, Any]],
query: str,
min_score: float,
) -> list[tuple[float, dict[str, Any], str]]:
normalized_query = normalize(query)
query_digits = digits_only(query)
scored: list[tuple[float, dict[str, Any], str]] = []
for contact in contacts:
haystacks = contact_haystacks(contact)
score, reason = score_contact(normalized_query, query_digits, haystacks)
if score >= min_score:
scored.append((score, contact, reason))
return sorted(scored, key=lambda item: (-item[0], display_name(item[1]).lower(), item[1].get("id", 0)))
def score_contact(
query: str,
query_digits: str,
haystacks: list[tuple[str, str]],
) -> tuple[float, str]:
best_score = 0.0
best_reason = ""
for label, value in haystacks:
normalized_value = normalize(value)
if not normalized_value:
continue
if query and query in normalized_value:
score = 1.0
else:
score = SequenceMatcher(None, query, normalized_value).ratio()
for token in normalized_value.split():
score = max(score, SequenceMatcher(None, query, token).ratio() * 0.92)
if score > best_score:
best_score = score
best_reason = label
if query_digits:
for label, value in haystacks:
value_digits = digits_only(value)
if value_digits and query_digits in value_digits:
return 1.0, label
if value_digits:
score = SequenceMatcher(None, query_digits, value_digits).ratio() * 0.95
if score > best_score:
best_score = score
best_reason = label
return best_score, best_reason
def contact_haystacks(contact: dict[str, Any]) -> list[tuple[str, str]]:
haystacks: list[tuple[str, str]] = [("name", display_name(contact))]
for field in SEARCH_FIELDS:
value = contact.get(field)
if value:
haystacks.append((field, flatten(value)))
for email in contact.get("emails", []) or []:
haystacks.append(("email", flatten(email)))
for phone in contact.get("phones", []) or []:
haystacks.append(("phone", flatten(phone)))
address = contact.get("address") or {}
for key, value in address.items():
if value:
haystacks.append((f"address.{key}", flatten(value)))
custom_fields = contact.get("custom_fields") or []
for field in custom_fields:
name = field.get("name", "custom_field")
value = field.get("value")
if value:
haystacks.append((f"custom.{name}", flatten(value)))
return haystacks
def display_name(contact: dict[str, Any]) -> str:
if contact.get("is_company"):
return str(contact.get("first_name") or contact.get("company") or "").strip()
parts = [contact.get("first_name"), contact.get("middle_name"), contact.get("last_name")]
name = " ".join(str(part).strip() for part in parts if part)
return name or str(contact.get("company") or "").strip()
def flatten(value: Any) -> str:
if isinstance(value, dict):
return " ".join(flatten(item) for item in value.values())
if isinstance(value, list):
return " ".join(flatten(item) for item in value)
return str(value)
def normalize(value: str) -> str:
value = value.lower()
value = re.sub(r"[^a-z0-9@.+]+", " ", value)
return re.sub(r"\s+", " ", value).strip()
def digits_only(value: str) -> str:
return re.sub(r"\D+", "", value)
def print_matches(matches: list[tuple[float, dict[str, Any], str]]) -> None:
if not matches:
print("No contacts matched.")
return
for score, contact, reason in matches:
emails = ", ".join(flatten(item) for item in contact.get("emails", []) or [])
phones = ", ".join(flatten(item) for item in contact.get("phones", []) or [])
company = contact.get("company") or ""
line = f"{contact.get('id')}: {display_name(contact)}"
if company and company != display_name(contact):
line += f" | {company}"
if emails:
line += f" | {emails}"
if phones:
line += f" | {phones}"
line += f" | score={score:.2f} via {reason}"
print(line)
def print_json(data: dict[str, Any]) -> None:
# Helpdesk commands intentionally print raw JSON so later scripts/indexers
# can pipe the metadata without parsing a human-oriented table format.
print(json.dumps(data, indent=2, sort_keys=True))
if __name__ == "__main__":
raise SystemExit(main())