from __future__ import annotations from typing import Any, Dict, Iterable, List, Protocol, Sequence from .models import IndexDocument from .redmine import RedmineMapper class RedmineSource(Protocol): project_identifier: str | None def recent_helpdesk_issues(self, limit: int) -> Iterable[Dict[str, Any]]: ... class DocumentEmbedder(Protocol): def embed_documents(self, docs: Sequence[IndexDocument]) -> List[List[float]]: ... class RebuildStore(Protocol): def rebuild_source( self, source: str, docs: Sequence[IndexDocument], vectors: Sequence[Sequence[float]], project_identifier: str | None = None, ) -> None: ... class BackfillService: def __init__(self, source: RedmineSource, embedder: DocumentEmbedder, store: RebuildStore, mapper: RedmineMapper | None = None) -> None: self.source = source self.embedder = embedder self.store = store self.mapper = mapper or RedmineMapper(redmine_url="") def backfill_redmine_sample(self, limit: int = 500) -> Dict[str, int | str]: issues = list(self.source.recent_helpdesk_issues(limit)) documents: List[IndexDocument] = [] for issue in issues: documents.extend(self.mapper.issue_to_documents(issue)) documents = deduplicate_documents(documents) vectors = self.embedder.embed_documents(documents) if documents else [] self.store.rebuild_source("redmine", documents, vectors, project_identifier=self._project_identifier()) return {"source": "redmine", "issues": len(issues), "documents": len(documents)} def backfill_redmine_projects(self, projects: Sequence[str], per_project_limit: int = 500) -> Dict[str, object]: return self.backfill_redmine_project_limits({project: per_project_limit for project in projects}) def backfill_redmine_project_limits(self, project_limits: Dict[str, int]) -> Dict[str, object]: previous_source_project = getattr(self.source, "project_identifier", None) previous_mapper_project = getattr(self.mapper, "project_identifier", None) project_results: List[Dict[str, int | str]] = [] total_issues = 0 total_documents = 0 try: for project, project_limit in project_limits.items(): if hasattr(self.source, "project_identifier"): self.source.project_identifier = project if hasattr(self.mapper, "project_identifier"): self.mapper.project_identifier = project issues = list(self.source.recent_helpdesk_issues(project_limit)) documents: List[IndexDocument] = [] for issue in issues: documents.extend(self.mapper.issue_to_documents(issue)) documents = deduplicate_documents(documents) vectors = self.embedder.embed_documents(documents) if documents else [] self.store.rebuild_source("redmine", documents, vectors, project_identifier=project) project_results.append( {"project_identifier": project, "issues": len(issues), "documents": len(documents)} ) total_issues += len(issues) total_documents += len(documents) finally: if hasattr(self.source, "project_identifier"): self.source.project_identifier = previous_source_project if hasattr(self.mapper, "project_identifier"): self.mapper.project_identifier = previous_mapper_project return { "source": "redmine", "projects": len(project_limits), "issues": total_issues, "documents": total_documents, "project_results": project_results, } def _project_identifier(self) -> str | None: mapper_project = getattr(self.mapper, "project_identifier", None) if mapper_project: return mapper_project return getattr(self.source, "project_identifier", None) def deduplicate_documents(documents: Sequence[IndexDocument]) -> List[IndexDocument]: unique: Dict[str, IndexDocument] = {} for document in documents: unique[document.id] = document return list(unique.values())