from __future__ import annotations import hashlib import json import urllib.parse import urllib.request from typing import Any, Dict, Iterable, List, Optional from .chunking import chunk_text from .models import IndexDocument, Payload Issue = Dict[str, Any] class RedmineMapper: def __init__(self, redmine_url: str, chunk_chars: int = 3500, project_identifier: Optional[str] = None) -> None: self.redmine_url = redmine_url.rstrip("/") self.chunk_chars = chunk_chars self.project_identifier = project_identifier def issue_to_documents(self, issue: Issue) -> List[IndexDocument]: docs: List[IndexDocument] = [] docs.extend(self._issue_documents(issue)) docs.extend(self._journal_documents(issue)) docs.extend(self._message_documents(issue)) docs.extend(self._contact_documents(issue)) return docs def _issue_documents(self, issue: Issue) -> List[IndexDocument]: issue_id = int(issue["id"]) subject = issue.get("subject") or "" description = issue.get("description") or "" contact = self._issue_contact(issue) contact_text = self._contact_text(contact) text = f"Issue #{issue_id}: {subject}\n\n{description}\n\n{contact_text}".strip() return self._documents_for_record( base_id=f"redmine:issue:{issue_id}", text=text, issue=issue, doc_type="issue", source_record_id=f"issue:{issue_id}", record=issue, ) def _journal_documents(self, issue: Issue) -> List[IndexDocument]: docs: List[IndexDocument] = [] issue_id = int(issue["id"]) for journal in issue.get("journals") or []: notes = journal.get("notes") or "" if not notes.strip(): continue docs.extend( self._documents_for_record( base_id=f"redmine:issue:{issue_id}:journal:{journal['id']}", text=notes, issue=issue, doc_type="journal", source_record_id=f"journal:{journal['id']}", record=journal, extra={ "journal_id": journal.get("id"), "visibility": "private" if journal.get("private_notes") else "public", "created_on": journal.get("created_on") or issue.get("updated_on"), }, ) ) return docs def _message_documents(self, issue: Issue) -> List[IndexDocument]: docs: List[IndexDocument] = [] issue_id = int(issue["id"]) for message in issue.get("messages") or issue.get("journal_messages") or []: body = message.get("body") or message.get("content") or message.get("message") or "" if not body.strip(): continue docs.extend( self._documents_for_record( base_id=f"redmine:issue:{issue_id}:message:{message['id']}", text=body, issue=issue, doc_type="message", source_record_id=f"message:{message['id']}", record=message, extra={ "message_id": message.get("id"), "direction": message.get("direction"), "created_on": message.get("created_on") or issue.get("updated_on"), }, ) ) return docs def _contact_documents(self, issue: Issue) -> List[IndexDocument]: contact = self._issue_contact(issue) contact_id = contact.get("id") if not contact_id: return [] text = self._contact_text(contact) if not text.strip(): return [] return self._documents_for_record( base_id=f"redmine:contact:{contact_id}:issue:{issue['id']}", text=text, issue=issue, doc_type="contact", source_record_id=f"contact:{contact_id}", record=contact, ) def _documents_for_record( self, base_id: str, text: str, issue: Issue, doc_type: str, source_record_id: str, record: Dict[str, Any], extra: Optional[Payload] = None, ) -> List[IndexDocument]: chunks = chunk_text(text, max_chars=self.chunk_chars) payload = self._base_payload(issue, doc_type, source_record_id, record) if extra: payload.update({key: value for key, value in extra.items() if value is not None}) return [ IndexDocument(id=f"{base_id}:chunk:{index}", text=chunk, payload={**payload, "chunk_index": index}) for index, chunk in enumerate(chunks) ] def _base_payload(self, issue: Issue, doc_type: str, source_record_id: str, record: Dict[str, Any]) -> Payload: project = issue.get("project") or {} helpdesk_ticket = issue.get("helpdesk_ticket") or {} contact = self._issue_contact(issue) issue_id = int(issue["id"]) redmine_url = issue.get("url") or f"{self.redmine_url}/issues/{issue_id}" created_on = record.get("created_on") or issue.get("created_on") updated_on = record.get("updated_on") or issue.get("updated_on") return { "source": "redmine", "doc_type": doc_type, "issue_id": issue_id, "project_id": project.get("id"), "project_identifier": project.get("identifier") or self.project_identifier, "project_name": project.get("name"), "has_helpdesk_ticket": bool(helpdesk_ticket.get("id")), "helpdesk_ticket_id": helpdesk_ticket.get("id"), "contact_id": contact.get("id"), "contact_email": contact.get("email"), "contact_name": contact.get("name"), "contact_company": contact.get("company"), "created_on": created_on, "updated_on": updated_on, "visibility": "public", "redmine_url": redmine_url, "source_record_id": source_record_id, "source_hash": stable_hash(record), } def _issue_contact(self, issue: Issue) -> Payload: contact = issue.get("contact") or issue.get("customer") or {} helpdesk_ticket = issue.get("helpdesk_ticket") or {} helpdesk_contact = helpdesk_ticket.get("contact") or {} merged = {**helpdesk_contact, **contact} if not merged.get("id"): merged["id"] = helpdesk_ticket.get("contact_id") if not merged.get("email"): merged["email"] = helpdesk_ticket.get("contact_email") or helpdesk_ticket.get("from_address") if not merged.get("name"): merged["name"] = helpdesk_ticket.get("contact_name") if not merged.get("company"): merged["company"] = helpdesk_ticket.get("contact_company") return {key: value for key, value in merged.items() if value not in (None, "")} def _contact_text(self, contact: Payload) -> str: text_parts = [ contact.get("name"), contact.get("email"), contact.get("phone"), contact.get("company"), ] return "\n".join(str(part) for part in text_parts if part) class RedmineApiSource: def __init__(self, redmine_url: str, api_key: str, project_identifier: Optional[str] = None) -> None: self.redmine_url = redmine_url.rstrip("/") self.api_key = api_key self.project_identifier = project_identifier def recent_helpdesk_issues(self, limit: int) -> Iterable[Issue]: for issue in self.recent_issue_summaries(limit): yield self.issue_detail(int(issue["id"]), fallback=issue) def recent_issue_summaries(self, limit: int) -> Iterable[Issue]: yielded = 0 offset = 0 seen_issue_ids = set() page_size = 100 while yielded < limit: current_limit = min(page_size, limit - yielded) params = { "limit": str(current_limit), "offset": str(offset), "sort": "updated_on:desc,id:desc", "include": "journals", "status_id": "*", } if self.project_identifier: params["project_id"] = self.project_identifier params["subproject_id"] = "!*" path = f"{self.redmine_url}/issues.json?{urllib.parse.urlencode(params)}" payload = self._get_json(path) issues = payload.get("issues", []) if not issues: break for issue in issues: issue_id = issue["id"] if issue_id in seen_issue_ids: continue seen_issue_ids.add(issue_id) issue.setdefault("url", f"{self.redmine_url}/issues/{issue_id}") yield issue yielded += 1 if yielded >= limit: break offset += len(issues) def issue_detail(self, issue_id: int, fallback: Optional[Issue] = None) -> Issue: detail_params = urllib.parse.urlencode({"include": "journals,helpdesk"}) detail = self._get_json(f"{self.redmine_url}/issues/{issue_id}.json?{detail_params}") merged = {**(fallback or {}), **detail.get("issue", {})} merged.setdefault("url", f"{self.redmine_url}/issues/{issue_id}") return merged def _get_json(self, url: str) -> Dict[str, Any]: request = urllib.request.Request(url, headers={"X-Redmine-API-Key": self.api_key, "Accept": "application/json"}) with urllib.request.urlopen(request, timeout=30) as response: return json.loads(response.read().decode("utf-8")) def stable_hash(record: Dict[str, Any]) -> str: canonical = json.dumps(record, sort_keys=True, separators=(",", ":"), default=str) return hashlib.sha256(canonical.encode("utf-8")).hexdigest()