Initial Redmine tooling and local plugin forks
This commit is contained in:
Executable
+465
@@ -0,0 +1,465 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Small CLI for searching and lightly updating Redmine CRM contacts."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from dataclasses import dataclass
|
||||
from difflib import SequenceMatcher
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
DEFAULT_BASE_URL = "http://192.168.50.170"
|
||||
DEFAULT_PROJECT = "customer-service"
|
||||
DEFAULT_CACHE_DIR = Path(".cache/redmine_contacts")
|
||||
SEARCH_FIELDS = (
|
||||
"first_name",
|
||||
"middle_name",
|
||||
"last_name",
|
||||
"company",
|
||||
"job_title",
|
||||
"background",
|
||||
"website",
|
||||
"skype_name",
|
||||
"tag_list",
|
||||
)
|
||||
UPDATE_FIELDS = {
|
||||
"first_name",
|
||||
"middle_name",
|
||||
"last_name",
|
||||
"company",
|
||||
"website",
|
||||
"skype_name",
|
||||
"birthday",
|
||||
"job_title",
|
||||
"background",
|
||||
"phone",
|
||||
"email",
|
||||
"tag_list",
|
||||
}
|
||||
|
||||
|
||||
class RedmineError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class RedmineClient:
|
||||
base_url: str
|
||||
api_key: str
|
||||
project: str
|
||||
|
||||
def request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
params: dict[str, Any] | None = None,
|
||||
payload: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
url = self._url(path, params)
|
||||
body = None
|
||||
headers = {
|
||||
"Accept": "application/json",
|
||||
"X-Redmine-API-Key": self.api_key,
|
||||
}
|
||||
if payload is not None:
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
request = urllib.request.Request(url, data=body, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=30) as response:
|
||||
raw = response.read()
|
||||
except urllib.error.HTTPError as exc:
|
||||
detail = exc.read().decode("utf-8", errors="replace")
|
||||
raise RedmineError(f"HTTP {exc.code} from Redmine: {detail[:500]}") from exc
|
||||
except urllib.error.URLError as exc:
|
||||
raise RedmineError(f"Could not reach Redmine: {exc.reason}") from exc
|
||||
|
||||
if not raw:
|
||||
return {}
|
||||
try:
|
||||
return json.loads(raw.decode("utf-8"))
|
||||
except json.JSONDecodeError as exc:
|
||||
raise RedmineError(f"Redmine returned non-JSON response from {url}") from exc
|
||||
|
||||
def fetch_contacts(self, limit: int = 100) -> list[dict[str, Any]]:
|
||||
contacts: list[dict[str, Any]] = []
|
||||
offset = 0
|
||||
total = None
|
||||
while total is None or offset < total:
|
||||
data = self.request(
|
||||
"GET",
|
||||
f"/projects/{self.project}/contacts.json",
|
||||
{"limit": limit, "offset": offset},
|
||||
)
|
||||
page = data.get("contacts", [])
|
||||
contacts.extend(page)
|
||||
total = int(data.get("total_count", len(contacts)))
|
||||
received = len(page)
|
||||
if received == 0:
|
||||
break
|
||||
offset += received
|
||||
return contacts
|
||||
|
||||
def update_contact(self, contact_id: int, fields: dict[str, Any]) -> None:
|
||||
self.request("PUT", f"/contacts/{contact_id}.json", payload={"contact": fields})
|
||||
|
||||
def helpdesk_ticket_by_issue(self, issue_id: int) -> dict[str, Any]:
|
||||
return self.request("GET", f"/helpdesk_search/issues/{issue_id}/ticket")
|
||||
|
||||
def helpdesk_issues_by_contact(self, contact_id: int, limit: int) -> dict[str, Any]:
|
||||
return self.request("GET", f"/helpdesk_search/contacts/{contact_id}/issues", {"limit": limit})
|
||||
|
||||
def helpdesk_messages_by_issue(self, issue_id: int, limit: int) -> dict[str, Any]:
|
||||
return self.request("GET", f"/helpdesk_search/issues/{issue_id}/messages", {"limit": limit})
|
||||
|
||||
def helpdesk_timeline(self, contact_id: int, limit: int) -> dict[str, Any]:
|
||||
return self.request("GET", f"/helpdesk_search/contacts/{contact_id}/timeline", {"limit": limit})
|
||||
|
||||
def _url(self, path: str, params: dict[str, Any] | None = None) -> str:
|
||||
base = self.base_url.rstrip("/")
|
||||
url = f"{base}{path}"
|
||||
if params:
|
||||
url += "?" + urllib.parse.urlencode(params)
|
||||
return url
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Search and update Redmine CRM contacts.")
|
||||
parser.add_argument("--base-url", default=os.getenv("REDMINE_BASE_URL", DEFAULT_BASE_URL))
|
||||
parser.add_argument("--project", default=os.getenv("REDMINE_PROJECT", DEFAULT_PROJECT))
|
||||
parser.add_argument("--cache", type=Path, help="Override the cache file path.")
|
||||
|
||||
subparsers = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
fetch_parser = subparsers.add_parser("fetch", help="Fetch all visible contacts into the local cache.")
|
||||
fetch_parser.add_argument("--limit", type=int, default=100)
|
||||
|
||||
search_parser = subparsers.add_parser("search", help="Search cached contacts.")
|
||||
search_parser.add_argument("query")
|
||||
search_parser.add_argument("--refresh", action="store_true", help="Refresh cache before searching.")
|
||||
search_parser.add_argument(
|
||||
"--offline-cache",
|
||||
action="store_true",
|
||||
help="Search only the local cache without requiring REDMINE_API_KEY.",
|
||||
)
|
||||
search_parser.add_argument("--limit", type=int, default=20)
|
||||
search_parser.add_argument("--min-score", type=float, default=0.68)
|
||||
|
||||
update_parser = subparsers.add_parser("update", help="Update safe fields on one contact.")
|
||||
update_parser.add_argument("contact_id", type=int)
|
||||
update_parser.add_argument(
|
||||
"--set",
|
||||
action="append",
|
||||
default=[],
|
||||
metavar="FIELD=VALUE",
|
||||
help="Set a contact field. Repeat for multiple fields.",
|
||||
)
|
||||
update_parser.add_argument("--apply", action="store_true", help="Actually send the update.")
|
||||
|
||||
ticket_parser = subparsers.add_parser(
|
||||
"helpdesk-ticket",
|
||||
help="Fetch helpdesk ticket metadata for an issue.",
|
||||
)
|
||||
ticket_parser.add_argument("issue_id", type=int)
|
||||
|
||||
contact_issues_parser = subparsers.add_parser(
|
||||
"helpdesk-issues",
|
||||
help="List helpdesk issues for a contact.",
|
||||
)
|
||||
contact_issues_parser.add_argument("contact_id", type=int)
|
||||
contact_issues_parser.add_argument("--limit", type=int, default=100)
|
||||
|
||||
issue_messages_parser = subparsers.add_parser(
|
||||
"helpdesk-messages",
|
||||
help="List helpdesk journal message metadata for an issue.",
|
||||
)
|
||||
issue_messages_parser.add_argument("issue_id", type=int)
|
||||
issue_messages_parser.add_argument("--limit", type=int, default=100)
|
||||
|
||||
timeline_parser = subparsers.add_parser(
|
||||
"helpdesk-timeline",
|
||||
help="Show a contact's helpdesk ticket/message timeline.",
|
||||
)
|
||||
timeline_parser.add_argument("contact_id", type=int)
|
||||
timeline_parser.add_argument("--limit", type=int, default=100)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
if args.command == "fetch":
|
||||
client = client_from_args(args)
|
||||
contacts = client.fetch_contacts(limit=args.limit)
|
||||
cache_path = resolve_cache_path(args)
|
||||
write_cache(cache_path, args, contacts)
|
||||
print(f"Cached {len(contacts)} contacts in {cache_path}")
|
||||
return 0
|
||||
|
||||
if args.command == "search":
|
||||
contacts = load_or_refresh(args)
|
||||
matches = search_contacts(contacts, args.query, args.min_score)
|
||||
print_matches(matches[: args.limit])
|
||||
return 0
|
||||
|
||||
if args.command == "update":
|
||||
fields = parse_update_fields(args.set)
|
||||
if not fields:
|
||||
raise RedmineError("No fields supplied. Use --set FIELD=VALUE.")
|
||||
print(f"Contact {args.contact_id} update payload:")
|
||||
print(json.dumps({"contact": fields}, indent=2, sort_keys=True))
|
||||
if not args.apply:
|
||||
print("Dry run only. Re-run with --apply to write to Redmine.")
|
||||
return 0
|
||||
client = client_from_args(args)
|
||||
client.update_contact(args.contact_id, fields)
|
||||
print("Update sent.")
|
||||
return 0
|
||||
|
||||
if args.command == "helpdesk-ticket":
|
||||
client = client_from_args(args)
|
||||
print_json(client.helpdesk_ticket_by_issue(args.issue_id))
|
||||
return 0
|
||||
|
||||
if args.command == "helpdesk-issues":
|
||||
client = client_from_args(args)
|
||||
print_json(client.helpdesk_issues_by_contact(args.contact_id, args.limit))
|
||||
return 0
|
||||
|
||||
if args.command == "helpdesk-messages":
|
||||
client = client_from_args(args)
|
||||
print_json(client.helpdesk_messages_by_issue(args.issue_id, args.limit))
|
||||
return 0
|
||||
|
||||
if args.command == "helpdesk-timeline":
|
||||
client = client_from_args(args)
|
||||
print_json(client.helpdesk_timeline(args.contact_id, args.limit))
|
||||
return 0
|
||||
except RedmineError as exc:
|
||||
print(f"error: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
def client_from_args(args: argparse.Namespace) -> RedmineClient:
|
||||
api_key = os.getenv("REDMINE_API_KEY")
|
||||
if not api_key:
|
||||
raise RedmineError("Set REDMINE_API_KEY in the environment.")
|
||||
return RedmineClient(base_url=args.base_url, api_key=api_key, project=args.project)
|
||||
|
||||
|
||||
def load_or_refresh(args: argparse.Namespace) -> list[dict[str, Any]]:
|
||||
cache_path = resolve_cache_path(args)
|
||||
if args.refresh or not cache_path.exists():
|
||||
client = client_from_args(args)
|
||||
contacts = client.fetch_contacts()
|
||||
write_cache(cache_path, args, contacts)
|
||||
return contacts
|
||||
if not args.offline_cache:
|
||||
require_api_key_for_cached_search()
|
||||
with cache_path.open("r", encoding="utf-8") as handle:
|
||||
data = json.load(handle)
|
||||
validate_cache_metadata(data, args, cache_path)
|
||||
return data["contacts"]
|
||||
|
||||
|
||||
def write_cache(cache_path: Path, args: argparse.Namespace, contacts: list[dict[str, Any]]) -> None:
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"base_url": canonical_base_url(args.base_url),
|
||||
"project": args.project,
|
||||
"cached_at": int(time.time()),
|
||||
"contacts": contacts,
|
||||
}
|
||||
with cache_path.open("w", encoding="utf-8") as handle:
|
||||
json.dump(payload, handle, indent=2, sort_keys=True)
|
||||
handle.write("\n")
|
||||
|
||||
|
||||
def resolve_cache_path(args: argparse.Namespace) -> Path:
|
||||
if args.cache:
|
||||
return args.cache
|
||||
parsed = urllib.parse.urlparse(args.base_url)
|
||||
host = parsed.netloc or parsed.path
|
||||
host_slug = re.sub(r"[^A-Za-z0-9_.-]+", "_", host.strip("/"))
|
||||
project_slug = re.sub(r"[^A-Za-z0-9_.-]+", "_", args.project)
|
||||
return DEFAULT_CACHE_DIR / f"{host_slug}_{project_slug}.json"
|
||||
|
||||
|
||||
def canonical_base_url(base_url: str) -> str:
|
||||
return base_url.rstrip("/")
|
||||
|
||||
|
||||
def require_api_key_for_cached_search() -> None:
|
||||
if not os.getenv("REDMINE_API_KEY"):
|
||||
raise RedmineError(
|
||||
"Set REDMINE_API_KEY, or pass --offline-cache to search only a local cache explicitly."
|
||||
)
|
||||
|
||||
|
||||
def validate_cache_metadata(data: dict[str, Any], args: argparse.Namespace, cache_path: Path) -> None:
|
||||
expected_base_url = canonical_base_url(args.base_url)
|
||||
cached_base_url = data.get("base_url")
|
||||
cached_project = data.get("project")
|
||||
if cached_base_url is None or cached_project is None:
|
||||
raise RedmineError(
|
||||
f"{cache_path} is a legacy cache without base-url/project metadata. "
|
||||
"Refresh it before searching."
|
||||
)
|
||||
if cached_base_url != expected_base_url or cached_project != args.project:
|
||||
raise RedmineError(
|
||||
f"{cache_path} was created for {cached_base_url} project {cached_project}, "
|
||||
f"not {expected_base_url} project {args.project}."
|
||||
)
|
||||
|
||||
|
||||
def parse_update_fields(assignments: list[str]) -> dict[str, str]:
|
||||
fields: dict[str, str] = {}
|
||||
for assignment in assignments:
|
||||
if "=" not in assignment:
|
||||
raise RedmineError(f"Invalid --set value: {assignment!r}")
|
||||
field, value = assignment.split("=", 1)
|
||||
field = field.strip()
|
||||
if field not in UPDATE_FIELDS:
|
||||
allowed = ", ".join(sorted(UPDATE_FIELDS))
|
||||
raise RedmineError(f"Unsupported update field {field!r}. Allowed fields: {allowed}")
|
||||
fields[field] = value
|
||||
return fields
|
||||
|
||||
|
||||
def search_contacts(
|
||||
contacts: list[dict[str, Any]],
|
||||
query: str,
|
||||
min_score: float,
|
||||
) -> list[tuple[float, dict[str, Any], str]]:
|
||||
normalized_query = normalize(query)
|
||||
query_digits = digits_only(query)
|
||||
scored: list[tuple[float, dict[str, Any], str]] = []
|
||||
for contact in contacts:
|
||||
haystacks = contact_haystacks(contact)
|
||||
score, reason = score_contact(normalized_query, query_digits, haystacks)
|
||||
if score >= min_score:
|
||||
scored.append((score, contact, reason))
|
||||
return sorted(scored, key=lambda item: (-item[0], display_name(item[1]).lower(), item[1].get("id", 0)))
|
||||
|
||||
|
||||
def score_contact(
|
||||
query: str,
|
||||
query_digits: str,
|
||||
haystacks: list[tuple[str, str]],
|
||||
) -> tuple[float, str]:
|
||||
best_score = 0.0
|
||||
best_reason = ""
|
||||
for label, value in haystacks:
|
||||
normalized_value = normalize(value)
|
||||
if not normalized_value:
|
||||
continue
|
||||
if query and query in normalized_value:
|
||||
score = 1.0
|
||||
else:
|
||||
score = SequenceMatcher(None, query, normalized_value).ratio()
|
||||
for token in normalized_value.split():
|
||||
score = max(score, SequenceMatcher(None, query, token).ratio() * 0.92)
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_reason = label
|
||||
|
||||
if query_digits:
|
||||
for label, value in haystacks:
|
||||
value_digits = digits_only(value)
|
||||
if value_digits and query_digits in value_digits:
|
||||
return 1.0, label
|
||||
if value_digits:
|
||||
score = SequenceMatcher(None, query_digits, value_digits).ratio() * 0.95
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_reason = label
|
||||
return best_score, best_reason
|
||||
|
||||
|
||||
def contact_haystacks(contact: dict[str, Any]) -> list[tuple[str, str]]:
|
||||
haystacks: list[tuple[str, str]] = [("name", display_name(contact))]
|
||||
for field in SEARCH_FIELDS:
|
||||
value = contact.get(field)
|
||||
if value:
|
||||
haystacks.append((field, flatten(value)))
|
||||
for email in contact.get("emails", []) or []:
|
||||
haystacks.append(("email", flatten(email)))
|
||||
for phone in contact.get("phones", []) or []:
|
||||
haystacks.append(("phone", flatten(phone)))
|
||||
address = contact.get("address") or {}
|
||||
for key, value in address.items():
|
||||
if value:
|
||||
haystacks.append((f"address.{key}", flatten(value)))
|
||||
custom_fields = contact.get("custom_fields") or []
|
||||
for field in custom_fields:
|
||||
name = field.get("name", "custom_field")
|
||||
value = field.get("value")
|
||||
if value:
|
||||
haystacks.append((f"custom.{name}", flatten(value)))
|
||||
return haystacks
|
||||
|
||||
|
||||
def display_name(contact: dict[str, Any]) -> str:
|
||||
if contact.get("is_company"):
|
||||
return str(contact.get("first_name") or contact.get("company") or "").strip()
|
||||
parts = [contact.get("first_name"), contact.get("middle_name"), contact.get("last_name")]
|
||||
name = " ".join(str(part).strip() for part in parts if part)
|
||||
return name or str(contact.get("company") or "").strip()
|
||||
|
||||
|
||||
def flatten(value: Any) -> str:
|
||||
if isinstance(value, dict):
|
||||
return " ".join(flatten(item) for item in value.values())
|
||||
if isinstance(value, list):
|
||||
return " ".join(flatten(item) for item in value)
|
||||
return str(value)
|
||||
|
||||
|
||||
def normalize(value: str) -> str:
|
||||
value = value.lower()
|
||||
value = re.sub(r"[^a-z0-9@.+]+", " ", value)
|
||||
return re.sub(r"\s+", " ", value).strip()
|
||||
|
||||
|
||||
def digits_only(value: str) -> str:
|
||||
return re.sub(r"\D+", "", value)
|
||||
|
||||
|
||||
def print_matches(matches: list[tuple[float, dict[str, Any], str]]) -> None:
|
||||
if not matches:
|
||||
print("No contacts matched.")
|
||||
return
|
||||
for score, contact, reason in matches:
|
||||
emails = ", ".join(flatten(item) for item in contact.get("emails", []) or [])
|
||||
phones = ", ".join(flatten(item) for item in contact.get("phones", []) or [])
|
||||
company = contact.get("company") or ""
|
||||
line = f"{contact.get('id')}: {display_name(contact)}"
|
||||
if company and company != display_name(contact):
|
||||
line += f" | {company}"
|
||||
if emails:
|
||||
line += f" | {emails}"
|
||||
if phones:
|
||||
line += f" | {phones}"
|
||||
line += f" | score={score:.2f} via {reason}"
|
||||
print(line)
|
||||
|
||||
|
||||
def print_json(data: dict[str, Any]) -> None:
|
||||
# Helpdesk commands intentionally print raw JSON so later scripts/indexers
|
||||
# can pipe the metadata without parsing a human-oriented table format.
|
||||
print(json.dumps(data, indent=2, sort_keys=True))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user