Skip to content

Utilities

Utilities are sets of tools designed to support and streamline InSAR processing workflows.

Select Pairs

Select interferogram pairs from ASF search results based on temporal and perpendicular baseline criteria.

from insarhub import Downloader
from insarhub.utils import select_pairs 
s1 = Downloader.create('S1_SLC', 
                    intersectsWith=[-113.05, 37.74, -112.68, 38.00],
                    start='2020-01-01', 
                    end='2020-12-31',  
                    relativeOrbit=100, 
                    frame=466, 
                    workdir='path/to/dir')
results = dl.search()

pairs, baselines = select_pairs(search_results=results)

Parameters:

Name Type Description Default
search_results list[ASFProduct] | dict[tuple[int, int], list[ASFProduct]]

Either a flat list (single stack) or a dictionary keyed by (path, frame).

required
dt_targets list[float]

Preferred temporal spacings in days. A candidate pair passes if |dt - target| <= dt_tol for at least one target.

(6, 12, 24, 36, 48, 72, 96)
dt_tol float

Tolerance in days added to each entry in dt_targets.

3
dt_max float

Maximum temporal baseline in days.

120
pb_max float

Maximum perpendicular baseline in meters.

150.0
min_degree int

Minimum interferogram connections per scene. Enforced when force_connect is True.

3
max_degree int

Maximum interferogram connections per scene.

999
force_connect bool

If a scene falls below min_degree after primary selection, add its nearest-time neighbors that satisfy pb_max and dt_max. May introduce lower-quality pairs; a warning is logged.

True
max_workers int

Number of threads for API fallback. Has no effect if all products have local baseline data (common for Sentinel-1 and ALOS). Set to 1 to disable threading (useful for debugging).

8
Source code in src/insarhub/utils/tool.py
def select_pairs(
    search_results: Union[dict[tuple[int, int], list[ASFProduct]], list[ASFProduct]],
    dt_targets: tuple[int, ...] = (6, 12, 24, 36, 48, 72, 96),
    dt_tol: int = 3,
    dt_max: int = 120,
    pb_max: float = 150.0,
    min_degree: int = 3,
    max_degree: int = 999,
    force_connect: bool = True,
    max_workers: int = 8
) -> Union[PairGroup, list[Pair]]:

    """
    Select interferogram pairs based on temporal and perpendicular baseline.

    This function selects interferogram pairs according to temporal spacing 
    and perpendicular baseline constraints, optionally enforcing connectivity 
    rules per scene.

    Supported sensors:
    - Sentinel-1 (CALCULATED)  : stateVectors + ascendingNodeTime → local
    - ALOS / ERS / RADARSAT (PRE_CALCULATED) : insarBaseline scalar → local
    - Any product missing data : ref.stack() API call → fallback

    Args:
        search_results (list[ASFProduct] | dict[tuple[int,int], list[ASFProduct]]):
            Either a flat list (single stack) or a dictionary keyed by (path, frame).
        dt_targets (list[float], optional):
            Preferred temporal spacings in days. A candidate pair passes if 
            |dt - target| <= dt_tol for at least one target.
        dt_tol (float, optional):
            Tolerance in days added to each entry in dt_targets.
        dt_max (float, optional):
            Maximum temporal baseline in days.
        pb_max (float, optional):
            Maximum perpendicular baseline in meters.
        min_degree (int, optional):
            Minimum interferogram connections per scene. Enforced when force_connect is True.
        max_degree (int, optional):
            Maximum interferogram connections per scene.
        force_connect (bool, optional):
            If a scene falls below min_degree after primary selection, add its nearest-time 
            neighbors that satisfy pb_max and dt_max. May introduce lower-quality pairs; a warning is logged.
        max_workers (int, optional):
            Number of threads for API fallback. Has no effect if all products have local baseline 
            data (common for Sentinel-1 and ALOS). Set to 1 to disable threading (useful for debugging).

    Returns:
        tuple of three elements:

        pairs (list[Pair] | dict[tuple[int,int], list[Pair]]):
            A flat list of Pair tuples ``(earlier_scene, later_scene)`` if
            *search_results* was a list, or a dict keyed by ``(path, frame)``
            with lists of Pair tuples if *search_results* was a dict.

        baselines (BaselineTable | dict[tuple[int,int], BaselineTable]):
            Pairwise baseline table mapping ``(scene_a, scene_b)`` →
            ``(dt_days, bperp_m)`` where ``bperp_m = |bp_a − bp_b|`` (always
            positive). Mirrors the structure of *pairs* (flat or grouped).

        scene_bperp (dict[str, float] | dict[tuple[int,int], dict[str, float]]):
            Signed perpendicular baseline for each scene relative to the
            anchor (earliest) scene, as returned by
            ``calculate_perpendicular_baselines``. Values can be negative or
            positive. Use this for network plots to reproduce the MintPy-style
            y-axis (negative/positive spread around zero). Mirrors the
            structure of *pairs* (flat or grouped).
    """

    # ── normalise input ───────────────────────────────────────────────────
    input_is_list = isinstance(search_results, list)
    if input_is_list:
        working_dict: dict[tuple[int, int], list[ASFProduct]] = {
            (0, 0): search_results   # type: ignore[arg-type]
        }
    elif isinstance(search_results, dict):
        working_dict = search_results
    else:
        raise TypeError(
            f"search_results must be a list or dict of ASFProducts, "
            f"got {type(search_results)}"
        )

    # ── primary filter helpers (defined once, closed over threshold args) ─
    def _near_target(dt: float) -> bool:
        return any(abs(dt - t) <= dt_tol for t in dt_targets)

    def _passes_primary(dt: float, bp: float) -> bool:
        return _near_target(dt) and dt <= dt_max and bp <= pb_max

    pairs_group: PairGroup = defaultdict(list)
    baseline_group: dict[tuple[int, int], BaselineTable] = {}
    scene_bperp_group: dict[tuple[int, int], dict] = {}

    # ── process each (path, frame) key ───────────────────────────────────
    for key, search_result in working_dict.items():
        if not input_is_list:
            logger.info(
                "%sSearching pairs for path %d frame %d …",
                Fore.GREEN, key[0], key[1],
            )

        # Sort by acquisition time so `names` is chronologically ordered
        prods = sorted(search_result, key=lambda p: p.properties["startTime"])

        if not prods:
            logger.warning("No products for key %s — skipping.", key)
            continue

        # Pre-parse acquisition datetimes to Unix timestamps (done once;
        # reused in sort keys, dt calculations, and pair ordering)
        id_time_raw: dict[SceneID, str] = {
            p.properties["sceneName"]: p.properties["startTime"] for p in prods
        }
        id_time_dt: dict[SceneID, DateFloat] = {
            sid: isoparse(t).timestamp() for sid, t in id_time_raw.items()
        }
        ids: set[SceneID] = set(id_time_raw)
        names: list[SceneID] = [p.properties["sceneName"] for p in prods]

        # ── 1. Build pairwise baseline table ─────────────────────────────
        B, scene_bp = _build_baseline_table(prods, ids, id_time_dt, max_workers=max_workers)
        baseline_group[key] = B
        scene_bperp_group[key] = scene_bp
        # ── 2. Primary pair selection ─────────────────────────────────────
        pairs: set[Pair] = {
            e for e, (dt, bp) in B.items() if _passes_primary(dt, bp)
        }
        logger.info(
            "Key %s — primary selection: %d / %d candidate pairs.",
            key, len(pairs), len(B),
        )

        # ── 3. Connectivity enforcement ───────────────────────────────────
        pairs = _enforce_connectivity(
            pairs,
            B,
            names,
            id_time_dt,
            min_degree=min_degree,
            max_degree=max_degree,
            pb_max=pb_max,
            dt_max=float(dt_max),
            force_connect=force_connect
        )

        pairs_group[key] = sorted(pairs)
        logger.info(
            "Key %s — final pair count: %d.", key, len(pairs_group[key])
        )
    pairs = pairs_group[(0, 0)] if input_is_list else pairs_group
    scene_bperp = scene_bperp_group.get((0, 0), {}) if input_is_list else scene_bperp_group

    return pairs, baseline_group, scene_bperp

Plot Pair Network

Plot selected interferogram pairs SBAS network from select_pairs based on temporal and perpendicular baseline criteria.

from insarhub.utils import plot_pair_network

fig = plot_pair_network(pairs=pairs, baselines=baselines)

Example:

networks

Parameters:

Name Type Description Default
pairs list[Pair] | PairGroup

A flat list of pairs or a dictionary keyed by (path, frame) with lists of pairs. Each pair is a tuple (earlier_scene, later_scene).

required
baselines BaselineTable

Table or mapping containing temporal and perpendicular baseline information for each interferogram pair.

required
title str

Main title of the network plot. Defaults to "Interferogram Network".

'Interferogram Network'
figsize tuple[int, int]

Figure size (width, height) in inches. Defaults to (18, 7).

(18, 7)
save_path str | Path | None

Path to save the generated figure. If None, figure is not saved. Defaults to None.

None

Raises:

Type Description
TypeError

If any scene name in pairs is not a string.

ValueError

If a scene name cannot be parsed into a valid date.

Source code in src/insarhub/utils/tool.py
def plot_pair_network(
    pairs: list[Pair] | PairGroup,
    baselines: BaselineTable,
    scene_baselines: dict | None = None,
    title: str = "Interferogram Network",
    figsize: tuple[int, int] = (18, 7),
    save_path: str |Path| None = None,
) -> plt.Figure| dict:

    """
    Plot an interferogram network along with per-scene connection statistics.

    This function visualizes the relationships between SAR acquisitions in
    terms of temporal and perpendicular baselines. The network graph is
    shown on the left, while a horizontal bar chart summarizes the number
    of connections per scene on the right.

    The layout is as follows:
        - Left  : Network graph (x-axis = days since first acquisition,
                  y-axis = perpendicular baseline [m])
        - Right : Horizontal bar chart showing the number of connections per SAR scene

    Args:
        pairs (list[Pair] | PairGroup):
            A flat list of pairs or a dictionary keyed by (path, frame)
            with lists of pairs. Each pair is a tuple `(earlier_scene, later_scene)`.
        baselines (BaselineTable):
            Table or mapping containing temporal and perpendicular baseline
            information for each interferogram pair.
        title (str, optional):
            Main title of the network plot. Defaults to "Interferogram Network".
        figsize (tuple[int, int], optional):
            Figure size (width, height) in inches. Defaults to (18, 7).
        save_path (str | Path | None, optional):
            Path to save the generated figure. If None, figure is not saved.
            Defaults to None.

    Returns:
        matplotlib.figure.Figure:
            The created matplotlib figure containing the network and
            per-scene connection histogram.

    Raises:
        TypeError:
            If any scene name in `pairs` is not a string.
        ValueError:
            If a scene name cannot be parsed into a valid date.

    Notes:
        - Node positions: x = days since first acquisition, y = perpendicular baseline.
        - Node color represents the node degree (number of connections).
        - Edge color and width represent temporal baseline.
        - Scenes with fewer than 2 connections are highlighted in red in the histogram.
        - Legends show node degree, temporal baseline, and path/frame grouping.
        - The top axis of the network plot shows real acquisition dates for reference.
    """

    # ── 0. Normalise input ────────────────────────────────────────────────

    if isinstance(pairs, dict):
        figures = {}

        save_path_obj = None
        save_is_dir = False

        if save_path is not None:
            save_path_obj = Path(save_path).expanduser()

            if save_path_obj.suffix == "":
                save_is_dir = True
                save_path_obj.mkdir(parents=True, exist_ok=True)
            else:
                # Has suffix → treat as file template
                save_path_obj.parent.mkdir(parents=True, exist_ok=True)

        for (path, frame), group_pairs in pairs.items():
            group_title = f"{title} — P{path}/F{frame}"
            group_save_path = None

            if save_path_obj is not None:
                if save_is_dir:
                    # Case 1: directory given
                    group_save_path = (
                        save_path_obj.joinpath(f"network_P{path}_F{frame}.png")
                    )
                else:
                    # Case 2: file with suffix given
                    group_save_path = (
                        save_path_obj.parent
                        / f"{save_path_obj.stem}_P{path}_F{frame}{save_path_obj.suffix}"
                    )

            fig = plot_pair_network(
                    pairs=group_pairs,
                    baselines=baselines[(path, frame)],
                    scene_baselines=scene_baselines.get((path, frame)) if isinstance(scene_baselines, dict) else scene_baselines,
                    title=group_title,
                    figsize=figsize,
                    save_path=group_save_path,
                )

            figures[(path, frame)] = fig

        return figures



    flat_pairs = pairs
    subtitle = f"{len(flat_pairs)} pairs"
    if save_path is not None:
        save_path = Path(save_path).expanduser()
    # ── 1. Parse dates ────────────────────────────────────────────────────
    scenes: set[SceneID] = set()
    for a, b in flat_pairs:
        scenes.update([a, b])

    def _parse_date(scene_name: str) -> datetime:
        if not isinstance(scene_name, str):
            raise TypeError(
                f"Expected str, got {type(scene_name).__name__}: {scene_name!r}."
            )
        m = re.search(r"(\d{8})", scene_name)
        if m:
            return datetime.strptime(m.group(1), "%Y%m%d")
        m = re.search(r"(\d{4}-\d{2}-\d{2})", scene_name)
        if m:
            return datetime.strptime(m.group(1), "%Y-%m-%d")
        raise ValueError(f"Cannot parse date from scene name: {scene_name}")

    id_time: dict[SceneID, datetime] = {s: _parse_date(s) for s in scenes}
    t0      = min(id_time.values())
    id_days: dict[SceneID, float] = {
        s: (id_time[s] - t0).total_seconds() / 86_400.0 for s in scenes
    }

    # ── 2. Build graph ────────────────────────────────────────────────────
    G = nx.Graph()
    G.add_nodes_from(scenes)

    if isinstance(pairs, dict):
        for (path, frame), pair_list in pairs.items():
            for a, b in pair_list:
                dt, bp = baselines.get((a, b), (_MISSING, _MISSING))
                G.add_edge(a, b, dt=dt, bp=bp, path=path, frame=frame)
    else:
        for a, b in flat_pairs:
            dt, bp = baselines.get((a, b), (_MISSING, _MISSING))
            G.add_edge(a, b, dt=dt, bp=bp, path=0, frame=0)

    # ── 3. Node positions (x=days, y=bperp) ──────────────────────────────
    if scene_baselines:
        # Use signed per-scene bperp relative to anchor — same as MintPy display
        # (negative = scene orbited closer than anchor, positive = further)
        bperp_pos: dict[SceneID, float] = {
            s: float(scene_baselines.get(s, 0.0)) for s in scenes
        }
    else:
        # Fallback: reconstruct from pairwise table (loses sign info, may trend upward)
        bperp_accum: dict[SceneID, list[float]] = defaultdict(list)
        for (a, b), (dt, bp) in baselines.items():
            if bp >= _MISSING:
                continue
            bperp_accum[a].append(-bp / 2.0)
            bperp_accum[b].append(+bp / 2.0)
        bperp_pos = {
            s: float(np.mean(v)) if v else 0.0
            for s, v in bperp_accum.items()
        }
        sorted_by_time = sorted(scenes, key=lambda s: id_days[s])
        offset = bperp_pos.get(sorted_by_time[0], 0.0)
        bperp_pos = {s: bperp_pos.get(s, 0.0) - offset for s in scenes}

    pos: dict[SceneID, tuple[float, float]] = {
        s: (id_days[s], bperp_pos[s]) for s in scenes
    }

    # ── 4. Visual attributes ──────────────────────────────────────────────
    degrees      = dict(G.degree())
    max_deg      = max(degrees.values(), default=1)
    node_colours = [plt.cm.RdYlGn(degrees[n] / max_deg) for n in G.nodes()]

    edge_dts     = [G[a][b]["dt"] for a, b in G.edges()]
    max_dt       = max((d for d in edge_dts if d < _MISSING), default=1.0)
    edge_colours = [plt.cm.RdYlGn_r(min(dt, max_dt) / max_dt) for dt in edge_dts]
    edge_widths  = [0.5 + 2.5 * (1.0 - min(dt, max_dt) / max_dt) for dt in edge_dts]

    if isinstance(pairs, dict):
        group_keys  = list(pairs.keys())
        linestyles  = ["-", "--", "-.", ":"] * (len(group_keys) // 4 + 1)
        key_style   = {k: linestyles[i] for i, k in enumerate(group_keys)}
        edge_styles = [
            key_style[(G[a][b]["path"], G[a][b]["frame"])] for a, b in G.edges()
        ]
    else:
        edge_styles = ["-"] * len(G.edges())

    # ── 5. Figure layout ──────────────────────────────────────────────────
    fig = plt.figure(figsize=figsize)
    gs  = fig.add_gridspec(1, 2, width_ratios=[3, 1], wspace=0.35)
    ax_net  = fig.add_subplot(gs[0])
    ax_hist = fig.add_subplot(gs[1])

    # ── 6. Draw network ───────────────────────────────────────────────────
    edges_by_style: dict[str, list] = defaultdict(list)
    for (a, b), style, colour, width in zip(
        G.edges(), edge_styles, edge_colours, edge_widths
    ):
        edges_by_style[style].append((a, b, colour, width))

    for style, edge_data in edges_by_style.items():
        nx.draw_networkx_edges(
            G, pos, ax=ax_net,
            edgelist=[(a, b) for a, b, _, _ in edge_data],
            edge_color=[c for _, _, c, _ in edge_data],
            width=[w for _, _, _, w in edge_data],
            style=style,
            alpha=0.7,
        )

    nx.draw_networkx_nodes(
        G, pos, ax=ax_net,
        node_color=node_colours,
        node_size=80,
        linewidths=0.5,
        edgecolors="black",
    )
    nx.draw_networkx_labels(
        G, pos,
        labels={s: s[-8:] for s in G.nodes()},
        ax=ax_net,
        font_size=5,
    )

    # ── 7. Network axes ───────────────────────────────────────────────────
    ax_net.set_xlabel("Days since first acquisition", fontsize=11)
    ax_net.set_ylabel("Perpendicular baseline [m]", fontsize=11)    # ✅ real unit
    ax_net.set_title(
        f"{title}\n{subtitle}\n"
        f"{len(scenes)} scenes · {len(flat_pairs)} pairs · "
        f"mean degree {np.mean(list(degrees.values())):.1f}",
        fontsize=11,
    )
    ax_net.tick_params(left=True, bottom=True, labelleft=True, labelbottom=True)
    ax_net.set_frame_on(True)

    # real date ticks on top axis
    x_vals  = [p[0] for p in pos.values()]
    x_ticks = np.linspace(min(x_vals), max(x_vals), min(8, len(pos)))
    ax2 = ax_net.twiny()
    ax2.set_xlim(ax_net.get_xlim())
    ax2.set_xticks(x_ticks)
    ax2.set_xticklabels(
        [
            (t0 + __import__("datetime").timedelta(days=d)).strftime("%Y-%m-%d")
            for d in x_ticks
        ],
        rotation=30, ha="left", fontsize=7,
    )
    ax2.set_xlabel("Acquisition date (UTC)", fontsize=9)

    # ── 8. Per-scene connection histogram ─────────────────────────────────
    # Sort scenes by date so the histogram reads chronologically top→bottom
    sorted_scene_names = sorted(scenes, key=lambda s: id_days[s])
    scene_degrees      = [degrees[s] for s in sorted_scene_names]
    short_names        = [s[-12:] for s in sorted_scene_names]   # trim for readability
    y_positions        = range(len(sorted_scene_names))

    bar_colours = [plt.cm.RdYlGn(degrees[s] / max_deg) for s in sorted_scene_names]

    bars = ax_hist.barh(
        y_positions,
        scene_degrees,
        color=bar_colours,
        edgecolor="white",
        linewidth=0.4,
        height=0.7,
    )

    # annotate each bar with connection count
    for bar, count in zip(bars, scene_degrees):
        ax_hist.text(
            bar.get_width() + 0.1,
            bar.get_y() + bar.get_height() / 2,
            str(count),
            va="center", fontsize=7,
        )

    # vertical line at mean degree
    mean_deg = np.mean(scene_degrees)
    ax_hist.axvline(
        mean_deg, color="steelblue", linestyle="--", linewidth=1.0, alpha=0.8
    )
    ax_hist.text(
        mean_deg + 0.1, len(sorted_scene_names) - 0.5,
        f"mean\n{mean_deg:.1f}",
        color="steelblue", fontsize=7, va="top",
    )

    # mark scenes below min connectivity in red
    for i, (s, deg) in enumerate(zip(sorted_scene_names, scene_degrees)):
        if deg < 2:
            ax_hist.get_children()[i].set_edgecolor("red")
            ax_hist.get_children()[i].set_linewidth(1.5)

    ax_hist.set_yticks(y_positions)
    ax_hist.set_yticklabels(short_names, fontsize=6)
    ax_hist.set_xlabel("Number of connections", fontsize=9)
    ax_hist.set_title("Connections\nper scene", fontsize=10)
    ax_hist.xaxis.set_major_locator(plt.MaxNLocator(integer=True))
    ax_hist.set_frame_on(True)
    # match vertical order to network: earliest at top
    ax_hist.invert_yaxis()

    # ── 9. Legends ────────────────────────────────────────────────────────
    deg_legend = ax_net.legend(
        handles=[
            mpatches.Patch(color=plt.cm.RdYlGn(v / max_deg), label=f"degree {v}")
            for v in sorted(set(degrees.values()))
        ],
        title="Node degree", loc="upper left", fontsize=7, title_fontsize=8,
    )
    ax_net.add_artist(deg_legend)

    ax_net.legend(
        handles=[
            mpatches.Patch(
                color=plt.cm.RdYlGn_r(v / max_dt), label=f"{v:.0f} days"
            )
            for v in [0, max_dt * 0.33, max_dt * 0.66, max_dt]
        ],
        title="Temporal baseline", loc="lower right", fontsize=7, title_fontsize=8,
    )

    if isinstance(pairs, dict):
        ax_net.add_artist(
            ax_net.legend(
                handles=[
                    mpatches.Patch(
                        linestyle=key_style[k], fill=False,
                        edgecolor="grey", label=f"P{k[0]}/F{k[1]}",
                    )
                    for k in group_keys
                ],
                title="Path / Frame", loc="upper right", fontsize=7, title_fontsize=8,
            )
        )

    if save_path:
        fig.savefig(save_path.as_posix(), dpi=300, bbox_inches="tight")
        print(f"Saved → {save_path}")

    return fig

Earth Credit Pool

If user have multiple Earthdata credentials, user may storage it under ~/.credit_pool with format

username1:password1
username2:password2
then read use:
from isnarscript.utils import earth_credit_pool
ec_pool = earth_credit_pool()
You may then pass this into processor for seameless switch across multiple Earthdata credentials

from insarhub import Processor
processor= Processor.create('Hyp3_InSAR', earthdata_credentials_pool=ec_pool, ....)

Parameters:

Name Type Description Default
earthdata_credentials_pool_path Path

Path to the Earthdata credentials file. Defaults to ~/.credit_pool. The path is expanded and resolved to an absolute path.

joinpath('.credit_pool')

Raises:

Type Description
FileNotFoundError

If the specified credentials file does not exist.

ValueError

If any line in the file does not contain a single ':' separating key and value.

OSError

For any other I/O related errors while reading the file.

Source code in src/insarhub/utils/tool.py
def earth_credit_pool(earthdata_credentials_pool_path = Path.home().joinpath('.credit_pool')) -> dict:
    """
    Load Earthdata credentials from a local credit pool file.

    The function reads a simple key-value file where each line contains
    `username:password` (or `key:value`) pairs, and returns them as a dictionary.

    Args:
        earthdata_credentials_pool_path (Path, optional):
            Path to the Earthdata credentials file. Defaults to
            `~/.credit_pool`. The path is expanded and resolved to an absolute path.

    Returns:
        dict:
            Dictionary mapping credential keys to their corresponding values.
            Example:
            ```
            {
                "username1": "password1",
                "username2": "password2",
            }
            ```

    Raises:
        FileNotFoundError:
            If the specified credentials file does not exist.
        ValueError:
            If any line in the file does not contain a single ':' separating key and value.
        OSError:
            For any other I/O related errors while reading the file.

    Notes:
        - Each line of the file must be formatted as `key:value`.
        - Leading/trailing whitespace is stripped from both key and value.
        - Useful for managing multiple Earthdata credentials for automated downloads.
    """
    earthdata_credentials_pool_path = Path(earthdata_credentials_pool_path).expanduser().resolve()
    earthdata_credentials_pool = {}
    with open(earthdata_credentials_pool_path, 'r') as f:
        for line in f:
            key, value = line.strip().split(':')
            earthdata_credentials_pool[key] = value
    return earthdata_credentials_pool

Slurm Job Config

This class encapsulates all parameters needed to generate a SLURM batch script, including resource allocation, job settings, environment configuration, and execution commands.

from insarhub.utils import Slurmjob_Config
config = SlurmJobConfig(
            job_name="my_analysis",
            time="02:00:00",
            command="python analyze.py"
        )
config.to_script("analysis.slurm")

Attributes:

Name Type Description
job_name str

Name of the SLURM job.

output_file str

Path for standard output. Use %j for job ID.

error_file str

Path for standard error. Use %j for job ID.

time str

Maximum wall time in HH:MM:SS format.

partition str

SLURM partition name to submit to.

nodes int

Number of nodes to allocate.

ntasks int

Number of tasks to run.

cpus_per_task int

CPUs per task.

mem str

Memory allocation per node (e.g., "4G", "500M").

nodelist Optional[str]

Specific nodes to use (e.g., "node[01-05]").

gpus Optional[str]

GPU allocation (e.g., "1", "2", "1g").

array Optional[str]

Array job specification (e.g., "0-9", "1-100%10").

dependency Optional[str]

Job dependency condition (e.g., "afterok:123456").

mail_user Optional[str]

Email address for job notifications.

mail_type str

When to send email notifications (BEGIN, END, FAIL, ALL).

account Optional[str]

Account to charge resources to.

qos Optional[str]

Quality of Service specification.

modules List[str]

List of environment modules to load.

conda_env Optional[str]

Name of conda environment to activate.

export_env Dict[str, str]

Dictionary of environment variables to export.

command str

Bash command(s) to execute.

Source code in src/insarhub/utils/tool.py
@dataclass
class Slurmjob_Config:
    """Configuration for a SLURM job submission script.

    This class encapsulates all parameters needed to generate a SLURM batch script,
    including resource allocation, job settings, environment configuration, and
    execution commands.

    Attributes:
        job_name: Name of the SLURM job.
        output_file: Path for standard output. Use %j for job ID.
        error_file: Path for standard error. Use %j for job ID.
        time: Maximum wall time in HH:MM:SS format.
        partition: SLURM partition name to submit to.
        nodes: Number of nodes to allocate.
        ntasks: Number of tasks to run.
        cpus_per_task: CPUs per task.
        mem: Memory allocation per node (e.g., "4G", "500M").
        nodelist: Specific nodes to use (e.g., "node[01-05]").
        gpus: GPU allocation (e.g., "1", "2", "1g").
        array: Array job specification (e.g., "0-9", "1-100%10").
        dependency: Job dependency condition (e.g., "afterok:123456").
        mail_user: Email address for job notifications.
        mail_type: When to send email notifications (BEGIN, END, FAIL, ALL).
        account: Account to charge resources to.
        qos: Quality of Service specification.
        modules: List of environment modules to load.
        conda_env: Name of conda environment to activate.
        export_env: Dictionary of environment variables to export.
        command: Bash command(s) to execute.

    Examples:
        Basic job configuration:

        >>> config = SlurmJobConfig(
        ...     job_name="my_analysis",
        ...     time="02:00:00",
        ...     command="python analyze.py"
        ... )
        >>> config.to_script("analysis.slurm")
        PosixPath('analysis.slurm')

        GPU job with conda environment:

        >>> config = SlurmJobConfig(
        ...     job_name="training",
        ...     time="12:00:00",
        ...     mem="32G",
        ...     gpus="2",
        ...     conda_env="pytorch",
        ...     modules=["cuda/11.8"],
        ...     command="python train.py --epochs 100"
        ... )
        >>> config.to_script("train.slurm")
        PosixPath('train.slurm')

        Array job with environment variables:

        >>> config = SlurmJobConfig(
        ...     job_name="param_sweep",
        ...     array="0-99",
        ...     export_env={"PARAM_ID": "$SLURM_ARRAY_TASK_ID"},
        ...     command="python run_experiment.py $PARAM_ID"
        ... )
        >>> config.to_script()
        PosixPath('job.slurm')
    """
    job_name: str = "my_job"
    output_file: str = "job_%j.out"
    error_file: str = "job_%j.err"
    time: str = "04:00:00"
    partition: str = "all"
    nodes: int = 1
    ntasks: int = 1
    cpus_per_task: int = 1
    mem: str = "4G"

    # Optional parameters
    nodelist: Optional[str] = None
    gpus: Optional[str] = None
    array: Optional[str] = None
    dependency: Optional[str] = None
    mail_user: Optional[str] = None
    mail_type: str = "ALL"
    account: Optional[str] = None
    qos: Optional[str] = None

    # Environment
    modules: List[str] = field(default_factory=list)
    conda_env: Optional[str] = None
    export_env: Dict[str, str] = field(default_factory=dict)

    # Execution
    command: str = "echo Hello SLURM!"

    def to_script(self, filename: str = "job.slurm") -> Path:
        """Generate the SLURM script file."""
        lines = ["#!/bin/bash"]

        # Required directives
        lines.extend([
            f"#SBATCH --job-name={self.job_name}",
            f"#SBATCH --output={self.output_file}",
            f"#SBATCH --error={self.error_file}",
            f"#SBATCH --time={self.time}",
            f"#SBATCH --partition={self.partition}",
            f"#SBATCH --nodes={self.nodes}",
            f"#SBATCH --ntasks={self.ntasks}",
            f"#SBATCH --cpus-per-task={self.cpus_per_task}",
            f"#SBATCH --mem={self.mem}",
        ])

        # Optional directives
        if self.gpus:
            lines.append(f"#SBATCH --gres=gpu:{self.gpus}")
        if self.array:
            lines.append(f"#SBATCH --array={self.array}")
        if self.dependency:
            lines.append(f"#SBATCH --dependency={self.dependency}")
        if self.mail_user:
            lines.append(f"#SBATCH --mail-user={self.mail_user}")
            lines.append(f"#SBATCH --mail-type={self.mail_type}")
        if self.account:
            lines.append(f"#SBATCH --account={self.account}")
        if self.qos:
            lines.append(f"#SBATCH --qos={self.qos}")
        if self.nodelist:
            lines.append(f"#SBATCH --nodelist={self.nodelist}")

        lines.append("")

        # Environment setup
        lines.extend([f"module load {mod}" for mod in self.modules])
        if self.conda_env:
            lines.append(f"source activate {self.conda_env}")
        lines.extend([f"export {k}={v}" for k, v in self.export_env.items()])

        lines.append("")

        # Execution
        lines.extend([
            'echo "Starting job on $(date)"',
            self.command,
            'echo "Job finished on $(date)"'
        ])

        filepath = Path(filename).expanduser().resolve()
        filepath.write_text("\n".join(lines))

        return filepath

⚠️ Major Redesign

InSARScript v1.1.0 has change of APIs, this documentation is not compatible with version v1.0.0.