First-Look Transform
The transformation layer: build_base_table(...) turns the raw ingested frames into the joined, account-master-anchored base table — one row per account in the near-complete universe, the charging outcome left-joined, the full-waiver pool flagged (accounts absent from the charging file), management type classified, and join keys normalised. The reusable substrate every downstream analysis builds on; behaviour pinned by a synthetic-fixture test suite.
GET
/v2/hypothesis-testing/first-look-transformRequires authenticationCode Example
python
# transform.py — custody-fee transformation layer (raw ingestion frames → analysis base table)
# ---------------------------------------------------------------------------------------------
# The ONE place the raw ingested frames become a clean, joined, account-level base table. EVERY
# downstream piece of work in this project — first-look exhibits, modelling, the weekly-loop
# analyses — builds on `build_base_table(...)`, so the custom fixes and joins live here once
# instead of being copy-pasted into each analysis.
#
# Layer boundary:
# ingestion (see ingestion.md) → TRANSFORM (this file) → analysis / exhibits
# reads workbooks, renames, joins + custom fixes, consume `acct`
# normalises keys produces the base table
#
# Custom logic encoded here (all stakeholder-confirmed — see ingestion.md / plan.md decision log):
# • client-map UNIVERSE anchor (not the charging file) — includes the CDT099 accounts the
# charging file omits;
# • every join key cast to a clean STRING (Excel int/float vs comma-split string ids);
# • SG charging-file '82…' prefix re-based to the canonical '80…';
# • CDT099 = absent-from-charging-file (full waiver) — flagged, not dropped;
# • management type → mgmt_exempt + waived_class (legit-exempt vs genuine-gap).
#
# Usage (file mode): from transform import build_base_table
# acct = build_base_table(custody_2h25_hk, custody_2h25_sg, mkgid_clientid_mapping,
# clientdomicile_df, acctmgmt_df, trustacct_df, mkg_info)
# Usage (notebook paste): run this file's cells first, then call build_base_table(...).
#
# Behaviour is pinned by test_transform.py — extend a test there in the same commit as any change here.
import numpy as np
import pandas as pd
def _num(s): return pd.to_numeric(s, errors="coerce")
def _key(s):
"""Every join key must be a clean STRING — Excel reads numeric ids as int/float while bridge ids
are strings from a comma-split, so a dtype mismatch silently yields zero matches."""
return (s.astype(str).str.strip()
.str.replace(r"\.0$", "", regex=True) # float-read ids: "12345.0" -> "12345"
.replace({"nan": np.nan, "None": np.nan, "": np.nan}))
def _pick(df, cols): # tolerate columns the ingest didn't keep
return df[[c for c in cols if c in df.columns]].copy()
def _site_from_id(cid): # booking centre is encoded in the id prefix
return pd.Series(np.where(cid.str.startswith("88"), "HK",
np.where(cid.str.startswith("80"), "SG", None)), index=cid.index)
def _explode_bridge(df, *cols):
"""Dashboard bridge MG→clients: split the comma-delimited active/closed id strings to long."""
parts = []
for c in cols:
if c in df.columns:
t = df[["MKG_ID", c]].dropna(subset=[c]).copy()
t["CLIENT_ID"] = t[c].astype(str).str.split(",")
parts.append(t[["MKG_ID", "CLIENT_ID"]].explode("CLIENT_ID"))
out = pd.concat(parts, ignore_index=True)
out["CLIENT_ID"] = _key(out["CLIENT_ID"]); out["MKG_ID"] = _key(out["MKG_ID"])
return (out.dropna(subset=["CLIENT_ID"]).drop_duplicates("CLIENT_ID")
.rename(columns={"MKG_ID": "MKG_ID_bridge"}))
def build_base_table(custody_2h25_hk, custody_2h25_sg, mkgid_clientid_mapping,
clientdomicile_df, acctmgmt_df, trustacct_df, mkg_info, *, verbose=True):
"""Raw ingested frames → the joined, account-level base table (`acct`): one row per CLIENT_ID in
the client-map universe, with the charging outcome left-joined and the project's flags derived.
See the module docstring for the custom logic encoded here."""
# --- account universe + its own attributes (account→MG, domicile, management type) ---
dom = _pick(clientdomicile_df, ["CLIENT_ID", "MKG_ID", "domicile_country"])
mgmt = _pick(acctmgmt_df, ["CLIENT_ID", "MKG_ID", "account_type"])
for d in (dom, mgmt):
d["CLIENT_ID"] = _key(d["CLIENT_ID"])
if "MKG_ID" in d: d["MKG_ID"] = _key(d["MKG_ID"])
universe = dom.merge(mgmt, on="CLIENT_ID", how="outer", suffixes=("", "_m"))
if "MKG_ID_m" in universe: # coalesce the two client-map MG columns
universe["MKG_ID"] = universe["MKG_ID"].where(universe["MKG_ID"].notna(), universe["MKG_ID_m"])
universe = universe.drop(columns="MKG_ID_m")
if "MKG_ID" not in universe: universe["MKG_ID"] = np.nan
universe = universe.dropna(subset=["CLIENT_ID"]).drop_duplicates("CLIENT_ID")
# --- charging file = the OUTCOME, non-CDT099 accounts only (eligible-AUM blank elsewhere) ---
_parts = [df.assign(site=s) for df, s in [(custody_2h25_hk, "HK"), (custody_2h25_sg, "SG")] if len(df) > 0]
cust = pd.concat(_parts, ignore_index=True) if _parts else custody_2h25_hk.assign(site="HK") # robust to an empty site file
for c in ["custody_eligible_aum", "custody_fee_computed", "custody_fee_charged"]:
cust[c] = _num(cust[c])
cust["CLIENT_ID"] = _key(cust["CLIENT_ID"])
# SG charging ids carry an alternate '82…' prefix; canonical SG ids start '80…' (HK '88…' already ok).
# Re-base leading 82→80 for SG rows ONLY (this was the ~45% MG-match cause). Stakeholder-confirmed 2026-06-17.
_sg = cust["site"].eq("SG")
cust.loc[_sg, "CLIENT_ID"] = cust.loc[_sg, "CLIENT_ID"].str.replace(r"^82", "80", regex=True)
cust = cust.drop_duplicates("CLIENT_ID")
cust["in_charging_file"] = True
# --- dashboard bridge: CROSS-VALIDATE / backfill MKG_ID where the client map is blank ---
bridge = _explode_bridge(mkgid_clientid_mapping, "clientids_active", "clientids_closed")
trust = set(_key(trustacct_df["CLIENT_ID"]).dropna())
mgatt = mkg_info[["MKG_ID", "segment_current", "market"]].assign(MKG_ID=lambda d: _key(d["MKG_ID"])).drop_duplicates("MKG_ID")
# --- assemble: universe (anchor) ← charging outcome ← bridge MG backfill ← MG attributes ---
acct = (universe.merge(cust, on="CLIENT_ID", how="left")
.merge(bridge, on="CLIENT_ID", how="left"))
acct["MKG_ID"] = acct["MKG_ID"].where(acct["MKG_ID"].notna(), acct["MKG_ID_bridge"]) # client map first
acct = acct.drop(columns="MKG_ID_bridge").merge(mgatt, on="MKG_ID", how="left")
acct["in_charging_file"] = acct["in_charging_file"].eq(True) # clean bool: NaN (unmatched) = absent = False
acct["site"] = acct["site"].where(acct["site"].notna(), _site_from_id(acct["CLIENT_ID"]))
# --- flags ---
acct["is_trust"] = acct["CLIENT_ID"].isin(trust).map({True: "Trust", False: "Non-trust"})
_code = acct["cdt_code"].astype(str).str.upper() if "cdt_code" in acct else pd.Series("", index=acct.index)
acct["is_cdt099"] = (~acct["in_charging_file"]) | _code.str.contains("099", na=False) # full-waiver pool
# mandate (account_type, 3 clean categories — ingestion decode): Free=DPM · Advisory=MyAdvisory ·
# Restricted=Self-Directed. Free/Advisory are not custody-charged by design (legit-exempt); RESTRICTED
# is the genuine custody-fee population — the one to analyse, incl. within the CDT099 pool.
_at = acct["account_type"].astype(str).str.upper()
acct["mandate"] = np.select(
[_at.str.contains("DPM|FREE", na=False), _at.str.contains("ADVISOR", na=False),
_at.str.contains("SELF|RESTRICT", na=False)],
["Free (DPM)", "Advisory (MyAdvisory)", "Restricted (Self-Directed)"], default="unknown")
acct["mgmt_exempt"] = pd.Series(acct["mandate"], index=acct.index).isin(["Free (DPM)", "Advisory (MyAdvisory)"]) # legit-exempt
acct["charged"] = acct["custody_fee_charged"].fillna(0) > 0
acct["eligible"] = acct["custody_eligible_aum"].fillna(0) > 0 # known only for non-CDT099 (backend blank elsewhere)
_pos = acct["custody_eligible_aum"] > 0 # guard divide-by-zero
acct["realized_bps"] = np.nan
acct.loc[_pos, "realized_bps"] = (acct.loc[_pos, "custody_fee_charged"]
/ acct.loc[_pos, "custody_eligible_aum"] * 1e4)
# legitimately-exempt (DPM/MyAdvisory) vs genuine-gap candidate — the core waived-pool cut.
# NB CDT099 eligible-AUM is PENDING the backend request → the gap pool is countable, not yet $-sizable.
acct["waived_class"] = np.where(acct["charged"], "charged",
np.where(acct["mgmt_exempt"], "legit-exempt (DPM/MyAdv)", "genuine-gap candidate"))
# waiver depth + outcome (DEF, partial waivers first-class). discount_depth = 1 − charged/computed on
# the THEORETICAL charge (custody_fee_computed); defined only where a fee was computed (non-CDT099).
_comp = acct["custody_fee_computed"]; _chg = acct["custody_fee_charged"].fillna(0)
acct["discount_depth"] = np.where(_comp > 0, 1 - _chg / _comp, np.nan) # 0 = full pay … 1 = full waiver
# fee_outcome (5% full-pay tolerance absorbs GST/rounding):
# default (CDT099, never computed) · full-waiver (computed, zero charged) ·
# partial-discount (0.05 ≤ depth < 1) · full-pay (depth < 0.05, charged ≈ computed)
_default = acct["is_cdt099"]
_fullwaive = (~_default) & (_comp.fillna(0) > 0) & (_chg == 0)
_partial = (~_default) & (_chg > 0) & (acct["discount_depth"] >= 0.05)
acct["fee_outcome"] = np.select([_default, _fullwaive, _partial],
["default (CDT099)", "full-waiver", "partial-discount"], default="full-pay")
# both partial and full non-CDT099 waivers are INTENTIONAL give-aways → group as deliberate_waiver
# (degree kept in discount_depth). NB sizable NOW (computed + eligible AUM in hand), unlike CDT099.
acct["deliberate_waiver"] = acct["fee_outcome"].isin(["partial-discount", "full-waiver"])
# cross-booking (Q2›BC1): MGs whose accounts span BOTH booking centres (HK 88… + SG 80…) — the
# HK-client diversification signature; lets us test "SG leg waived while HK leg pays" within one MG.
_sites_per_mg = acct.dropna(subset=["MKG_ID"]).groupby("MKG_ID")["site"].nunique()
_xmg = set(_sites_per_mg[_sites_per_mg > 1].index)
acct["mg_cross_booking"] = acct["MKG_ID"].isin(_xmg)
# policy segment floors (policy-and-segments.md): Min Segment RoA + the no-waiver segments. Used by
# E10 (low return = below the account's segment floor, not a global median) and E14 (PB/PB Ex anomalies).
_SEG_MIN_ROA = {"Strategic": 40, "Key": 65, "Core": 80, "Private Banking": 100, "PB": 100} # bps; PB Ex / Small = N.A.
_NO_WAIVER_SEGS = {"Private Banking", "PB", "PB Ex"} # waivers not permitted
acct["segment_min_roa"] = acct["segment_current"].map(_SEG_MIN_ROA) # NaN where no policy floor
acct["no_waiver_segment"] = acct["segment_current"].isin(_NO_WAIVER_SEGS) # PB / PB Ex: no waivers permitted
# PB/PB Ex policy anomaly: the "no waivers" rule bites on the COMPUTED (post-code) amount → an IN-FILE
# (non-099) PB/PB Ex account charged BELOW its computed fee is a violation. A CDT099 account computes
# zero (the code waives) → NOT this anomaly; and CDT099 on PB/PB Ex is not anomalous on its own (many
# are legit-exempt DPM/MyAdvisory) — those sit in the normal 099 pool, sized via the CDT099 re-exam.
acct["pb_waiver_anomaly"] = (acct["no_waiver_segment"] & acct["in_charging_file"]
& (acct["custody_fee_charged"].fillna(0) < acct["custody_fee_computed"].fillna(0)))
# min-charge floor (USD 150 semi-annual, policy): an in-file account whose computed fee CLEARS the floor
# (≥ 150) but is charged BELOW it is a min-charge breach. (Currency = the fee currency — confirm USD.)
MIN_CHARGE = 150.0
acct["under_min_charge"] = (acct["in_charging_file"] & (acct["custody_fee_computed"].fillna(0) >= MIN_CHARGE)
& (acct["custody_fee_charged"].fillna(0) < MIN_CHARGE))
if verbose:
print(f"\nBASE TABLE (universe = client map): {len(acct):,} accounts")
print(f" in charging file (non-CDT099) {acct['in_charging_file'].mean():.0%} "
f"| CDT099/full-waiver pool {acct['is_cdt099'].sum():,} ({acct['is_cdt099'].mean():.0%})")
print(f" MG match {acct['MKG_ID'].notna().mean():.0%} | domicile {acct['domicile_country'].notna().mean():.0%} "
f"| mgmt-type {acct['account_type'].notna().mean():.0%} "
f"| eligible-AUM known {acct['custody_eligible_aum'].notna().mean():.0%} (CDT099 blank till backend)")
print(f" waived split → charged {acct['charged'].sum():,} | "
f"legit-exempt {(acct['waived_class']=='legit-exempt (DPM/MyAdv)').sum():,} | "
f"genuine-gap candidate {(acct['waived_class']=='genuine-gap candidate').sum():,}")
print(f" fee outcome → " + " | ".join(f"{k} {v:,}" for k, v in acct['fee_outcome'].value_counts().items()))
print(f" deliberate waivers (partial+full, non-CDT099): {acct['deliberate_waiver'].sum():,}")
print(f" cross-booking MGs (span HK+SG): {len(_xmg):,} | accounts under them: {acct['mg_cross_booking'].sum():,}")
print(f" PB/PB Ex (no-waiver): {acct['no_waiver_segment'].sum():,} | in-file waiver anomaly (charged<computed): {acct['pb_waiver_anomaly'].sum():,}")
_c99 = acct["is_cdt099"]
print(f" CDT099 by mandate → Restricted/candidate {(_c99 & acct['mandate'].eq('Restricted (Self-Directed)')).sum():,} "
f"| legit-exempt Free/Advisory {(_c99 & acct['mgmt_exempt']).sum():,}")
return acct
Last updated: June 19, 2026python