Skip to content

Processor

The InSARHub Processor module provides functionality specifically for interferogram processing.

  • Import processor

    Import the Processor class to access all processor functionality

    from insarhub import Processor
    

  • View available processors

    List all registered processors

    Processor.available()
    

Available Processors

The HyP3 InSAR processor is a cloud-based processing service provided by the ASF HyP3 system for generating interferograms from Sentinel-1 SAR data. InSARHub wrapped hyp3_sdk as one of its process backends.

The Hyp3_S1 specifically wraps insar_job in hyp3_sdk to provide InSAR SLC processing workflows.

Source code in src/insarhub/processor/hyp3_s1.py
class Hyp3_S1(Hyp3Base):
    name = "Hyp3_S1"
    description = "HyP3 InSAR GAMMA processing. Produces geocoded interferograms from Sentinel-1 SLC pairs."
    compatible_downloader = "S1_SLC"
    default_config = Hyp3_S1_Config
    def __init__(self, config: Hyp3_S1_Config | None = None):
        super().__init__(config)
        # Fetch InSAR specific cost table
        try:
            self.cost = self.client.costs()['INSAR_GAMMA']['cost_table'][f'{self.config.looks}']
        except Exception as e:
            print(f"{Fore.YELLOW}Warning: Could not fetch InSAR cost table ({e}). Using local cost table.{Fore.RESET}")
            if self.config.looks == "20x4":
                self.cost = 10
            elif self.config.looks == "10x2":
                self.cost = 15
            else:
                raise ValueError(f"{Fore.RED}Unsupported looks configuration: {self.config.looks}. Please provide a valid cost for this looks setting.{Fore.RESET}")

    def submit(self):
        """
        Submit InSAR jobs to HyP3 based on the current configuration.

        Prepares job payloads from the `pairs` in the configuration
        and submits them via `_submit_job_queue`, handling user rotation,
        batching, and credit checks.

        The job names are automatically generated using the `name_prefix`
        and scene IDs.

        Raises:
            ValueError: If `self.config.pairs` is not a tuple of two strings
                        or a list of tuples of two strings.

        Returns:
            dict:
                A dictionary mapping usernames to lists of submitted `Batch` objects.

        Example:
            ```python
            processor = Hyp3InSARProcessor(config)
            batches = processor.submit()
            for user, batch in batches.items():
                print(f"{user} submitted {len(batch)} jobs")
            ```
        """

        # Normalize pairs input
        if isinstance(self.config.pairs, (list, tuple)) and all(isinstance(p, str) for p in self.config.pairs):
                pairs = [(self.config.pairs[0], self.config.pairs[1])]
        elif isinstance(self.config.pairs, (list, tuple)) and all(isinstance(p, tuple) for p in self.config.pairs):
            pairs = self.config.pairs
        else:
            raise ValueError(f"{Fore.RED}Invalid pairs format. Provide a list of tuples or a tuple of two strings.\n")

        job_queue: list[dict] = []

        for (ref_id, sec_id) in pairs:
            # We use the client to help format the dict, but we don't submit yet.
            # We are preparing the payload for _submit_job_queue
            job = self.client.prepare_insar_job(
                granule1=ref_id,
                granule2=sec_id,
                name= f"{self.config.name_prefix}_{ref_id.split('_')[5]}_{sec_id.split('_')[5]}",
                include_look_vectors=self.config.include_look_vectors,
                include_inc_map = self.config.include_inc_map,
                looks = self.config.looks,
                include_dem=self.config.include_dem,
                include_wrapped_phase=self.config.include_wrapped_phase,
                apply_water_mask=self.config.apply_water_mask,
                include_displacement_maps=self.config.include_displacement_maps,
                phase_filter_parameter=self.config.phase_filter_parameter
            )
            job_queue.append(job)

        # Send to base class for batching and submission
        batchs = self._submit_job_queue(job_queue)
        self.batchs = batchs
        return batchs

Usage

  • Create Processor with Parameters

    Initialize a processor instance with search criteria

    processor = Processor.create('Hyp3_S1', workdir='/your/work/path', pairs=pairs)
    
    OR
    params = {
        "workdir": '/your/work/path',
        "pairs": pairs,
    }
    processor = Processor.create('Hyp3_S1', **params)
    
    OR
    from insarhub.config.defaultconfig import Hyp3_S1_Config
    cfg = Hyp3_S1_Config(workdir='/your/work/path', pairs=pairs)
    processor = Processor.create('Hyp3_S1', config=cfg)
    

    Attributes:

    Name Type Description
    workdir Path | str

    Directory where downloaded products will be stored. If provided as a string, it will be converted to a resolved Path object during initialization.

    saved_job_path Path | str | None

    Optional path to a saved job JSON file for reloading previously submitted jobs. If provided as a string, it will be converted to a resolved Path object.

    earthdata_credentials_pool dict[str, str] | None

    Dictionary mapping usernames to passwords for managing multiple Earthdata accounts. Used for parallel or quota-aware submissions.

    skip_existing bool

    If True, skip submission or download of products that already exist locally.

    submission_chunk_size int

    Number of jobs submitted per batch request to the API. Helps avoid request size limits and API throttling.

    max_workers int

    Maximum number of worker threads used for concurrent submissions or downloads. Recommended to keep below 8 to avoid overwhelming the API or triggering rate limits.

    Attributes:

    Name Type Description
    pairs list[tuple[str, str]] | None

    List of Sentinel-1 scene ID pairs in the form [(reference_scene, secondary_scene), ...]. If None, pairs must be provided during submission.

    name_prefix str | None

    Prefix added to generated HyP3 job names.

    include_look_vectors bool

    If True, include look vector layers in the output product.

    include_los_displacement bool

    If True, include line-of-sight (LOS) displacement maps.

    include_inc_map bool

    If True, include incidence angle maps.

    looks str

    Multi-looking factor in the format "range x azimuth" (e.g., "20x4").

    include_dem bool

    If True, include the DEM used during processing.

    include_wrapped_phase bool

    If True, include wrapped interferometric phase output.

    apply_water_mask bool

    If True, apply a water mask during processing.

    include_displacement_maps bool

    If True, include unwrapped displacement maps.

    phase_filter_parameter float

    Phase filtering strength parameter (typically between 0 and 1). Higher values apply stronger filtering.

    Source code in src/insarhub/config/defaultconfig.py
    @dataclass
    class Hyp3_S1_Config(Hyp3_Base_Config):
        """
        Configuration options for `hyp3_sdk` InSAR GAMMA processing jobs.
    
        This dataclass defines all parameters used when submitting
        InSAR jobs to the ASF HyP3 service using the GAMMA workflow.
    
        UI metadata is stored in ``_ui_groups`` / ``_ui_fields`` and consumed
        by the API layer to auto-generate the settings panel.
    
        Attributes:
            pairs (list[tuple[str, str]] | None):
                List of Sentinel-1 scene ID pairs in the form
                [(reference_scene, secondary_scene), ...].
                If None, pairs must be provided during submission.
    
            name_prefix (str | None):
                Prefix added to generated HyP3 job names.
    
            include_look_vectors (bool):
                If True, include look vector layers in the output product.
    
            include_los_displacement (bool):
                If True, include line-of-sight (LOS) displacement maps.
    
            include_inc_map (bool):
                If True, include incidence angle maps.
    
            looks (str):
                Multi-looking factor in the format "range x azimuth"
                (e.g., "20x4").
    
            include_dem (bool):
                If True, include the DEM used during processing.
    
            include_wrapped_phase (bool):
                If True, include wrapped interferometric phase output.
    
            apply_water_mask (bool):
                If True, apply a water mask during processing.
    
            include_displacement_maps (bool):
                If True, include unwrapped displacement maps.
    
            phase_filter_parameter (float):
                Phase filtering strength parameter (typically between 0 and 1).
                Higher values apply stronger filtering.
        """
    
        # ── UI metadata consumed by the API / settings panel ─────────────────────
        _ui_groups: ClassVar[list] = [
            {"label": "Processing",
             "fields": ["looks", "phase_filter_parameter", "name_prefix", "apply_water_mask"]},
            {"label": "Outputs",
             "fields": ["include_dem", "include_look_vectors", "include_inc_map",
                        "include_los_displacement", "include_wrapped_phase", "include_displacement_maps"]},
            {"label": "Job",
             "fields": ["skip_existing", "submission_chunk_size"]},
        ]
        _ui_fields: ClassVar[dict] = {
            "looks":                    {"type": "select", "options": ["20x4", "10x2"],
                                         "hint": "Range × azimuth looks (20x4 ≈ 80 m, 10x2 ≈ 40 m)"},
            "phase_filter_parameter":   {"type": "number", "min": 0, "max": 1, "step": 0.1,
                                         "default": 0.6,
                                         "hint": "Goldstein filter strength (0 = off, 1 = maximum)"},
            "name_prefix":              {"type": "text"},
            "apply_water_mask":         {"type": "bool"},
            "include_dem":              {"type": "bool"},
            "include_look_vectors":     {"type": "bool"},
            "include_inc_map":          {"type": "bool"},
            "include_los_displacement": {"type": "bool"},
            "include_wrapped_phase":    {"type": "bool"},
            "include_displacement_maps":{"type": "bool"},
            "skip_existing":            {"type": "bool",
                                         "hint": "Skip re-downloading already-completed jobs"},
            "submission_chunk_size":    {"type": "number", "min": 1, "max": 500, "step": 1,
                                         "default": 200,
                                         "hint": "Jobs per API batch request"},
        }
        # ─────────────────────────────────────────────────────────────────────────
    
        name: str = "Hyp3_S1_Config"
        pairs: list[tuple[str, str]] | None = None
        name_prefix: str | None = 'ifg'
        include_look_vectors:bool=True
        include_los_displacement:bool=False
        include_inc_map:bool=True
        looks:str='20x4'
        include_dem :bool=True
        include_wrapped_phase :bool=False
        apply_water_mask :bool=True
        include_displacement_maps:bool=True
        phase_filter_parameter :float=0.6
    
  • Submit Jobs

    Submit InSAR jobs to HyP3 based on the current configuration.

    jobs = processor.submit()
    

    Raises:

    Type Description
    ValueError

    If self.config.pairs is not a tuple of two strings or a list of tuples of two strings.

  • Refresh Jobs

    Refresh the status of all jobs.

    jobs = processor.refresh()
    

    Raises:

    Type Description
    ValueError

    If no jobs are loaded in memory.

  • Retry Failed Jobs

    Retry all failed jobs by re-submitting them.

    jobs = processor.retry()
    
  • Download Succeeded Jobs

    Download all succeeded jobs for all users.

    processor.download()
    
  • Save Current Jobs

    Save the current job batch information to a JSON file.

    processor.save()
    

    Parameters:

    Name Type Description Default
    save_path Path | str | None

    Path to save the batch JSON file. If None, defaults to hyp3_jobs.json in self.output_dir.

    None

    Raises:

    Type Description
    ValueError

    If no job batches exist to save.

  • Watch Jobs

    Continuously monitor jobs and download completed outputs.

    processor.watch()
    

    Parameters:

    Name Type Description Default
    refresh_interval int

    Time interval (in seconds) between refreshes.

    300
  • Load Saved Job

    Load a previously saved JSON file and resume work.

    processor = Processor.create('Hyp3_S1', saved_job_path='path/to/your/json/file.json')
    

    When loaded, you can resume checking or downloading jobs submitted to the HyP3 server.

The ISCE_S1 processor runs ISCE2 stackSentinel locally to generate Sentinel-1 interferograms from downloaded SLC .SAFE files. It generates a numbered sequence of run scripts and executes them sequentially, parallelising independent commands within each step.

  • Import processor

    from insarhub import Processor
    
  • Create processor

    from insarhub.config import ISCE_S1_Config
    
    cfg = ISCE_S1_Config(
        workdir='/data/p100_f466',
        bbox=[33.0, 38.0, -120.0, -115.0],   # [S, N, W, E]
    )
    pairs = [('20200101', '20200113'), ('20200113', '20200125')]
    processor = Processor.create('ISCE_S1', pairs=pairs, config=cfg)
    

    Attributes:

    Name Type Description
    workdir Path | str

    Processing root. All outputs (run_files/, merged/, etc.) live here.

    slc_dir Path | str

    Directory containing all Sentinel-1 SLC .SAFE files (or .zips).

    orbit_dir Path | str | None

    Directory with .EOF orbit files. Created automatically if absent.

    aux_dir Path | str | None

    Directory with Sentinel-1 AUX_CAL files. Defaults to workdir/aux; ISCE2 downloads missing files there on first run.

    dem_path Path | str | None

    ISCE2-binary DEM (dem.wgs84 + .xml sidecar). When None, GLO-30 is pre-downloaded using bbox.

    isce_home Path | str | None

    ISCE2 installation root. Falls back to $ISCE_HOME env var.

    bbox list[float] | None

    Area of interest as [S, N, W, E] degrees. Required when dem_path is None.

    num_overlap_connections int

    Connections used for NESD azimuth coregistration.

    reference_date str | None

    Stack reference date YYYYMMDD. None = stackSentinel auto-selects.

    coregistration str

    'NESD' (default, more accurate) or 'geometry' (faster).

    max_workers int

    Parallel commands within each run step.

  • Submit (local mode)

    Generate run scripts and start sequential execution in a background process. Returns immediately; use refresh() to monitor progress.

    jobs = processor.submit()
    
  • Submit (HPC / SLURM mode)

    Set hpc_mode=True in the config to submit each step as a separate sbatch job instead of running locally.

    cfg = ISCE_S1_Config(
        workdir='/data/p100_f466',
        bbox=[33.0, 38.0, -120.0, -115.0],
        hpc_mode=True,
    )
    processor = Processor.create('ISCE_S1', pairs=pairs, config=cfg)
    processor.submit()
    
  • Dry run

    Preview the run scripts and path checks without executing anything.

    cfg = ISCE_S1_Config(
        workdir='/data/p100_f466',
        bbox=[33.0, 38.0, -120.0, -115.0],
        dry_run=True,
    )
    processor = Processor.create('ISCE_S1', pairs=pairs, config=cfg)
    processor.submit()
    
  • Refresh

    Read step and command statuses from disk.

    jobs = processor.refresh()
    
  • Retry failed steps

    Re-run all steps that have FAILED status.

    processor.retry()
    
  • Cancel

    Terminate the running background process (local mode) or scancel all active SLURM jobs (HPC mode).

    processor.cancel()
    
  • Watch

    Poll step statuses at regular intervals until all steps complete.

    processor.watch(refresh_interval=60)
    
  • Save / Load

    Job state is saved automatically after submit(). To reload and resume from a saved job file:

    cfg = ISCE_S1_Config(
        workdir='/data/p100_f466',
        saved_job_path='/data/p100_f466/isce/isce_jobs_<timestamp>.json',
    )
    processor = Processor.create('ISCE_S1', pairs=[], config=cfg)
    processor.refresh()   # or .retry(), .cancel(), .watch()