101 lines
4.2 KiB
Python
101 lines
4.2 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, Iterable, List, Protocol, Sequence
|
|
|
|
from .models import IndexDocument
|
|
from .redmine import RedmineMapper
|
|
|
|
|
|
class RedmineSource(Protocol):
|
|
project_identifier: str | None
|
|
|
|
def recent_helpdesk_issues(self, limit: int) -> Iterable[Dict[str, Any]]:
|
|
...
|
|
|
|
|
|
class DocumentEmbedder(Protocol):
|
|
def embed_documents(self, docs: Sequence[IndexDocument]) -> List[List[float]]:
|
|
...
|
|
|
|
|
|
class RebuildStore(Protocol):
|
|
def rebuild_source(
|
|
self,
|
|
source: str,
|
|
docs: Sequence[IndexDocument],
|
|
vectors: Sequence[Sequence[float]],
|
|
project_identifier: str | None = None,
|
|
) -> None:
|
|
...
|
|
|
|
|
|
class BackfillService:
|
|
def __init__(self, source: RedmineSource, embedder: DocumentEmbedder, store: RebuildStore, mapper: RedmineMapper | None = None) -> None:
|
|
self.source = source
|
|
self.embedder = embedder
|
|
self.store = store
|
|
self.mapper = mapper or RedmineMapper(redmine_url="")
|
|
|
|
def backfill_redmine_sample(self, limit: int = 500) -> Dict[str, int | str]:
|
|
issues = list(self.source.recent_helpdesk_issues(limit))
|
|
documents: List[IndexDocument] = []
|
|
for issue in issues:
|
|
documents.extend(self.mapper.issue_to_documents(issue))
|
|
documents = deduplicate_documents(documents)
|
|
vectors = self.embedder.embed_documents(documents) if documents else []
|
|
self.store.rebuild_source("redmine", documents, vectors, project_identifier=self._project_identifier())
|
|
return {"source": "redmine", "issues": len(issues), "documents": len(documents)}
|
|
|
|
def backfill_redmine_projects(self, projects: Sequence[str], per_project_limit: int = 500) -> Dict[str, object]:
|
|
return self.backfill_redmine_project_limits({project: per_project_limit for project in projects})
|
|
|
|
def backfill_redmine_project_limits(self, project_limits: Dict[str, int]) -> Dict[str, object]:
|
|
previous_source_project = getattr(self.source, "project_identifier", None)
|
|
previous_mapper_project = getattr(self.mapper, "project_identifier", None)
|
|
project_results: List[Dict[str, int | str]] = []
|
|
total_issues = 0
|
|
total_documents = 0
|
|
try:
|
|
for project, project_limit in project_limits.items():
|
|
if hasattr(self.source, "project_identifier"):
|
|
self.source.project_identifier = project
|
|
if hasattr(self.mapper, "project_identifier"):
|
|
self.mapper.project_identifier = project
|
|
issues = list(self.source.recent_helpdesk_issues(project_limit))
|
|
documents: List[IndexDocument] = []
|
|
for issue in issues:
|
|
documents.extend(self.mapper.issue_to_documents(issue))
|
|
documents = deduplicate_documents(documents)
|
|
vectors = self.embedder.embed_documents(documents) if documents else []
|
|
self.store.rebuild_source("redmine", documents, vectors, project_identifier=project)
|
|
project_results.append(
|
|
{"project_identifier": project, "issues": len(issues), "documents": len(documents)}
|
|
)
|
|
total_issues += len(issues)
|
|
total_documents += len(documents)
|
|
finally:
|
|
if hasattr(self.source, "project_identifier"):
|
|
self.source.project_identifier = previous_source_project
|
|
if hasattr(self.mapper, "project_identifier"):
|
|
self.mapper.project_identifier = previous_mapper_project
|
|
return {
|
|
"source": "redmine",
|
|
"projects": len(project_limits),
|
|
"issues": total_issues,
|
|
"documents": total_documents,
|
|
"project_results": project_results,
|
|
}
|
|
|
|
def _project_identifier(self) -> str | None:
|
|
mapper_project = getattr(self.mapper, "project_identifier", None)
|
|
if mapper_project:
|
|
return mapper_project
|
|
return getattr(self.source, "project_identifier", None)
|
|
|
|
|
|
def deduplicate_documents(documents: Sequence[IndexDocument]) -> List[IndexDocument]:
|
|
unique: Dict[str, IndexDocument] = {}
|
|
for document in documents:
|
|
unique[document.id] = document
|
|
return list(unique.values())
|