跳转至

处理器

InSARHub 处理器模块专门提供干涉图处理功能。

  • 导入处理器

    导入 Processor 类以访问所有处理器功能

    from insarhub import Processor
    

  • 查看可用处理器

    列出所有已注册的处理器

    Processor.available()
    

可用处理器

HyP3 InSAR 处理器是 ASF HyP3 系统提供的基于云端的处理服务,用于从 Sentinel-1 SAR 数据生成干涉图。 InSARHub 将 hyp3_sdk 封装为其处理后端之一。

Hyp3_S1 专门封装了 hyp3_sdk 中的 insar_job,提供 InSAR SLC 处理工作流。

Source code in src/insarhub/processor/hyp3_s1.py
class Hyp3_S1(Hyp3Base):
    name = "Hyp3_S1"
    description = "HyP3 InSAR GAMMA processing. Produces geocoded interferograms from Sentinel-1 SLC pairs."
    compatible_downloader = "S1_SLC"
    default_config = Hyp3_S1_Config
    def __init__(self, config: Hyp3_S1_Config | None = None):
        super().__init__(config)
        # Fetch InSAR specific cost table
        try:
            self.cost = self.client.costs()['INSAR_GAMMA']['cost_table'][f'{self.config.looks}']
        except Exception as e:
            print(f"{Fore.YELLOW}Warning: Could not fetch InSAR cost table ({e}). Using local cost table.{Fore.RESET}")
            if self.config.looks == "20x4":
                self.cost = 10
            elif self.config.looks == "10x2":
                self.cost = 15
            else:
                raise ValueError(f"{Fore.RED}Unsupported looks configuration: {self.config.looks}. Please provide a valid cost for this looks setting.{Fore.RESET}")

    def submit(self):
        """
        Submit InSAR jobs to HyP3 based on the current configuration.

        Prepares job payloads from the `pairs` in the configuration
        and submits them via `_submit_job_queue`, handling user rotation,
        batching, and credit checks.

        The job names are automatically generated using the `name_prefix`
        and scene IDs.

        Raises:
            ValueError: If `self.config.pairs` is not a tuple of two strings
                        or a list of tuples of two strings.

        Returns:
            dict:
                A dictionary mapping usernames to lists of submitted `Batch` objects.

        Example:
            ```python
            processor = Hyp3InSARProcessor(config)
            batches = processor.submit()
            for user, batch in batches.items():
                print(f"{user} submitted {len(batch)} jobs")
            ```
        """

        # Normalize pairs input
        if isinstance(self.config.pairs, (list, tuple)) and all(isinstance(p, str) for p in self.config.pairs):
                pairs = [(self.config.pairs[0], self.config.pairs[1])]
        elif isinstance(self.config.pairs, (list, tuple)) and all(isinstance(p, tuple) for p in self.config.pairs):
            pairs = self.config.pairs
        else:
            raise ValueError(f"{Fore.RED}Invalid pairs format. Provide a list of tuples or a tuple of two strings.\n")

        job_queue: list[dict] = []

        for (ref_id, sec_id) in pairs:
            # We use the client to help format the dict, but we don't submit yet.
            # We are preparing the payload for _submit_job_queue
            job = self.client.prepare_insar_job(
                granule1=ref_id,
                granule2=sec_id,
                name= f"{self.config.name_prefix}_{ref_id.split('_')[5]}_{sec_id.split('_')[5]}",
                include_look_vectors=self.config.include_look_vectors,
                include_inc_map = self.config.include_inc_map,
                looks = self.config.looks,
                include_dem=self.config.include_dem,
                include_wrapped_phase=self.config.include_wrapped_phase,
                apply_water_mask=self.config.apply_water_mask,
                include_displacement_maps=self.config.include_displacement_maps,
                phase_filter_parameter=self.config.phase_filter_parameter
            )
            job_queue.append(job)

        # Send to base class for batching and submission
        batchs = self._submit_job_queue(job_queue)
        self.batchs = batchs
        return batchs

使用方法

  • 使用参数创建处理器

    使用搜索条件初始化处理器实例

    processor = Processor.create('Hyp3_S1', workdir='/your/work/path', pairs=pairs)
    
    params = {
        "workdir": '/your/work/path',
        "pairs": pairs,
    }
    processor = Processor.create('Hyp3_S1', **params)
    
    from insarhub.config.defaultconfig import Hyp3_S1_Config
    cfg = Hyp3_S1_Config(workdir='/your/work/path', pairs=pairs)
    processor = Processor.create('Hyp3_S1', config=cfg)
    

    Attributes:

    Name Type Description
    workdir Path | str

    Directory where downloaded products will be stored. If provided as a string, it will be converted to a resolved Path object during initialization.

    saved_job_path Path | str | None

    Optional path to a saved job JSON file for reloading previously submitted jobs. If provided as a string, it will be converted to a resolved Path object.

    earthdata_credentials_pool dict[str, str] | None

    Dictionary mapping usernames to passwords for managing multiple Earthdata accounts. Used for parallel or quota-aware submissions.

    skip_existing bool

    If True, skip submission or download of products that already exist locally.

    submission_chunk_size int

    Number of jobs submitted per batch request to the API. Helps avoid request size limits and API throttling.

    max_workers int

    Maximum number of worker threads used for concurrent submissions or downloads. Recommended to keep below 8 to avoid overwhelming the API or triggering rate limits.

    Attributes:

    Name Type Description
    pairs list[tuple[str, str]] | None

    List of Sentinel-1 scene ID pairs in the form [(reference_scene, secondary_scene), ...]. If None, pairs must be provided during submission.

    name_prefix str | None

    Prefix added to generated HyP3 job names.

    include_look_vectors bool

    If True, include look vector layers in the output product.

    include_los_displacement bool

    If True, include line-of-sight (LOS) displacement maps.

    include_inc_map bool

    If True, include incidence angle maps.

    looks str

    Multi-looking factor in the format "range x azimuth" (e.g., "20x4").

    include_dem bool

    If True, include the DEM used during processing.

    include_wrapped_phase bool

    If True, include wrapped interferometric phase output.

    apply_water_mask bool

    If True, apply a water mask during processing.

    include_displacement_maps bool

    If True, include unwrapped displacement maps.

    phase_filter_parameter float

    Phase filtering strength parameter (typically between 0 and 1). Higher values apply stronger filtering.

    Source code in src/insarhub/config/defaultconfig.py
    @dataclass
    class Hyp3_S1_Config(Hyp3_Base_Config):
        """
        Configuration options for `hyp3_sdk` InSAR GAMMA processing jobs.
    
        This dataclass defines all parameters used when submitting
        InSAR jobs to the ASF HyP3 service using the GAMMA workflow.
    
        UI metadata is stored in ``_ui_groups`` / ``_ui_fields`` and consumed
        by the API layer to auto-generate the settings panel.
    
        Attributes:
            pairs (list[tuple[str, str]] | None):
                List of Sentinel-1 scene ID pairs in the form
                [(reference_scene, secondary_scene), ...].
                If None, pairs must be provided during submission.
    
            name_prefix (str | None):
                Prefix added to generated HyP3 job names.
    
            include_look_vectors (bool):
                If True, include look vector layers in the output product.
    
            include_los_displacement (bool):
                If True, include line-of-sight (LOS) displacement maps.
    
            include_inc_map (bool):
                If True, include incidence angle maps.
    
            looks (str):
                Multi-looking factor in the format "range x azimuth"
                (e.g., "20x4").
    
            include_dem (bool):
                If True, include the DEM used during processing.
    
            include_wrapped_phase (bool):
                If True, include wrapped interferometric phase output.
    
            apply_water_mask (bool):
                If True, apply a water mask during processing.
    
            include_displacement_maps (bool):
                If True, include unwrapped displacement maps.
    
            phase_filter_parameter (float):
                Phase filtering strength parameter (typically between 0 and 1).
                Higher values apply stronger filtering.
        """
    
        # ── UI metadata consumed by the API / settings panel ─────────────────────
        _ui_groups: ClassVar[list] = [
            {"label": "Processing",
             "fields": ["looks", "phase_filter_parameter", "name_prefix", "apply_water_mask"]},
            {"label": "Outputs",
             "fields": ["include_dem", "include_look_vectors", "include_inc_map",
                        "include_los_displacement", "include_wrapped_phase", "include_displacement_maps"]},
            {"label": "Job",
             "fields": ["skip_existing", "submission_chunk_size"]},
        ]
        _ui_fields: ClassVar[dict] = {
            "looks":                    {"type": "select", "options": ["20x4", "10x2"],
                                         "hint": "Range × azimuth looks (20x4 ≈ 80 m, 10x2 ≈ 40 m)"},
            "phase_filter_parameter":   {"type": "number", "min": 0, "max": 1, "step": 0.1,
                                         "default": 0.6,
                                         "hint": "Goldstein filter strength (0 = off, 1 = maximum)"},
            "name_prefix":              {"type": "text"},
            "apply_water_mask":         {"type": "bool"},
            "include_dem":              {"type": "bool"},
            "include_look_vectors":     {"type": "bool"},
            "include_inc_map":          {"type": "bool"},
            "include_los_displacement": {"type": "bool"},
            "include_wrapped_phase":    {"type": "bool"},
            "include_displacement_maps":{"type": "bool"},
            "skip_existing":            {"type": "bool",
                                         "hint": "Skip re-downloading already-completed jobs"},
            "submission_chunk_size":    {"type": "number", "min": 1, "max": 500, "step": 1,
                                         "default": 200,
                                         "hint": "Jobs per API batch request"},
        }
        # ─────────────────────────────────────────────────────────────────────────
    
        name: str = "Hyp3_S1_Config"
        pairs: list[tuple[str, str]] | None = None
        name_prefix: str | None = 'ifg'
        include_look_vectors:bool=True
        include_los_displacement:bool=False
        include_inc_map:bool=True
        looks:str='20x4'
        include_dem :bool=True
        include_wrapped_phase :bool=False
        apply_water_mask :bool=True
        include_displacement_maps:bool=True
        phase_filter_parameter :float=0.6
    
  • 提交任务

    根据当前配置向 HyP3 提交 InSAR 任务。

    jobs = processor.submit()
    

    Raises:

    Type Description
    ValueError

    If self.config.pairs is not a tuple of two strings or a list of tuples of two strings.

  • 刷新任务

    刷新所有任务的状态。

    jobs = processor.refresh()
    

    Raises:

    Type Description
    ValueError

    If no jobs are loaded in memory.

  • 重试失败任务

    通过重新提交来重试所有失败的任务。

    jobs = processor.retry()
    
  • 下载成功任务

    下载所有用户的已成功任务。

    processor.download()
    
  • 保存当前任务

    将当前任务批次信息保存到 JSON 文件。

    processor.save()
    

    Parameters:

    Name Type Description Default
    save_path Path | str | None

    Path to save the batch JSON file. If None, defaults to hyp3_jobs.json in self.output_dir.

    None

    Raises:

    Type Description
    ValueError

    If no job batches exist to save.

  • 监控任务

    持续监控任务并下载已完成的输出。

    processor.watch()
    

    Parameters:

    Name Type Description Default
    refresh_interval int

    Time interval (in seconds) between refreshes.

    300
  • 加载已保存任务

    加载之前保存的 JSON 文件并恢复工作。

    processor = Processor.create('Hyp3_S1', saved_job_path='path/to/your/json/file.json')
    

    加载后可恢复检查/下载提交至 HyP3 服务器的任务。

ISCE_S1 处理器在本地运行 ISCE2 stackSentinel,从下载的 SLC .SAFE 文件生成 Sentinel-1 干涉图。它生成一系列编号运行脚本并顺序执行,在每个步骤内并行运行独立命令。

  • 导入处理器

    from insarhub import Processor
    
  • 创建处理器

    from insarhub.config import ISCE_S1_Config
    
    cfg = ISCE_S1_Config(
        workdir='/data/p100_f466',
        bbox=[33.0, 38.0, -120.0, -115.0],   # [南, 北, 西, 东]
    )
    pairs = [('20200101', '20200113'), ('20200113', '20200125')]
    processor = Processor.create('ISCE_S1', pairs=pairs, config=cfg)
    

    Attributes:

    Name Type Description
    workdir Path | str

    Processing root. All outputs (run_files/, merged/, etc.) live here.

    slc_dir Path | str

    Directory containing all Sentinel-1 SLC .SAFE files (or .zips).

    orbit_dir Path | str | None

    Directory with .EOF orbit files. Created automatically if absent.

    aux_dir Path | str | None

    Directory with Sentinel-1 AUX_CAL files. Defaults to workdir/aux; ISCE2 downloads missing files there on first run.

    dem_path Path | str | None

    ISCE2-binary DEM (dem.wgs84 + .xml sidecar). When None, GLO-30 is pre-downloaded using bbox.

    isce_home Path | str | None

    ISCE2 installation root. Falls back to $ISCE_HOME env var.

    bbox list[float] | None

    Area of interest as [S, N, W, E] degrees. Required when dem_path is None.

    num_overlap_connections int

    Connections used for NESD azimuth coregistration.

    reference_date str | None

    Stack reference date YYYYMMDD. None = stackSentinel auto-selects.

    coregistration str

    'NESD' (default, more accurate) or 'geometry' (faster).

    max_workers int

    Parallel commands within each run step.

  • 提交(本地模式)

    生成运行脚本并在后台进程中开始顺序执行。立即返回;使用 refresh() 监控进度。

    jobs = processor.submit()
    
  • 提交(HPC / SLURM 模式)

    在配置中设置 hpc_mode=True,将每个步骤作为单独的 sbatch 任务提交。

    cfg = ISCE_S1_Config(
        workdir='/data/p100_f466',
        bbox=[33.0, 38.0, -120.0, -115.0],
        hpc_mode=True,
    )
    processor = Processor.create('ISCE_S1', pairs=pairs, config=cfg)
    processor.submit()
    
  • 试运行

    预览运行脚本和路径检查,不执行任何操作。

    cfg = ISCE_S1_Config(
        workdir='/data/p100_f466',
        bbox=[33.0, 38.0, -120.0, -115.0],
        dry_run=True,
    )
    processor = Processor.create('ISCE_S1', pairs=pairs, config=cfg)
    processor.submit()
    
  • 刷新

    从磁盘读取步骤和命令状态。

    jobs = processor.refresh()
    
  • 重试失败步骤

    重新运行所有状态为 FAILED 的步骤。

    processor.retry()
    
  • 取消

    终止正在运行的后台进程(本地模式)或对所有活动 SLURM 任务执行 scancel(HPC 模式)。

    processor.cancel()
    
  • 监控

    定期轮询步骤状态,直到所有步骤完成。

    processor.watch(refresh_interval=60)
    
  • 保存 / 加载

    任务状态在 submit() 后自动保存。从已保存的任务文件重新加载并恢复:

    cfg = ISCE_S1_Config(
        workdir='/data/p100_f466',
        saved_job_path='/data/p100_f466/isce/isce_jobs_<timestamp>.json',
    )
    processor = Processor.create('ISCE_S1', pairs=[], config=cfg)
    processor.refresh()   # 或 .retry()、.cancel()、.watch()