443 lines
16 KiB
Python
Executable File
443 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""End-to-end Helpdesk smoke test for the LAN Redmine copy.
|
|
|
|
The test intentionally creates one Helpdesk issue in the controlled
|
|
``fud-helpdesk`` project and leaves it closed as audit evidence.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import smtplib
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from email.message import EmailMessage
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
DEFAULT_REDMINE_URL = "http://192.168.50.170"
|
|
DEFAULT_MAILPIT_HOST = "192.168.1.105"
|
|
DEFAULT_MAILPIT_HTTP_PORT = 8025
|
|
DEFAULT_MAILPIT_SMTP_PORT = 1025
|
|
DEFAULT_PROJECT = "fud-helpdesk"
|
|
DEFAULT_CONTROL_PROJECT = "fud-nohelpdesk"
|
|
DEFAULT_SSH_HOST = "reddev@192.168.50.170"
|
|
DEFAULT_SSH_KEY = Path("/tmp/reddev")
|
|
DEFAULT_REMOTE_REDMINE = "/usr/share/redmine"
|
|
|
|
|
|
class SmokeError(RuntimeError):
|
|
pass
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SmokeConfig:
|
|
redmine_url: str
|
|
api_key: str
|
|
mailpit_host: str
|
|
mailpit_http_port: int
|
|
mailpit_smtp_port: int
|
|
project: str
|
|
control_project: str
|
|
ssh_host: str
|
|
ssh_key: Path
|
|
remote_redmine: str
|
|
skip_preflight: bool
|
|
keep_open: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SmokeResult:
|
|
token: str
|
|
subject: str
|
|
issue_id: int
|
|
from_address: str
|
|
reply_token: str
|
|
control_issue_id: int
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Run a live Helpdesk/redMCP smoke test against the LAN Redmine copy.")
|
|
parser.add_argument("--redmine-url", default=os.getenv("REDMINE_URL", DEFAULT_REDMINE_URL))
|
|
parser.add_argument("--api-key", default=os.getenv("REDMINE_API_KEY") or os.getenv("REDMNINE_API_KEY"))
|
|
parser.add_argument("--env-file", type=Path, default=Path("redMCP/.env"))
|
|
parser.add_argument("--mailpit-host", default=DEFAULT_MAILPIT_HOST)
|
|
parser.add_argument("--mailpit-http-port", type=int, default=DEFAULT_MAILPIT_HTTP_PORT)
|
|
parser.add_argument("--mailpit-smtp-port", type=int, default=DEFAULT_MAILPIT_SMTP_PORT)
|
|
parser.add_argument("--project", default=DEFAULT_PROJECT)
|
|
parser.add_argument("--control-project", default=DEFAULT_CONTROL_PROJECT)
|
|
parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST))
|
|
parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY))))
|
|
parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE))
|
|
parser.add_argument("--skip-preflight", action="store_true", help="Skip validate_test_instance.py.")
|
|
parser.add_argument("--keep-open", action="store_true", help="Do not close the created Helpdesk issue.")
|
|
args = parser.parse_args()
|
|
|
|
env = load_env_file(args.env_file)
|
|
redmine_url = args.redmine_url or env.get("REDMINE_URL") or DEFAULT_REDMINE_URL
|
|
api_key = args.api_key or env.get("REDMINE_API_KEY") or env.get("REDMNINE_API_KEY")
|
|
if not api_key:
|
|
print("error: missing Redmine API key. Set REDMINE_API_KEY or redMCP/.env.", file=sys.stderr)
|
|
return 1
|
|
|
|
config = SmokeConfig(
|
|
redmine_url=redmine_url.rstrip("/"),
|
|
api_key=api_key,
|
|
mailpit_host=args.mailpit_host,
|
|
mailpit_http_port=args.mailpit_http_port,
|
|
mailpit_smtp_port=args.mailpit_smtp_port,
|
|
project=args.project,
|
|
control_project=args.control_project,
|
|
ssh_host=args.ssh_host,
|
|
ssh_key=args.ssh_key,
|
|
remote_redmine=args.remote_redmine,
|
|
skip_preflight=args.skip_preflight,
|
|
keep_open=args.keep_open,
|
|
)
|
|
|
|
try:
|
|
run_smoke(config)
|
|
return 0
|
|
except SmokeError as exc:
|
|
print(f"error: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
def run_smoke(config: SmokeConfig) -> SmokeResult:
|
|
token = time.strftime("%Y%m%d%H%M%S")
|
|
subject = f"[redMCP-smoke {token}] Helpdesk inbound validation"
|
|
from_address = f"redmcp-smoke-{token}@example.test"
|
|
reply_token = f"redMCP outbound smoke reply {token}"
|
|
|
|
if not config.skip_preflight:
|
|
run_checked(["./validate_test_instance.py"], "preflight validator")
|
|
|
|
before_ids = set(mailpit_message_ids(config))
|
|
send_inbound_email(config, from_address, subject, token)
|
|
print(f"[OK] Sent inbound Mailpit message: {subject}")
|
|
|
|
trigger_helpdesk_import(config)
|
|
issue_id = wait_for_issue(config, subject)
|
|
print(f"[OK] Helpdesk imported issue #{issue_id}")
|
|
|
|
context = redmcp_call(config, "issueWithHelpdesk", {"issue_id": issue_id, "limit": 20})
|
|
assert_helpdesk_context(context, issue_id, from_address, subject, require_messages=False)
|
|
print("[OK] redMCP issueWithHelpdesk returned Helpdesk ticket context")
|
|
|
|
silent_token = f"redMCP silent internal note {token}"
|
|
redmcp_call(config, "updateIssue", {"issue_id": issue_id, "fields": {"notes": silent_token}})
|
|
assert_mailpit_text_absent(config, silent_token)
|
|
print("[OK] redMCP default issue update did not send Helpdesk email")
|
|
|
|
control = redmcp_call(config, "createControlIssue", {"project": config.control_project, "token": token})
|
|
control_id = int(control["id"])
|
|
redmcp_call(config, "updateIssue", {"issue_id": control_id, "fields": {"notes": f"redMCP control update {token}"}})
|
|
redmcp_call(config, "deleteIssue", {"issue_id": control_id})
|
|
print(f"[OK] redMCP non-Helpdesk CRUD control passed with issue #{control_id}")
|
|
|
|
redmcp_call(
|
|
config,
|
|
"updateIssue",
|
|
{"issue_id": issue_id, "fields": {"notes": reply_token}, "options": {"send_helpdesk_email": True}},
|
|
)
|
|
wait_for_mailpit_text(config, before_ids, reply_token)
|
|
print("[OK] Mailpit received outbound Helpdesk/Redmine mail containing the reply token")
|
|
|
|
context = redmcp_call(config, "issueWithHelpdesk", {"issue_id": issue_id, "limit": 20})
|
|
assert_helpdesk_context(context, issue_id, from_address, subject, require_messages=True)
|
|
print("[OK] redMCP issueWithHelpdesk returned post-reply journal message context")
|
|
|
|
if not config.keep_open:
|
|
redmcp_call(
|
|
config,
|
|
"updateIssue",
|
|
{"issue_id": issue_id, "fields": {"status_id": 5, "notes": f"Closed by redMCP smoke test {token}"}},
|
|
)
|
|
print(f"[OK] Closed smoke-test Helpdesk issue #{issue_id}")
|
|
else:
|
|
print(f"[WARN] Left smoke-test Helpdesk issue #{issue_id} open because --keep-open was passed")
|
|
|
|
print("\nSmoke test passed.")
|
|
print(f" issue_id: {issue_id}")
|
|
print(f" subject: {subject}")
|
|
return SmokeResult(
|
|
token=token,
|
|
subject=subject,
|
|
issue_id=issue_id,
|
|
from_address=from_address,
|
|
reply_token=reply_token,
|
|
control_issue_id=control_id,
|
|
)
|
|
|
|
|
|
def load_env_file(path: Path) -> dict[str, str]:
|
|
values: dict[str, str] = {}
|
|
if not path.exists():
|
|
return values
|
|
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
values[key.strip()] = value.strip().strip('"').strip("'")
|
|
return values
|
|
|
|
|
|
def send_inbound_email(config: SmokeConfig, from_address: str, subject: str, token: str) -> None:
|
|
message = EmailMessage()
|
|
message["From"] = f"redMCP Smoke <{from_address}>"
|
|
message["To"] = f"helpdesk-{config.project}@example.test"
|
|
message["Subject"] = subject
|
|
message["Message-ID"] = f"<redmcp-smoke-{token}@example.test>"
|
|
message.set_content(
|
|
"\n".join(
|
|
[
|
|
"This is an automated redMCP Helpdesk smoke-test message.",
|
|
f"Smoke token: {token}",
|
|
"The test validates Helpdesk import, redMCP enrichment, and outbound delivery.",
|
|
]
|
|
)
|
|
)
|
|
with smtplib.SMTP(config.mailpit_host, config.mailpit_smtp_port, timeout=10) as smtp:
|
|
smtp.send_message(message)
|
|
|
|
|
|
def trigger_helpdesk_import(config: SmokeConfig) -> None:
|
|
ruby = (
|
|
f"project = Project.find_by_identifier({ruby_string(config.project)}); "
|
|
"raise 'project not found' unless project; "
|
|
"HelpdeskMailer.check_project(project.id); "
|
|
"puts 'ok'"
|
|
)
|
|
result = ssh_redmine(config, f"bin/rails runner -e production {shell_quote(ruby)}")
|
|
if result.returncode != 0:
|
|
raise SmokeError("Helpdesk import failed: " + (result.stderr or result.stdout).strip())
|
|
|
|
|
|
def wait_for_issue(config: SmokeConfig, subject: str, timeout: int = 60) -> int:
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
issue_id = find_issue_id(config, subject)
|
|
if issue_id is not None:
|
|
return issue_id
|
|
time.sleep(2)
|
|
raise SmokeError(f"Timed out waiting for Helpdesk issue with subject {subject!r}")
|
|
|
|
|
|
def find_issue_id(config: SmokeConfig, subject: str) -> int | None:
|
|
sql = (
|
|
"SELECT i.id "
|
|
"FROM issues i JOIN projects p ON p.id = i.project_id "
|
|
f"WHERE p.identifier = {sql_string(config.project)} "
|
|
f"AND i.subject = {sql_string(subject)} "
|
|
"ORDER BY i.id DESC LIMIT 1;"
|
|
)
|
|
output = remote_mysql(config, sql).strip()
|
|
if not output:
|
|
return None
|
|
return int(output.splitlines()[0])
|
|
|
|
|
|
def remote_mysql(config: SmokeConfig, sql: str) -> str:
|
|
ruby = (
|
|
"require 'yaml'; "
|
|
"c = YAML.load_file('config/database.yml')['production']; "
|
|
"ENV['MYSQL_PWD'] = c['password'].to_s; "
|
|
"args = ['--batch', '--raw', '--quick', '--skip-column-names', "
|
|
"'--default-character-set=utf8', '-h', c['host'].to_s, "
|
|
"'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; "
|
|
"exec('mysql', *args)"
|
|
)
|
|
command = [
|
|
"ssh",
|
|
"-i",
|
|
str(config.ssh_key),
|
|
"-o",
|
|
"IdentitiesOnly=yes",
|
|
config.ssh_host,
|
|
f"cd {shell_quote(config.remote_redmine)} && ruby -e {shell_quote(ruby)}",
|
|
]
|
|
result = subprocess.run(command, input=sql, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
|
|
if result.returncode != 0:
|
|
raise SmokeError("Remote MySQL query failed: " + (result.stderr or result.stdout).strip())
|
|
return result.stdout
|
|
|
|
|
|
def redmcp_call(config: SmokeConfig, action: str, payload: dict[str, Any]) -> dict[str, Any]:
|
|
php = r'''
|
|
require getcwd() . '/redMCP/vendor/autoload.php';
|
|
|
|
use RedMCP\RedmineClient;
|
|
|
|
$input = json_decode(stream_get_contents(STDIN), true);
|
|
$client = RedmineClient::fromCredentials($input['redmine_url'], $input['api_key']);
|
|
$payload = $input['payload'];
|
|
|
|
switch ($input['action']) {
|
|
case 'issueWithHelpdesk':
|
|
$result = $client->issueWithHelpdesk((int) $payload['issue_id'], (int) $payload['limit']);
|
|
break;
|
|
case 'createControlIssue':
|
|
$result = $client->createIssue([
|
|
'project_id' => $payload['project'],
|
|
'subject' => '[redMCP-smoke ' . $payload['token'] . '] Non-Helpdesk CRUD control',
|
|
'description' => 'Created by the redMCP Helpdesk smoke test.',
|
|
'tracker_id' => 3,
|
|
]);
|
|
break;
|
|
case 'updateIssue':
|
|
$client->updateIssue((int) $payload['issue_id'], $payload['fields'], $payload['options'] ?? []);
|
|
$result = ['ok' => true];
|
|
break;
|
|
case 'deleteIssue':
|
|
$client->deleteIssue((int) $payload['issue_id']);
|
|
$result = ['ok' => true];
|
|
break;
|
|
default:
|
|
throw new RuntimeException('Unknown redMCP smoke action: ' . $input['action']);
|
|
}
|
|
|
|
echo json_encode($result, JSON_UNESCAPED_SLASHES);
|
|
'''
|
|
request = {
|
|
"redmine_url": config.redmine_url,
|
|
"api_key": config.api_key,
|
|
"action": action,
|
|
"payload": payload,
|
|
}
|
|
result = subprocess.run(
|
|
["php", "-r", php],
|
|
input=json.dumps(request),
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
raise SmokeError(f"redMCP {action} failed: " + (result.stderr or result.stdout).strip())
|
|
try:
|
|
decoded = json.loads(result.stdout)
|
|
except json.JSONDecodeError as exc:
|
|
raise SmokeError(f"redMCP {action} returned invalid JSON: {result.stdout[:500]}") from exc
|
|
if not isinstance(decoded, dict):
|
|
raise SmokeError(f"redMCP {action} returned unexpected JSON")
|
|
return decoded
|
|
|
|
|
|
def assert_helpdesk_context(
|
|
context: dict[str, Any],
|
|
issue_id: int,
|
|
from_address: str,
|
|
subject: str,
|
|
require_messages: bool,
|
|
) -> None:
|
|
issue = context.get("issue")
|
|
helpdesk = context.get("helpdesk")
|
|
if not isinstance(issue, dict) or int(issue.get("id", 0)) != issue_id:
|
|
raise SmokeError("redMCP context did not include the expected issue")
|
|
if issue.get("subject") != subject:
|
|
raise SmokeError("redMCP context issue subject did not match the smoke-test subject")
|
|
if not isinstance(helpdesk, dict) or helpdesk.get("available") is not True:
|
|
raise SmokeError("redMCP context did not mark Helpdesk data as available")
|
|
ticket = helpdesk.get("ticket")
|
|
if not isinstance(ticket, dict):
|
|
raise SmokeError("redMCP context did not include Helpdesk ticket metadata")
|
|
messages = helpdesk.get("journal_messages")
|
|
if not isinstance(messages, list):
|
|
raise SmokeError("redMCP context did not include a Helpdesk journal message list")
|
|
if require_messages and not messages:
|
|
raise SmokeError("redMCP context did not include post-reply Helpdesk journal messages")
|
|
text = json.dumps(helpdesk, sort_keys=True)
|
|
if from_address not in text:
|
|
raise SmokeError(f"redMCP Helpdesk context did not include sender {from_address}")
|
|
|
|
|
|
def mailpit_message_ids(config: SmokeConfig) -> list[str]:
|
|
data = mailpit_json(config, "/api/v1/messages", {"limit": "200"})
|
|
messages = data.get("messages", []) if isinstance(data, dict) else []
|
|
return [message["ID"] for message in messages if isinstance(message, dict) and "ID" in message]
|
|
|
|
|
|
def wait_for_mailpit_text(config: SmokeConfig, before_ids: set[str], needle: str, timeout: int = 60) -> None:
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
for message_id in mailpit_message_ids(config):
|
|
if message_id in before_ids:
|
|
continue
|
|
message = mailpit_json(config, f"/api/v1/message/{message_id}", {})
|
|
if needle in json.dumps(message):
|
|
return
|
|
time.sleep(2)
|
|
raise SmokeError(f"Timed out waiting for Mailpit message containing {needle!r}")
|
|
|
|
|
|
def assert_mailpit_text_absent(config: SmokeConfig, needle: str, timeout: int = 5) -> None:
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
for message_id in mailpit_message_ids(config):
|
|
message = mailpit_json(config, f"/api/v1/message/{message_id}", {})
|
|
if needle in json.dumps(message):
|
|
raise SmokeError(f"Mailpit unexpectedly received message containing {needle!r}")
|
|
time.sleep(1)
|
|
|
|
|
|
def mailpit_json(config: SmokeConfig, path: str, params: dict[str, str]) -> dict[str, Any]:
|
|
query = urllib.parse.urlencode(params)
|
|
url = f"http://{config.mailpit_host}:{config.mailpit_http_port}{path}"
|
|
if query:
|
|
url += "?" + query
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=10) as response:
|
|
return json.loads(response.read().decode("utf-8"))
|
|
except (urllib.error.URLError, json.JSONDecodeError) as exc:
|
|
raise SmokeError(f"Mailpit API failed for {path}: {exc}") from exc
|
|
|
|
|
|
def ssh_redmine(config: SmokeConfig, command: str) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(
|
|
[
|
|
"ssh",
|
|
"-i",
|
|
str(config.ssh_key),
|
|
"-o",
|
|
"IdentitiesOnly=yes",
|
|
config.ssh_host,
|
|
f"cd {shell_quote(config.remote_redmine)} && {command}",
|
|
],
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
|
|
|
|
def run_checked(command: list[str], label: str) -> None:
|
|
result = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
|
|
if result.returncode != 0:
|
|
raise SmokeError(f"{label} failed:\n{result.stdout}{result.stderr}")
|
|
print(f"[OK] {label}")
|
|
|
|
|
|
def shell_quote(value: str) -> str:
|
|
return "'" + value.replace("'", "'\"'\"'") + "'"
|
|
|
|
|
|
def ruby_string(value: str) -> str:
|
|
return json.dumps(value)
|
|
|
|
|
|
def sql_string(value: str) -> str:
|
|
return "'" + value.replace("\\", "\\\\").replace("'", "''") + "'"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|