Skip to content

Analyzer

The InSARHub analyzer module provides workflow for InSAR time-series analysis.

  • Import analyzer

    Import the Analyzer class to access all time-series analysis functionality

    from insarhub import Analyzer
    

  • View Available Analyzers

    List all registered analyzers

    Analyzer.available()
    

Available Analyzers

InSARHub wrapped Mintpy as one of its analysis backends. The Mintpy_SBAS_Base_Analyzer is implemented on top of a reusable base configuration class, which provides the full smallbaselineApp logic of Mintpy. Provides users with an experience similar to using MintPy directly, allowing full customization of processing parameters and steps.

Source code in src/insarhub/analyzer/mintpy_base.py
class Mintpy_SBAS_Base_Analyzer(BaseAnalyzer):

    description = "Generic MintPy SBAS analyzer, fully customizable configs."
    compatible_processor = 'all'
    default_config = Mintpy_SBAS_Base_Config
    '''
    Base class for Mintpy SBAS analysis. This class provides a template for implementing 
    specific analysis methods using the Mintpy software package.
    '''
    def __init__(self, config: Mintpy_SBAS_Base_Config | None = None):
        super().__init__(config)

        self.workdir = self.config.workdir
        self.tmp_dir = self.workdir.joinpath('tmp')
        self.clip_dir = self.workdir.joinpath('clip')
        self.cfg_path = self.workdir.joinpath('.mintpy.cfg')
        write_workflow_marker(self.workdir, analyzer=type(self).name)

    def prep_data(self):
        """Write the MintPy config file to workdir."""
        self.config.write_mintpy_config(self.cfg_path)

    def _validate_cds_token(self, key: str) -> bool:
        """Validate a CDS API token via a lightweight HTTP request (no download)."""
        import requests as _requests
        endpoints = [
            # Fast profile endpoint (new CDS API)
            ("GET", "https://cds.climate.copernicus.eu/api/account/me",
             {"PRIVATE-TOKEN": key}),
            # Fallback: jobs list
            ("GET", "https://cds.climate.copernicus.eu/api/retrieve/v1/jobs",
             {"PRIVATE-TOKEN": key}),
        ]
        for method, url, headers in endpoints:
            try:
                resp = _requests.request(method, url, headers=headers,
                                         params={"limit": 1}, timeout=30)
                if resp.status_code == 200:
                    return True
                if resp.status_code in (401, 403):
                    return False
            except _requests.exceptions.Timeout:
                continue
            except Exception:
                continue
        # If all endpoints timed out, assume valid to avoid blocking the user
        print(f"{Fore.YELLOW}CDS API unreachable (timeout) — assuming token is valid.{Fore.RESET}")
        return True

    def _cds_authorize(self):
        """Ensure valid CDS credentials exist, prompting the user if needed."""
        cdsapirc_path = Path.home() / ".cdsapirc"
        # Try existing .cdsapirc first
        if cdsapirc_path.is_file():
            key = None
            for line in cdsapirc_path.read_text().splitlines():
                if line.strip().startswith("key:"):
                    key = line.split(":", 1)[1].strip()
                    break
            if key and self._validate_cds_token(key):
                return True
            print(f"{Fore.YELLOW}CDS token in .cdsapirc is invalid or expired. Will prompt login.\n")

        # Prompt user for a valid token
        while True:
            self._cds_token = getpass.getpass("Enter your CDS api token at https://cds.climate.copernicus.eu/profile: ")
            if not self._validate_cds_token(self._cds_token):
                print(f"{Fore.RED}Authentication failed. Please check your token and try again.\n")
                continue
            cdsapirc_path.write_text(f"url: https://cds.climate.copernicus.eu/api\nkey: {self._cds_token}\n")
            print(f"{Fore.GREEN}Credentials saved to {cdsapirc_path}.\n")
            return True

    def submit_hpc(self, steps: list[str] | None = None) -> str:
        """Generate a sbatch script for the full MintPy run and submit it.

        Returns the SLURM job ID string.
        """
        from insarhub.utils.tool import Slurmjob_Config

        mintpy_dir = self.workdir / "mintpy"
        mintpy_dir.mkdir(parents=True, exist_ok=True)

        # Merge user opts over defaults
        _defaults = {"time": "24:00:00", "ntasks": 1, "cpus_per_task": 16, "mem": "128G", "partition": "all"}
        opts = {**_defaults, **(self.config.hpc_sbatch_opts or {})}

        _slurm_fields = {f.name for f in dataclasses.fields(Slurmjob_Config)}
        _skip = {"job_name", "output_file", "error_file", "command",
                 "modules", "conda_env", "export_env", "array", "dependency"}
        slurm_kwargs = {k: v for k, v in opts.items()
                        if k in _slurm_fields and k not in _skip}

        slurm_cfg = Slurmjob_Config(
            job_name="mintpy_sbas",
            output_file=str(mintpy_dir / "mintpy_slurm_%j.out"),
            error_file=str(mintpy_dir / "mintpy_slurm_%j.err"),
            **slurm_kwargs,
        )

        import os
        import shutil

        insarhub_bin = shutil.which("insarhub") or f"{Path(sys.executable).parent}/insarhub"
        analyzer_name = type(self).name
        current_path  = os.environ.get("PATH", "")

        body = "\n".join([
            f'export PATH="{current_path}"',
            f"{insarhub_bin} analyzer -N {analyzer_name} -w {self.workdir} run",
        ])

        lines = ["#!/bin/bash"] + slurm_cfg.to_header_lines() + ["", body, ""]
        sbatch_script = mintpy_dir / "mintpy_sbas.sbatch"
        sbatch_script.write_text("\n".join(lines) + "\n")
        sbatch_script.chmod(0o755)

        result = subprocess.run(
            ["sbatch", "--parsable", str(sbatch_script)],
            capture_output=True, text=True,
        )
        if result.returncode != 0:
            raise RuntimeError(f"sbatch failed: {result.stderr.strip()}")

        job_id = result.stdout.strip().split(";")[0]

        job_file = mintpy_dir / "mintpy_job.json"
        job_file.write_text(json.dumps({
            "job_id":  job_id,
            "status":  "PENDING",
            "script":  str(sbatch_script),
            "log":     str(mintpy_dir / f"mintpy_slurm_{job_id}.out"),
        }, indent=2))

        print(f"{Fore.GREEN}MintPy SBAS job submitted: {job_id}{Style.RESET_ALL}")
        print(f"  script : {sbatch_script}")
        print(f"  log    : {mintpy_dir}/mintpy_slurm_{job_id}.out")
        return job_id

    def run(self, steps=None):
        """
        Run the MintPy SBAS time-series analysis workflow.

        This method writes the MintPy configuration file, optionally authorizes
        CDS access for tropospheric correction, and executes the selected
        MintPy processing steps using TimeSeriesAnalysis.

        Args:
            steps (list[str] | None, optional):
                List of MintPy processing steps to execute. If None, the
                default full workflow is executed:
                    [
                        'load_data', 'modify_network', 'reference_point', 'quick_overview',
                        'invert_network', 'correct_LOD', 'correct_SET',
                        'correct_ionosphere', 'correct_troposphere',
                        'deramp', 'correct_topography', 'residual_RMS',
                        'reference_date', 'velocity', 'geocode',
                        'google_earth', 'hdfeos5'
                    ]

        Raises:
            RuntimeError: If tropospheric delay method requires CDS authorization
                and authorization fails.
            Exception: Propagates exceptions raised during MintPy execution.

        Notes:
            - If `troposphericDelay_method` is set to 'pyaps', CDS
            authorization is performed before running MintPy.
            - The configuration file is written to `self.cfg_path`.
            - Processing is executed inside `self.workdir`.
            - This method wraps MintPy TimeSeriesAnalysis for SBAS workflows.
        """
        run_steps = steps or [
            'load_data', 'modify_network', 'reference_point', 'quick_overview', 'invert_network',
            'correct_LOD', 'correct_SET', 'correct_ionosphere', 'correct_troposphere',
            'deramp', 'correct_topography', 'residual_RMS', 'reference_date',
            'velocity', 'geocode', 'google_earth', 'hdfeos5'
        ]

        if self.config.troposphericDelay_method == 'pyaps' and 'correct_troposphere' in run_steps:
            self._cds_authorize()
        print(f'{Style.BRIGHT}{Fore.MAGENTA}Running MintPy Analysis...{Fore.RESET}')
        app = TimeSeriesAnalysis(self.cfg_path.as_posix(), self.workdir.as_posix())
        app.open()
        app.run(steps=run_steps)
        if 'geocode' in run_steps:
            self._geocode_diagnostic_files(self.cfg_path.parent)

    def _geocode_diagnostic_files(self, mintpy_work: Path) -> None:
        """Geocode diagnostic files omitted from MintPy's default geocode step.

        MintPy only geocodes temporalCoherence, avgSpatialCoh, timeseries, velocity.
        avgPhaseVelocity, numTriNonzeroIntAmbiguity, and maskConnComp are left in
        radar coordinates. This method geocodes them into geo/ when a lookup table
        is available (radar-coord inputs). For already-geocoded inputs the method
        is a no-op.
        """
        geo_dir = mintpy_work / 'geo'
        if not geo_dir.exists():
            return  # geocode step skipped by MintPy (inputs already geocoded)

        try:
            from mintpy.utils import utils as _mut
            _, _, lookup_file = _mut.check_loaded_dataset(str(mintpy_work), print_msg=False)[:3]
        except Exception:
            return

        if not lookup_file:
            return  # geocoded inputs — no lookup table

        _DIAG = ['avgPhaseVelocity.h5', 'numTriNonzeroIntAmbiguity.h5', 'maskConnComp.h5']
        to_geo = [
            str(mintpy_work / f) for f in _DIAG
            if (mintpy_work / f).exists() and not (geo_dir / f'geo_{f}').exists()
        ]
        if not to_geo:
            return

        try:
            import mintpy.cli.geocode as _geo_cli
            iargs = to_geo + ['-l', lookup_file, '--outdir', str(geo_dir), '--update']
            print(f'{Fore.CYAN}Geocoding diagnostic files: {[Path(f).name for f in to_geo]}{Fore.RESET}')
            _geo_cli.main(iargs)
        except Exception as e:
            print(f'{Fore.YELLOW}Warning: could not geocode diagnostic files: {e}{Fore.RESET}')

    def cleanup(self):
        """
        Remove temporary files and directories generated during processing.

        This method deletes the temporary working directories and any `.zip`
        archives in `self.workdir`. If debug mode is enabled, temporary files
        are preserved and a message is printed instead.

        Behavior:
            - Deletes `self.tmp_dir` and `self.clip_dir` if they exist.
            - Deletes all `.zip` files in `self.workdir`.
            - Prints informative messages for each removal or failure.
            - Respects `self.config.debug`; no files are deleted in debug mode.

        Raises:
            Exception: Propagates any unexpected errors raised during removal.

        Notes:
            - Useful for freeing disk space after large InSAR or MintPy
            processing workflows.
            - Temporary directories should contain only non-essential files
            to avoid accidental data loss.
        """

        if self.config.debug:
            print(f"{Fore.YELLOW}Debug mode is enabled. Keeping temporary files at: {self.workdir}{Fore.RESET}")
            return
        print(f"{Fore.CYAN}Step: Cleaning up temporary directories...{Fore.RESET}")

        for folder in [self.tmp_dir, self.clip_dir]:
            if folder.exists() and folder.is_dir():
                try:
                    shutil.rmtree(folder)
                    print(f"  Removed: {folder.relative_to(self.workdir)}")
                except Exception as e:
                    print(f"{Fore.RED}  Failed to remove {folder}: {e}{Fore.RESET}")

        zips = list(self.workdir.glob('*.zip'))
        if zips:
            print(f"{Fore.CYAN}Step: Removing zip archives...{Fore.RESET}")
            for zf in zips:
                try:
                    zf.unlink()
                    print(f"  Removed: {zf.name}")
                except Exception as e:
                    print(f"{Fore.RED}  Failed to remove {zf.name}: {e}{Fore.RESET}")

        print(f"{Fore.GREEN}Cleanup complete.{Fore.RESET}")

Usage

  • Create Analyzer with Parameters

    Initialize an analyzer instance

    analyzer = Analyzer.create('Mintpy_SBAS_Base_Analyzer',
                                workdir="/your/work/dir",
                                load_processor="hyp3", ....)
    
    OR
    params = {"workdir": "/your/work/dir", "load_processor": "hyp3" ....}
    analyzer = Analyzer.create('Mintpy_SBAS_Base_Analyzer', **params)
    
    OR
    from insarhub.config import Mintpy_SBAS_Base_Config
    cfg = Mintpy_SBAS_Base_Config(workdir="/your/work/dir",
                                  load_processor="hyp3",
                                  ....)
    analyzer = Analyzer.create('Mintpy_SBAS_Base_Analyzer', config=cfg)
    

    The base config Mintpy_SBAS_Base_Config contains all parameters from Mintpy smallbaselineApp.cfg. For detailed descriptions refer to the official Mintpy config documentation.

    Source code in src/insarhub/config/defaultconfig.py
    355
    356
    357
    358
    359
    360
    361
    362
    363
    364
    365
    366
    367
    368
    369
    370
    371
    372
    373
    374
    375
    376
    377
    378
    379
    380
    381
    382
    383
    384
    385
    386
    387
    388
    389
    390
    391
    392
    393
    394
    395
    396
    397
    398
    399
    400
    401
    402
    403
    404
    405
    406
    407
    408
    409
    410
    411
    412
    413
    414
    415
    416
    417
    418
    419
    420
    421
    422
    423
    424
    425
    426
    427
    428
    429
    430
    431
    432
    433
    434
    435
    436
    437
    438
    439
    440
    441
    442
    443
    444
    445
    446
    447
    448
    449
    450
    451
    452
    453
    454
    455
    456
    457
    458
    459
    460
    461
    462
    463
    464
    465
    466
    467
    468
    469
    470
    471
    472
    473
    474
    475
    476
    477
    478
    479
    480
    481
    482
    483
    484
    485
    486
    487
    488
    489
    490
    491
    492
    493
    494
    495
    496
    497
    498
    499
    500
    501
    502
    503
    504
    505
    506
    507
    508
    509
    510
    511
    512
    513
    514
    515
    516
    517
    518
    519
    520
    521
    522
    523
    524
    525
    526
    527
    528
    529
    530
    531
    532
    533
    534
    535
    536
    537
    538
    539
    540
    541
    542
    543
    544
    545
    546
    547
    548
    549
    550
    551
    552
    553
    554
    555
    556
    557
    558
    559
    560
    561
    562
    563
    564
    565
    566
    567
    568
    569
    570
    571
    572
    573
    574
    575
    576
    577
    578
    579
    580
    581
    582
    583
    584
    585
    586
    587
    588
    589
    590
    591
    592
    593
    594
    595
    596
    597
    598
    599
    600
    601
    602
    603
    604
    605
    606
    607
    608
    609
    610
    611
    612
    613
    614
    615
    616
    617
    618
    619
    620
    621
    622
    623
    624
    625
    626
    627
    628
    629
    630
    631
    632
    633
    634
    635
    636
    637
    638
    639
    640
    641
    642
    643
    644
    645
    646
    647
    648
    649
    650
    651
    652
    653
    654
    655
    656
    657
    658
    659
    660
    661
    662
    663
    664
    665
    666
    667
    668
    669
    670
    671
    672
    673
    674
    675
    676
    677
    678
    679
    680
    681
    682
    683
    684
    685
    686
    687
    688
    689
    690
    691
    692
    693
    694
    695
    696
    697
    698
    699
    700
    701
    702
    703
    704
    705
    706
    707
    708
    709
    710
    711
    712
    713
    714
    715
    716
    717
    718
    719
    720
    721
    722
    723
    724
    725
    726
    727
    728
    729
    730
    731
    732
    733
    734
    735
    736
    737
    738
    739
    740
    741
    742
    743
    744
    745
    746
    747
    748
    749
    750
    751
    752
    753
    754
    755
    756
    757
    758
    759
    760
    761
    762
    763
    764
    765
    766
    767
    768
    769
    770
    771
    772
    773
    774
    775
    776
    777
    778
    779
    780
    781
    782
    783
    784
    785
    786
    787
    788
    789
    790
    791
    792
    793
    794
    795
    796
    797
    798
    799
    800
    801
    802
    803
    804
    805
    806
    807
    808
    809
    810
    811
    812
    813
    814
    815
    816
    817
    818
    819
    820
    821
    822
    823
    824
    825
    826
    827
    828
    829
    830
    831
    832
    833
    834
    835
    836
    837
    838
    839
    840
    841
    842
    843
    844
    845
    846
    847
    848
    849
    850
    851
    852
    853
    854
    855
    856
    857
    858
    859
    860
    861
    862
    863
    864
    865
    866
    867
    868
    869
    870
    871
    872
    873
    874
    875
    @dataclass
    class Mintpy_SBAS_Base_Config:
        '''
        Dataclass containing all configuration options for Mintpy SBAS jobs.
    
        UI metadata is stored in ``_ui_groups`` / ``_ui_fields`` and consumed
        by the API layer to auto-generate the settings panel.
        '''
    
        # ── UI metadata consumed by the API / settings panel ─────────────────────
        _ui_groups: ClassVar[list] = [
            {"label": "Compute Resources",
             "fields": ["compute_maxMemory", "compute_cluster", "compute_numWorker", "compute_config"]},
            {"label": "Load Data",
             "fields": ["load_processor", "load_autoPath", "load_updateMode", "load_compression",
                        "load_metaFile", "load_baselineDir",
                        "load_unwFile", "load_corFile", "load_connCompFile", "load_intFile", "load_magFile",
                        "load_ionUnwFile", "load_ionCorFile", "load_ionConnCompFile",
                        "load_azOffFile", "load_rgOffFile", "load_azOffStdFile", "load_rgOffStdFile", "load_offSnrFile",
                        "load_demFile", "load_lookupYFile", "load_lookupXFile",
                        "load_incAngleFile", "load_azAngleFile", "load_shadowMaskFile", "load_waterMaskFile", "load_bperpFile",
                        "subset_yx", "subset_lalo",
                        "multilook_method", "multilook_ystep", "multilook_xstep"]},
            {"label": "Modify Network",
             "fields": ["network_tempBaseMax", "network_perpBaseMax", "network_connNumMax",
                        "network_startDate", "network_endDate", "network_excludeDate", "network_excludeDate12",
                        "network_excludeIfgIndex", "network_referenceFile",
                        "network_coherenceBased", "network_minCoherence",
                        "network_areaRatioBased", "network_minAreaRatio",
                        "network_keepMinSpanTree", "network_maskFile", "network_aoiYX", "network_aoiLALO"]},
            {"label": "Reference Point",
             "fields": ["reference_yx", "reference_lalo", "reference_maskFile",
                        "reference_coherenceFile", "reference_minCoherence"]},
            {"label": "Unwrap Error Correction",
             "fields": ["unwrapError_method", "unwrapError_waterMaskFile", "unwrapError_connCompMinArea",
                        "unwrapError_numSample", "unwrapError_ramp", "unwrapError_bridgePtsRadius"]},
            {"label": "Network Inversion",
             "fields": ["networkInversion_weightFunc", "networkInversion_waterMaskFile",
                        "networkInversion_minNormVelocity", "networkInversion_maskDataset",
                        "networkInversion_maskThreshold", "networkInversion_minRedundancy",
                        "networkInversion_minTempCoh", "networkInversion_minNumPixel", "networkInversion_shadowMask"]},
            {"label": "Solid Earth Tides",
             "fields": ["solidEarthTides"]},
            {"label": "Ionosphere Correction",
             "fields": ["ionosphericDelay_method", "ionosphericDelay_excludeDate", "ionosphericDelay_excludeDate12"]},
            {"label": "Troposphere Correction",
             "fields": ["troposphericDelay_method", "troposphericDelay_weatherModel", "troposphericDelay_weatherDir",
                        "troposphericDelay_polyOrder", "troposphericDelay_looks", "troposphericDelay_minCorrelation",
                        "troposphericDelay_gacosDir"]},
            {"label": "Deramp",
             "fields": ["deramp", "deramp_maskFile"]},
            {"label": "Topography Correction",
             "fields": ["topographicResidual", "topographicResidual_polyOrder", "topographicResidual_phaseVelocity",
                        "topographicResidual_stepDate", "topographicResidual_excludeDate",
                        "topographicResidual_pixelwiseGeometry"]},
            {"label": "Residual RMS",
             "fields": ["residualRMS_maskFile", "residualRMS_deramp", "residualRMS_cutoff"]},
            {"label": "Reference Date",
             "fields": ["reference_date"]},
            {"label": "Velocity",
             "fields": ["timeFunc_startDate", "timeFunc_endDate", "timeFunc_excludeDate",
                        "timeFunc_polynomial", "timeFunc_periodic", "timeFunc_stepDate",
                        "timeFunc_exp", "timeFunc_log",
                        "timeFunc_uncertaintyQuantification", "timeFunc_timeSeriesCovFile",
                        "timeFunc_bootstrapCount"]},
            {"label": "Geocode",
             "fields": ["geocode", "geocode_SNWE", "geocode_laloStep", "geocode_interpMethod", "geocode_fillValue"]},
            {"label": "Google earth",
             "fields": ["save_kmz"]},
            {"label": "Hdfeos5",
             "fields": ["save_hdfEos5", "save_hdfEos5_update", "save_hdfEos5_subset"]},
            {"label": "Plot",
             "fields": ["plot", "plot_dpi", "plot_maxMemory"]},
            {"label": "HPC (SLURM)",
             "fields": ["hpc_mode", "hpc_sbatch_opts"]},
        ]
        _ui_fields: ClassVar[dict] = {
            # Compute Resources
            "compute_maxMemory":   {"type": "number", "min": 1, "max": 512, "step": 1,
                                    "default": _env['memory'],
                                    "hint": "Maximum memory size in GB for each dask worker"},
            "compute_cluster":     {"type": "select",
                                    "options": ["local", "slurm", "pbs", "lsf", "oar", "sge", "none"],
                                    "hint": "Cluster type for parallel processing (local = dask LocalCluster)"},
            "compute_numWorker":   {"type": "number", "min": 1, "max": 64, "step": 1,
                                    "default": _env['cpu'],
                                    "hint": "Number of workers for parallel processing"},
            "compute_config":      {"type": "text",
                                    "hint": "Configuration file for dask distributed cluster"},
            # Load Data
            "load_processor":      {"type": "select",
                                    "options": ["auto", "isce", "aria", "hyp3", "gmtsar", "snap", "gamma", "roipac"],
                                    "hint": "SAR processor of the input dataset"},
            "load_autoPath":       {"type": "text",
                                    "hint": "Auto-detect input file paths based on processor type (auto)"},
            "load_updateMode":     {"type": "select", "options": ["auto", "yes", "no"],
                                    "hint": "Skip re-loading if file already exists with same dataset and metadata"},
            "load_compression":    {"type": "select", "options": ["auto", "lzf", "gzip", "no"],
                                    "hint": "Data compression for HDF5 files"},
            "load_metaFile":       {"type": "text",
                                    "hint": "Metadata file path (ISCE only), e.g. reference/IW1.xml"},
            "load_baselineDir":    {"type": "text",
                                    "hint": "Baseline directory (ISCE only), e.g. baselines"},
            "load_unwFile":        {"type": "text",
                                    "hint": "Unwrapped interferogram file(s), e.g. ./../pairs/*/filt*.unw"},
            "load_corFile":        {"type": "text",
                                    "hint": "Coherence file(s), e.g. ./../pairs/*/filt*.cor"},
            "load_connCompFile":   {"type": "text",
                                    "hint": "Connected components file(s), e.g. ./../pairs/*/filt*.unw.conncomp"},
            "load_intFile":        {"type": "text",
                                    "hint": "Wrapped interferogram file(s), e.g. ./../pairs/*/filt*.int"},
            "load_magFile":        {"type": "text",
                                    "hint": "Interferogram magnitude file(s), e.g. ./../pairs/*/filt*.int"},
            "load_ionUnwFile":     {"type": "text", "hint": "Unwrapped ionospheric phase file(s)"},
            "load_ionCorFile":     {"type": "text", "hint": "Ionospheric coherence file(s)"},
            "load_ionConnCompFile":{"type": "text", "hint": "Ionospheric connected component file(s)"},
            "load_azOffFile":      {"type": "text", "hint": "Azimuth offset file(s)"},
            "load_rgOffFile":      {"type": "text", "hint": "Range offset file(s)"},
            "load_azOffStdFile":   {"type": "text", "hint": "Azimuth offset standard deviation file(s)"},
            "load_rgOffStdFile":   {"type": "text", "hint": "Range offset standard deviation file(s)"},
            "load_offSnrFile":     {"type": "text", "hint": "Offset SNR file(s)"},
            "load_demFile":        {"type": "text",
                                    "hint": "DEM file in radar/geo coordinates, e.g. ./inputs/geometryRadar.h5"},
            "load_lookupYFile":    {"type": "text",
                                    "hint": "Lookup table lat/y file, e.g. ./inputs/geometryGeo.h5"},
            "load_lookupXFile":    {"type": "text", "hint": "Lookup table lon/x file"},
            "load_incAngleFile":   {"type": "text", "hint": "Incidence angle file"},
            "load_azAngleFile":    {"type": "text", "hint": "Azimuth angle file"},
            "load_shadowMaskFile": {"type": "text", "hint": "Shadow/layover mask file"},
            "load_waterMaskFile":  {"type": "text", "hint": "Water mask file"},
            "load_bperpFile":      {"type": "text", "hint": "Perpendicular baseline file"},
            "subset_yx":           {"type": "text", "hint": "Subset in row/column, e.g. 1200:2000,0:2000"},
            "subset_lalo":         {"type": "text", "hint": "Subset in lat/lon, e.g. 37.5:38.5,-118.5:-117.5"},
            "multilook_method":    {"type": "select", "options": ["auto", "mean", "nearest", "no"],
                                    "hint": "Multilook method: mean, nearest, or no for skip"},
            "multilook_ystep":     {"type": "auto_number", "hint": "Multilook factor in y/azimuth direction"},
            "multilook_xstep":     {"type": "auto_number", "hint": "Multilook factor in x/range direction"},
            # Modify Network
            "network_tempBaseMax":     {"type": "auto_number", "hint": "Maximum temporal baseline in days"},
            "network_perpBaseMax":     {"type": "auto_number", "hint": "Maximum perpendicular baseline in meters"},
            "network_connNumMax":      {"type": "auto_number", "hint": "Maximum number of nearest-neighbor connections"},
            "network_startDate":       {"type": "text", "hint": "Start date in YYYYMMDD format"},
            "network_endDate":         {"type": "text", "hint": "End date in YYYYMMDD format"},
            "network_excludeDate":     {"type": "text", "hint": "Date(s) to exclude in YYYYMMDD, separated by space"},
            "network_excludeDate12":   {"type": "text",
                                        "hint": "Interferogram date pairs to exclude, e.g. 20150115_20150127"},
            "network_excludeIfgIndex": {"type": "text",
                                        "hint": "Index(es) of interferograms to exclude, e.g. 2 8 230"},
            "network_referenceFile":   {"type": "text",
                                        "hint": "Reference network file (pairs in date12_list.txt format)"},
            "network_coherenceBased":  {"type": "select", "options": ["auto", "yes", "no"],
                                        "hint": "Enable coherence-based network modification"},
            "network_minCoherence":    {"type": "number", "min": 0, "max": 1, "step": 0.05,
                                        "hint": "Minimum coherence threshold for coherence-based modification"},
            "network_areaRatioBased":  {"type": "select", "options": ["auto", "yes", "no"],
                                        "hint": "Enable area-ratio-based network modification (ECR method)"},
            "network_minAreaRatio":    {"type": "auto_number",
                                        "hint": "Minimum area ratio for area-ratio-based modification"},
            "network_keepMinSpanTree": {"type": "select", "options": ["auto", "yes", "no"],
                                        "hint": "Keep the minimum spanning tree of the network"},
            "network_maskFile":        {"type": "text",
                                        "hint": "Mask file for coherence-based network modification"},
            "network_aoiYX":           {"type": "text",
                                        "hint": "AOI in row/column for coherence calculation, e.g. 100:200,300:400"},
            "network_aoiLALO":         {"type": "text",
                                        "hint": "AOI in lat/lon for coherence calculation, e.g. 37.5:38.0,-118.0:-117.5"},
            # Reference Point
            "reference_yx":            {"type": "text", "hint": "Reference point in row/column, e.g. 257 151"},
            "reference_lalo":          {"type": "text", "hint": "Reference point in lat/lon, e.g. 37.65 -118.45"},
            "reference_maskFile":      {"type": "text", "hint": "Mask file for reference point selection"},
            "reference_coherenceFile": {"type": "text", "hint": "Coherence file for reference point selection"},
            "reference_minCoherence":  {"type": "auto_number",
                                        "hint": "Minimum coherence for reference point selection"},
            # Unwrap Error
            "unwrapError_method":          {"type": "select",
                                            "options": ["auto", "bridging", "phase_closure",
                                                        "bridging+phase_closure", "no"],
                                            "hint": "Phase unwrapping error correction method"},
            "unwrapError_waterMaskFile":   {"type": "text", "hint": "Water mask file for bridging method"},
            "unwrapError_connCompMinArea": {"type": "auto_number",
                                            "hint": "Minimum area in pixels for a connected component"},
            "unwrapError_numSample":       {"type": "auto_number",
                                            "hint": "Number of randomly sampled triplets for phase_closure method"},
            "unwrapError_ramp":            {"type": "select", "options": ["auto", "linear", "quadratic", "no"],
                                            "hint": "Remove ramp before bridging"},
            "unwrapError_bridgePtsRadius": {"type": "auto_number",
                                            "hint": "Radius in pixels to search for bridge points"},
            # Network Inversion
            "networkInversion_weightFunc":      {"type": "select", "options": ["auto", "var", "fim", "no"],
                                                 "hint": "var = spatial variance, fim = Fisher info matrix, no = uniform"},
            "networkInversion_waterMaskFile":   {"type": "text", "hint": "Water mask file applied before inversion"},
            "networkInversion_minNormVelocity": {"type": "select", "options": ["auto", "yes", "no"],
                                                 "hint": "Minimize L2-norm of velocity (vs. timeseries) in SBAS inversion"},
            "networkInversion_maskDataset":     {"type": "text",
                                                 "hint": "Dataset for masking, e.g. coherence or connectComponent"},
            "networkInversion_maskThreshold":   {"type": "number", "min": 0, "max": 1, "step": 0.05,
                                                 "hint": "Threshold for maskDataset to mask unwrapped phase"},
            "networkInversion_minRedundancy":   {"type": "auto_number",
                                                 "hint": "Minimum redundancy of interferograms per pixel"},
            "networkInversion_minTempCoh":      {"type": "auto_number",
                                                 "hint": "Minimum temporal coherence for pixel masking"},
            "networkInversion_minNumPixel":     {"type": "auto_number",
                                                 "hint": "Minimum number of coherent pixels to proceed"},
            "networkInversion_shadowMask":      {"type": "select", "options": ["auto", "yes", "no"],
                                                 "hint": "Use shadow mask from geometry"},
            # Solid Earth Tides
            "solidEarthTides":  {"type": "select", "options": ["auto", "yes", "no"],
                                 "hint": "Correct for solid earth tides using pysolid"},
            # Ionosphere
            "ionosphericDelay_method":       {"type": "select", "options": ["auto", "split_spectrum", "no"],
                                              "hint": "Ionospheric delay correction method"},
            "ionosphericDelay_excludeDate":  {"type": "text",
                                              "hint": "Dates to exclude from ionospheric correction, e.g. 20180202 20180414"},
            "ionosphericDelay_excludeDate12":{"type": "text",
                                              "hint": "Interferogram date pairs to exclude from ionospheric correction"},
            # Troposphere
            "troposphericDelay_method":         {"type": "select",
                                                 "options": ["auto", "pyaps", "gacos", "height_correlation", "no"],
                                                 "hint": "Tropospheric delay correction method"},
            "troposphericDelay_weatherModel":   {"type": "select",
                                                 "options": ["auto", "ERA5", "ERA5T", "MERRA", "NARR"],
                                                 "hint": "Weather model for pyaps (ERA5 recommended)"},
            "troposphericDelay_weatherDir":     {"type": "text",
                                                 "hint": "Directory of downloaded weather data files for pyaps"},
            "troposphericDelay_polyOrder":      {"type": "auto_number",
                                                 "hint": "Polynomial order for height-correlation method"},
            "troposphericDelay_looks":          {"type": "auto_number",
                                                 "hint": "Extra multilook factor for height-correlation estimation"},
            "troposphericDelay_minCorrelation": {"type": "auto_number",
                                                 "hint": "Minimum correlation between height and phase"},
            "troposphericDelay_gacosDir":       {"type": "text", "hint": "Directory of GACOS delay files"},
            # Deramp
            "deramp":          {"type": "select", "options": ["auto", "linear", "quadratic", "no"],
                                "hint": "Remove phase ramp in x/y direction"},
            "deramp_maskFile": {"type": "text", "hint": "Mask file for ramp estimation"},
            # Topography
            "topographicResidual":                 {"type": "select", "options": ["auto", "yes", "no"],
                                                    "hint": "Correct topographic residuals (DEM error)"},
            "topographicResidual_polyOrder":       {"type": "auto_number",
                                                    "hint": "Polynomial order for DEM error estimation"},
            "topographicResidual_phaseVelocity":   {"type": "select", "options": ["auto", "yes", "no"],
                                                    "hint": "Minimize phase velocity (not phase) in DEM error inversion"},
            "topographicResidual_stepDate":        {"type": "text",
                                                    "hint": "Step function date(s) for co-seismic jumps, e.g. 20140911"},
            "topographicResidual_excludeDate":     {"type": "text",
                                                    "hint": "Dates to exclude in DEM error inversion"},
            "topographicResidual_pixelwiseGeometry":{"type": "select", "options": ["auto", "yes", "no"],
                                                     "hint": "Use pixel-wise geometry in DEM error estimation"},
            # Residual RMS
            "residualRMS_maskFile": {"type": "text", "hint": "Mask file for residual phase quality assessment"},
            "residualRMS_deramp":   {"type": "select", "options": ["auto", "linear", "quadratic", "no"],
                                     "hint": "Remove ramp before RMS calculation"},
            "residualRMS_cutoff":   {"type": "auto_number",
                                     "hint": "Cutoff value in RMS threshold for outlier date detection"},
            # Reference Date
            "reference_date": {"type": "text",
                               "hint": "Reference date in YYYYMMDD; 'auto' = first date with full coherence"},
            # Velocity
            "timeFunc_startDate":                {"type": "text", "hint": "Start date of the time function fit"},
            "timeFunc_endDate":                  {"type": "text", "hint": "End date of the time function fit"},
            "timeFunc_excludeDate":              {"type": "text",
                                                  "hint": "Date(s) to exclude from time function fitting"},
            "timeFunc_polynomial":               {"type": "auto_number",
                                                  "hint": "Polynomial order: 1 = linear velocity, 2 = acceleration"},
            "timeFunc_periodic":                 {"type": "text",
                                                  "hint": "Periodic periods in years, e.g. 1.0 0.5 for annual+semi-annual"},
            "timeFunc_stepDate":                 {"type": "text",
                                                  "hint": "Step function date(s), e.g. 20161231 for co-seismic jump"},
            "timeFunc_exp":                      {"type": "text",
                                                  "hint": "Exponential decay: onset_date char_time, e.g. 20181026 60"},
            "timeFunc_log":                      {"type": "text",
                                                  "hint": "Logarithmic relaxation: onset_date char_time, e.g. 20181026 60"},
            "timeFunc_uncertaintyQuantification":{"type": "select", "options": ["auto", "bootstrap", "residue"],
                                                  "hint": "Method for velocity uncertainty quantification"},
            "timeFunc_timeSeriesCovFile":        {"type": "text",
                                                  "hint": "Time-series covariance file for uncertainty propagation"},
            "timeFunc_bootstrapCount":           {"type": "auto_number",
                                                  "hint": "Number of bootstrap iterations"},
            # Geocode
            "geocode":              {"type": "select", "options": ["auto", "yes", "no"],
                                     "hint": "Geocode datasets in radar coordinates to geo coordinates"},
            "geocode_SNWE":         {"type": "text",
                                     "hint": "Bounding box: south north west east, e.g. 31 40 -115 -100"},
            "geocode_laloStep":     {"type": "text",
                                     "hint": "Output pixel size in lat/lon, e.g. -0.000833 0.000833 (≈90 m)"},
            "geocode_interpMethod": {"type": "select", "options": ["auto", "nearest", "linear"],
                                     "hint": "Interpolation method for geocoding"},
            "geocode_fillValue":    {"type": "text",
                                     "hint": "Fill value for pixels outside coverage, e.g. nan or 0"},
            # Google Earth
            "save_kmz":            {"type": "select", "options": ["auto", "yes", "no"],
                                    "hint": "Save geocoded velocity to Google Earth KMZ file"},
            # HDF-EOS5
            "save_hdfEos5":        {"type": "select", "options": ["auto", "yes", "no"],
                                    "hint": "Save time-series to HDF-EOS5 format"},
            "save_hdfEos5_update": {"type": "select", "options": ["auto", "yes", "no"],
                                    "hint": "Update HDF-EOS5 file if already exists"},
            "save_hdfEos5_subset": {"type": "select", "options": ["auto", "yes", "no"],
                                    "hint": "Save subset of HDF-EOS5 file"},
            # Plot
            "plot":                {"type": "select", "options": ["auto", "yes", "no"],
                                    "hint": "Plot results during processing"},
            "plot_dpi":            {"type": "auto_number", "hint": "Figure DPI for saved plots"},
            "plot_maxMemory":      {"type": "auto_number",
                                    "hint": "Maximum memory in GB for plot_smallbaseline.py"},
            "hpc_mode":            {"type": "bool",
                                    "hint": "Submit the full MintPy run as a single sbatch job"},
            "hpc_sbatch_opts":     {"type": "text",
                                    "hint": "SLURM resource overrides, e.g. {\"time\": \"24:00:00\", \"mem\": \"256G\", \"cpus_per_task\": 32}. Defaults: ntasks=1, cpus_per_task=16, mem=128G, time=12:00:00"},
        }
        # ─────────────────────────────────────────────────────────────────────────
    
        name: str = "Mintpy_SBAS_Base_Config"
        workdir: Path | str = field(default_factory=lambda: Path.cwd())
        debug: bool = False
        hpc_mode: bool = False
        hpc_sbatch_opts: dict = field(default_factory=dict)
    
        ## computing resource configuration
        compute_maxMemory : float | int = _env['memory']
        compute_cluster : str = 'local' # Mintpy's slurm parallel processing is buggy, so we will handle parallel processing with dask instead. Switch to none to turn off parallel processing to save memory.
        compute_numWorker : int = _env['cpu']
        compute_config: str = 'none'
    
        ## Load data
        load_processor: str = 'auto'
        load_autoPath: str = 'auto' 
        load_updateMode: str = 'no'
        load_compression: str = 'auto'
        ##---------for ISCE only:
        load_metaFile: str = 'auto'
        load_baselineDir: str = 'auto'
        ##---------interferogram stack:
        load_unwFile: str = 'auto'
        load_corFile: str = 'auto'
        load_connCompFile: str = 'auto'
        load_intFile: str = 'auto'
        load_magFile: str = 'auto'
        ##---------ionosphere stack (optional):
        load_ionUnwFile: str = 'auto'
        load_ionCorFile: str = 'auto'
        load_ionConnCompFile: str = 'auto'
        ##---------offset stack (optional):
        load_azOffFile: str = 'auto'
        load_rgOffFile: str = 'auto'
        load_azOffStdFile: str = 'auto'
        load_rgOffStdFile: str = 'auto'
        load_offSnrFile: str = 'auto'
        ##---------geometry:
        load_demFile: str = 'auto'
        load_lookupYFile: str = 'auto'
        load_lookupXFile: str = 'auto'
        load_incAngleFile: str = 'auto'
        load_azAngleFile: str = 'auto'
        load_shadowMaskFile: str = 'auto'
        load_waterMaskFile: str = 'auto'
        load_bperpFile: str = 'auto'
        ##---------subset (optional):
        subset_yx: str = 'auto'
        subset_lalo: str = 'auto'
        ##---------multilook (optional):
        multilook_method: str = 'auto'
        multilook_ystep: str | int = 'auto'
        multilook_xstep: str | int= 'auto'
    
        # 2. Modify Network
        network_tempBaseMax: str | float = 'auto'
        network_perpBaseMax: str | float = 'auto'
        network_connNumMax: str | int = 'auto'
        network_startDate: str = 'auto'
        network_endDate: str = 'auto'
        network_excludeDate: str = 'auto'
        network_excludeDate12: str = 'auto'
        network_excludeIfgIndex: str = 'auto'
        network_referenceFile: str = 'auto'
        ## 2) Data-driven network modification
        network_coherenceBased: str = 'auto'
        network_minCoherence: str |float = 'auto'
        ## b - Effective Coherence Ratio network modification = (threshold + MST) by default
        network_areaRatioBased: str = 'auto'
        network_minAreaRatio: str |float= 'auto'
        ## Additional common parameters for the 2) data-driven network modification
        network_keepMinSpanTree: str = 'auto'
        network_maskFile: str = 'auto'
        network_aoiYX: str = 'auto'
        network_aoiLALO: str = 'auto'
    
        # 3. Reference Point
        reference_yx: str = 'auto'
        reference_lalo: str = 'auto'
        reference_maskFile: str = 'auto'
        reference_coherenceFile: str = 'auto'
        reference_minCoherence: str |float = 'auto'
    
        # 4. Correct Unwrap Error
        unwrapError_method: str = 'auto'
        unwrapError_waterMaskFile: str = 'auto'
        unwrapError_connCompMinArea: str |float = 'auto'
        ## phase_closure options:
        unwrapError_numSample: str | int= 'auto'
        ## bridging options:
        unwrapError_ramp: str = 'auto'
        unwrapError_bridgePtsRadius: str | int= 'auto'
    
        # 5. Invert Network
        networkInversion_weightFunc: str = 'auto'
        networkInversion_waterMaskFile: str = 'auto'
        networkInversion_minNormVelocity: str = 'auto'
        ## mask options for unwrapPhase of each interferogram before inversion (recommend if weightFunct=no):
        networkInversion_maskDataset: str = 'auto'
        networkInversion_maskThreshold: str | float = 'auto'
        networkInversion_minRedundancy: str | float = 'auto'
        ## Temporal coherence is calculated and used to generate the mask as the reliability measure
        networkInversion_minTempCoh: str | float = 'auto'
        networkInversion_minNumPixel: str | int = 'auto'
        networkInversion_shadowMask: str = 'auto'
    
        # 6. Correct SET (Solid Earth Tides)
        solidEarthTides: str = 'auto'
    
        # 7. Correct Ionosphere
        ionosphericDelay_method: str = 'auto'
        ionosphericDelay_excludeDate: str = 'auto'
        ionosphericDelay_excludeDate12: str = 'auto'
    
        # 8. Correct Troposphere
        troposphericDelay_method: str = 'auto'
        ## Notes for pyaps:
        troposphericDelay_weatherModel: str = 'auto'
        troposphericDelay_weatherDir: str = 'auto'
    
        ## Notes for height_correlation:
        troposphericDelay_polyOrder: str | int = 'auto'
        troposphericDelay_looks: str | int = 'auto'
        troposphericDelay_minCorrelation: str | float = 'auto'
        ## Notes for gacos:
        troposphericDelay_gacosDir: str = 'auto'
    
        # 9. Deramp
        deramp: str = 'auto'
        deramp_maskFile: str = 'auto'
    
        # 10. Correct Topography
        topographicResidual: str = 'auto'
        topographicResidual_polyOrder: str = 'auto'
        topographicResidual_phaseVelocity: str = 'auto'
        topographicResidual_stepDate: str = 'auto'
        topographicResidual_excludeDate: str = 'auto'
        topographicResidual_pixelwiseGeometry: str = 'auto'
    
        # 11.1 Residual RMS
        residualRMS_maskFile: str = 'auto'
        residualRMS_deramp: str = 'auto'
        residualRMS_cutoff: str | float = 'auto'
    
        # 11.2 Reference Date
        reference_date: str = 'auto'
    
        # 12. Velocity
        timeFunc_startDate: str = 'auto'
        timeFunc_endDate: str = 'auto'
        timeFunc_excludeDate: str = 'auto'
        ## Fit a suite of time functions
        timeFunc_polynomial: str | int = 'auto'
        timeFunc_periodic: str = 'auto'
        timeFunc_stepDate: str = 'auto'
        timeFunc_exp: str = 'auto'
        timeFunc_log: str = 'auto'
        ## Uncertainty quantification methods:
        timeFunc_uncertaintyQuantification: str = 'auto'
        timeFunc_timeSeriesCovFile: str = 'auto'
        timeFunc_bootstrapCount: str | int = 'auto'
    
        # 13.1 Geocode
        geocode: str = 'auto'
        geocode_SNWE: str = 'auto'
        geocode_laloStep: str = 'auto'
        geocode_interpMethod: str = 'auto'
        geocode_fillValue: str | float = 'auto'
    
        # 13.2 Google Earth
        save_kmz: str = 'auto'
    
        # 13.3 HDFEOS5
        save_hdfEos5: str = 'auto'
        save_hdfEos5_update: str = 'auto'
        save_hdfEos5_subset: str = 'auto'
    
        # 13.4 Plot
        plot: str = 'auto'
        plot_dpi: str | int = 'auto'
        plot_maxMemory: str | int = 'auto'
    
        def __post_init__(self):
            if isinstance(self.workdir, str):
                self.workdir = Path(self.workdir).expanduser().resolve()
    
        def write_mintpy_config(self, outpath: Union[Path, str]):
            """
            Writes the dataclass to a mintpy .cfg file, excluding operational 
            parameters that MintPy doesn't recognize.
            """
            outpath = Path(outpath).expanduser().resolve()
            exclude_fields = ['name', 'workdir', 'debug']
    
            with open(outpath, 'w') as f:
                f.write("## MintPy Config File Generated via InSARHub\n")
    
                for key, value in asdict(self).items():
                    if key in exclude_fields:
                        continue
    
                    parts = key.split('_')
                    if len(parts) > 1:
                        mintpy_key = f"mintpy.{parts[0]}.{'.'.join(parts[1:])}"
                    else:
                        mintpy_key = f"mintpy.{parts[0]}"
    
                    f.write(f"{mintpy_key:<40} = {value}\n")
    
            return Path(outpath).resolve()
    
  • Run

    Run the Mintpy time-series analysis based on provided configuration

    analyzer.run()
    

    Parameters:

    Name Type Description Default
    steps list[str] | None

    List of MintPy processing steps to execute. If None, the default full workflow is executed: [ 'load_data', 'modify_network', 'reference_point', 'quick_overview', 'invert_network', 'correct_LOD', 'correct_SET', 'correct_ionosphere', 'correct_troposphere', 'deramp', 'correct_topography', 'residual_RMS', 'reference_date', 'velocity', 'geocode', 'google_earth', 'hdfeos5' ]

    None

    Raises:

    Type Description
    RuntimeError

    If tropospheric delay method requires CDS authorization and authorization fails.

    Exception

    Propagates exceptions raised during MintPy execution.

  • Submit (HPC / SLURM mode)

    Generate a single sbatch script covering all selected steps and submit it to SLURM. Inherited by Hyp3_SBAS and ISCE_SBAS.

    # Submit full pipeline as one SLURM job
    analyzer.submit_hpc()
    
    # Submit only specific steps
    analyzer.submit_hpc(steps=["velocity", "geocode"])
    

    The script is written to <workdir>/mintpy/mintpy_sbas.sbatch and job state to mintpy/mintpy_job.json. Default resources: time=24:00:00, ntasks=1, cpus_per_task=16, mem=128G, partition=all. Override via hpc_sbatch_opts in the config:

    cfg = Mintpy_SBAS_Base_Config(
        workdir="/your/work/dir",
        load_processor="hyp3",
        hpc_mode=True,
        hpc_sbatch_opts={"time": "48:00:00", "mem": "256G", "partition": "gpu"},
    )
    analyzer = Analyzer.create('Hyp3_SBAS', config=cfg)
    analyzer.submit_hpc()
    
  • Clean up

    Remove intermediate processing files generated during the time-series process

    analyzer.cleanup()
    

    Raises:

    Type Description
    Exception

    Propagates any unexpected errors raised during removal.

The Hyp3_SBAS is a specialized analyzer that extends Mintpy_SBAS_Base_Analyzer, preconfigured specifically for processing time-series data from HyP3 InSAR products.

Source code in src/insarhub/analyzer/hyp3_sbas.py
class Hyp3_SBAS(Mintpy_SBAS_Base_Analyzer):
    name = 'Hyp3_SBAS'
    description = "SBAS time-series analysis of HyP3 InSAR outputs using MintPy."
    compatible_processor = "Hyp3_S1"
    default_config = Hyp3_SBAS_Config
    required = ['unw_phase.tif', 'corr.tif',  'dem.tif'] # also need meta files to get the date and other info
    optional = ['lv_theta.tif', 'lv_phi.tif', 'water_mask.tif']
    def __init__(self, config: Hyp3_SBAS_Config | None = None):
        super().__init__(config)

    def prep_data(self):
        """
        Prepare input data for analysis by performing unzipping, collection, clipping, and parameter setup.

        This method orchestrates the preprocessing steps required before running the analysis workflow. 
        It ensures that all input files are available, aligned, and properly configured.

        Steps performed:
            1. `_unzip_hyp3()`: Extracts any compressed Hyp3 output files.
            2. `_collect_files()`: Gathers relevant input files (e.g., DEMs, interferograms).
            3. `_get_common_overlap(files['dem'])`: Computes the spatial overlap extent among input rasters.
            4. `_clip_rasters(files, overlap_extent)`: Clips input rasters to the common overlapping area.
            5. `_set_load_parameters()`: Sets parameters required for loading the preprocessed data into memory.

        Raises:
            FileNotFoundError: If required input files are missing.
            ValueError: If no common overlap region can be determined among rasters.
            Exception: Propagates any unexpected errors during preprocessing.

        Notes:
            - This method must be called before running the analysis workflow.
            - Designed for workflows using Hyp3-derived Sentinel-1 products.
            - Ensures consistent spatial coverage across all input datasets.
        """
        self._unzip_hyp3()
        files = self._collect_files()
        overlap_extent = self._get_common_overlap(files['dem'])
        self._clip_rasters(files, overlap_extent)
        self._set_load_parameters()
        super().prep_data()

    def _unzip_hyp3(self):
        print(f'{Fore.CYAN}Unzipping HyP3 Products...{Fore.RESET}')

        hyp3_results = list(self.workdir.rglob('*.zip'))
        self.tmp_dir.mkdir(parents=True, exist_ok=True)

        with tqdm(hyp3_results, desc="Processing", unit="file") as pbar:
            for zip_file in pbar:
                extract_target = self.tmp_dir / zip_file.stem
                with zipfile.ZipFile(zip_file, 'r') as zf:
                    needs_extraction = True
                    if extract_target.is_dir():
                        files_in_zip = {Path(f).name for f in zf.namelist() if not f.endswith('/')}
                        folder_files = {f.name for f in extract_target.iterdir() if f.is_file()}
                        if files_in_zip.issubset(folder_files):
                            needs_extraction = False
                            pbar.set_description(f"File Exist: {zip_file.stem[:30]}...")
                    if needs_extraction:
                        pbar.set_description(f"Extracting: {zip_file.stem[:30]}...")
                        if extract_target.is_dir():
                            shutil.rmtree(extract_target)

                        zf.extractall(self.tmp_dir)
        print(f'\n{Fore.GREEN}Unzipping complete.{Fore.RESET}')

    def _collect_files(self):
        print(f'{Fore.CYAN}Mapping file paths...{Fore.RESET}')
        all_required = {ext.split('.')[0]: ext for ext in self.required}    
        all_optional = {ext.split('.')[0]: ext for ext in self.optional}
        files = defaultdict(list)
        files['meta'] = [m for m in self.tmp_dir.rglob('*.txt') if 'README' not in m.name]
        for cat_name, ext in {**all_required, **all_optional}.items():
            files[cat_name] = list(self.tmp_dir.rglob(f"*_{ext}"))

        missing_req = [name for name, ext in all_required.items() if not files[name]]
        if missing_req or not files['meta']:
            print(f"\033[K", end="\r") # Clear current line
            msg = []
            if missing_req: msg.append(f"Missing rasters: {missing_req}")
            if not files['meta']: msg.append("Missing metadata (.txt) files")

            error_report = f"{Fore.RED}CRITICAL ERROR: {'. '.join(msg)}.{Fore.RESET}\n" \
                           f"MintPy requires these files to extract dates and baselines."
            raise FileNotFoundError(error_report)
        missing_opt = [name for name in all_optional if not files[name]]

        total_pairs = len(files['unw_phase'])
        status_msg = f"{Fore.GREEN}Found {total_pairs} pairs | Metadata: OK"
        if missing_opt:
            status_msg += f" | {Fore.YELLOW}Missing optional: {missing_opt}"

        print(f"\r\033[K{status_msg}{Fore.RESET}")
        return files

    def _get_common_overlap(self, dem_files):
        ulx_l, uly_l, lrx_l, lry_l = [], [], [], []
        for f in dem_files:
            ds = gdal.Open(f.as_posix())
            gt = ds.GetGeoTransform() # (ulx, xres, xrot, uly, yrot, yres)
            ulx, uly = gt[0], gt[3]
            lrx, lry = gt[0] + gt[1] * ds.RasterXSize, gt[3] + gt[5] * ds.RasterYSize
            ulx_l.append(ulx)
            uly_l.append(uly)
            lrx_l.append(lrx)
            lry_l.append(lry)
            ds = None
        return  (max(ulx_l), min(uly_l), min(lrx_l), max(lry_l))

    def _clip_rasters(self, files, overlap_extent):
        print(f'{Fore.CYAN}Clipping rasters to common overlap...{Fore.RESET}')
        self.clip_dir.mkdir(parents=True, exist_ok=True)
        categories = [k for k in files.keys() if k != 'meta']

        with tqdm(categories, desc="Progress", position=0, dynamic_ncols=True) as pbar_out:
            for key in pbar_out:
                file_list = files[key]
                pbar_out.set_description(f"Group: {key}")

                # Inner progress bar for individual files in this group
                # leave=False ensures the inner bar disappears when the group is done
                with tqdm(file_list, desc=f"  -> Clipping", leave=False, position=1, unit="file", dynamic_ncols=True) as pbar_in:
                    for f in pbar_in:
                        out = self.clip_dir / f"{f.stem}_clip.tif"

                        if out.exists():
                            pbar_in.set_postfix_str(f"Skip: {f.name[:15]}...")
                            # Update postfix instead of printing to avoid creating new lines
                            continue

                        pbar_in.set_postfix_str(f"File: {f.name[:15]}...")

                        try:
                            gdal.Translate(
                                destName=out.as_posix(),
                                srcDS=f.as_posix(),
                                projWin=overlap_extent
                            )
                        except Exception as e:
                            tqdm.write(f"{Fore.RED}Error clipping {f.name}: {e}{Fore.RESET}")

            # Handle metadata separately as it's just a file copy (no progress bar needed)
        if 'meta' in files:
            print(f"\r{Fore.CYAN}Step: Copying metadata files... \033[K", end="", flush=True)
            for f in files['meta']:
                shutil.copy(f, self.clip_dir / f.name)

        print(f'\n{Fore.GREEN}Clipping complete.{Fore.RESET}')

    def _set_load_parameters(self):
        self.config.load_unwFile = (self.clip_dir / '*_unw_phase_clip.tif').as_posix()
        self.config.load_corFile = (self.clip_dir / '*_corr_clip.tif').as_posix()
        self.config.load_demFile = (self.clip_dir / '*_dem_clip.tif').as_posix()
        opt_map = {
            'lv_theta': 'load_incAngleFile',
            'lv_phi': 'load_azAngleFile',
            'water_mask': 'load_waterMaskFile'
        }
        for k, cfg_attr in opt_map.items():
            if list(self.clip_dir.glob(f"*_{k}_clip.tif")):
                setattr(self.config, cfg_attr, (self.clip_dir / f"*_{k}_clip.tif").as_posix())

Usage

  • Create Analyzer with Parameters

    Initialize an analyzer instance

    analyzer = Analyzer.create('Hyp3_SBAS',
                                workdir="/your/work/dir")
    
    OR
    params = {"workdir": "/your/work/dir"}
    analyzer = Analyzer.create('Hyp3_SBAS', **params)
    
    OR
    from insarhub.config import Mintpy_SBAS_Base_Config
    cfg = Mintpy_SBAS_Base_Config(workdir="/your/work/dir")
    analyzer = Analyzer.create('Hyp3_SBAS', config=cfg)
    

  • Prepare data

    Prepare interferogram data downloaded from HyP3 server for MintPy

    analyzer.prep_data()
    

    Raises:

    Type Description
    FileNotFoundError

    If required input files are missing.

    ValueError

    If no common overlap region can be determined among rasters.

    Exception

    Propagates any unexpected errors during preprocessing.

    Source code in src/insarhub/analyzer/hyp3_sbas.py
    def prep_data(self):
        """
        Prepare input data for analysis by performing unzipping, collection, clipping, and parameter setup.
    
        This method orchestrates the preprocessing steps required before running the analysis workflow. 
        It ensures that all input files are available, aligned, and properly configured.
    
        Steps performed:
            1. `_unzip_hyp3()`: Extracts any compressed Hyp3 output files.
            2. `_collect_files()`: Gathers relevant input files (e.g., DEMs, interferograms).
            3. `_get_common_overlap(files['dem'])`: Computes the spatial overlap extent among input rasters.
            4. `_clip_rasters(files, overlap_extent)`: Clips input rasters to the common overlapping area.
            5. `_set_load_parameters()`: Sets parameters required for loading the preprocessed data into memory.
    
        Raises:
            FileNotFoundError: If required input files are missing.
            ValueError: If no common overlap region can be determined among rasters.
            Exception: Propagates any unexpected errors during preprocessing.
    
        Notes:
            - This method must be called before running the analysis workflow.
            - Designed for workflows using Hyp3-derived Sentinel-1 products.
            - Ensures consistent spatial coverage across all input datasets.
        """
        self._unzip_hyp3()
        files = self._collect_files()
        overlap_extent = self._get_common_overlap(files['dem'])
        self._clip_rasters(files, overlap_extent)
        self._set_load_parameters()
        super().prep_data()
    
  • Run

    Run the Mintpy time-series analysis based on provided configuration

    analyzer.run()
    

    Parameters:

    Name Type Description Default
    steps list[str] | None

    List of MintPy processing steps to execute. If None, the default full workflow is executed: [ 'load_data', 'modify_network', 'reference_point', 'quick_overview', 'invert_network', 'correct_LOD', 'correct_SET', 'correct_ionosphere', 'correct_troposphere', 'deramp', 'correct_topography', 'residual_RMS', 'reference_date', 'velocity', 'geocode', 'google_earth', 'hdfeos5' ]

    None

    Raises:

    Type Description
    RuntimeError

    If tropospheric delay method requires CDS authorization and authorization fails.

    Exception

    Propagates exceptions raised during MintPy execution.

    Source code in src/insarhub/analyzer/mintpy_base.py
    def run(self, steps=None):
        """
        Run the MintPy SBAS time-series analysis workflow.
    
        This method writes the MintPy configuration file, optionally authorizes
        CDS access for tropospheric correction, and executes the selected
        MintPy processing steps using TimeSeriesAnalysis.
    
        Args:
            steps (list[str] | None, optional):
                List of MintPy processing steps to execute. If None, the
                default full workflow is executed:
                    [
                        'load_data', 'modify_network', 'reference_point', 'quick_overview',
                        'invert_network', 'correct_LOD', 'correct_SET',
                        'correct_ionosphere', 'correct_troposphere',
                        'deramp', 'correct_topography', 'residual_RMS',
                        'reference_date', 'velocity', 'geocode',
                        'google_earth', 'hdfeos5'
                    ]
    
        Raises:
            RuntimeError: If tropospheric delay method requires CDS authorization
                and authorization fails.
            Exception: Propagates exceptions raised during MintPy execution.
    
        Notes:
            - If `troposphericDelay_method` is set to 'pyaps', CDS
            authorization is performed before running MintPy.
            - The configuration file is written to `self.cfg_path`.
            - Processing is executed inside `self.workdir`.
            - This method wraps MintPy TimeSeriesAnalysis for SBAS workflows.
        """
        run_steps = steps or [
            'load_data', 'modify_network', 'reference_point', 'quick_overview', 'invert_network',
            'correct_LOD', 'correct_SET', 'correct_ionosphere', 'correct_troposphere',
            'deramp', 'correct_topography', 'residual_RMS', 'reference_date',
            'velocity', 'geocode', 'google_earth', 'hdfeos5'
        ]
    
        if self.config.troposphericDelay_method == 'pyaps' and 'correct_troposphere' in run_steps:
            self._cds_authorize()
        print(f'{Style.BRIGHT}{Fore.MAGENTA}Running MintPy Analysis...{Fore.RESET}')
        app = TimeSeriesAnalysis(self.cfg_path.as_posix(), self.workdir.as_posix())
        app.open()
        app.run(steps=run_steps)
        if 'geocode' in run_steps:
            self._geocode_diagnostic_files(self.cfg_path.parent)
    
  • Submit (HPC / SLURM mode)

    Inherited from Mintpy_SBAS_Base_Analyzer. Submit full MintPy run as a single sbatch job.

    analyzer.submit_hpc()
    
  • Clean up

    Remove intermediate processing files generated during the time-series process

    analyzer.cleanup()
    

    Raises:

    Type Description
    Exception

    Propagates any unexpected errors raised during removal.

The ISCE_SBAS analyzer extends Mintpy_SBAS_Base_Analyzer and is preconfigured for ISCE2 stackSentinel outputs. prep_data() auto-discovers interferograms and geometry from the isce/ directory and writes the MintPy config to mintpy/.mintpy.cfg. All MintPy outputs are written to workdir/mintpy/.

Source code in src/insarhub/analyzer/isce_sbas.py
class ISCE_SBAS(Mintpy_SBAS_Base_Analyzer):
    """SBAS time-series analysis of ISCE2 stackSentinel outputs using MintPy.

    Usage::

        from insarhub import Analyzer

        az = Analyzer.create('ISCE_SBAS', workdir='/data/p64_f468')
        az.prep_data()   # auto-wires MintPy paths, writes mintpy/.mintpy.cfg
        az.run()         # writes all output to workdir/mintpy/
    """

    name                 = "ISCE_SBAS"
    description          = "SBAS time-series analysis of ISCE2 stackSentinel outputs using MintPy."
    compatible_processor = "ISCE_S1"
    default_config       = ISCE_SBAS_Config

    def __init__(self, config: ISCE_SBAS_Config | None = None):
        super().__init__(config)
        self.isce_dir   = self.workdir / "isce"
        self.mintpy_dir = self.workdir / "mintpy"
        # Write config inside mintpy/ so MintPy finds it next to its outputs
        self.cfg_path   = self.mintpy_dir / ".mintpy.cfg"

    # ── Public entry points ───────────────────────────────────────────────────

    def prep_data(self) -> None:
        """Auto-discover stackSentinel outputs and write the MintPy config."""
        if not self.isce_dir.exists():
            raise FileNotFoundError(
                f"ISCE processing directory not found: {self.isce_dir}. "
                "Run ISCE_S1 and wait for all steps to complete."
            )
        ifg_dir = self.isce_dir / "merged" / "interferograms"
        pairs = sorted(d for d in ifg_dir.iterdir() if d.is_dir()) if ifg_dir.exists() else []
        if not pairs:
            raise FileNotFoundError(
                f"No interferogram directories in {ifg_dir}. "
                "ISCE_S1 processing must reach the interferogram stage first."
            )
        print(f"{Fore.CYAN}Found {len(pairs)} interferogram pair(s). "
              f"Configuring MintPy load paths…{Fore.RESET}")
        self.mintpy_dir.mkdir(parents=True, exist_ok=True)
        self._set_load_parameters()
        super().prep_data()   # writes self.cfg_path

    def run(self, steps=None):
        """Run MintPy, writing all output to workdir/mintpy/."""
        self.mintpy_dir.mkdir(parents=True, exist_ok=True)
        if self.config.troposphericDelay_method == "pyaps" and (steps is None or "correct_troposphere" in steps):
            self._cds_authorize()
        run_steps = steps or [
            "load_data", "modify_network", "reference_point", "quick_overview",
            "invert_network", "correct_LOD", "correct_SET", "correct_ionosphere",
            "correct_troposphere", "deramp", "correct_topography", "residual_RMS",
            "reference_date", "velocity", "geocode", "google_earth", "hdfeos5",
        ]
        from colorama import Style
        print(f"{Style.BRIGHT}{Fore.MAGENTA}Running MintPy Analysis…{Fore.RESET}")
        app = TimeSeriesAnalysis(self.cfg_path.as_posix(), str(self.mintpy_dir))
        app.open()
        app.run(steps=run_steps)
        if 'geocode' in run_steps:
            self._geocode_diagnostic_files(self.mintpy_dir)

    def cleanup(self) -> None:
        """Remove large ISCE2 intermediate directories and input data no longer needed
        after MintPy has loaded all data into HDF5.

        Removes under ``isce/``:
          - ``coarse_interferograms/``
          - ``ESD/``
          - ``coreg_secondarys/``
          - ``interferograms/``

        Removes at workdir level:
          - ``slc/``
          - ``dem/``
        """
        if self.config.debug:
            print(f"{Fore.YELLOW}Debug mode enabled. Skipping cleanup.{Fore.RESET}")
            return

        isce_subdirs = [
            self.isce_dir / "coarse_interferograms",
            self.isce_dir / "ESD",
            self.isce_dir / "coreg_secondarys",
            self.isce_dir / "interferograms",
        ]
        workdir_subdirs = [
            self.workdir / "slc",
            self.workdir / "dem",
        ]

        print(f"{Fore.CYAN}Cleaning up ISCE2 intermediate directories…{Fore.RESET}")
        for folder in isce_subdirs + workdir_subdirs:
            if folder.exists() and folder.is_dir():
                try:
                    shutil.rmtree(folder)
                    print(f"  Removed: {folder.relative_to(self.workdir)}")
                except Exception as e:
                    print(f"{Fore.RED}  Failed to remove {folder}: {e}{Fore.RESET}")
            else:
                print(f"  Skipped (not found): {folder.relative_to(self.workdir)}")
        print(f"{Fore.GREEN}Cleanup complete.{Fore.RESET}")

    # ── Path discovery ────────────────────────────────────────────────────────

    def _set_load_parameters(self) -> None:
        """Wire all MintPy load_* fields from the stackSentinel output layout.

        Handles both geocoded (.geo suffix) and radar-coordinate (no suffix)
        variants, and both merged/geometry/ and merged/geom_reference/ layouts.
        """
        isce   = self.isce_dir
        merged = isce / "merged"

        # ── interferogram files: prefer geocoded, fall back to radar-coordinate ──
        ifg_base = merged / "interferograms"
        sample_pair = next((d for d in ifg_base.iterdir() if d.is_dir()), None) if ifg_base.exists() else None

        def _ifg_file(stem: str) -> str:
            if sample_pair and (sample_pair / f"{stem}.geo").exists():
                return str(ifg_base / "*" / f"{stem}.geo")
            return str(ifg_base / "*" / stem)

        self.config.load_unwFile      = _ifg_file("filt_fine.unw")
        self.config.load_corFile      = _ifg_file("filt_fine.cor")
        self.config.load_connCompFile = _ifg_file("filt_fine.unw.conncomp")

        # ── geometry: prefer merged/geometry/, fall back to merged/geom_reference/ ──
        geo = merged / "geometry" if (merged / "geometry").exists() else merged / "geom_reference"

        def _geo(name_geo: str, name_rdr: str) -> str:
            f = geo / name_geo
            return str(f) if f.exists() else str(geo / name_rdr)

        self.config.load_demFile      = _geo("hgt.rdr.geo",       "hgt.rdr")
        self.config.load_incAngleFile = _geo("incLocal.rdr.geo",   "incLocal.rdr")
        self.config.load_lookupYFile  = _geo("lat.rdr.geo",        "lat.rdr")
        self.config.load_lookupXFile  = _geo("lon.rdr.geo",        "lon.rdr")

        # azimuth angle: dedicated file or fall back to los.rdr (band 2)
        az_file = geo / "azimuthAngle.rdr.geo"
        if not az_file.exists():
            az_file = geo / "azimuthAngle.rdr"
        if not az_file.exists():
            az_file = geo / "los.rdr"   # band 2 = azimuth angle
        self.config.load_azAngleFile = str(az_file)

        # shadow/layover mask
        for sn in ("shadowMask.rdr.geo", "shadowMask.rdr"):
            sf = geo / sn
            if sf.exists():
                self.config.load_shadowMaskFile = str(sf)
                break

        # water mask (pre-existing only)
        for wn in ("waterMask.geo", "waterMask.rdr.geo", "waterMask.rdr"):
            wf = geo / wn
            if wf.exists():
                self.config.load_waterMaskFile = str(wf)
                break
        else:
            self.config.load_waterMaskFile = "no"

        self.config.load_baselineDir = str(isce / "baselines")
        self.config.load_metaFile    = self._find_meta_file()

        print(f"{Fore.GREEN}  unwFile      : {self.config.load_unwFile}")
        print(f"  corFile      : {self.config.load_corFile}")
        print(f"  demFile      : {self.config.load_demFile}")
        print(f"  geometry dir : {geo}")
        print(f"  metaFile     : {self.config.load_metaFile}")
        print(f"  baselineDir  : {self.config.load_baselineDir}{Fore.RESET}")

    def _find_meta_file(self) -> str:
        """Locate the reference IW*.xml metadata file in the stackSentinel tree."""
        isce = self.isce_dir

        # isce/reference/IW*.xml  (most common stackSentinel layout)
        ref_dir = isce / "reference"
        if ref_dir.exists():
            xmls = sorted(ref_dir.glob("IW*.xml"))
            if xmls:
                return str(xmls[0])
            # isce/reference/{date}/IW*.xml
            for sub in sorted(ref_dir.iterdir()):
                if sub.is_dir():
                    xmls = sorted(sub.glob("IW*.xml"))
                    if xmls:
                        return str(xmls[0])

        # isce/merged/SLC/{date}/*.xml  (stackSentinel merged SLC layout)
        slc_merged = isce / "merged" / "SLC"
        if slc_merged.exists():
            for date_dir in sorted(slc_merged.iterdir()):
                if date_dir.is_dir():
                    xmls = sorted(date_dir.glob("*.slc.full.xml"))
                    if not xmls:
                        xmls = sorted(date_dir.glob("*.xml"))
                    if xmls:
                        return str(xmls[0])

        # fall back: pass the reference directory and let MintPy scan it
        return str(ref_dir)

Usage

  • Create Analyzer

    from insarhub import Analyzer
    
    analyzer = Analyzer.create('ISCE_SBAS', workdir='/your/work/dir')
    

    OR with explicit config:

    from insarhub.config.defaultconfig import ISCE_SBAS_Config
    
    cfg = ISCE_SBAS_Config(workdir='/your/work/dir')
    analyzer = Analyzer.create('ISCE_SBAS', config=cfg)
    
  • Prepare data

    Auto-discover ISCE2 outputs and write mintpy/.mintpy.cfg.

    analyzer.prep_data()
    
  • Run

    Run MintPy SBAS time-series analysis. All output written to workdir/mintpy/.

    analyzer.run()
    
  • Submit (HPC / SLURM mode)

    Inherited from Mintpy_SBAS_Base_Analyzer. Submit full MintPy run as a single sbatch job.

    analyzer.submit_hpc()
    
  • Clean up

    Remove large ISCE2 intermediate directories and input data no longer needed after load_data. Removes isce/coarse_interferograms/, isce/ESD/, isce/coreg_secondarys/, isce/interferograms/, slc/, and dem/.

    analyzer.cleanup()