from __future__ import annotations import json from collections import Counter from typing import Any, Dict, Iterable, List, Optional from .models import SearchQuery, SearchResult from .redmine import RedmineMapper def print_count(store: Any, source: Optional[str], project: Optional[str], doc_type: Optional[str]) -> None: count = store.count_documents(source=source, project_identifier=project, doc_type=doc_type) print(count) def print_list(store: Any, limit: int, source: Optional[str], project: Optional[str], doc_type: Optional[str], full_text: bool) -> None: documents = store.list_documents(limit=limit, source=source, project_identifier=project, doc_type=doc_type) for document in documents: print_document(document, full_text=full_text) def print_search(search_service: Any, query_text: str, limit: int, source: Optional[str], project: Optional[str], doc_type: Optional[str], full_text: bool) -> None: query = SearchQuery( text=query_text, source=source, project_identifier=project, doc_type=doc_type, limit=limit, include_snippets=not full_text, ) for result in search_service.search(query): print_result(result, full_text=full_text) def print_show(search_service: Any, document_id: str) -> None: document = search_service.get_document(document_id) if document is None: print(f"not found: {document_id}") return print_document(document, full_text=True) def print_preview_redmine(source: Any, redmine_url: str, project: Optional[str], limit: int, full_text: bool) -> None: previous_project = getattr(source, "project_identifier", None) if project and hasattr(source, "project_identifier"): source.project_identifier = project try: mapper = RedmineMapper(redmine_url=redmine_url, project_identifier=project) documents = [] for issue in source.recent_helpdesk_issues(limit): documents.extend(mapper.issue_to_documents(issue)) finally: if hasattr(source, "project_identifier"): source.project_identifier = previous_project for document in documents: print_document({"id": document.id, "text": document.text, "payload": document.payload}, full_text=full_text) def print_audit(store: Any, limit: int, source: Optional[str], project: Optional[str], doc_type: Optional[str], as_json: bool) -> None: documents = store.list_documents(limit=limit, source=source, project_identifier=project, doc_type=doc_type) report = audit_documents(documents) if as_json: print(json.dumps(report, sort_keys=True)) return print(f"documents={report['total_documents']}") for name, count in sorted(report["doc_type_counts"].items()): print(f"doc_type {name}={count}") for name, count in sorted(report["project_counts"].items()): print(f"project {name}={count}") print(f"contact_metadata {report['contact_metadata_count']}/{report['total_documents']}") print(f"helpdesk_contact_metadata {report['helpdesk_contact_metadata_count']}/{report['helpdesk_documents']}") print(f"attachments={report['attachment_documents']}") for document_id in report["missing_helpdesk_contact_metadata"]: print(f"missing_contact {document_id}") for document_id in report["unexpected_attachment_documents"]: print(f"unexpected_attachment {document_id}") def print_compare_redmine(store: Any, source: Any, redmine_url: str, project: Optional[str], limit: int, as_json: bool) -> None: preview_documents = preview_redmine_documents(source, redmine_url, project, limit) indexed_documents = store.list_documents(limit=max(5000, limit * 100), source="redmine", project_identifier=project) report = compare_documents(preview_documents, indexed_documents) if as_json: print(json.dumps(report, sort_keys=True)) return print(f"preview_documents={report['preview_documents']}") print(f"indexed_documents={report['indexed_documents']}") for document_id in report["missing"]: print(f"missing {document_id}") for document_id in report["stale"]: print(f"stale {document_id}") for mismatch in report["contact_mismatches"]: print(f"contact_mismatch {mismatch['id']}") def print_smoke_search( search_service: Any, project: Optional[str], email: str, issue_id: Optional[int], order_token: Optional[str], natural_query: str, as_json: bool, ) -> None: checks = smoke_search(search_service, project, email, issue_id, order_token, natural_query) report = {"project_identifier": project, "checks": checks} if as_json: print(json.dumps(report, sort_keys=True)) return for check in checks: status = "PASS" if check["passed"] else "FAIL" print(f"{status} {check['kind']} {check['query']}") for result in check["results"]: payload = result["payload"] print( f" {result['id']} score={result['score']:.4f} " f"doc_type={payload.get('doc_type')} issue={payload.get('issue_id')} " f"contact={contact_display(payload)} url={result['citation'].get('url')}" ) def audit_documents(documents: List[Dict[str, Any]]) -> Dict[str, Any]: doc_type_counts = Counter(str((document.get("payload") or {}).get("doc_type") or "unknown") for document in documents) project_counts = Counter(str((document.get("payload") or {}).get("project_identifier") or "unknown") for document in documents) missing_contact = [] missing_helpdesk_contact = [] contact_metadata_count = 0 helpdesk_documents = 0 helpdesk_contact_metadata_count = 0 unexpected_attachments = [] for document in documents: payload = document.get("payload") or {} doc_type = str(payload.get("doc_type") or "") has_contact = bool(payload.get("contact_id") and payload.get("contact_email")) has_helpdesk_ticket = bool(payload.get("has_helpdesk_ticket")) if has_contact: contact_metadata_count += 1 elif doc_type in {"issue", "journal", "message", "contact"} and has_helpdesk_ticket: missing_contact.append(str(document.get("id"))) if has_helpdesk_ticket: helpdesk_documents += 1 if has_contact: helpdesk_contact_metadata_count += 1 elif doc_type in {"issue", "journal", "message", "contact"}: missing_helpdesk_contact.append(str(document.get("id"))) if doc_type == "attachment": unexpected_attachments.append(str(document.get("id"))) return { "total_documents": len(documents), "doc_type_counts": dict(doc_type_counts), "project_counts": dict(project_counts), "contact_metadata_count": contact_metadata_count, "helpdesk_documents": helpdesk_documents, "helpdesk_contact_metadata_count": helpdesk_contact_metadata_count, "missing_contact_metadata": missing_contact, "missing_helpdesk_contact_metadata": missing_helpdesk_contact, "attachment_documents": len(unexpected_attachments), "unexpected_attachment_documents": unexpected_attachments, } def preview_redmine_documents(source: Any, redmine_url: str, project: Optional[str], limit: int) -> List[Dict[str, Any]]: previous_project = getattr(source, "project_identifier", None) if project and hasattr(source, "project_identifier"): source.project_identifier = project try: mapper = RedmineMapper(redmine_url=redmine_url, project_identifier=project) documents = [] for issue in source.recent_helpdesk_issues(limit): documents.extend(mapper.issue_to_documents(issue)) return [{"id": document.id, "text": document.text, "payload": document.payload} for document in documents] finally: if hasattr(source, "project_identifier"): source.project_identifier = previous_project def compare_documents(preview_documents: List[Dict[str, Any]], indexed_documents: List[Dict[str, Any]]) -> Dict[str, Any]: indexed_by_id = {str(document.get("id")): document for document in indexed_documents} missing = [] stale = [] contact_mismatches = [] for preview in preview_documents: document_id = str(preview.get("id")) indexed = indexed_by_id.get(document_id) if indexed is None: missing.append(document_id) continue preview_payload = preview.get("payload") or {} indexed_payload = indexed.get("payload") or {} if preview_payload.get("source_hash") != indexed_payload.get("source_hash"): stale.append(document_id) contact_fields = ("contact_id", "contact_name", "contact_email", "contact_company") if any(preview_payload.get(field) != indexed_payload.get(field) for field in contact_fields): contact_mismatches.append({"id": document_id}) return { "preview_documents": len(preview_documents), "indexed_documents": len(indexed_documents), "missing": missing, "stale": stale, "contact_mismatches": contact_mismatches, } def smoke_search( search_service: Any, project: Optional[str], email: str, issue_id: Optional[int], order_token: Optional[str], natural_query: str, ) -> List[Dict[str, Any]]: checks = [run_smoke_query(search_service, "email", email, project, expected_email=email)] if issue_id is not None: checks.append(run_smoke_query(search_service, "issue", str(issue_id), project, expected_issue_id=issue_id)) if order_token: checks.append(run_smoke_query(search_service, "order", order_token, project)) if natural_query: checks.append(run_smoke_query(search_service, "natural", natural_query, project)) return checks def run_smoke_query( search_service: Any, kind: str, text: str, project: Optional[str], expected_email: Optional[str] = None, expected_issue_id: Optional[int] = None, ) -> Dict[str, Any]: query = SearchQuery(text=text, source="redmine", project_identifier=project, issue_id=expected_issue_id, limit=5) results = search_service.search(query) result_dicts = [result.to_dict(include_snippet=True) for result in results] passed = bool(result_dicts) if expected_email: passed = passed and any((result["payload"] or {}).get("contact_email") == expected_email for result in result_dicts) if expected_issue_id is not None: passed = passed and any((result["payload"] or {}).get("issue_id") == expected_issue_id for result in result_dicts) return {"kind": kind, "query": text, "passed": passed, "results": result_dicts} def print_result(result: SearchResult, full_text: bool) -> None: print(f"{result.id} score={result.score:.4f}") print_metadata(result.payload) print(f"url={result.citation.get('url')}") print(result.text if full_text else snippet(result.text)) print() def print_document(document: Dict[str, Any], full_text: bool) -> None: payload = document.get("payload") or {} print(document.get("id")) print_metadata(payload) url = payload.get("redmine_url") if url: print(f"url={url}") print(document.get("text", "") if full_text else snippet(document.get("text", ""))) print() def print_metadata(payload: Dict[str, Any]) -> None: contact = contact_display(payload) fields = [ ("source", payload.get("source")), ("doc_type", payload.get("doc_type")), ("issue", payload.get("issue_id")), ("project", payload.get("project_identifier")), ("contact", contact), ("created", payload.get("created_on")), ("updated", payload.get("updated_on")), ] print(" ".join(f"{name}={value}" for name, value in fields if value is not None)) def contact_display(payload: Dict[str, Any]) -> Optional[str]: contact_id = payload.get("contact_id") pieces = [] if contact_id is not None: pieces.append(f"#{contact_id}") if payload.get("contact_name"): pieces.append(str(payload["contact_name"])) if payload.get("contact_email"): pieces.append(str(payload["contact_email"])) if payload.get("contact_company"): pieces.append(str(payload["contact_company"])) return " | ".join(pieces) if pieces else None def snippet(text: str, max_chars: int = 240) -> str: compact = " ".join(text.split()) if len(compact) <= max_chars: return compact return compact[: max_chars - 3].rstrip() + "..."