Files
redmine/validate_test_instance.py
T
2026-04-25 01:26:24 +00:00

329 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""Read-only validation for the LAN Redmine test instance.
This script is intended to be run after a database import and the documented
post-import reset steps. It reports whether the test instance looks ready for
Helpdesk and redMCP testing without changing remote state.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import socket
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
DEFAULT_SSH_HOST = "reddev@192.168.50.170"
DEFAULT_SSH_KEY = Path("/tmp/reddev")
DEFAULT_REMOTE_REDMINE = "/usr/share/redmine"
DEFAULT_MAILPIT_HOST = "192.168.1.105"
DEFAULT_FILES_ROOT = "/var/lib/redmine/default/files"
@dataclass(frozen=True)
class CheckResult:
status: str
name: str
detail: str
@dataclass(frozen=True)
class RemoteRedmine:
ssh_host: str
ssh_key: Path
remote_redmine: str
def ssh(self, remote_command: str) -> subprocess.CompletedProcess[str]:
return subprocess.run(
[
"ssh",
"-i",
str(self.ssh_key),
"-o",
"IdentitiesOnly=yes",
self.ssh_host,
remote_command,
],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
def mysql_json_lines(self, sql: str) -> list[dict[str, Any]]:
result = self.mysql(sql)
rows: list[dict[str, Any]] = []
for line in result.splitlines():
if not line.strip():
continue
rows.append(json.loads(bytes.fromhex(line.strip()).decode("utf-8")))
return rows
def mysql(self, sql: str) -> str:
result = subprocess.run(
[
"ssh",
"-i",
str(self.ssh_key),
"-o",
"IdentitiesOnly=yes",
self.ssh_host,
self._mysql_runner_command(),
],
input=sql,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or "Remote MySQL command failed.")
return result.stdout
def _mysql_runner_command(self) -> str:
ruby = (
"require 'yaml'; "
"c = YAML.load_file('config/database.yml')['production']; "
"ENV['MYSQL_PWD'] = c['password'].to_s; "
"args = ['--batch', '--raw', '--quick', '--skip-column-names', "
"'--default-character-set=utf8', '-h', c['host'].to_s, "
"'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; "
"exec('mysql', *args)"
)
return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}"
def main() -> int:
parser = argparse.ArgumentParser(description="Read-only checks for the Redmine LAN test instance.")
parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST))
parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY))))
parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE))
parser.add_argument("--mailpit-host", default=DEFAULT_MAILPIT_HOST)
parser.add_argument("--files-root", default=DEFAULT_FILES_ROOT)
parser.add_argument("--composer-bin", default=os.getenv("COMPOSER_BIN", "composer"))
args = parser.parse_args()
remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine)
checks: list[CheckResult] = []
checks.extend(check_remote_basics(remote))
checks.extend(check_mailpit_connectivity(remote, args.mailpit_host))
checks.extend(check_files_permissions(remote, args.files_root))
checks.extend(check_database_state(remote, args.mailpit_host))
checks.append(check_composer(args.composer_bin))
for result in checks:
print(f"[{result.status}] {result.name}: {result.detail}")
failures = [result for result in checks if result.status == "FAIL"]
warnings = [result for result in checks if result.status == "WARN"]
print(f"\nSummary: {len(checks) - len(failures) - len(warnings)} OK, {len(warnings)} WARN, {len(failures)} FAIL")
return 1 if failures else 0
def check_remote_basics(remote: RemoteRedmine) -> list[CheckResult]:
results: list[CheckResult] = []
result = remote.ssh("printf ok")
if result.returncode == 0 and result.stdout == "ok":
results.append(CheckResult("OK", "SSH", f"connected to {remote.ssh_host}"))
else:
return [CheckResult("FAIL", "SSH", (result.stderr or result.stdout).strip() or "connection failed")]
path_check = remote.ssh(f"test -d {shell_quote(remote.remote_redmine)} && printf ok")
if path_check.returncode == 0 and path_check.stdout == "ok":
results.append(CheckResult("OK", "Remote Redmine path", remote.remote_redmine))
else:
results.append(CheckResult("FAIL", "Remote Redmine path", f"missing: {remote.remote_redmine}"))
tmp_check = remote.ssh(
f"cd {shell_quote(remote.remote_redmine)} && "
"if [ -d tmp ]; then stat -c '%U %G %a %n' tmp; test -w tmp; else exit 2; fi"
)
if tmp_check.returncode == 0:
results.append(CheckResult("OK", "Passenger restart directory", tmp_check.stdout.strip()))
elif tmp_check.returncode == 1:
results.append(CheckResult("WARN", "Passenger restart directory", "tmp exists but is not writable by SSH user"))
else:
results.append(CheckResult("FAIL", "Passenger restart directory", "tmp directory missing"))
return results
def check_mailpit_connectivity(remote: RemoteRedmine, host: str) -> list[CheckResult]:
results = [
tcp_check("Mailpit HTTP from local", host, 8025),
tcp_check("Mailpit SMTP from local", host, 1025),
tcp_check("Mailpit POP3 from local", host, 1110),
]
remote_command = (
"ruby -rsocket -e "
+ shell_quote(
"host = ARGV[0]; "
"{1025 => 'SMTP', 1110 => 'POP3'}.each do |port, name| "
"begin; s = TCPSocket.new(host, port); s.close; puts \"#{name}=ok\"; "
"rescue => e; puts \"#{name}=#{e.class}: #{e.message}\"; exit 1; end; "
"end"
)
+ " "
+ shell_quote(host)
)
result = remote.ssh(remote_command)
if result.returncode == 0:
results.append(CheckResult("OK", "Mailpit from Redmine host", result.stdout.strip().replace("\n", ", ")))
else:
results.append(CheckResult("FAIL", "Mailpit from Redmine host", (result.stdout or result.stderr).strip()))
return results
def check_files_permissions(remote: RemoteRedmine, files_root: str) -> list[CheckResult]:
command = (
"ruby -e "
+ shell_quote(
"require 'json'; require 'etc'; "
"paths = ARGV; "
"rows = paths.map do |p| "
"if File.directory?(p); st = File.stat(p); "
"{path: p, mode: sprintf('%o', st.mode & 07777), group_writable: (st.mode & 0020) != 0, owner: Etc.getpwuid(st.uid).name, group: Etc.getgrgid(st.gid).name}; "
"else; {path: p, missing: true}; end; "
"end; puts JSON.generate(rows)"
)
+ " "
+ " ".join(shell_quote(path) for path in [files_root, f"{files_root}/2026", f"{files_root}/2026/04"])
)
result = remote.ssh(command)
if result.returncode != 0:
return [CheckResult("FAIL", "Attachment directory permissions", (result.stderr or result.stdout).strip())]
rows = json.loads(result.stdout)
failures = [row for row in rows if row.get("missing") or not row.get("group_writable")]
if failures:
detail = "; ".join(f"{row['path']} mode={row.get('mode', 'missing')}" for row in failures)
return [CheckResult("FAIL", "Attachment directory permissions", detail)]
detail = "; ".join(f"{row['path']} {row['owner']}:{row['group']} {row['mode']}" for row in rows)
return [CheckResult("OK", "Attachment directory permissions", detail)]
def check_database_state(remote: RemoteRedmine, mailpit_host: str) -> list[CheckResult]:
results: list[CheckResult] = []
try:
projects = remote.mysql_json_lines(
"""
SELECT HEX(CAST(JSON_OBJECT('identifier', identifier, 'id', id, 'name', name) AS CHAR)) AS document
FROM projects
WHERE identifier IN ('fud-helpdesk', 'fud-nohelpdesk')
ORDER BY identifier;
"""
)
found = {project["identifier"] for project in projects}
missing = {"fud-helpdesk", "fud-nohelpdesk"} - found
if missing:
results.append(CheckResult("FAIL", "Controlled test projects", "missing " + ", ".join(sorted(missing))))
else:
results.append(CheckResult("OK", "Controlled test projects", ", ".join(sorted(found))))
settings_rows = remote.mysql_json_lines(helpdesk_settings_sql())
failures = helpdesk_setting_failures(settings_rows, mailpit_host)
if failures:
results.append(CheckResult("FAIL", "Helpdesk Mailpit settings", "; ".join(failures[:8])))
else:
results.append(CheckResult("OK", "Helpdesk Mailpit settings", f"{len(settings_rows)} active project(s) match"))
except Exception as exc:
results.append(CheckResult("FAIL", "Database checks", f"{exc.__class__.__name__}: {exc}"))
return results
def helpdesk_settings_sql() -> str:
return """
SELECT HEX(CAST(JSON_OBJECT(
'identifier', p.identifier,
'protocol', MAX(CASE WHEN cs.name = 'helpdesk_protocol' THEN cs.value END),
'host', MAX(CASE WHEN cs.name = 'helpdesk_host' THEN cs.value END),
'port', MAX(CASE WHEN cs.name = 'helpdesk_port' THEN cs.value END),
'username', MAX(CASE WHEN cs.name = 'helpdesk_username' THEN cs.value END),
'has_password', MAX(CASE WHEN cs.name = 'helpdesk_password' AND cs.value <> '' THEN 1 ELSE 0 END),
'apop', MAX(CASE WHEN cs.name = 'helpdesk_apop' THEN cs.value END),
'delete_unprocessed', MAX(CASE WHEN cs.name = 'helpdesk_delete_unprocessed' THEN cs.value END),
'smtp_custom', MAX(CASE WHEN cs.name = 'helpdesk_smtp_use_default_settings' THEN cs.value END),
'smtp_server', MAX(CASE WHEN cs.name = 'helpdesk_smtp_server' THEN cs.value END),
'smtp_port', MAX(CASE WHEN cs.name = 'helpdesk_smtp_port' THEN cs.value END),
'smtp_auth', MAX(CASE WHEN cs.name = 'helpdesk_smtp_authentication' THEN cs.value END),
'smtp_username', MAX(CASE WHEN cs.name = 'helpdesk_smtp_username' THEN cs.value END),
'smtp_ssl', MAX(CASE WHEN cs.name = 'helpdesk_smtp_ssl' THEN cs.value END),
'smtp_tls', MAX(CASE WHEN cs.name = 'helpdesk_smtp_tls' THEN cs.value END)
) AS CHAR)) AS document
FROM projects p
LEFT JOIN contacts_settings cs ON cs.project_id = p.id
WHERE p.status = 1
GROUP BY p.id, p.identifier
ORDER BY p.identifier;
"""
def helpdesk_setting_failures(rows: list[dict[str, Any]], mailpit_host: str) -> list[str]:
expected = {
"protocol": "pop3",
"host": mailpit_host,
"port": "1110",
"username": "test",
"has_password": 1,
"apop": "0",
"delete_unprocessed": "0",
"smtp_custom": "1",
"smtp_server": mailpit_host,
"smtp_port": "1025",
"smtp_auth": "",
"smtp_username": "",
"smtp_ssl": "0",
"smtp_tls": "0",
}
failures: list[str] = []
for row in rows:
for key, value in expected.items():
if row.get(key) != value:
failures.append(f"{row['identifier']} {key}={row.get(key)!r} expected {value!r}")
return failures
def check_composer(composer_bin: str) -> CheckResult:
composer_path = Path(composer_bin)
composer_on_path = shutil.which(composer_bin)
if composer_on_path is None and not composer_path.exists():
return CheckResult("WARN", "Composer validation", f"{composer_bin!r} not found; skipped")
command = [composer_on_path or composer_bin, "validate", "--working-dir=redMCP"]
if composer_path.suffix == ".phar":
php = shutil.which("php")
if php is None:
return CheckResult("WARN", "Composer validation", "php not found; composer.phar skipped")
command = [php, composer_bin, "validate", "--working-dir=redMCP"]
result = subprocess.run(
command,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if result.returncode == 0:
return CheckResult("OK", "Composer validation", "redMCP/composer.json is valid")
return CheckResult("FAIL", "Composer validation", (result.stdout + result.stderr).strip())
def tcp_check(name: str, host: str, port: int) -> CheckResult:
try:
with socket.create_connection((host, port), timeout=5):
return CheckResult("OK", name, f"{host}:{port}")
except OSError as exc:
return CheckResult("FAIL", name, f"{host}:{port} {exc.__class__.__name__}: {exc}")
def shell_quote(value: str) -> str:
return "'" + value.replace("'", "'\"'\"'") + "'"
if __name__ == "__main__":
raise SystemExit(main())