Skip to content

Utilities

Utilities are sets of tools designed to support and streamline InSAR processing workflows.

Select Pairs

Select interferogram pairs from ASF search results based on temporal and perpendicular baseline criteria.

from insarhub import Downloader
from insarhub.utils import select_pairs

s1 = Downloader.create('S1_SLC',
                    intersectsWith=[-113.05, 37.74, -112.68, 38.00],
                    start='2020-01-01',
                    end='2020-12-31',
                    relativeOrbit=100,
                    frame=466,
                    workdir='path/to/dir')
results = s1.search()

pairs, baselines, scene_bperp, _ = select_pairs(search_results=results)

Parameters:

Name Type Description Default
search_results list[ASFProduct] | dict[tuple[int, int], list[ASFProduct]]

Either a flat list (single stack) or a dictionary keyed by (path, frame).

required
dt_targets list[float]

Preferred temporal spacings in days. A candidate pair passes if |dt - target| <= dt_tol for at least one target.

SELECT_PAIRS_DEFAULTS['dt_targets']
dt_tol float

Tolerance in days added to each entry in dt_targets.

SELECT_PAIRS_DEFAULTS['dt_tol']
dt_max float

Maximum temporal baseline in days.

SELECT_PAIRS_DEFAULTS['dt_max']
pb_max float

Maximum perpendicular baseline in meters.

SELECT_PAIRS_DEFAULTS['pb_max']
min_degree int

Minimum interferogram connections per scene. Enforced when force_connect is True.

SELECT_PAIRS_DEFAULTS['min_degree']
max_degree int

Maximum interferogram connections per scene.

SELECT_PAIRS_DEFAULTS['max_degree']
force_connect bool

If a scene falls below min_degree after primary selection, add its nearest-time neighbors that satisfy pb_max and dt_max. May introduce lower-quality pairs; a warning is logged.

SELECT_PAIRS_DEFAULTS['force_connect']
max_workers int

Number of threads for API fallback. Has no effect if all products have local baseline data (common for Sentinel-1 and ALOS). Set to 1 to disable threading (useful for debugging).

SELECT_PAIRS_DEFAULTS['max_workers']
avoid_low_quality_days bool

If True, fetch weather and snow cover for every acquisition date and remove scenes whose date fails quality thresholds before building the network. Removed scenes are logged as warnings. Defaults to False.

SELECT_PAIRS_DEFAULTS['avoid_low_quality_days']
snow_threshold float

Snow-cover fraction [0–1] above which a scene is considered unusable. Also triggers a wet-snow hard-kill (snow_frac > 0.3 AND temp > 0 °C). Defaults to 0.5.

SELECT_PAIRS_DEFAULTS['snow_threshold']
precip_mm_threshold float

3-day accumulated precipitation (mm) above which a scene is considered unusable. Defaults to 20.0 mm.

SELECT_PAIRS_DEFAULTS['precip_mm_threshold']
aoi_wkt str

WKT geometry of the area of interest. When provided the centroid of this geometry is used for weather/snow lookups instead of the union centroid of the scene footprints, ensuring the same location is used here as in FeatureAssembler. Defaults to None.

None
Source code in src/insarhub/utils/tool.py
def select_pairs(
    search_results: Union[dict[tuple[int, int], list[ASFProduct]], list[ASFProduct]],
    dt_targets: tuple[int, ...]  = _SP["dt_targets"],
    dt_tol: int                  = _SP["dt_tol"],
    dt_max: int                  = _SP["dt_max"],
    pb_max: float                = _SP["pb_max"],
    min_degree: int              = _SP["min_degree"],
    max_degree: int              = _SP["max_degree"],
    force_connect: bool          = _SP["force_connect"],
    max_workers: int             = _SP["max_workers"],
    avoid_low_quality_days: bool = _SP["avoid_low_quality_days"],
    snow_threshold: float        = _SP["snow_threshold"],
    precip_mm_threshold: float   = _SP["precip_mm_threshold"],
    aoi_wkt: str | None = None,
) -> Union[PairGroup, list[Pair]]:

    """
    Select interferogram pairs based on temporal and perpendicular baseline.

    This function selects interferogram pairs according to temporal spacing 
    and perpendicular baseline constraints, optionally enforcing connectivity 
    rules per scene.

    Supported sensors:
    - Sentinel-1 (CALCULATED)  : stateVectors + ascendingNodeTime → local
    - ALOS / ERS / RADARSAT (PRE_CALCULATED) : insarBaseline scalar → local
    - Any product missing data : ref.stack() API call → fallback

    Args:
        search_results (list[ASFProduct] | dict[tuple[int,int], list[ASFProduct]]):
            Either a flat list (single stack) or a dictionary keyed by (path, frame).
        dt_targets (list[float], optional):
            Preferred temporal spacings in days. A candidate pair passes if 
            |dt - target| <= dt_tol for at least one target.
        dt_tol (float, optional):
            Tolerance in days added to each entry in dt_targets.
        dt_max (float, optional):
            Maximum temporal baseline in days.
        pb_max (float, optional):
            Maximum perpendicular baseline in meters.
        min_degree (int, optional):
            Minimum interferogram connections per scene. Enforced when force_connect is True.
        max_degree (int, optional):
            Maximum interferogram connections per scene.
        force_connect (bool, optional):
            If a scene falls below min_degree after primary selection, add its nearest-time 
            neighbors that satisfy pb_max and dt_max. May introduce lower-quality pairs; a warning is logged.
        max_workers (int, optional):
            Number of threads for API fallback. Has no effect if all products have local baseline
            data (common for Sentinel-1 and ALOS). Set to 1 to disable threading (useful for debugging).
        avoid_low_quality_days (bool, optional):
            If True, fetch weather and snow cover for every acquisition date and
            remove scenes whose date fails quality thresholds before building the
            network. Removed scenes are logged as warnings. Defaults to False.
        snow_threshold (float, optional):
            Snow-cover fraction [0–1] above which a scene is considered unusable.
            Also triggers a wet-snow hard-kill (snow_frac > 0.3 AND temp > 0 °C).
            Defaults to 0.5.
        precip_mm_threshold (float, optional):
            3-day accumulated precipitation (mm) above which a scene is considered
            unusable. Defaults to 20.0 mm.
        aoi_wkt (str, optional):
            WKT geometry of the area of interest.  When provided the centroid of
            this geometry is used for weather/snow lookups instead of the union
            centroid of the scene footprints, ensuring the same location is used
            here as in FeatureAssembler.  Defaults to None.

    Returns:
        tuple of three elements:

        pairs (list[Pair] | dict[tuple[int,int], list[Pair]]):
            A flat list of Pair tuples ``(earlier_scene, later_scene)`` if
            *search_results* was a list, or a dict keyed by ``(path, frame)``
            with lists of Pair tuples if *search_results* was a dict.

        baselines (BaselineTable | dict[tuple[int,int], BaselineTable]):
            Pairwise baseline table mapping ``(scene_a, scene_b)`` →
            ``(dt_days, bperp_m)`` where ``bperp_m = |bp_a − bp_b|`` (always
            positive). Mirrors the structure of *pairs* (flat or grouped).

        scene_bperp (dict[str, float] | dict[tuple[int,int], dict[str, float]]):
            Signed perpendicular baseline for each scene relative to the
            anchor (earliest) scene, as returned by
            ``calculate_perpendicular_baselines``. Values can be negative or
            positive. Use this for network plots to reproduce the MintPy-style
            y-axis (negative/positive spread around zero). Mirrors the
            structure of *pairs* (flat or grouped).
    """

    # ── bad-day pre-filter ────────────────────────────────────────────────
    def _bad_scene_names(
        prods: list,
        snow_thr: float,
        precip_thr: float,
    ) -> tuple[set[str], dict[str, dict], dict[str, dict], float, float]:
        """Return (bad_names, weather_dict, snow_dict, lat, lon).

        weather_dict / snow_dict are {date: feats} for ALL unique dates — not
        just the bad ones — so callers can seed FeatureAssembler's cache and
        avoid a second fetch during scoring.

        Flags a date bad when:
          - wet snow  : snow_frac > 0.3 AND temp_max > 0 °C  (C-band hard kill)
          - heavy snow: snow_frac >= snow_thr
          - heavy rain: precip_3day (or daily precip fallback) >= precip_thr mm
        """
        from shapely.geometry import shape as _shape
        from shapely.ops import unary_union
        from insarhub.utils.pair_quality._weather import fetch_weather_batch
        from insarhub.utils.pair_quality._snow_modis import fetch_snow_features_batch

        # Prefer the explicit AOI centroid (same geometry FeatureAssembler uses).
        # Fall back to the union centroid of all scene footprints.
        try:
            from shapely import wkt as _wkt
            if aoi_wkt:
                c = _wkt.loads(aoi_wkt).centroid
            else:
                union = unary_union([_shape(p.geometry) for p in prods])
                c = union.centroid
            lat, lon = c.y, c.x
        except Exception as exc:
            logger.warning("avoid_low_quality_days: could not extract centroid (%s) — skipping filter", exc)
            return set(), {}, {}, 0.0, 0.0

        # Use startTime property (already ISO-8601) rather than parsing the scene
        # name at fixed character positions — more reliable across sensor types.
        date_of: dict[str, str] = {
            p.properties["sceneName"]: (p.properties.get("startTime") or "")[:10]
            for p in prods
        }
        unique_dates = list({d for d in date_of.values() if len(d) == 10})

        if not unique_dates:
            logger.warning("avoid_low_quality_days: no valid dates extracted — skipping filter")
            return set(), {}, {}, lat, lon

        logger.info("avoid_low_quality_days: fetching weather/snow for %d dates …", len(unique_dates))
        try:
            weather = fetch_weather_batch(lat, lon, unique_dates)
        except Exception as exc:
            logger.warning("avoid_low_quality_days: weather fetch failed (%s) — skipping filter", exc)
            weather = {}
        try:
            snow = fetch_snow_features_batch(lat, lon, unique_dates)
        except Exception as exc:
            logger.warning("avoid_low_quality_days: snow fetch failed (%s) — skipping filter", exc)
            snow = {}

        bad_dates: set[str] = set()
        for date in unique_dates:
            w = weather.get(date, {})
            s = snow.get(date, {})
            temp      = w.get("temp_max")
            # precip_3day can be None when the API returns null for precipitation_sum.
            # Fall back to the daily precip so the check is never silently bypassed.
            precip3   = w.get("precip_3day")
            precip1   = w.get("precip")
            precip_mm = (precip3 if precip3 is not None else precip1) or 0.0
            snow_frac = s.get("snow_cover_frac") or 0.0

            wet_snow   = (temp is not None and temp > 0 and snow_frac > 0.30)
            heavy_snow = snow_frac >= snow_thr
            heavy_rain = precip_mm >= precip_thr

            if wet_snow or heavy_snow or heavy_rain:
                reasons = []
                if wet_snow:   reasons.append(f"wet snow (frac={snow_frac:.2f}, temp={temp:.1f}°C)")
                if heavy_snow: reasons.append(f"heavy snow (frac={snow_frac:.2f}{snow_thr})")
                if heavy_rain: reasons.append(f"heavy rain (precip={precip_mm:.1f} mm ≥ {precip_thr} mm)")
                logger.warning("avoid_low_quality_days: dropping date %s%s", date, ", ".join(reasons))
                bad_dates.add(date)

        bad_names = {name for name, d in date_of.items() if d in bad_dates}
        return bad_names, weather, snow, lat, lon

    # ── normalise input ───────────────────────────────────────────────────
    input_is_list = isinstance(search_results, list)
    if input_is_list:
        working_dict: dict[tuple[int, int], list[ASFProduct]] = {
            (0, 0): search_results   # type: ignore[arg-type]
        }
    elif isinstance(search_results, dict):
        working_dict = search_results
    else:
        raise TypeError(
            f"search_results must be a list or dict of ASFProducts, "
            f"got {type(search_results)}"
        )

    # ── primary filter helpers (defined once, closed over threshold args) ─
    def _near_target(dt: float) -> bool:
        return any(abs(dt - t) <= dt_tol for t in dt_targets)

    def _passes_primary(dt: float, bp: float) -> bool:
        return _near_target(dt) and dt <= dt_max and bp <= pb_max

    pairs_group: PairGroup = defaultdict(list)
    baseline_group: dict[tuple[int, int], BaselineTable] = {}
    scene_bperp_group: dict[tuple[int, int], dict] = {}
    # Keyed by (path, frame): {"weather": {date: feats}, "snow": {date: feats}, "lat": float, "lon": float}
    prefetch_cache: dict[tuple[int, int], dict] = {}

    # ── process each (path, frame) key ───────────────────────────────────
    for key, search_result in working_dict.items():
        if not input_is_list:
            logger.info(
                "%sSearching pairs for path %d frame %d …",
                Fore.GREEN, key[0], key[1],
            )

        # Sort by acquisition time so `names` is chronologically ordered
        prods = sorted(search_result, key=lambda p: p.properties["startTime"])

        if not prods:
            logger.warning("No products for key %s — skipping.", key)
            continue

        # Pre-parse acquisition datetimes to Unix timestamps (done once;
        # reused in sort keys, dt calculations, and pair ordering)
        id_time_raw: dict[SceneID, str] = {
            p.properties["sceneName"]: p.properties["startTime"] for p in prods
        }
        id_time_dt: dict[SceneID, DateFloat] = {
            sid: isoparse(t).timestamp() for sid, t in id_time_raw.items()
        }
        ids: set[SceneID] = set(id_time_raw)
        names: list[SceneID] = [p.properties["sceneName"] for p in prods]

        # ── 0. Drop bad-weather/snow acquisition dates ────────────────────
        if avoid_low_quality_days:
            bad, w_cache, s_cache, pc_lat, pc_lon = _bad_scene_names(
                prods, snow_threshold, precip_mm_threshold
            )
            prefetch_cache[key] = {
                "weather": w_cache,
                "snow":    s_cache,
                "lat":     pc_lat,
                "lon":     pc_lon,
            }
            if bad:
                before = len(prods)
                prods  = [p for p in prods if p.properties["sceneName"] not in bad]
                names  = [n for n in names if n not in bad]
                ids    = set(names)
                id_time_raw = {k: v for k, v in id_time_raw.items() if k not in bad}
                id_time_dt  = {k: v for k, v in id_time_dt.items()  if k not in bad}
                logger.warning(
                    "Key %s — avoid_low_quality_days: removed %d / %d scenes.",
                    key, before - len(prods), before,
                )

        # ── 1. Build pairwise baseline table ─────────────────────────────
        B, scene_bp = _build_baseline_table(prods, ids, id_time_dt, max_workers=max_workers)
        baseline_group[key] = B
        scene_bperp_group[key] = scene_bp
        # ── 2. Primary pair selection ─────────────────────────────────────
        pairs: set[Pair] = {
            e for e, (dt, bp) in B.items() if _passes_primary(dt, bp)
        }
        logger.info(
            "Key %s — primary selection: %d / %d candidate pairs.",
            key, len(pairs), len(B),
        )

        # ── 3. Connectivity enforcement ───────────────────────────────────
        pairs = _enforce_connectivity(
            pairs,
            B,
            names,
            id_time_dt,
            min_degree=min_degree,
            max_degree=max_degree,
            pb_max=pb_max,
            dt_max=float(dt_max),
            force_connect=force_connect
        )

        pairs_group[key] = sorted(pairs)
        logger.info(
            "Key %s — final pair count: %d.", key, len(pairs_group[key])
        )
    pairs = pairs_group[(0, 0)] if input_is_list else pairs_group
    scene_bperp = scene_bperp_group.get((0, 0), {}) if input_is_list else scene_bperp_group
    prefetch = prefetch_cache.get((0, 0), {}) if input_is_list else prefetch_cache

    return pairs, baseline_group, scene_bperp, prefetch

Plot Pair Network

Plot the SBAS interferogram network returned by select_pairs.

from insarhub.utils import plot_pair_network

fig = plot_pair_network(pairs=pairs, baselines=baselines, scene_baselines=scene_bperp)
fig.show()

Example:

networks

Parameters:

Name Type Description Default
pairs list[Pair] | PairGroup

A flat list of pairs or a dictionary keyed by (path, frame) with lists of pairs. Each pair is a tuple (earlier_scene, later_scene).

required
baselines BaselineTable

Table or mapping containing temporal and perpendicular baseline information for each interferogram pair.

required
title str

Main title of the network plot. Defaults to "Interferogram Network".

'Interferogram Network'
figsize tuple[int, int]

Figure size (width, height) in inches. Defaults to (18, 7).

(18, 7)
save_path str | Path | None

Path to save the generated figure. If None, figure is not saved. Defaults to None.

None

Raises:

Type Description
TypeError

If any scene name in pairs is not a string.

ValueError

If a scene name cannot be parsed into a valid date.

Source code in src/insarhub/utils/tool.py
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
def plot_pair_network(
    pairs: list[Pair] | PairGroup,
    baselines: BaselineTable,
    scene_baselines: dict | None = None,
    title: str = "Interferogram Network",
    figsize: tuple[int, int] = (18, 7),
    save_path: str | Path | None = None,
    quality_scores: dict[str, float] | None = None,
    quality_factors: dict[str, dict] | None = None,
) -> plt.Figure | dict:

    """
    Plot an interferogram network along with per-scene connection statistics.

    This function visualizes the relationships between SAR acquisitions in
    terms of temporal and perpendicular baselines. The network graph is
    shown on the left, while a horizontal bar chart summarizes the number
    of connections per scene on the right.

    The layout is as follows:
        - Left  : Network graph (x-axis = days since first acquisition,
                  y-axis = perpendicular baseline [m])
        - Right : Horizontal bar chart showing the number of connections per SAR scene

    Args:
        pairs (list[Pair] | PairGroup):
            A flat list of pairs or a dictionary keyed by (path, frame)
            with lists of pairs. Each pair is a tuple `(earlier_scene, later_scene)`.
        baselines (BaselineTable):
            Table or mapping containing temporal and perpendicular baseline
            information for each interferogram pair.
        title (str, optional):
            Main title of the network plot. Defaults to "Interferogram Network".
        figsize (tuple[int, int], optional):
            Figure size (width, height) in inches. Defaults to (18, 7).
        save_path (str | Path | None, optional):
            Path to save the generated figure. If None, figure is not saved.
            Defaults to None.

    Returns:
        matplotlib.figure.Figure:
            The created matplotlib figure containing the network and
            per-scene connection histogram.

    Raises:
        TypeError:
            If any scene name in `pairs` is not a string.
        ValueError:
            If a scene name cannot be parsed into a valid date.

    Notes:
        - Node positions: x = days since first acquisition, y = perpendicular baseline.
        - Node color represents the node degree (number of connections).
        - Edge color and width represent temporal baseline.
        - Scenes with fewer than 2 connections are highlighted in red in the histogram.
        - Legends show node degree, temporal baseline, and path/frame grouping.
        - The top axis of the network plot shows real acquisition dates for reference.
    """

    # ── 0. Normalise input ────────────────────────────────────────────────

    if isinstance(pairs, dict):
        figures = {}

        save_path_obj = None
        save_is_dir = False

        if save_path is not None:
            save_path_obj = Path(save_path).expanduser()

            if save_path_obj.suffix == "":
                save_is_dir = True
                save_path_obj.mkdir(parents=True, exist_ok=True)
            else:
                # Has suffix → treat as file template
                save_path_obj.parent.mkdir(parents=True, exist_ok=True)

        for (path, frame), group_pairs in pairs.items():
            group_title = f"{title} — P{path}/F{frame}"
            group_save_path = None

            if save_path_obj is not None:
                if save_is_dir:
                    # Case 1: directory given
                    group_save_path = (
                        save_path_obj.joinpath(f"network_P{path}_F{frame}.png")
                    )
                else:
                    # Case 2: file with suffix given
                    group_save_path = (
                        save_path_obj.parent
                        / f"{save_path_obj.stem}_P{path}_F{frame}{save_path_obj.suffix}"
                    )

            fig = plot_pair_network(
                    pairs=group_pairs,
                    baselines=baselines[(path, frame)],
                    scene_baselines=scene_baselines.get((path, frame)) if isinstance(scene_baselines, dict) else scene_baselines,
                    title=group_title,
                    figsize=figsize,
                    save_path=group_save_path,
                    quality_scores=quality_scores,
                    quality_factors=quality_factors,
                )

            figures[(path, frame)] = fig

        return figures



    flat_pairs = pairs
    subtitle = f"{len(flat_pairs)} pairs"
    if save_path is not None:
        save_path = Path(save_path).expanduser()
    # ── 1. Parse dates ────────────────────────────────────────────────────
    scenes: set[SceneID] = set()
    for a, b in flat_pairs:
        scenes.update([a, b])

    def _parse_date(scene_name: str) -> datetime:
        if not isinstance(scene_name, str):
            raise TypeError(
                f"Expected str, got {type(scene_name).__name__}: {scene_name!r}."
            )
        m = re.search(r"(\d{8})", scene_name)
        if m:
            return datetime.strptime(m.group(1), "%Y%m%d")
        m = re.search(r"(\d{4}-\d{2}-\d{2})", scene_name)
        if m:
            return datetime.strptime(m.group(1), "%Y-%m-%d")
        raise ValueError(f"Cannot parse date from scene name: {scene_name}")

    id_time: dict[SceneID, datetime] = {s: _parse_date(s) for s in scenes}
    t0      = min(id_time.values())
    id_days: dict[SceneID, float] = {
        s: (id_time[s] - t0).total_seconds() / 86_400.0 for s in scenes
    }

    # ── 2. Build graph ────────────────────────────────────────────────────
    G = nx.Graph()
    G.add_nodes_from(scenes)

    if isinstance(pairs, dict):
        for (path, frame), pair_list in pairs.items():
            for a, b in pair_list:
                dt, bp = baselines.get((a, b), (_MISSING, _MISSING))
                G.add_edge(a, b, dt=dt, bp=bp, path=path, frame=frame)
    else:
        for a, b in flat_pairs:
            dt, bp = baselines.get((a, b), (_MISSING, _MISSING))
            G.add_edge(a, b, dt=dt, bp=bp, path=0, frame=0)

    # ── 3. Node positions (x=days, y=bperp) ──────────────────────────────
    if scene_baselines:
        # Use signed per-scene bperp relative to anchor — same as MintPy display
        # (negative = scene orbited closer than anchor, positive = further)
        bperp_pos: dict[SceneID, float] = {
            s: float(scene_baselines.get(s, 0.0)) for s in scenes
        }
    else:
        # Fallback: reconstruct from pairwise table (loses sign info, may trend upward)
        bperp_accum: dict[SceneID, list[float]] = defaultdict(list)
        for (a, b), (dt, bp) in baselines.items():
            if bp >= _MISSING:
                continue
            bperp_accum[a].append(-bp / 2.0)
            bperp_accum[b].append(+bp / 2.0)
        bperp_pos = {
            s: float(np.mean(v)) if v else 0.0
            for s, v in bperp_accum.items()
        }
        sorted_by_time = sorted(scenes, key=lambda s: id_days[s])
        offset = bperp_pos.get(sorted_by_time[0], 0.0)
        bperp_pos = {s: bperp_pos.get(s, 0.0) - offset for s in scenes}

    pos: dict[SceneID, tuple[float, float]] = {
        s: (id_days[s], bperp_pos[s]) for s in scenes
    }

    # ── 4. Visual attributes ──────────────────────────────────────────────
    # 3-category quality colours matching the GUI thresholds (Hanssen 2001)
    _Q_GOOD  = '#4caf50'
    _Q_RISKY = '#ffc107'
    _Q_BAD   = '#f44336'
    _Q_NONE  = '#888888'  # unscored

    def _quality_colour(sc: float | None) -> str:
        if sc is None:  return _Q_NONE
        # support both 0-1 (coherence) and 0-100 (pair quality) scales
        v = sc if sc <= 1 else sc / 100.0
        if v >= 0.60:  return _Q_GOOD
        if v >= 0.30:  return _Q_RISKY
        return _Q_BAD

    # ── Extract per-class scores from quality_factors ─────────────────────
    _LC_CLASSES = [
        ("stable",     "🏗 Stable"),
        ("vegetation", "🌾 Vegetation"),
        ("forest",     "🌲 Forest"),
    ]
    _class_scores: dict[str, dict[str, float]] = {}   # {class: {pair_key: coh}}
    if quality_factors:
        for pair_key, fct in quality_factors.items():
            by_cls = fct.get("coherence_by_class") or {}
            for cls_name, coh_val in by_cls.items():
                _class_scores.setdefault(cls_name, {})[pair_key] = float(coh_val)
                # Also store reverse key
                parts = pair_key.split(":", 1)
                if len(parts) == 2:
                    _class_scores[cls_name][f"{parts[1]}:{parts[0]}"] = float(coh_val)

    degrees      = dict(G.degree())
    max_deg      = max(degrees.values(), default=1)
    node_colours = [plt.cm.RdYlGn(degrees[n] / max_deg) for n in G.nodes()]

    edge_dts     = [G[a][b]["dt"] for a, b in G.edges()]
    max_dt       = max((d for d in edge_dts if d < _MISSING), default=1.0)

    if quality_scores:
        edge_colours = []
        edge_widths  = []
        for a, b in G.edges():
            sc = quality_scores.get(f"{a}:{b}") or quality_scores.get(f"{b}:{a}")
            edge_colours.append(_quality_colour(sc))
            sv = (sc / 100.0 if sc is not None and sc > 1 else sc)
            edge_widths.append(2.0 if sv is not None and sv >= 0.60 else
                               1.2 if sv is not None and sv >= 0.30 else 0.7)
    else:
        edge_colours = [plt.cm.RdYlGn_r(min(dt, max_dt) / max_dt) for dt in edge_dts]
        edge_widths  = [0.5 + 2.5 * (1.0 - min(dt, max_dt) / max_dt) for dt in edge_dts]

    if isinstance(pairs, dict):
        group_keys  = list(pairs.keys())
        linestyles  = ["-", "--", "-.", ":"] * (len(group_keys) // 4 + 1)
        key_style   = {k: linestyles[i] for i, k in enumerate(group_keys)}
        edge_styles = [
            key_style[(G[a][b]["path"], G[a][b]["frame"])] for a, b in G.edges()
        ]
    else:
        edge_styles = ["-"] * len(G.edges())

    # ── 5. Figure layout ─────────────────────────────────────────────────
    # Main figure: network + histogram.
    # Per-class figures are saved separately when class data is available.
    _has_class_data = bool(_class_scores)
    fig = plt.figure(figsize=figsize)
    gs  = fig.add_gridspec(1, 2, width_ratios=[3, 1], wspace=0.35)
    ax_net  = fig.add_subplot(gs[0])
    ax_hist = fig.add_subplot(gs[1])

    # ── 6. Draw network ───────────────────────────────────────────────────
    edges_by_style: dict[str, list] = defaultdict(list)
    for (a, b), style, colour, width in zip(
        G.edges(), edge_styles, edge_colours, edge_widths
    ):
        edges_by_style[style].append((a, b, colour, width))

    for style, edge_data in edges_by_style.items():
        nx.draw_networkx_edges(
            G, pos, ax=ax_net,
            edgelist=[(a, b) for a, b, _, _ in edge_data],
            edge_color=[c for _, _, c, _ in edge_data],
            width=[w for _, _, _, w in edge_data],
            style=style,
            alpha=0.7,
        )

    nx.draw_networkx_nodes(
        G, pos, ax=ax_net,
        node_color=node_colours,
        node_size=80,
        linewidths=0.5,
        edgecolors="black",
    )
    nx.draw_networkx_labels(
        G, pos,
        labels={s: s[-8:] for s in G.nodes()},
        ax=ax_net,
        font_size=5,
    )

    # ── 7. Network axes ───────────────────────────────────────────────────
    ax_net.set_xlabel("Days since first acquisition", fontsize=11)
    ax_net.set_ylabel("Perpendicular baseline [m]", fontsize=11)    # ✅ real unit
    ax_net.set_title(
        f"{title}\n{subtitle}\n"
        f"{len(scenes)} scenes · {len(flat_pairs)} pairs · "
        f"mean degree {np.mean(list(degrees.values())):.1f}",
        fontsize=11,
    )
    ax_net.tick_params(left=True, bottom=True, labelleft=True, labelbottom=True)
    ax_net.set_frame_on(True)

    # real date ticks on top axis
    x_vals  = [p[0] for p in pos.values()]
    x_ticks = np.linspace(min(x_vals), max(x_vals), min(8, len(pos)))
    ax2 = ax_net.twiny()
    ax2.set_xlim(ax_net.get_xlim())
    ax2.set_xticks(x_ticks)
    ax2.set_xticklabels(
        [
            (t0 + __import__("datetime").timedelta(days=d)).strftime("%Y-%m-%d")
            for d in x_ticks
        ],
        rotation=30, ha="left", fontsize=7,
    )
    ax2.set_xlabel("Acquisition date (UTC)", fontsize=9)

    # ── 8. Per-scene connection histogram ─────────────────────────────────
    # Sort scenes by date so the histogram reads chronologically top→bottom
    sorted_scene_names = sorted(scenes, key=lambda s: id_days[s])
    scene_degrees      = [degrees[s] for s in sorted_scene_names]
    short_names        = [s[-12:] for s in sorted_scene_names]   # trim for readability
    y_positions        = range(len(sorted_scene_names))

    bar_colours = [plt.cm.RdYlGn(degrees[s] / max_deg) for s in sorted_scene_names]

    bars = ax_hist.barh(
        y_positions,
        scene_degrees,
        color=bar_colours,
        edgecolor="white",
        linewidth=0.4,
        height=0.7,
    )

    # annotate each bar with connection count
    for bar, count in zip(bars, scene_degrees):
        ax_hist.text(
            bar.get_width() + 0.1,
            bar.get_y() + bar.get_height() / 2,
            str(count),
            va="center", fontsize=7,
        )

    # vertical line at mean degree
    mean_deg = np.mean(scene_degrees)
    ax_hist.axvline(
        mean_deg, color="steelblue", linestyle="--", linewidth=1.0, alpha=0.8
    )
    ax_hist.text(
        mean_deg + 0.1, len(sorted_scene_names) - 0.5,
        f"mean\n{mean_deg:.1f}",
        color="steelblue", fontsize=7, va="top",
    )

    # mark scenes below min connectivity in red
    for i, (s, deg) in enumerate(zip(sorted_scene_names, scene_degrees)):
        if deg < 2:
            ax_hist.get_children()[i].set_edgecolor("red")
            ax_hist.get_children()[i].set_linewidth(1.5)

    ax_hist.set_yticks(y_positions)
    ax_hist.set_yticklabels(short_names, fontsize=6)
    ax_hist.set_xlabel("Number of connections", fontsize=9)
    ax_hist.set_title("Connections\nper scene", fontsize=10)
    ax_hist.xaxis.set_major_locator(plt.MaxNLocator(integer=True))
    ax_hist.set_frame_on(True)
    # match vertical order to network: earliest at top
    ax_hist.invert_yaxis()

    # ── 9. Legends ────────────────────────────────────────────────────────
    deg_legend = ax_net.legend(
        handles=[
            mpatches.Patch(color=plt.cm.RdYlGn(v / max_deg), label=f"degree {v}")
            for v in sorted(set(degrees.values()))
        ],
        title="Node degree", loc="upper left", fontsize=7, title_fontsize=8,
    )
    ax_net.add_artist(deg_legend)

    if quality_scores:
        ax_net.legend(
            handles=[
                mpatches.Patch(color=_Q_GOOD,  label="Good"),
                mpatches.Patch(color=_Q_RISKY, label="Risky"),
                mpatches.Patch(color=_Q_BAD,   label="Bad"),
                mpatches.Patch(color=_Q_NONE,  label="Unscored"),
            ],
            title="Pair quality", loc="lower right", fontsize=7, title_fontsize=8,
        )
    else:
        ax_net.legend(
            handles=[
                mpatches.Patch(
                    color=plt.cm.RdYlGn_r(v / max_dt), label=f"{v:.0f} days"
                )
                for v in [0, max_dt * 0.33, max_dt * 0.66, max_dt]
            ],
            title="Temporal baseline", loc="lower right", fontsize=7, title_fontsize=8,
        )

    if isinstance(pairs, dict):
        ax_net.add_artist(
            ax_net.legend(
                handles=[
                    mpatches.Patch(
                        linestyle=key_style[k], fill=False,
                        edgecolor="grey", label=f"P{k[0]}/F{k[1]}",
                    )
                    for k in group_keys
                ],
                title="Path / Frame", loc="upper right", fontsize=7, title_fontsize=8,
            )
        )

    if save_path:
        fig.savefig(save_path.as_posix(), dpi=300, bbox_inches="tight")
        print(f"Saved → {save_path}")

    # ── 10. Per-LC-class figures (separate PNGs) ──────────────────────────
    if _has_class_data and save_path:
        sp   = Path(save_path)
        stem = sp.stem
        ext  = sp.suffix

        for cls_name, cls_label in _LC_CLASSES:
            cls_map = _class_scores.get(cls_name, {})
            if not cls_map:
                continue

            c_colours, c_widths = [], []
            for a, b in G.edges():
                sc = cls_map.get(f"{a}:{b}") or cls_map.get(f"{b}:{a}")
                c_colours.append(_quality_colour(sc))
                sv = (sc / 100.0 if sc is not None and sc > 1 else sc)
                c_widths.append(2.0 if sv is not None and sv >= 0.60 else
                                1.2 if sv is not None and sv >= 0.30 else 0.7)

            fig_c = plt.figure(figsize=figsize)
            gs_c  = fig_c.add_gridspec(1, 2, width_ratios=[3, 1], wspace=0.35)
            ax_c  = fig_c.add_subplot(gs_c[0])
            ax_h  = fig_c.add_subplot(gs_c[1])

            # Network
            nx.draw_networkx_edges(
                G, pos, ax=ax_c,
                edgelist=list(G.edges()),
                edge_color=c_colours,
                width=c_widths,
                alpha=0.7,
            )
            nx.draw_networkx_nodes(
                G, pos, ax=ax_c,
                node_color=node_colours,
                node_size=80,
                linewidths=0.5,
                edgecolors="black",
            )
            nx.draw_networkx_labels(
                G, pos,
                labels={s: s[-8:] for s in G.nodes()},
                ax=ax_c,
                font_size=5,
            )

            ax_c.set_xlabel("Days since first acquisition", fontsize=11)
            ax_c.set_ylabel("Perpendicular baseline [m]", fontsize=11)
            ax_c.set_title(
                f"{title}{cls_label}\n{subtitle}\n"
                f"{len(scenes)} scenes · {len(flat_pairs)} pairs",
                fontsize=11,
            )
            ax_c.tick_params(left=True, bottom=True, labelleft=True, labelbottom=True)
            ax_c.set_frame_on(True)

            ax2_c = ax_c.twiny()
            ax2_c.set_xlim(ax_c.get_xlim())
            ax2_c.set_xticks(x_ticks)
            ax2_c.set_xticklabels(
                [(t0 + __import__("datetime").timedelta(days=d)).strftime("%Y-%m-%d")
                 for d in x_ticks],
                rotation=30, ha="left", fontsize=7,
            )
            ax2_c.set_xlabel("Acquisition date (UTC)", fontsize=9)

            ax_c.legend(
                handles=[
                    mpatches.Patch(color=_Q_GOOD,  label="Good  (≥0.60)"),
                    mpatches.Patch(color=_Q_RISKY, label="Risky (0.30–0.60)"),
                    mpatches.Patch(color=_Q_BAD,   label="Bad   (<0.30)"),
                    mpatches.Patch(color=_Q_NONE,  label="Unscored"),
                ],
                title=f"{cls_label} coherence", loc="lower right",
                fontsize=7, title_fontsize=8,
            )

            # Histogram (same degree info — identical across class figures)
            ax_h.barh(
                y_positions, scene_degrees,
                color=bar_colours, edgecolor="white", linewidth=0.4, height=0.7,
            )
            for bar, count in zip(ax_h.patches, scene_degrees):
                ax_h.text(bar.get_width() + 0.1,
                          bar.get_y() + bar.get_height() / 2,
                          str(count), va="center", fontsize=7)
            ax_h.axvline(mean_deg, color="steelblue", linestyle="--",
                         linewidth=1.0, alpha=0.8)
            ax_h.set_yticks(y_positions)
            ax_h.set_yticklabels(short_names, fontsize=6)
            ax_h.set_xlabel("Number of connections", fontsize=9)
            ax_h.set_title("Connections\nper scene", fontsize=10)
            ax_h.xaxis.set_major_locator(plt.MaxNLocator(integer=True))
            ax_h.set_frame_on(True)
            ax_h.invert_yaxis()

            cls_path = sp.parent / f"{stem}_{cls_name}{ext}"
            fig_c.savefig(cls_path.as_posix(), dpi=300, bbox_inches="tight")
            plt.close(fig_c)
            print(f"Saved → {cls_path}")

    return fig

ERA5 Downloader

Download ERA5 pressure-level weather data for MintPy tropospheric correction. Automatically determines required acquisition dates and spatial extents from HyP3 zip files and saves files using MintPy-compatible naming (ERA5_S*_N*_W*_E*_YYYYMMDD_HH.grb). Requires a ~/.cdsapirc file with your CDS API credentials.

from insarhub.utils import ERA5Downloader

era5 = ERA5Downloader(output_dir='path/to/era5', num_processes=3, max_retries=3)
era5.download_batch(batch_dir='path/to/hyp3/outputs')

Parameters:

Name Type Description Default
output_dir str

Directory to save ERA5 .grb files. Created if it does not exist. Defaults to None.

None
num_processes int

Number of parallel download workers. Defaults to 3.

3
max_retries int

Retry attempts per file on download failure. Defaults to 3.

3
Source code in src/insarhub/utils/batch.py
class ERA5Downloader:
    """A class to handle batch downloading of ERA5 weather data for InSAR processing,
    formatted specifically for MintPy compatibility.

    Args:
        output_dir (str, optional): Directory to save ERA5 `.grb` files. Created if it does not exist. Defaults to None.
        num_processes (int, optional): Number of parallel download workers. Defaults to 3.
        max_retries (int, optional): Retry attempts per file on download failure. Defaults to 3.
    """

    PRESSURE_LEVELS = [
        '1', '2', '3', '5', '7', '10', '20', '30', '50', '70', '100', '125', '150', 
        '175', '200', '225', '250', '300', '350', '400', '450', '500', '550', '600', 
        '650', '700', '750', '775', '800', '825', '850', '875', '900', '925', '950', 
        '975', '1000'
    ]

    def __init__(self, output_dir=None, num_processes=3, max_retries=3):
        self.output_dir = Path(output_dir).expanduser().resolve() if output_dir else None
        if self.output_dir:
            self.output_dir.mkdir(parents=True, exist_ok=True)
        self.num_processes = num_processes
        self.max_retries = max_retries

        # Internal client holder for workers
        self._worker_client = None

    @staticmethod
    def _get_round_hour(time_str):
        """Rounds HHMMSS to the nearest whole hour."""
        h = int(time_str[0:2])
        m = int(time_str[2:4])
        s = int(time_str[4:6])

        if m > 30 or (m == 30 and s > 0):
            h += 1
        if h == 24:
            h = 0
        return f"{h:02d}"

    @staticmethod
    def _calculate_snwe(snwe, min_buffer=2, step=10):
        """Calculates buffered bounding box in multiples of 'step'."""
        def ceil2multiple(x, s):
            return x if x % s == 0 else x + (s - x % s)

        def floor2multiple(x, s):
            return x - x % s

        s_orig, n_orig, w_orig, e_orig = snwe
        S = np.floor(min(s_orig, n_orig) - min_buffer)
        N = np.ceil(max(s_orig, n_orig) + min_buffer)
        W = np.floor(min(w_orig, e_orig) - min_buffer)
        E = np.ceil(max(w_orig, e_orig) + min_buffer)

        if step > 1:
            S, W = floor2multiple(S, step), floor2multiple(W, step)
            N, E = ceil2multiple(N, step), ceil2multiple(E, step)

        return (int(S), int(N), int(W), int(E))

    @staticmethod
    def _get_mintpy_filename(output_dir, day, hr, snwe):
        """Generates MintPy-compliant filename: ERA5_S10_N20_W120_E110_YYYYMMDD_HH.grb"""
        s, n, w, e = snwe
        def fmt(val):
            return f"{'S' if val < 0 else 'N'}{abs(val)}" if val == s or val == n \
                   else f"{'W' if val < 0 else 'E'}{abs(val)}"

        # Note: MintPy logic often uses N/S for lat and W/E for lon
        area_str = f"_{fmt(s)}_{fmt(n)}_{fmt(w)}_{fmt(e)}"
        return Path(output_dir) / f"ERA5{area_str}_{day}_{hr}.grb"

    def _prepare_cds_payload(self, day, hr, snwe_tuple):
        """Formats the dictionary for the CDS API request."""
        S, N, W, E = snwe_tuple
        return {
            'product_type': ['reanalysis'],
            'variable': ['geopotential', 'temperature', 'specific_humidity'],
            'year': [day[0:4]],
            'month': [day[4:6]],
            'day': [day[6:8]],
            'time': [f'{hr}:00'],
            'pressure_level': self.PRESSURE_LEVELS,
            'data_format': 'grib',
            'area': [N, W, S, E],  # CDS format: North, West, South, East
        }

    @classmethod
    def _worker_init(cls):
        """Initializer for multiprocessing pool to create a per-process CDS client."""
        global _client
        _client = cdsapi.Client(progress=False, quiet=True)
        logger = logging.getLogger('cdsapi')
        logger.setLevel(logging.WARNING)

    @staticmethod
    def _download_worker(task_info):
        """The actual download function executed by the worker process."""
        global _client
        dataset = task_info['dataset']
        dest_path = task_info['dest_path']
        max_retries = task_info['max_retries']

        for attempt in range(1, max_retries + 1):
            try:
                result = _client.retrieve('reanalysis-era5-pressure-levels', dataset)
                result.download(dest_path)
                return Path(dest_path).name
            except Exception as e:
                if attempt == max_retries:
                    return f"ERROR: {dest_path} failed after {max_retries} attempts: {str(e)}"
                time.sleep(min(60, 5 * attempt))

    def download_batch(self, batch_dir):
        """Scan a directory of HyP3 zip files, determine required ERA5 dates and extents, and download missing files.

        Already-downloaded files are skipped automatically, so the method is safe to re-run after an interrupted download.

        Args:
            batch_dir (str): Directory containing HyP3 `.zip` files. Subdirectories are scanned if no zips are found directly.

        Raises:
            ValueError: If no valid geometry can be extracted from any zip file in the batch directory.
        """
        batch_path = Path(batch_dir).expanduser().resolve()

        # If zips are directly in batch_dir, treat it as a single group
        direct_zips = list(batch_path.glob('*.zip'))
        folders_to_scan = [batch_path] if direct_zips else [
            s for s in batch_path.iterdir() if s.is_dir()
        ]

        for subfolder in tqdm(folders_to_scan, desc="Folders", position=0):
            zip_files = list(subfolder.glob('*.zip'))
            if not zip_files:
                continue

            W, E, N, S = 180, -180, -90, 90
            dates = set()
            valid_files_count = 0

            # 1. Scan Metadata from Zips
            for zip_path in tqdm(zip_files, desc=f"Scanning {subfolder.name[:10]}...", leave=False, position=1):
                try:
                    with zipfile.ZipFile(zip_path, 'r') as z:
                        namelist = z.namelist()

                        # Extract Dates
                        date_match = re.findall(r'(\d{8})T(\d{6})', zip_path.name)
                        for d, t in date_match:
                            dates.add(f'{d}_{self._get_round_hour(t)}')

                        # Extract Spatial Bounds using GDAL Virtual File System
                        dem_file = next((f for f in namelist if '_dem.tif' in f or '_unw_phase.tif' in f), None)
                        if dem_file:
                            vsi_path = f"/vsizip/{zip_path.as_posix()}/{dem_file}"
                            with rasterio.open(vsi_path) as src:
                                l, b, r, t = src.bounds
                                wgs = transform_bounds(src.crs, 'EPSG:4326', l, b, r, t)
                                W, S, E, N = min(W, wgs[0]), min(S, wgs[1]), max(E, wgs[2]), max(N, wgs[3])
                                valid_files_count += 1
                except Exception:
                    continue

            if valid_files_count == 0:
                print(f"{Fore.RED}No geometry found in {subfolder.name}")
                continue

            # 2. Prepare Download Tasks
            snwe_tuple = self._calculate_snwe((S, N, W, E))
            era5_out = self.output_dir if self.output_dir else subfolder
            era5_out.mkdir(parents=True, exist_ok=True)
            tasks = []
            for date_str in sorted(dates):
                day, hr = date_str.split('_')
                output_path = self._get_mintpy_filename(era5_out, day, hr, snwe_tuple)

                if output_path.exists():
                    continue

                tasks.append({
                    'dataset': self._prepare_cds_payload(day, hr, snwe_tuple),
                    'dest_path': output_path.as_posix(),
                    'max_retries': self.max_retries
                })

            # 3. Execute Parallel Downloads
            if not tasks:
                print(f"{Fore.GREEN}All files exist for {subfolder.name}")
                continue

            tqdm.write(f"{Fore.CYAN}Downloading {len(tasks)} files for {subfolder.name}...")
            with multiprocessing.Pool(processes=self.num_processes, initializer=self._worker_init) as pool:
                with tqdm(total=len(tasks), desc="Progress", unit="file", leave=False) as pbar:
                    for result in pool.imap_unordered(self._download_worker, tasks):
                        if result.startswith("ERROR"):
                            pbar.write(f"{Fore.RED}{result}")
                        else:
                            pbar.set_postfix_str(f"Finished: {result}")
                        pbar.update(1)

        print(f"{Fore.MAGENTA}Batch Processing Complete.")
download_batch(batch_dir)

Parameters:

Name Type Description Default
batch_dir str

Directory containing HyP3 .zip files. Subdirectories are scanned if no zips are found directly.

required

Raises:

Type Description
ValueError

If no valid geometry can be extracted from any zip file in the batch directory.

Source code in src/insarhub/utils/batch.py
def download_batch(self, batch_dir):
    """Scan a directory of HyP3 zip files, determine required ERA5 dates and extents, and download missing files.

    Already-downloaded files are skipped automatically, so the method is safe to re-run after an interrupted download.

    Args:
        batch_dir (str): Directory containing HyP3 `.zip` files. Subdirectories are scanned if no zips are found directly.

    Raises:
        ValueError: If no valid geometry can be extracted from any zip file in the batch directory.
    """
    batch_path = Path(batch_dir).expanduser().resolve()

    # If zips are directly in batch_dir, treat it as a single group
    direct_zips = list(batch_path.glob('*.zip'))
    folders_to_scan = [batch_path] if direct_zips else [
        s for s in batch_path.iterdir() if s.is_dir()
    ]

    for subfolder in tqdm(folders_to_scan, desc="Folders", position=0):
        zip_files = list(subfolder.glob('*.zip'))
        if not zip_files:
            continue

        W, E, N, S = 180, -180, -90, 90
        dates = set()
        valid_files_count = 0

        # 1. Scan Metadata from Zips
        for zip_path in tqdm(zip_files, desc=f"Scanning {subfolder.name[:10]}...", leave=False, position=1):
            try:
                with zipfile.ZipFile(zip_path, 'r') as z:
                    namelist = z.namelist()

                    # Extract Dates
                    date_match = re.findall(r'(\d{8})T(\d{6})', zip_path.name)
                    for d, t in date_match:
                        dates.add(f'{d}_{self._get_round_hour(t)}')

                    # Extract Spatial Bounds using GDAL Virtual File System
                    dem_file = next((f for f in namelist if '_dem.tif' in f or '_unw_phase.tif' in f), None)
                    if dem_file:
                        vsi_path = f"/vsizip/{zip_path.as_posix()}/{dem_file}"
                        with rasterio.open(vsi_path) as src:
                            l, b, r, t = src.bounds
                            wgs = transform_bounds(src.crs, 'EPSG:4326', l, b, r, t)
                            W, S, E, N = min(W, wgs[0]), min(S, wgs[1]), max(E, wgs[2]), max(N, wgs[3])
                            valid_files_count += 1
            except Exception:
                continue

        if valid_files_count == 0:
            print(f"{Fore.RED}No geometry found in {subfolder.name}")
            continue

        # 2. Prepare Download Tasks
        snwe_tuple = self._calculate_snwe((S, N, W, E))
        era5_out = self.output_dir if self.output_dir else subfolder
        era5_out.mkdir(parents=True, exist_ok=True)
        tasks = []
        for date_str in sorted(dates):
            day, hr = date_str.split('_')
            output_path = self._get_mintpy_filename(era5_out, day, hr, snwe_tuple)

            if output_path.exists():
                continue

            tasks.append({
                'dataset': self._prepare_cds_payload(day, hr, snwe_tuple),
                'dest_path': output_path.as_posix(),
                'max_retries': self.max_retries
            })

        # 3. Execute Parallel Downloads
        if not tasks:
            print(f"{Fore.GREEN}All files exist for {subfolder.name}")
            continue

        tqdm.write(f"{Fore.CYAN}Downloading {len(tasks)} files for {subfolder.name}...")
        with multiprocessing.Pool(processes=self.num_processes, initializer=self._worker_init) as pool:
            with tqdm(total=len(tasks), desc="Progress", unit="file", leave=False) as pbar:
                for result in pool.imap_unordered(self._download_worker, tasks):
                    if result.startswith("ERROR"):
                        pbar.write(f"{Fore.RED}{result}")
                    else:
                        pbar.set_postfix_str(f"Finished: {result}")
                    pbar.update(1)

    print(f"{Fore.MAGENTA}Batch Processing Complete.")

Earth Credit Pool

If user have multiple Earthdata credentials, user may storage it under ~/.credit_pool with format

username1:password1
username2:password2
then read use:
from insarhub.utils import earth_credit_pool
ec_pool = earth_credit_pool()
You may then pass this into processor for seameless switch across multiple Earthdata credentials

from insarhub import Processor
processor= Processor.create('Hyp3_InSAR', earthdata_credentials_pool=ec_pool, ....)

Parameters:

Name Type Description Default
earthdata_credentials_pool_path Path

Path to the Earthdata credentials file. Defaults to ~/.credit_pool. The path is expanded and resolved to an absolute path.

joinpath('.credit_pool')

Raises:

Type Description
FileNotFoundError

If the specified credentials file does not exist.

ValueError

If any line in the file does not contain a single ':' separating key and value.

OSError

For any other I/O related errors while reading the file.

Source code in src/insarhub/utils/tool.py
def earth_credit_pool(earthdata_credentials_pool_path = Path.home().joinpath('.credit_pool')) -> dict:
    """
    Load Earthdata credentials from a local credit pool file.

    The function reads a simple key-value file where each line contains
    `username:password` (or `key:value`) pairs, and returns them as a dictionary.

    Args:
        earthdata_credentials_pool_path (Path, optional):
            Path to the Earthdata credentials file. Defaults to
            `~/.credit_pool`. The path is expanded and resolved to an absolute path.

    Returns:
        dict:
            Dictionary mapping credential keys to their corresponding values.
            Example:
            ```
            {
                "username1": "password1",
                "username2": "password2",
            }
            ```

    Raises:
        FileNotFoundError:
            If the specified credentials file does not exist.
        ValueError:
            If any line in the file does not contain a single ':' separating key and value.
        OSError:
            For any other I/O related errors while reading the file.

    Notes:
        - Each line of the file must be formatted as `key:value`.
        - Leading/trailing whitespace is stripped from both key and value.
        - Useful for managing multiple Earthdata credentials for automated downloads.
    """
    earthdata_credentials_pool_path = Path(earthdata_credentials_pool_path).expanduser().resolve()
    earthdata_credentials_pool = {}
    with open(earthdata_credentials_pool_path, 'r') as f:
        for line in f:
            key, value = line.strip().split(':')
            earthdata_credentials_pool[key] = value
    return earthdata_credentials_pool

Slurm Job Config

This class encapsulates all parameters needed to generate a SLURM batch script, including resource allocation, job settings, environment configuration, and execution commands.

from insarhub.utils import Slurmjob_Config
config = SlurmJobConfig(
            job_name="my_analysis",
            time="02:00:00",
            command="python analyze.py"
        )
config.to_script("analysis.slurm")

Attributes:

Name Type Description
job_name str

Name of the SLURM job.

output_file str

Path for standard output. Use %j for job ID.

error_file str

Path for standard error. Use %j for job ID.

time str

Maximum wall time in HH:MM:SS format.

partition str

SLURM partition name to submit to.

nodes int

Number of nodes to allocate.

ntasks int

Number of tasks to run.

cpus_per_task int

CPUs per task.

mem str

Memory allocation per node (e.g., "4G", "500M").

nodelist Optional[str]

Specific nodes to use (e.g., "node[01-05]").

gpus Optional[str]

GPU allocation (e.g., "1", "2", "1g").

array Optional[str]

Array job specification (e.g., "0-9", "1-100%10").

dependency Optional[str]

Job dependency condition (e.g., "afterok:123456").

mail_user Optional[str]

Email address for job notifications.

mail_type str

When to send email notifications (BEGIN, END, FAIL, ALL).

account Optional[str]

Account to charge resources to.

qos Optional[str]

Quality of Service specification.

modules List[str]

List of environment modules to load.

conda_env Optional[str]

Name of conda environment to activate.

export_env Dict[str, str]

Dictionary of environment variables to export.

command str

Bash command(s) to execute.

Source code in src/insarhub/utils/tool.py
@dataclass
class Slurmjob_Config:
    """Configuration for a SLURM job submission script.

    This class encapsulates all parameters needed to generate a SLURM batch script,
    including resource allocation, job settings, environment configuration, and
    execution commands.

    Attributes:
        job_name: Name of the SLURM job.
        output_file: Path for standard output. Use %j for job ID.
        error_file: Path for standard error. Use %j for job ID.
        time: Maximum wall time in HH:MM:SS format.
        partition: SLURM partition name to submit to.
        nodes: Number of nodes to allocate.
        ntasks: Number of tasks to run.
        cpus_per_task: CPUs per task.
        mem: Memory allocation per node (e.g., "4G", "500M").
        nodelist: Specific nodes to use (e.g., "node[01-05]").
        gpus: GPU allocation (e.g., "1", "2", "1g").
        array: Array job specification (e.g., "0-9", "1-100%10").
        dependency: Job dependency condition (e.g., "afterok:123456").
        mail_user: Email address for job notifications.
        mail_type: When to send email notifications (BEGIN, END, FAIL, ALL).
        account: Account to charge resources to.
        qos: Quality of Service specification.
        modules: List of environment modules to load.
        conda_env: Name of conda environment to activate.
        export_env: Dictionary of environment variables to export.
        command: Bash command(s) to execute.

    Examples:
        Basic job configuration:

        >>> config = SlurmJobConfig(
        ...     job_name="my_analysis",
        ...     time="02:00:00",
        ...     command="python analyze.py"
        ... )
        >>> config.to_script("analysis.slurm")
        PosixPath('analysis.slurm')

        GPU job with conda environment:

        >>> config = SlurmJobConfig(
        ...     job_name="training",
        ...     time="12:00:00",
        ...     mem="32G",
        ...     gpus="2",
        ...     conda_env="pytorch",
        ...     modules=["cuda/11.8"],
        ...     command="python train.py --epochs 100"
        ... )
        >>> config.to_script("train.slurm")
        PosixPath('train.slurm')

        Array job with environment variables:

        >>> config = SlurmJobConfig(
        ...     job_name="param_sweep",
        ...     array="0-99",
        ...     export_env={"PARAM_ID": "$SLURM_ARRAY_TASK_ID"},
        ...     command="python run_experiment.py $PARAM_ID"
        ... )
        >>> config.to_script()
        PosixPath('job.slurm')
    """
    job_name: str = "my_job"
    output_file: str = "job_%j.out"
    error_file: str = "job_%j.err"
    time: str = "04:00:00"
    partition: str = "all"
    nodes: int = 1
    ntasks: int = 1
    cpus_per_task: int = 1
    mem: str = "4G"

    # Optional parameters
    nodelist: Optional[str] = None
    gpus: Optional[str] = None
    array: Optional[str] = None
    dependency: Optional[str] = None
    mail_user: Optional[str] = None
    mail_type: str = "ALL"
    account: Optional[str] = None
    qos: Optional[str] = None

    # Environment
    modules: List[str] = field(default_factory=list)
    conda_env: Optional[str] = None
    export_env: Dict[str, str] = field(default_factory=dict)

    # Execution
    command: str = "echo Hello SLURM!"

    def to_script(self, filename: str = "job.slurm") -> Path:
        """Generate the SLURM script file."""
        lines = ["#!/bin/bash"]

        # Required directives
        lines.extend([
            f"#SBATCH --job-name={self.job_name}",
            f"#SBATCH --output={self.output_file}",
            f"#SBATCH --error={self.error_file}",
            f"#SBATCH --time={self.time}",
            f"#SBATCH --partition={self.partition}",
            f"#SBATCH --nodes={self.nodes}",
            f"#SBATCH --ntasks={self.ntasks}",
            f"#SBATCH --cpus-per-task={self.cpus_per_task}",
            f"#SBATCH --mem={self.mem}",
        ])

        # Optional directives
        if self.gpus:
            lines.append(f"#SBATCH --gres=gpu:{self.gpus}")
        if self.array:
            lines.append(f"#SBATCH --array={self.array}")
        if self.dependency:
            lines.append(f"#SBATCH --dependency={self.dependency}")
        if self.mail_user:
            lines.append(f"#SBATCH --mail-user={self.mail_user}")
            lines.append(f"#SBATCH --mail-type={self.mail_type}")
        if self.account:
            lines.append(f"#SBATCH --account={self.account}")
        if self.qos:
            lines.append(f"#SBATCH --qos={self.qos}")
        if self.nodelist:
            lines.append(f"#SBATCH --nodelist={self.nodelist}")

        lines.append("")

        # Environment setup
        lines.extend([f"module load {mod}" for mod in self.modules])
        if self.conda_env:
            lines.append(f"source activate {self.conda_env}")
        lines.extend([f"export {k}={v}" for k, v in self.export_env.items()])

        lines.append("")

        # Execution
        lines.extend([
            'echo "Starting job on $(date)"',
            self.command,
            'echo "Job finished on $(date)"'
        ])

        filepath = Path(filename).expanduser().resolve()
        filepath.write_text("\n".join(lines))

        return filepath

⚠️ Major Redesign

InSARScript v1.1.0 has change of APIs, this documentation is not compatible with version v1.0.0.