Skip to content

Commit 2afe4a4

Browse files
committed
fix: harden live cpanel mutation paths
1 parent 5128953 commit 2afe4a4

14 files changed

Lines changed: 464 additions & 191 deletions

‎.gitignore‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
.venv/
2+
.venv-dev/
23
__pycache__/
34
.pytest_cache/
45
.ruff_cache/
56
.mypy_cache/
67
.coverage
78
*.egg-info/
89
.env
9-

‎o2switch_cli/cli/helpers.py‎

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,36 @@
1313
T = TypeVar("T")
1414

1515

16-
def run_guarded(ctx: typer.Context, action: Callable[[AppContext], T]) -> T:
17-
app_context = get_app_context(ctx)
16+
def _execute_guarded(app_context: AppContext, action: Callable[[AppContext], T]) -> T:
1817
try:
1918
return action(app_context)
2019
except CliAppError as error:
21-
raise_for_error(app_context, error)
20+
raise error
2221
except typer.Exit:
2322
raise
2423
except Exception as error: # pragma: no cover
25-
raise_for_error(app_context, TransportAppError("runtime", str(error)))
24+
raise TransportAppError("runtime", str(error)) from error
2625
raise RuntimeError("unreachable")
2726

2827

28+
def run_guarded(ctx: typer.Context, action: Callable[[AppContext], T]) -> T:
29+
app_context = get_app_context(ctx)
30+
try:
31+
return _execute_guarded(app_context, action)
32+
except CliAppError as error:
33+
raise_for_error(app_context, error)
34+
raise RuntimeError("unreachable")
35+
36+
37+
def run_guarded_interactive(app_context: AppContext, action: Callable[[AppContext], T]) -> T | None:
38+
ui = TerminalUI(app_context.console, app_context.output_format)
39+
try:
40+
return _execute_guarded(app_context, action)
41+
except CliAppError as error:
42+
ui.print_error(error.to_envelope())
43+
return None
44+
45+
2946
def confirm_plan(app_context: AppContext, ui: TerminalUI, plan: MutationPlan, *, zone: str | None = None) -> bool:
3047
if ui.output_format != "json":
3148
ui.print_plan(plan, zone=zone)

‎o2switch_cli/cli/interactive.py‎

Lines changed: 148 additions & 142 deletions
Large diffs are not rendered by default.

‎o2switch_cli/cli/main.py‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from o2switch_cli.cli.context import build_context
1010
from o2switch_cli.cli.dns import app as dns_app
1111
from o2switch_cli.cli.domains import app as domains_app
12+
from o2switch_cli.cli.helpers import run_guarded
1213
from o2switch_cli.cli.interactive import run_interactive_menu
1314
from o2switch_cli.cli.subdomains import app as subdomains_app
1415
from o2switch_cli.config.settings import load_settings
@@ -52,16 +53,15 @@ def main(
5253

5354
if ctx.invoked_subcommand is None and not ctx.resilient_parsing:
5455
if sys.stdin.isatty():
55-
run_interactive_menu(app_context)
56+
run_guarded(ctx, lambda active_context: run_interactive_menu(active_context))
5657
raise typer.Exit()
5758
typer.echo(ctx.get_help())
5859
raise typer.Exit()
5960

6061

6162
@app.command("interactive")
6263
def interactive(ctx: typer.Context) -> None:
63-
app_context = ctx.obj
64-
run_interactive_menu(app_context)
64+
run_guarded(ctx, run_interactive_menu)
6565

6666

6767
def run() -> None:

‎o2switch_cli/core/cpanel_client.py‎

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,34 @@ def _parse_api2(self, payload: dict[str, Any], operation: str) -> ApiResult:
7171
event = result.get("event", {})
7272
if event and event.get("result") not in (1, "1"):
7373
raise TransportAppError(operation, event.get("reason", f"{operation} failed."))
74+
data = result.get("data")
75+
if isinstance(data, dict):
76+
detail = self._api2_failure_detail(data, operation)
77+
if detail:
78+
raise TransportAppError(operation, detail)
79+
elif isinstance(data, list):
80+
for item in data:
81+
if not isinstance(item, dict):
82+
continue
83+
detail = self._api2_failure_detail(item, operation)
84+
if detail:
85+
raise TransportAppError(operation, detail)
7486
return ApiResult(
75-
data=result.get("data"),
87+
data=data,
7688
metadata=result.get("metadata", {}) or {},
7789
warnings=result.get("warnings", []) or [],
7890
messages=result.get("messages", []) or [],
7991
)
8092

93+
@staticmethod
94+
def _api2_failure_detail(payload: dict[str, Any], operation: str) -> str | None:
95+
if "result" not in payload:
96+
return None
97+
if payload.get("result") in (1, "1", True):
98+
return None
99+
detail = payload.get("reason") or payload.get("statusmsg") or payload.get("error") or f"{operation} failed."
100+
return str(detail)
101+
81102
def uapi(self, module: str, function: str, **params: Any) -> ApiResult:
82103
payload = self._request("GET", f"/execute/{module}/{function}", params=params)
83104
return self._parse_uapi(payload, f"{module}/{function}")
@@ -128,8 +149,8 @@ def add_subdomain(self, *, domain: str, rootdomain: str, directory: str) -> ApiR
128149
def list_subdomains(self) -> ApiResult:
129150
return self.api2("SubDomain", "listsubdomains")
130151

131-
def delete_subdomain(self, *, domain: str, rootdomain: str) -> ApiResult:
132-
return self.api2("SubDomain", "delsubdomain", domain=domain, rootdomain=rootdomain)
152+
def delete_subdomain(self, *, domain: str) -> ApiResult:
153+
return self.api2("SubDomain", "delsubdomain", domain=domain)
133154

134155
def test_access(self) -> ApiResult:
135156
return self.list_domains()

‎o2switch_cli/core/dns_service.py‎

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from __future__ import annotations
22

3+
import base64
4+
import binascii
5+
36
from o2switch_cli.core.audit import AuditService
47
from o2switch_cli.core.cpanel_client import CpanelClient
58
from o2switch_cli.core.domain_service import DomainService
@@ -45,6 +48,50 @@ def _line_index(record: DNSRecord) -> int:
4548
raw_value = record.record_id or record.raw.get("line_index") or record.raw.get("line")
4649
return int(raw_value)
4750

51+
@staticmethod
52+
def _decode_b64_text(value: object) -> str | None:
53+
if not isinstance(value, str) or not value:
54+
return None
55+
try:
56+
decoded = base64.b64decode(value, validate=True)
57+
except (binascii.Error, ValueError):
58+
return None
59+
try:
60+
return decoded.decode("utf-8")
61+
except UnicodeDecodeError:
62+
return decoded.decode("utf-8", errors="replace")
63+
64+
@classmethod
65+
def _record_values(cls, item: dict[str, object]) -> list[str]:
66+
data = item.get("data")
67+
if isinstance(data, list):
68+
return [str(value) for value in data]
69+
if data not in (None, ""):
70+
return [str(data)]
71+
72+
data_b64 = item.get("data_b64")
73+
if isinstance(data_b64, list):
74+
return [decoded for value in data_b64 if (decoded := cls._decode_b64_text(value)) is not None]
75+
if decoded := cls._decode_b64_text(data_b64):
76+
return [decoded]
77+
return []
78+
79+
@classmethod
80+
def _record_name(cls, item: dict[str, object], root_domain: str) -> str:
81+
raw_name = item.get("dname") or item.get("name") or item.get("domain")
82+
if raw_name in (None, ""):
83+
raw_name = cls._decode_b64_text(item.get("dname_b64")) or root_domain
84+
return canonical_record_name(str(raw_name), root_domain)
85+
86+
@staticmethod
87+
def _require_serial(operation: str, root_domain: str, serial: int | None) -> int:
88+
if serial is None:
89+
raise TransportAppError(
90+
operation,
91+
f"Could not determine the current DNS serial for zone {root_domain}.",
92+
)
93+
return serial
94+
4895
def _zone_state(self, root_domain: str) -> tuple[list[DNSRecord], int | None]:
4996
result = self._client.parse_zone(root_domain)
5097
payload = result.data or {}
@@ -64,13 +111,7 @@ def _zone_state(self, root_domain: str) -> tuple[list[DNSRecord], int | None]:
64111
if not isinstance(item, dict):
65112
continue
66113
record_type = str(item.get("record_type") or item.get("type") or "").upper()
67-
data = item.get("data")
68-
if isinstance(data, list):
69-
values = [str(value) for value in data]
70-
elif data is None:
71-
values = []
72-
else:
73-
values = [str(data)]
114+
values = self._record_values(item)
74115
value = (
75116
item.get("address")
76117
or item.get("target")
@@ -80,10 +121,7 @@ def _zone_state(self, root_domain: str) -> tuple[list[DNSRecord], int | None]:
80121
)
81122
ttl = item.get("ttl")
82123
record_id = item.get("line_index") or item.get("line") or item.get("record_id") or item.get("id")
83-
name = canonical_record_name(
84-
str(item.get("dname") or item.get("name") or item.get("domain") or root_domain),
85-
root_domain,
86-
)
124+
name = self._record_name(item, root_domain)
87125
records.append(
88126
DNSRecord(
89127
name=name,
@@ -106,15 +144,10 @@ def _extract_serial(records: list[DNSRecord]) -> int | None:
106144
for record in records:
107145
if record.type != "SOA":
108146
continue
109-
values = record.raw.get("data", [])
110-
if isinstance(values, list):
111-
for value in values:
112-
if str(value).isdigit() and len(str(value)) >= 8:
113-
return int(value)
114-
elif isinstance(values, str):
115-
for part in values.split():
116-
if part.isdigit() and len(part) >= 8:
117-
return int(part)
147+
values = DNSService._record_values(record.raw)
148+
for value in values:
149+
if str(value).isdigit() and len(str(value)) >= 8:
150+
return int(value)
118151
return None
119152

120153
def get_zone_state(self, root_domain: str) -> list[DNSRecord]:
@@ -318,6 +351,7 @@ def upsert_a_record(
318351
applied = False
319352
action = "dry-run" if dry_run else "created"
320353
if not dry_run:
354+
serial = self._require_serial("dns_upsert", root_domain, serial)
321355
add = None
322356
edit = None
323357
remove = None
@@ -407,6 +441,7 @@ def delete_a_record(
407441
hostname = normalize_hostname(fqdn)
408442
root_domain, serial, matches, plan = self.plan_delete_a_record(hostname, force=force)
409443
if not dry_run:
444+
serial = self._require_serial("dns_delete", root_domain, serial)
410445
remove = [self._line_index(record) for record in matches]
411446
self._client.mass_edit_zone(zone=root_domain, serial=serial, remove=remove)
412447
verification = VerificationStatus.SKIPPED

‎o2switch_cli/core/domain_service.py‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
from o2switch_cli.core.cpanel_client import CpanelClient
4+
from o2switch_cli.core.errors import NotFoundAppError
45
from o2switch_cli.core.models import DomainDescriptor, DomainType
56
from o2switch_cli.core.validators import normalize_hostname, select_root_domain
67

@@ -48,5 +49,12 @@ def search(self, term: str) -> list[DomainDescriptor]:
4849
needle = term.strip().lower()
4950
return [item for item in self.list_domains() if needle in item.domain]
5051

52+
def get_domain_descriptor(self, domain: str, operation: str) -> DomainDescriptor:
53+
target = normalize_hostname(domain)
54+
for item in self.list_domains():
55+
if item.domain == target:
56+
return item
57+
raise NotFoundAppError(operation, "Root domain was not found on the account.", target)
58+
5159
def resolve_root_domain(self, hostname: str, operation: str) -> str:
5260
return select_root_domain(hostname, self.root_domains(), operation)

‎o2switch_cli/core/subdomain_service.py‎

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
TransportAppError,
1212
)
1313
from o2switch_cli.core.models import (
14+
DomainType,
1415
MutationPlan,
1516
OperationMode,
1617
OperationResult,
@@ -40,6 +41,13 @@ def __init__(
4041
self._audit = audit
4142
self._reserved_labels = reserved_labels
4243

44+
def _delete_domain_argument(self, hostname: str, root_domain: str) -> str:
45+
descriptor = self._domains.get_domain_descriptor(root_domain, "subdomain_delete")
46+
if descriptor.type is DomainType.ADDON:
47+
label = hostname[: -(len(root_domain) + 1)]
48+
return f"{label}_{root_domain}"
49+
return hostname
50+
4351
def search(self, term: str) -> list[SubdomainDescriptor]:
4452
needle = term.strip().lower()
4553
try:
@@ -188,7 +196,8 @@ def delete(self, fqdn: str, *, dry_run: bool) -> OperationResult:
188196
root_domain, label, plan = self.plan_delete(hostname)
189197
if not dry_run:
190198
try:
191-
self._client.delete_subdomain(domain=label, rootdomain=root_domain)
199+
delete_domain = self._delete_domain_argument(hostname, root_domain)
200+
self._client.delete_subdomain(domain=delete_domain)
192201
except TransportAppError as exc:
193202
if self._looks_like_unsupported_delete(str(exc)):
194203
raise NotSupportedAppError(

‎o2switch_cli/core/validators.py‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ def validate_reserved_hostname(hostname: str, root_domain: str, reserved_labels:
111111

112112

113113
def canonical_record_name(name: str, zone: str) -> str:
114-
candidate = normalize_hostname(name)
114+
candidate = name.strip().lower().rstrip(".")
115115
normalised_zone = normalize_hostname(zone)
116-
if candidate == "@":
116+
if not candidate or candidate == "@":
117117
return normalised_zone
118118
if candidate == normalised_zone or candidate.endswith(f".{normalised_zone}"):
119119
return candidate

‎tests/test_cli.py‎

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typer.testing import CliRunner
66

77
from o2switch_cli.cli.main import app
8+
from o2switch_cli.core.errors import TransportAppError
89
from o2switch_cli.core.models import (
910
DomainDescriptor,
1011
DomainType,
@@ -221,3 +222,23 @@ class FakeRuntime:
221222
monkeypatch.setattr("o2switch_cli.cli.context.AppContext.runtime", lambda self: FakeRuntime())
222223
result = runner.invoke(app, ["--json", "dns", "verify", "--host", "odoo.ginutech.com"])
223224
assert result.exit_code == 7
225+
226+
227+
def test_interactive_command_renders_guarded_error(monkeypatch) -> None:
228+
def boom(app_context):
229+
raise TransportAppError("runtime", "interactive exploded")
230+
231+
monkeypatch.setattr("o2switch_cli.cli.main.run_interactive_menu", boom)
232+
result = runner.invoke(
233+
app,
234+
["--json", "interactive"],
235+
env={
236+
"O2SWITCH_CLI_CPANEL_HOST": "cpanel.example.test",
237+
"O2SWITCH_CLI_CPANEL_USER": "demo",
238+
"O2SWITCH_CLI_CPANEL_TOKEN": "secret-token",
239+
},
240+
)
241+
payload = json.loads(result.output)
242+
assert result.exit_code == 6
243+
assert payload["error_class"] == "transport"
244+
assert payload["message"] == "interactive exploded"

0 commit comments

Comments
 (0)