Skip to content

Downloader

The InSARHub Downloader module provides a streamlined interface for searching and downloading satellite data.

  • Import downloader

    Import the Downloader class to access all downloader functionality

    from insarhub import Downloader
    

  • View available downloaders

    List all registered downloaders

    Downloader.available()
    

Available Downloaders

InSARHub wrapped asf_search as one of its download backends. The ASF_Base_Downloader is implemented on top of a reusable base configuration class, which provides the full searching, filtering, and downloading logic of asf_search.

Source code in src/insarhub/downloader/asf_base.py
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
class ASF_Base_Downloader(BaseDownloader):
    """
    Simplify searching and downloading satellite data using ASF Search API.
    """
    description = "Generic ASF Search API downloader. Supports Sentinel-1, ALOS, NISAR, and more."
    default_config = ASF_Base_Config
    _DATASET_GROUP_KEYS = {
        'SENTINEL-1': ('pathNumber', 'frameNumber'),
        'ALOS':       ('pathNumber', 'frameNumber'),
        'NISAR':      ('pathNumber', 'frameID'),
        'BURST':      ('pathNumber', 'burstID'),
    }
    _DATASET_PROPERTY_KEYS = {
        'SENTINEL-1': {
            'relativeOrbit': 'pathNumber',
            'absoluteOrbit': 'absoluteOrbit',
            'polarization':  'polarization',
            'flightDirection': 'flightDirection',
        },
        'ALOS': {
            'relativeOrbit': 'pathNumber',
            'absoluteOrbit': 'absoluteOrbit',
            'polarization':  'polarization',
            'flightDirection': 'flightDirection',
        },
        'NISAR': {
            'relativeOrbit': 'relativeOrbit',
            'absoluteOrbit': 'absoluteOrbit',
            'polarization':  'polarization',
            'flightDirection': 'flightDirection',
        },
    }

    def __init__(self, config: ASF_Base_Config | None = None): 

        """
        Initialize the Downloader with search parameters. Options was adapted from asf_search searching api. 
        You may check https://docs.asf.alaska.edu/asf_search/searching/ for more info, below only list customized parameters.
        """
        print(f"""
This downloader relies on the ASF API. Please ensure you to create an account at https://search.asf.alaska.edu/. 
If a .netrc file is not provide under your home directory, you will be prompt to enter your ASF username and password. 
Check documentation for how to setup .netrc file.\n""")
        super().__init__(config)

        if self.config.dataset is None and self.config.platform is None and not getattr(self.config, 'granule_names', None):
            raise ValueError(f"{Fore.RED}Dataset or platform must be specified for ASF search (or provide granule_names).")

        self.config.intersectsWith = _to_wkt(self.config.intersectsWith)


    def _asf_authorize(self):
        self._has_asf_netrc = self._check_netrc(keyword='machine urs.earthdata.nasa.gov')
        if not self._has_asf_netrc:
            while True:
                _username = input("Enter your ASF username: ")
                _password = getpass.getpass("Enter your ASF password: ")
                try:
                    self._session = asf.ASFSession().auth_with_creds(_username, _password)
                except ASFAuthenticationError:
                    print(f"{Fore.RED}Authentication failed. Please check your credentials and try again.\n")
                    continue
                print(f"{Fore.GREEN}Authentication successful.\n")
                netrc_path = Path.home().joinpath(".netrc")
                asf_entry = f"\nmachine urs.earthdata.nasa.gov\n    login {_username}\n    password {_password}\n"
                with open(netrc_path, 'a') as f:
                    f.write(asf_entry)
                print(f"{Fore.GREEN}Credentials saved to {netrc_path}. You can now use the downloader without entering credentials again.\n")
                break
        else:
            self._session = asf.ASFSession()

    def _check_netrc(self, keyword: str) -> bool:
        """Check if .netrc file exists in the home directory with the specified keyword.

        Args:
            keyword (str): The machine name to search for in .netrc file.

        Returns:
            bool: True if .netrc file exists and contains the keyword, False otherwise.
        """
        netrc_path = Path.home().joinpath('.netrc')
        if not netrc_path.is_file():            
            print(f"{Fore.RED}No .netrc file found in your home directory. Will prompt login.\n")
            return False
        else: 
            with netrc_path.open() as f:
                content = f.read()
                if keyword in content:
                    return True
                else:
                    print(f"{Fore.RED}no machine name {keyword} found .netrc file. Will prompt login.\n")
                    return False


    def _get_group_key(self, result) -> tuple:
        """Derive grouping key based on available properties, with fallback.

        Args:
            result: Search result object containing properties.

        Returns:
            tuple: A tuple of (path_number, frame_identifier) used for grouping results.
        """
        props = result.properties
        # Burst product — any burst ID field set in config takes highest priority
        if any([
            self.config.absoluteBurstID,
            self.config.fullBurstID,
            self.config.operaBurstID,
            self.config.relativeBurstID,
        ]):
            return (props.get('pathNumber'), props.get('burstID'))

        if self.config.asfFrame is not None:
            # 'asfFrame' is a search filter parameter, not a scene property name.
            # Use 'frameNumber' (the actual returned property) for consistent grouping.
            return (props.get('pathNumber'), props.get('frameNumber'))

        if self.config.frame is not None:
            return (props.get('pathNumber'), props.get('frameNumber'))

        # Dataset-level mapping
        if self.config.dataset:
            datasets = [self.config.dataset] if isinstance(self.config.dataset, str) else self.config.dataset
            for ds in datasets:
                ds_upper = ds.upper()
                if ds_upper in self._DATASET_GROUP_KEYS:
                    pk, fk = self._DATASET_GROUP_KEYS[ds_upper]
                    return (props.get(pk), props.get(fk))
        # Platform-level fallback mapping      
        if self.config.platform:
            platforms = [self.config.platform] if isinstance(self.config.platform, str) else self.config.platform
            for pl in platforms:
                pl_upper = pl.upper()
                if 'SENTINEL' in pl_upper:
                    return (props.get('pathNumber'), props.get('frameNumber'))
                if 'ALOS' in pl_upper:
                    return (props.get('pathNumber'), props.get('frameNumber'))
                if 'NISAR' in pl_upper:
                    return (props.get('pathNumber'), props.get('frameID'))
        # last resort — group everything under the platform name
        return (props.get('pathNumber'), props.get('frameNumber'))

    def _get_property_keys(self) -> dict:
        """Return the correct result.properties key mapping based on config.

        Returns:
            dict: Mapping of property names to their corresponding keys in search results.
        """
        if self.config.dataset:
            datasets = [self.config.dataset] if isinstance(self.config.dataset, str) else self.config.dataset
            for ds in datasets:
                ds_upper = ds.upper()
                if ds_upper in self._DATASET_PROPERTY_KEYS:
                    return self._DATASET_PROPERTY_KEYS[ds_upper]

        if self.config.platform:
            platforms = [self.config.platform] if isinstance(self.config.platform, str) else self.config.platform
            for pl in platforms:
                if 'SENTINEL' in pl.upper():
                    return self._DATASET_PROPERTY_KEYS['SENTINEL-1']
                if 'ALOS' in pl.upper():
                    return self._DATASET_PROPERTY_KEYS['ALOS']
                if 'NISAR' in pl.upper():
                    return self._DATASET_PROPERTY_KEYS['NISAR']

        # Default to Sentinel-1 keys as they are most common
        return self._DATASET_PROPERTY_KEYS['SENTINEL-1']

    @property
    def session(self):
        """Get or create an authenticated ASF session.

        Returns:
            ASFSession: Authenticated session for ASF downloads.
        """
        if not hasattr(self, '_session') or self._session is None:
            self._asf_authorize()
        return self._session

    @property
    def active_results(self):
        """Get the currently active results (filtered or full search results).

        Returns the subset of results if a filter/pick is active, 
        otherwise returns the full search results.

        Returns:
            dict: Dictionary of active search results grouped by (path, frame).

        Raises:
            ValueError: If no search results are available.
        """
        if not hasattr(self, 'results'):
             raise ValueError(f"{Fore.RED}No search results found. Please run search() first.")
        return self._subset if self._subset is not None else self.results

    def search(self) -> dict:
        """Search for data using the ASF Search API with the provided parameters.

        When ``config.granule_names`` is set the search is performed by granule
        name instead of the normal parameter search.  ``granule_names`` may be:

        * A ``list[str]`` of scene/granule names (with or without extensions).
        * A ``str`` containing a single name, a comma-separated list of names,
          or a path to a CSV / XLSX / TXT file on disk.

        Returns:
            dict: Dictionary of search results grouped by (path, frame) tuples.

        Raises:
            ValueError: If search returns no results.
            Exception: If search fails after 10 retry attempts.
        """
        self._subset = None

        granule_names = getattr(self.config, 'granule_names', None)
        if granule_names:
            print(f"{Fore.GREEN}Granule_names provided, Performing search by granule name(s) from {self.config.granule_names}...{Fore.RESET}")
            from insarhub.utils.tool import parse_scene_names_from_file
            raw_inputs = granule_names if isinstance(granule_names, list) else [n.strip() for n in granule_names.split(',') if n.strip()]
            names: list[str] = []
            for item in raw_inputs:
                p = Path(item)
                if p.exists():
                    names.extend(parse_scene_names_from_file(str(p)))
                else:
                    names.append(item)
            return self._search_by_name(names)

        print(f"Searching for SLCs....")
        search_opts = {k: v for k, v in asdict(self.config).items()
                       if v is not None and k not in ['workdir', 'name', 'bbox', 'granule_names']}

        for attempt in range(1, 11):
            try:
                self.results = asf.search(**search_opts)
                break
            except Exception as e:
                print(f"{Fore.RED}Search failed: {e}")
                if attempt == 10:
                    raise
                time.sleep(2 ** attempt)

        if not self.results:
            raise ValueError(f'{Fore.RED}Search does not return any result, please check input parameters or Internet connection')
        else:
            print(f"{Fore.GREEN} -- A total of {len(self.results)} results found. \n")

        grouped = defaultdict(list)
        for result in self.results:
            key = self._get_group_key(result)
            grouped[key].append(result)
        self.results = grouped
        if len(grouped) > 1:
            print(f"{Fore.YELLOW}The AOI crosses {len(grouped)} stacks")
        return grouped

    def _search_by_name(self, scene_names: list[str]) -> dict:
        """Populate results from a list of scene/granule names or filenames.

        Accepts names with or without file extensions (e.g. ``.zip``).
        Uses ``asf_search.granule_search()`` so no config parameters are needed
        and works for any ASF-supported dataset (S1 SLC, S1 Burst, ALOS, etc.).

        Args:
            scene_names: Scene or filename strings, e.g.
                ``["S1A_IW_SLC__1SDV_20201227T133500_..._5DB4",
                   "S1A_IW_SLC__1SDV_20201227T133500_..._5DB4.zip"]``

        Returns:
            Grouped results dict keyed by ``(relativeOrbit, frame)``.
        """
        # Strip common file extensions so granule_search can find them
        clean = [Path(n).stem if '.' in n else n for n in scene_names]
        raw = asf.granule_search(clean)
        if not raw:
            raise ValueError(f"No ASF results found for the {len(clean)} provided scene name(s).")

        # granule_search returns all product types per granule (SLC + METADATA_SLC, etc.).
        # Exclude metadata-only products, then deduplicate by sceneName.
        _EXCLUDE_LEVELS = {'METADATA_SLC', 'METADATA'}
        seen: set[str] = set()
        deduped = []
        for result in raw:
            if result.properties.get('processingLevel', '') in _EXCLUDE_LEVELS:
                continue
            sname = result.properties.get('sceneName', '')
            if sname not in seen:
                seen.add(sname)
                deduped.append(result)

        grouped: dict = defaultdict(list)
        for result in deduped:
            key = self._get_group_key(result)
            grouped[key].append(result)
        self.results = grouped
        print(f"{Fore.GREEN} -- Found {len(deduped)} scenes across {len(grouped)} stack(s).\n")
        if len(deduped) < len(clean):
            missing = len(clean) - len(deduped)
            print(f"{Fore.YELLOW} -- {missing} scene(s) not found on ASF.\n")
        return grouped

    def reset(self):
        """Reset the view to include all search results.

        Clears any active filters and restores the full result set.
        """
        self._subset = None
        print(f"{Fore.GREEN}Selection reset. Now viewing all {len(self.results)} stacks.")

    def summary(self, ls=False):
        """Summarize the active results, separated by flight direction.

        Args:
            ls (bool, optional): If True, list individual scene names and dates. 
                Defaults to False.
        """
        if not hasattr(self, 'results'):
            self.search()

        active_results = self.active_results

        if not active_results:
            print(f"{Fore.YELLOW}No results to summarize.")
            return

        ascending_stacks = {}
        descending_stacks = {}

        for key, items in active_results.items():
            if not items: continue
            direction = items[0].properties.get('flightDirection', 'UNKNOWN').upper()

            if direction == 'ASCENDING':
                ascending_stacks[key] = items
            elif direction == 'DESCENDING':
                descending_stacks[key] = items

        def _print_group(label, data_dict, color_code):
            if not data_dict:
                return
            print(f"\n{color_code}=== {label} ORBITS ({len(data_dict)} Stacks) ==={Fore.RESET}")
            sorted_keys = sorted(data_dict.keys())

            for key in sorted_keys:
                    items = data_dict[key]
                    count = len(items)

                    # Calculate time range
                    dates = [isoparse(i.properties['startTime']) for i in items]
                    start_date = min(dates).date()
                    end_date = max(dates).date()

                    print(f"relativeOrbit {key[0]} frame {key[1]} | Count: {count} | {start_date} --> {end_date}")

                    if ls:
                        # Sort scenes by date
                        items_sorted = sorted(items, key=lambda x: isoparse(x.properties['startTime']))
                        for scene in items_sorted:
                            scene_date = isoparse(scene.properties['startTime']).date()
                            print(f"    {Fore.LIGHTBLACK_EX}{scene.properties['sceneName']} ({scene_date}){Fore.RESET}")
        if ascending_stacks:
            _print_group("ASCENDING", ascending_stacks, Fore.MAGENTA)

        if descending_stacks:
            _print_group("DESCENDING", descending_stacks, Fore.CYAN)

        print("") # Final newline


    def footprint(self, save_path: str | None = None):
        """Display or save the search result footprints and AOI using matplotlib.

        Args:
            save_path (str, optional): Path to save the figure. If None, displays interactively.
                Defaults to None.
        """
        results_to_plot = self.active_results
        if not results_to_plot:
            print(f"{Fore.RED}No results to plot.")
            return

        transformer = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True)
        N = len(results_to_plot)
        cmap = plt.cm.get_cmap('hsv', N+1)

        fig, ax = plt.subplots(1, 1, figsize=(10,10), dpi=150)

        geom_aoi = transform(transformer.transform, wkt.loads(self.config.intersectsWith))
        global_minx, global_miny, global_maxx, global_maxy = geom_aoi.bounds
        plotting.plot_polygon(geom_aoi, ax=ax, edgecolor='red', facecolor='none', linewidth=2, linestyle='--')

        label_x_aoi = global_maxx - 0.01 * (global_maxx - global_minx)
        label_y_aoi = global_maxy - 0.01 * (global_maxy - global_miny)
        plt.text(label_x_aoi, label_y_aoi,
             f"AOI",
             horizontalalignment='right', verticalalignment='top',
             fontsize=12, color='red', fontweight='bold',
             bbox=dict(facecolor='white', alpha=0.7, edgecolor='none', boxstyle='round,pad=0.3'))

        for i, (key, results) in enumerate(results_to_plot.items()):
            geom = transform(transformer.transform, shape(results[0].geometry))
            minx, miny, maxx, maxy = geom.bounds

            global_minx = min(global_minx, minx)
            global_miny = min(global_miny, miny)
            global_maxx = max(global_maxx, maxx)
            global_maxy = max(global_maxy, maxy)

            label_x = maxx - 0.01 * (maxx - minx)
            label_y = maxy - 0.01 * (maxy - miny)

            plt.text(label_x, label_y,
             f"Path: {key[0]}\nFrame: {key[1]}\nStack: {len(results)}",
             horizontalalignment='right', verticalalignment='top',
             fontsize=12, color=cmap(i), fontweight='bold',
             bbox=dict(facecolor='white', alpha=0.7, edgecolor='none', boxstyle='round,pad=0.3'))

            for result in results:
                geom = transform(transformer.transform, shape(result.geometry))
                x, y = geom.exterior.xy
                ax.plot(x, y, color=cmap(i))

        ctx.add_basemap(ax, source=ctx.providers.OpenStreetMap.Mapnik)

        ax.set_xlim(global_minx, global_maxx)
        ax.set_ylim(global_miny, global_maxy)

        ax.set_axis_off()
        if save_path is not None:
            save_path = Path(save_path).expanduser().resolve()
            plt.savefig(save_path.as_posix(), dpi=300, bbox_inches='tight')
            print(f"Footprint figure saved to {save_path}")
        else:
            plt.subplots_adjust(top = 1, bottom = 0, right = 1, left = 0, hspace = 0, wspace = 0)
            plt.show()

    def filter(self, 
                path_frame : tuple | list[tuple] | None = None,
                start: str | None = None,
                end: str | None = None,
                frame: int | list[int] | None = None, 
                asfFrame: int | list[int] | None = None, 
                flightDirection: str | None = None,
                relativeOrbit: int | list[int] | None = None,
                absoluteOrbit: int | list[int] | None = None,
                lookDirection: str | None = None,
                polarization: str | list[str] | None = None,
                processingLevel: str | None = None,
                beamMode: str | None = None,
                season: list[int] | None = None,
                min_coverage: float | None = None,
                min_count: int | None = None,
                max_count: int | None = None,
                latest_n: int | None = None,
                earliest_n: int | None = None
               ) -> dict:
        """Filter active results by various properties after search.

        Args:
            path_frame (tuple | list[tuple], optional): A single (path, frame) tuple or list of tuples.
                Defaults to None.
            start (str, optional): Start date string, e.g. '2021-01-01'. Defaults to None.
            end (str, optional): End date string, e.g. '2023-12-31'. Defaults to None.
            frame (int | list[int], optional): Sensor native frame number(s), e.g. 50. Defaults to None.
            asfFrame (int | list[int], optional): ASF internal frame number(s), e.g. 50. Defaults to None.
            flightDirection (str, optional): 'ASCENDING' or 'DESCENDING'. Defaults to None.
            relativeOrbit (int | list[int], optional): Relative orbit number(s) to keep. Defaults to None.
            absoluteOrbit (int | list[int], optional): Absolute orbit number(s) to keep. Defaults to None.
            lookDirection (str, optional): 'LEFT' or 'RIGHT'. Defaults to None.
            polarization (str | list[str], optional): Polarization(s) to keep, e.g. 'VV' or ['VV', 'VH']. 
                Defaults to None.
            processingLevel (str, optional): Processing level to keep, e.g. 'SLC'. Defaults to None.
            beamMode (str, optional): Beam mode to keep, e.g. 'IW'. Defaults to None.
            season (list[int], optional): List of months (1-12) to keep, e.g. [6, 7, 8] for summer. 
                Defaults to None.
            min_coverage (float, optional): Minimum fractional overlap (0-1) between scene and AOI. 
                Defaults to None.
            min_count (int, optional): Drop stacks with fewer than this many scenes after filtering. 
                Defaults to None.
            max_count (int, optional): Keep at most this many scenes per stack (from earliest). 
                Defaults to None.
            latest_n (int, optional): Keep the N most recent scenes per stack. Defaults to None.
            earliest_n (int, optional): Keep the N earliest scenes per stack. Defaults to None.

        Returns:
            dict: Filtered results grouped by (path, frame).

        Raises:
            ValueError: If no search results are available.
        """

        if not hasattr(self, 'results'):
            raise ValueError(f"{Fore.RED}No search results found. Please run search() first.")

        source = self.active_results
        filtered = defaultdict(list)
        prop_keys = self._get_property_keys()

        # --- Pre-process filter values ---
        if path_frame is not None:
            targets = {path_frame} if isinstance(path_frame, tuple) else set(path_frame)
        else:
            targets = None

        start_dt = isoparse(start).replace(tzinfo=None) if start else None
        end_dt   = isoparse(end).replace(tzinfo=None)   if end   else None
        frames     = {frame}    if isinstance(frame, int)    else set(frame)    if frame    else None
        asf_frames = {asfFrame} if isinstance(asfFrame, int) else set(asfFrame) if asfFrame else None
        relative_orbits  = {relativeOrbit}  if isinstance(relativeOrbit, int)  else set(relativeOrbit)  if relativeOrbit  else None
        absolute_orbits  = {absoluteOrbit}  if isinstance(absoluteOrbit, int)  else set(absoluteOrbit)  if absoluteOrbit  else None
        polarizations    = {polarization}   if isinstance(polarization, str)   else set(polarization)   if polarization   else None
        season_months    = set(season) if season else None

        if min_coverage is not None:
            aoi_geom = wkt.loads(self.config.intersectsWith)

        for key, items in source.items():
            if targets is not None and key not in targets:
                continue

            if flightDirection:
                stack_dir = items[0].properties.get('flightDirection', '').upper()
                if stack_dir != flightDirection.upper():
                    continue

            if lookDirection:
                stack_look = items[0].properties.get('lookDirection', '').upper()
                if stack_look != lookDirection.upper():
                    continue

            if beamMode:
                stack_beam = items[0].properties.get('beamMode', '').upper()
                if stack_beam != beamMode.upper():
                    continue

            if processingLevel:
                stack_proc = items[0].properties.get('processingLevel', '').upper()
                if stack_proc != processingLevel.upper():
                    continue
        # --- Scene-level filters ---
            filtered_items = []
            for item in items:
                props = item.properties

                scene_dt = isoparse(props['startTime']).replace(tzinfo=None)
                # Date range
                if start_dt and scene_dt < start_dt:
                    continue
                if end_dt and scene_dt > end_dt:
                    continue

                # Native frame filter
                if frames is not None:
                    if props.get('frameNumber') not in frames:
                        continue

                # ASF frame filter
                if asf_frames is not None:
                    if props.get('asfFrame') not in asf_frames:
                        continue
                # Season (month filter)
                if season_months and scene_dt.month not in season_months:
                    continue

                # Relative orbit
                if relative_orbits and props.get(prop_keys['relativeOrbit']) not in relative_orbits:
                    continue

                # Absolute orbit
                if absolute_orbits and props.get(prop_keys['absoluteOrbit']) not in absolute_orbits:
                    continue

                # Polarization — props value may be a string like 'VV+VH'
                if polarizations:
                    scene_pols = set(props.get(prop_keys['polarization'], '').replace('+', ' ').split())
                    if not polarizations.intersection(scene_pols):
                        continue

                if min_coverage is not None:
                    scene_geom = shape(item.geometry)
                    intersection = aoi_geom.intersection(scene_geom)
                    coverage = intersection.area / aoi_geom.area
                    if coverage < min_coverage:
                        continue

                filtered_items.append(item)
            if not filtered_items:
                continue


            filtered_items = sorted(filtered_items, key=lambda x: isoparse(x.properties['startTime']))

            if earliest_n is not None:
                filtered_items = filtered_items[:earliest_n]
            elif latest_n is not None:
                filtered_items = filtered_items[-latest_n:]
            elif max_count is not None:
                filtered_items = filtered_items[:max_count]

            if min_count is not None and len(filtered_items) < min_count:
                print(f"{Fore.YELLOW}Stack Path {key[0]} Frame {key[1]} dropped: only {len(filtered_items)} scenes (min_count={min_count}).")
                continue

            filtered[key] = filtered_items

        if not filtered:
            print(f"{Fore.YELLOW}Warning: No results matched the given filters.")
        else:
            self._subset = filtered
            total_scenes = sum(len(v) for v in filtered.values())
            print(f"{Fore.GREEN}Filter applied. {len(filtered)} stacks, {total_scenes} total scenes remaining.")

        return filtered

    def dem(self, save_path: str | None = None):
        """Download DEM for co-registration uses.

        Args:
            save_path (str, optional): Directory to save DEM files. If None, uses config.workdir.
                Defaults to None.

        Returns:
            tuple: (X, p) where X is the DEM array and p is the rasterio profile.
        """
        output_dir = Path(save_path).expanduser().resolve() if save_path else self.config.workdir
        _dem_is_stack = (output_dir / "insarhub_config.json").exists()

        for key, results in self.active_results.items():
            _dem_sub = Path() if _dem_is_stack else Path(f'p{key[0]}_f{key[1]}')
            download_path = output_dir.joinpath('dem', _dem_sub)
            download_path.mkdir(exist_ok=True, parents=True)
            geom = shape(results[0].geometry)
            west_lon, south_lat, east_lon, north_lat =  geom.bounds
            bbox = [ west_lon, south_lat, east_lon, north_lat]
            X, p = dem_stitcher.stitch_dem(
                bbox, 
                dem_name='glo_30',
                dst_area_or_point='Point',
                dst_ellipsoidal_height=True
            )

            with rio.open(download_path.joinpath(f'dem_p{key[0]}_f{key[1]}.tif'), 'w', **p) as ds:
                    ds.write(X,1)
                    ds.update_tags(AREA_OR_POINT='Point')
        return X, p

    def select_pairs(
        self,
        dt_targets: tuple        = None,
        dt_tol: int              = None,
        dt_max: int              = None,
        pb_max: float            = None,
        min_degree: int          = None,
        max_degree: int          = None,
        force_connect: bool      = None,
        max_workers: int         = None,
        avoid_low_quality_days: bool = None,
        snow_threshold: float    = None,
        precip_mm_threshold: float = None,
        aoi_wkt: str | None = None,
    ) -> tuple:
        """Compute interferogram pairs for all active stacks.

        Args:
            dt_targets (tuple, optional): Target temporal spacings in days. Defaults to (6, 12, 24, 36, 48, 72, 96).
            dt_tol (int, optional): Tolerance in days around each target spacing. Defaults to 3.
            dt_max (int, optional): Maximum temporal baseline in days. Defaults to 120.
            pb_max (float, optional): Maximum perpendicular baseline in meters. Defaults to 150.0.
            min_degree (int, optional): Minimum number of connections per scene. Defaults to 3.
            max_degree (int, optional): Maximum number of connections per scene. Defaults to 5.
            force_connect (bool, optional): Force connectivity for isolated scenes. Defaults to True.
            max_workers (int, optional): Threads for API baseline fallback. Defaults to 4.
            avoid_low_quality_days (bool, optional): Skip scenes with heavy snow or rain. Defaults to True.
            snow_threshold (float, optional): MODIS snow fraction threshold to exclude a scene. Defaults to 0.5.
            precip_mm_threshold (float, optional): 3-day precipitation threshold in mm to exclude a scene. Defaults to 25.0.
            aoi_wkt (str, optional): AOI geometry in WKT for quality scoring. Defaults to search AOI.

        Returns:
            tuple: (pairs, baselines, scene_bperp, prefetch_cache)
                - pairs: dict keyed by (path, frame) for multi-stack, or list for single stack
                - baselines: temporal baselines
                - scene_bperp: perpendicular baselines per scene
                - prefetch_cache: coherence/weather cache dict for downstream use
        """
        # None → pull from the single source of truth
        from insarhub.utils.defaults import SELECT_PAIRS_DEFAULTS as _SP
        if dt_targets             is None: dt_targets             = _SP["dt_targets"]
        if dt_tol                 is None: dt_tol                 = _SP["dt_tol"]
        if dt_max                 is None: dt_max                 = _SP["dt_max"]
        if pb_max                 is None: pb_max                 = _SP["pb_max"]
        if min_degree             is None: min_degree             = _SP["min_degree"]
        if max_degree             is None: max_degree             = _SP["max_degree"]
        if force_connect          is None: force_connect          = _SP["force_connect"]
        if max_workers            is None: max_workers            = _SP["max_workers"]
        if avoid_low_quality_days is None: avoid_low_quality_days = _SP["avoid_low_quality_days"]
        if snow_threshold         is None: snow_threshold         = _SP["snow_threshold"]
        if precip_mm_threshold    is None: precip_mm_threshold    = _SP["precip_mm_threshold"]
        from insarhub.utils.tool import select_pairs as _select_pairs

        if not hasattr(self, 'results'):
            raise ValueError("No search results found. Please run search() first.")

        _aoi_wkt = aoi_wkt or getattr(self.config, "intersectsWith", None)
        _sp_result = _select_pairs(
            self.active_results,
            dt_targets=dt_targets,
            dt_tol=dt_tol,
            dt_max=dt_max,
            pb_max=pb_max,
            min_degree=min_degree,
            max_degree=max_degree,
            force_connect=force_connect,
            max_workers=max_workers,
            avoid_low_quality_days=avoid_low_quality_days,
            snow_threshold=snow_threshold,
            precip_mm_threshold=precip_mm_threshold,
            aoi_wkt=_aoi_wkt,
        )
        pairs      = _sp_result[0]
        baselines  = _sp_result[1]
        scene_bperp: dict = _sp_result[2] if len(_sp_result) > 2 else {}
        prefetch:   dict  = _sp_result[3] if len(_sp_result) > 3 else {}
        return pairs, baselines, scene_bperp, prefetch

    def download(self, save_path: str | None = None, max_workers: int = None,
                 stop_event=None, on_progress=None,
                 scenes=None):
        """Download search results to the specified output directory.

        Args:
            save_path (str, optional): Download path. Defaults to config.workdir.
            max_workers (int, optional): Concurrent downloads (3–5 recommended). Defaults to 3.
            scenes: Restrict download to a subset of scenes. Accepts any of:
                - ``list | set`` of scene name strings
                - The direct output of ``select_pairs()`` — either a
                  ``list[[ref, sec], ...]`` (single-stack) or a
                  ``dict{(path, frame): [[ref, sec], ...]}`` (multi-stack).
                  Unique scene names are extracted automatically.
                  When ``None`` (default) all search results are downloaded.

        Raises:
            ValueError: If no search results are available.
        """
        from insarhub.utils.defaults import DOWNLOAD_DEFAULTS as _DL
        if max_workers is None: max_workers = _DL["max_workers"]
        import json as _json
        from concurrent.futures import ThreadPoolExecutor, as_completed
        from insarhub.utils.tool import write_workflow_marker
        from insarhub.utils.config_io import write_insarhub_config as _wic
        output_dir = Path(save_path).expanduser().resolve() if save_path else self.config.workdir
        output_dir.mkdir(exist_ok=True, parents=True)

        self.download_dir = output_dir

        if not hasattr(self, 'results'):
            raise ValueError(f"{Fore.RED}No search results found. Please run search() first.")

        if stop_event is None:
            stop_event = threading.Event()
        _cfg_base = {k: v for k, v in asdict(self.config).items() if k != 'workdir'}

        scene_filter = _parse_scene_filter(scenes)

        jobs = []
        stack_paths: dict = {}
        _dir_is_stack = (self.download_dir / "insarhub_config.json").exists()
        for key, results in self.active_results.items():
            stack_path = self.download_dir if _dir_is_stack else self.download_dir / f'p{key[0]}_f{key[1]}'
            download_path = stack_path / 'slc'
            download_path.mkdir(parents=True, exist_ok=True)
            stack_paths[key] = download_path
            write_workflow_marker(stack_path, downloader=type(self).name)
            _cfg = {**_cfg_base, 'relativeOrbit': key[0], 'frame': key[1]}
            _wic(stack_path, {"downloader": {"type": type(self).name, "config": _cfg}})
            for result in results:
                if scene_filter is None or result.properties['sceneName'] in scene_filter:
                    jobs.append((key, result, download_path))

        total_jobs   = len(jobs)
        success_count = 0
        failure_count = 0
        failed_files  = []

        active_files: dict[int, Path] = {}
        active_files_lock = threading.Lock()

        total_scenes = sum(len(v) for v in self.active_results.values())
        filter_note  = (f" (filtered to {total_jobs} of {total_scenes})"
                        if scene_filter is not None and total_jobs != total_scenes else "")
        print(f"Downloading {total_jobs} scene(s) across "
              f"{len(self.active_results)} stack(s)"
              f"{filter_note} ({max_workers} concurrent)...\n")

        def _stream_download_interruptible(url, file_path, expected_bytes, 
                                        pbar_position, scene_name):
            """Stream download that checks stop_event on every chunk."""
            from tqdm import tqdm
            from asf_search.download.download import _try_get_response

            thread_session = asf.ASFSession()
            thread_session.cookies.update(self.session.cookies)
            thread_session.headers.update(self.session.headers)

            for attempt in range(1, 4):
                if stop_event.is_set():
                    raise InterruptedError("Download cancelled by user.")
                try:
                    response = _try_get_response(session=thread_session, url=url)
                    total_bytes = int(response.headers.get('content-length', expected_bytes))

                    with tqdm(
                        total=total_bytes,
                        unit='B',
                        unit_scale=True,
                        unit_divisor=1024,
                        desc=f"[Worker {pbar_position+1}] {scene_name}",
                        bar_format='{desc:<60}{percentage:3.0f}%|{bar:25}{r_bar}',
                        colour='green',
                        position=pbar_position,
                        leave=True,
                    ) as pbar:
                        with open(file_path, 'wb') as f:
                            for chunk in response.iter_content(chunk_size=65536):
                                # Check stop event on EVERY chunk — this is the key
                                if stop_event.is_set():
                                    response.close()  # abort the connection immediately
                                    raise InterruptedError("Download cancelled by user.")
                                if chunk:
                                    f.write(chunk)
                                    pbar.update(len(chunk))
                    return  # success

                except InterruptedError:
                    raise  # propagate immediately, don't retry
                except Exception as e:
                    if file_path.exists():
                        file_path.unlink()
                    if attempt == 3:
                        raise
                    time.sleep(2 ** attempt)

        def _download_job(args):
            key, result, download_path, position = args
            file_id   = result.properties['fileID']
            size_b    = result.properties['bytes']
            size_mb   = size_b / (1024 * 1024)
            filename  = result.properties.get('fileName', f"{file_id}.zip")
            file_path = download_path / filename

            scene_name = result.properties.get('sceneName', file_id)

            if stop_event.is_set():
                return file_id, 'cancelled', 0, None

            # Skip if already complete
            if file_path.exists() and file_path.stat().st_size == size_b:
                return file_id, 'skipped', size_mb, None

            # Remove incomplete file
            if file_path.exists():
                file_path.unlink()

            with active_files_lock:
                active_files[position] = file_path

            try:
                start_time = time.time()
                _stream_download_interruptible(
                    url=result.properties['url'],
                    file_path=file_path,
                    expected_bytes=size_b,
                    pbar_position=position,
                    scene_name=scene_name,
                )

                actual_size = file_path.stat().st_size
                if actual_size != size_b:
                    raise IOError(f"Size mismatch: expected {size_b}, got {actual_size} bytes.")

                elapsed = time.time() - start_time
                speed   = size_mb / elapsed if elapsed > 0 else 0
                return file_id, 'success', speed, None

            except InterruptedError:
                return file_id, 'cancelled', 0, None

            except Exception as e:
                if file_path.exists():
                    file_path.unlink()
                return file_id, 'failed', 0, str(e)
            finally:
                with active_files_lock:
                    active_files.pop(position, None)
        job_args = [
            (key, result, download_path, i % max_workers) 
            for i, (key, result, download_path) in enumerate(jobs)
        ]

        executor = ThreadPoolExecutor(max_workers=max_workers)
        futures  = {executor.submit(_download_job, args): args for args in job_args}

        completed_count = 0
        try:
            for future in as_completed(futures):
                file_id, status, value, error = future.result()
                completed_count += 1
                pct = int(completed_count / total_jobs * 100) if total_jobs else 100

                if status == 'success':
                    print(f"  {Fore.GREEN}{file_id} ({value:.1f} MB/s)")
                    success_count += 1
                    if on_progress:
                        on_progress(f"[{completed_count}/{total_jobs}] ✔ {file_id}", pct)
                elif status == 'skipped':
                    print(f"  {Fore.YELLOW}{file_id} ({value:.1f} MB, already exists)")
                    success_count += 1
                    if on_progress:
                        on_progress(f"[{completed_count}/{total_jobs}] ⏭ {file_id} (exists)", pct)
                elif status == 'cancelled':
                    pass  # silently skip cancelled jobs
                else:
                    print(f"  {Fore.RED}{file_id}{error}")
                    failure_count += 1
                    failed_files.append(file_id)
                    if on_progress:
                        on_progress(f"[{completed_count}/{total_jobs}] ✘ {file_id}", pct)
        except KeyboardInterrupt:
            print(f"\n{Fore.YELLOW}⚠ Download interrupted by user. Cancelling pending jobs...")
            stop_event.set()
            # Cancel all pending futures that haven't started yet
            for future in futures:
                future.cancel()

            # Shut down without waiting for running threads to finish
            executor.shutdown(wait=False, cancel_futures=True)

            # Clean up any partial files being actively written
            with active_files_lock:
                for position, file_path in active_files.items():
                    if file_path.exists():
                        print(f"  {Fore.RED}Removing partial file: {file_path.name}")
                        file_path.unlink()

            print(f"{Fore.YELLOW}Download cancelled. "
                    f"{success_count} scenes completed before interrupt.")
            return

        else:
            executor.shutdown(wait=True)

        # Final summary
        print("\n" + "─" * 60)
        print(f"Download complete: {Fore.GREEN}{success_count}/{total_jobs} succeeded{Fore.RESET}", end="")
        if failure_count:
            print(f", {Fore.RED}{failure_count}/{total_jobs} failed{Fore.RESET}")
            print(f"\nFailed files:")
            for f in failed_files:
                print(f"  {Fore.RED}- {f}")
        if len(stack_paths) == 1:
            print(f"\nFiles saved to: {next(iter(stack_paths.values()))}")
        else:
            print(f"\nFiles saved to:")
            for key, p in stack_paths.items():
                print(f"  path={key[0]} frame={key[1]}: {p}")

Usage

  • Create downloader with parameters

    Initialize a downloader instance with search criteria

    s1 = Downloader.create('ASF_Base_Downloader',
                            intersectsWith=[-113.05, 37.74, -112.68, 38.00],
                            dataset='SENTINEL-1',
                            instrument='C-SAR',
                            beamMode='IW',
                            polarization=['VV', 'VV+VH'],
                            processingLevel='SLC',
                            start='2020-01-01',
                            end='2020-12-31',
                            relativeOrbit=100,
                            frame=466,
                            workdir='path/to/dir')
    
    OR
    params = {
        "intersectsWith": [-113.05, 37.74, -112.68, 38.00],
        "dataset": "SENTINEL-1",
        "instrument": "C-SAR",
        "beamMode": "IW",
        "polarization": ["VV", "VV+VH"],
        "processingLevel": "SLC",
        "start": "2020-01-01",
        "end": "2020-12-31",
        "relativeOrbit": 100,
        "frame": 466,
        "workdir": "path/to/dir"
    }
    dl = Downloader.create('ASF_Base_Downloader', **params)
    
    OR
    from insarhub.config import ASF_Base_Config
    cfg = ASF_Base_Config(intersectsWith=[-113.05, 37.74, -112.68, 38.00],
                          dataset='SENTINEL-1',
                          instrument='C-SAR',
                          beamMode='IW',
                          polarization=['VV', 'VV+VH'],
                          processingLevel='SLC',
                          start='2020-01-01',
                          end='2020-12-31',
                          relativeOrbit=100,
                          frame=466,
                          workdir='path/to/dir')
    dl = Downloader.create('ASF_Base_Downloader', config=cfg)
    

    The base config ASF_Base_Config contains all parameters from asf_search keywords. For detailed descriptions refer to the official ASF Search documentation.

    Source code in src/insarhub/config/defaultconfig.py
    @dataclass
    class ASF_Base_Config:
        '''
        Dataclass containing all configuration options for asf_search.
    
        This class provides a unified interface for configuring ASF (Alaska Satellite Facility) 
        search parameters.
        '''
        name: str = "ASF_Base_Config"
        dataset: str | list[str] | None = None
        platform: str | list[str] | None = None
        instrument: str | None = None
        absoluteBurstID: int | list[int] | None = None
        absoluteOrbit: int | list[int] | None = None
        asfFrame: int | list[int] | None = None
        beamMode: str | None = None
        beamSwath: str | list[str] | None = None
        campaign: str | None = None
        maxDoppler: float | None = None
        minDoppler: float | None = None
        maxFaradayRotation: float | None = None
        minFaradayRotation: float | None = None
        flightDirection: str | None = None
        flightLine: str | None = None
        frame: int | list[int] | None = None
        frameCoverage: str | None = None
        fullBurstID: str | list[str] | None = None
        groupID: str | None = None
        jointObservation: bool | None = None
        lookDirection: str | None = None
        offNadirAngle: float | list[float] | None = None
        operaBurstID: str | list[str] | None = None
        polarization: str | list[str] | None = None
        mainBandPolarization: str | list[str] | None = None
        sideBandPolarization: str | list[str] | None = None
        processingLevel: str | None = None
        productionConfiguration: str | list[str] | None = None
        rangeBandwidth: str | list[str] | None = None
        relativeBurstID: str | list[str] | None = None
        relativeOrbit: int | list[int] | None = None
        intersectsWith: str | None = None  
        processingDate: str | None = None
        start: str | None = None
        end: str | None = None
        season: list[int] | None = None
        stack_from_id: str | None = None
        maxResults: int | None = None
        granule_names: str | list[str] | None = None
        workdir: Path | str = field(default_factory=lambda: Path.cwd())
    
        # ── UI metadata consumed by the API / settings panel ─────────────────────
        _ui_groups: ClassVar[list] = [
            {"label": "Dataset",
             "fields": ["dataset", "platform", "instrument"]},
            {"label": "SAR Parameters",
             "fields": ["beamMode", "beamSwath", "processingLevel",
                        "polarization", "mainBandPolarization", "sideBandPolarization",
                        "lookDirection", "flightDirection", "flightLine"]},
            {"label": "Orbit & Frame",
             "fields": ["relativeOrbit", "absoluteOrbit", "frame", "asfFrame", "frameCoverage"]},
            {"label": "Burst IDs",
             "fields": ["absoluteBurstID", "relativeBurstID", "fullBurstID", "operaBurstID"]},
            {"label": "Temporal & Location",
             "fields": ["start", "end", "processingDate", "season",
                        "intersectsWith", "stack_from_id", "maxResults"]},
            {"label": "By Granule Name",
             "fields": ["granule_names"]},
            {"label": "Advanced",
             "fields": ["campaign", "groupID",
                        "maxDoppler", "minDoppler", "maxFaradayRotation", "minFaradayRotation",
                        "offNadirAngle", "jointObservation",
                        "productionConfiguration", "rangeBandwidth"]},
        ]
        _ui_fields: ClassVar[dict] = {
            # Dataset
            "dataset":         {"type": "text",
                                "hint": "Dataset to search (e.g. SENTINEL-1, ALOS, NISAR)"},
            "platform":        {"type": "text",
                                "hint": "Platform name (e.g. S1A, ALOS)"},
            "instrument":      {"type": "text",
                                "hint": "Instrument name (e.g. C-SAR)"},
            # SAR Parameters
            "beamMode":        {"type": "select", "options": ["", "IW", "EW", "SM", "WV"],
                                "hint": "SAR acquisition mode"},
            "beamSwath":       {"type": "text",
                                "hint": "Beam swath identifier"},
            "processingLevel": {"type": "select",
                                "options": ["", "SLC", "GRD", "GRD_HD", "GRD_MS",
                                            "BURST", "RTC_HI_RES", "RTC_LOW_RES"],
                                "hint": "Processing level"},
            "polarization":    {"type": "text",
                                "hint": "Polarization(s), e.g. VV or VV+VH"},
            "mainBandPolarization": {"type": "text",
                                "hint": "Main band polarization (NISAR dual-band)"},
            "sideBandPolarization": {"type": "text",
                                "hint": "Side band polarization (NISAR dual-band)"},
            "lookDirection":   {"type": "select", "options": ["", "LEFT", "RIGHT"],
                                "hint": "Radar look direction"},
            "flightDirection": {"type": "select", "options": ["", "ASCENDING", "DESCENDING"],
                                "hint": "Orbit direction (empty = both)"},
            "flightLine":      {"type": "text",
                                "hint": "Flight line identifier"},
            # Orbit & Frame
            "relativeOrbit":   {"type": "text",
                                "hint": "Relative orbit (path) number(s), e.g. 64 or 64,65"},
            "absoluteOrbit":   {"type": "text",
                                "hint": "Absolute orbit number(s)"},
            "frame":           {"type": "text",
                                "hint": "Sensor native frame number(s)"},
            "asfFrame":        {"type": "text",
                                "hint": "ASF internal frame number(s)"},
            "frameCoverage":   {"type": "text",
                                "hint": "Frame coverage filter"},
            # Burst IDs
            "absoluteBurstID": {"type": "text",
                                "hint": "Absolute burst ID(s)"},
            "relativeBurstID": {"type": "text",
                                "hint": "Relative burst ID(s)"},
            "fullBurstID":     {"type": "text",
                                "hint": "Full burst ID, e.g. T064_135524_IW1"},
            "operaBurstID":    {"type": "text",
                                "hint": "OPERA burst ID(s)"},
            # Temporal & Location
            "start":           {"type": "text",
                                "hint": "Default start date (ISO 8601, e.g. 2020-01-01)"},
            "end":             {"type": "text",
                                "hint": "Default end date (ISO 8601, e.g. 2022-12-31)"},
            "processingDate":  {"type": "text",
                                "hint": "Processing date filter (ISO 8601)"},
            "season":          {"type": "text",
                                "hint": "Day-of-year range for seasonal filtering, e.g. 1,90"},
            "intersectsWith":  {"type": "text",
                                "hint": "WKT geometry for spatial intersection"},
            "stack_from_id":   {"type": "text",
                                "hint": "Build stack from a reference scene ID"},
            "maxResults":      {"type": "auto_number", "min": 1, "max": 50000, "step": 100,
                                "hint": "Maximum number of search results returned"},
            "granule_names":   {"type": "text",
                                "hint": "Granule/scene names (comma-separated), or a path to a CSV/XLSX/TXT file. "
                                        "When set, overrides normal parameter-based search."},
            # Advanced
            "campaign":        {"type": "text",
                                "hint": "Campaign name filter (UAVSAR / airborne datasets)"},
            "groupID":         {"type": "text",
                                "hint": "Group ID filter"},
            "maxDoppler":      {"type": "auto_number",
                                "hint": "Maximum Doppler centroid frequency (Hz)"},
            "minDoppler":      {"type": "auto_number",
                                "hint": "Minimum Doppler centroid frequency (Hz)"},
            "maxFaradayRotation": {"type": "auto_number",
                                "hint": "Maximum Faraday rotation angle (degrees)"},
            "minFaradayRotation": {"type": "auto_number",
                                "hint": "Minimum Faraday rotation angle (degrees)"},
            "offNadirAngle":   {"type": "text",
                                "hint": "Off-nadir angle(s), e.g. 34.3 or 21.5,26.2"},
            "jointObservation":{"type": "bool",
                                "hint": "Filter for joint ALOS PALSAR/AVNIR-2 observations"},
            "productionConfiguration": {"type": "text",
                                "hint": "Production configuration identifier"},
            "rangeBandwidth":  {"type": "text",
                                "hint": "Range bandwidth filter"},
        }
        # ─────────────────────────────────────────────────────────────────────────
    
        def __post_init__(self):
            if isinstance(self.workdir, str):
                self.workdir = Path(self.workdir).expanduser().resolve()
    
  • Search

    Query the satellite archive and retrieve available scenes matching your criteria

    results = dl.search()
    

    Raises:

    Type Description
    ValueError

    If search returns no results.

    Exception

    If search fails after 10 retry attempts.

  • Filter

    Refine existing search results by applying additional constraints

    filter_result = dl.filter(start='2020-02-01')
    

    Parameters:

    Name Type Description Default
    path_frame tuple | list[tuple]

    A single (path, frame) tuple or list of tuples. Defaults to None.

    None
    start str

    Start date string, e.g. '2021-01-01'. Defaults to None.

    None
    end str

    End date string, e.g. '2023-12-31'. Defaults to None.

    None
    frame int | list[int]

    Sensor native frame number(s), e.g. 50. Defaults to None.

    None
    asfFrame int | list[int]

    ASF internal frame number(s), e.g. 50. Defaults to None.

    None
    flightDirection str

    'ASCENDING' or 'DESCENDING'. Defaults to None.

    None
    relativeOrbit int | list[int]

    Relative orbit number(s) to keep. Defaults to None.

    None
    absoluteOrbit int | list[int]

    Absolute orbit number(s) to keep. Defaults to None.

    None
    lookDirection str

    'LEFT' or 'RIGHT'. Defaults to None.

    None
    polarization str | list[str]

    Polarization(s) to keep, e.g. 'VV' or ['VV', 'VH']. Defaults to None.

    None
    processingLevel str

    Processing level to keep, e.g. 'SLC'. Defaults to None.

    None
    beamMode str

    Beam mode to keep, e.g. 'IW'. Defaults to None.

    None
    season list[int]

    List of months (1-12) to keep, e.g. [6, 7, 8] for summer. Defaults to None.

    None
    min_coverage float

    Minimum fractional overlap (0-1) between scene and AOI. Defaults to None.

    None
    min_count int

    Drop stacks with fewer than this many scenes after filtering. Defaults to None.

    None
    max_count int

    Keep at most this many scenes per stack (from earliest). Defaults to None.

    None
    latest_n int

    Keep the N most recent scenes per stack. Defaults to None.

    None
    earliest_n int

    Keep the N earliest scenes per stack. Defaults to None.

    None

    Raises:

    Type Description
    ValueError

    If no search results are available.

  • Reset filter

    Restore search results to the original unfiltered state

    dl.reset()
    
  • Summary

    Display statistics and overview of current search results

    dl.summary()
    

    Parameters:

    Name Type Description Default
    ls bool

    If True, list individual scene names and dates. Defaults to False.

    False
  • View Footprint

    Visualize geographic coverage of search results on an interactive map

    dl.footprint()
    

    Parameters:

    Name Type Description Default
    save_path str

    Path to save the figure. If None, displays interactively. Defaults to None.

    None
  • Download

    Download all scenes from current search results to local storage

    dl.download()
    

    Parameters:

    Name Type Description Default
    save_path str

    Download path. Defaults to config.workdir.

    None
    max_workers int

    Concurrent downloads (3–5 recommended). Defaults to 3.

    None
    scenes

    Restrict download to a subset of scenes. Accepts any of: - list | set of scene name strings - The direct output of select_pairs() — either a list[[ref, sec], ...] (single-stack) or a dict{(path, frame): [[ref, sec], ...]} (multi-stack). Unique scene names are extracted automatically. When None (default) all search results are downloaded.

    None

    Raises:

    Type Description
    ValueError

    If no search results are available.

  • DEM Download

    Download DEM covering all scenes from current search results

    dl.dem()
    

    Parameters:

    Name Type Description Default
    save_path str

    Directory to save DEM files. If None, uses config.workdir. Defaults to None.

    None
  • Select Pairs

    Compute interferogram pairs for all active stacks based on temporal and perpendicular baseline constraints. Scenes with poor acquisition conditions (heavy rain, snow) are excluded automatically when avoid_low_quality_days=True (default).

    from insarhub.utils import plot_pair_network
    pairs, baselines, scene_bperp, _ = dl.select_pairs(
        dt_targets=(6, 12, 24, 36, 48, 72, 96),
        dt_tol=3,
        dt_max=120,
        pb_max=150.0,
        min_degree=3,
        max_degree=5,
        force_connect=True,
        avoid_low_quality_days=True,
        precip_mm_threshold=25.0,
        snow_threshold=0.5,
    )
    fig = plot_pair_network(pairs, baselines, scene_bperp)
    fig.show()
    

    Parameters:

    Name Type Description Default
    dt_targets tuple

    Target temporal spacings in days. Defaults to (6, 12, 24, 36, 48, 72, 96).

    None
    dt_tol int

    Tolerance in days around each target spacing. Defaults to 3.

    None
    dt_max int

    Maximum temporal baseline in days. Defaults to 120.

    None
    pb_max float

    Maximum perpendicular baseline in meters. Defaults to 150.0.

    None
    min_degree int

    Minimum number of connections per scene. Defaults to 3.

    None
    max_degree int

    Maximum number of connections per scene. Defaults to 5.

    None
    force_connect bool

    Force connectivity for isolated scenes. Defaults to True.

    None
    max_workers int

    Threads for API baseline fallback. Defaults to 4.

    None
    avoid_low_quality_days bool

    Skip scenes with heavy snow or rain. Defaults to True.

    None
    snow_threshold float

    MODIS snow fraction threshold to exclude a scene. Defaults to 0.5.

    None
    precip_mm_threshold float

    3-day precipitation threshold in mm to exclude a scene. Defaults to 25.0.

    None
    aoi_wkt str

    AOI geometry in WKT for quality scoring. Defaults to search AOI.

    None

S1_SLC is a specialized downloader that extends ASF_Base_Downloader, preconfigured specifically for downloading Sentinel-1 SLC data.

Source code in src/insarhub/downloader/s1_slc.py
class S1_SLC(ASF_Base_Downloader):
    name = "S1_SLC"
    description = "Sentinel-1 SLC scene search and download via ASF."
    default_config = S1_SLC_Config

    """
    A class to search and download Sentinel-1 data using ASF Search API."""

    def download(self, save_path: str | None = None, max_workers: int = None, force_cdse: bool = False, download_orbit: bool = False, stop_event=None, on_progress=None):
        from insarhub.utils.defaults import DOWNLOAD_DEFAULTS as _DL
        if max_workers is None: max_workers = _DL["max_workers"]
        """Download SLC data and optionally associated orbit files.

        Args:
            save_path (str | None): Optional path to save the downloaded files. Defaults to None.
            max_workers (int): Parallel download workers. Defaults to 3.
            force_cdse (bool): If True, forces downloading orbit files from CDSE instead of ASF. Defaults to False.
            download_orbit (bool): If True, also downloads orbit files after scenes. Defaults to False.
            stop_event: Optional threading.Event to cancel the download.
            on_progress: Optional callback(message, pct) called after each file completes.
        """
        super().download(save_path=save_path, max_workers=max_workers, stop_event=stop_event, on_progress=on_progress)
        if download_orbit:
            self.download_orbit(force_cdse=force_cdse)

    def download_orbit(self, force_cdse: bool = False, save_dir: str | None = None,
                       stop_event=None, scenes=None):
        """Download orbit files for the current search results.

        Downloads from ASF by default (no credentials required).  Pass
        ``force_cdse=True`` to use the Copernicus Data Space Ecosystem (CDSE)
        server instead — CDSE typically publishes precise orbits a few hours
        earlier but requires an account at https://dataspace.copernicus.eu/
        configured in your ``.netrc`` file.

        Args:
            force_cdse (bool): Use CDSE instead of ASF. Defaults to False.
            save_dir (str | None): Directory to save orbit files. Defaults to workdir if not specified.
            scenes: Restrict to a subset of scenes. Accepts scene name strings, or the
                direct output of ``select_pairs()`` (list or dict). Same format as
                ``download(scenes=...)``. When ``None`` all scenes get orbit files.
        """
        use_asf = not force_cdse
        print(f"Downloading orbit files from {'ASF' if use_asf else 'CDSE'}…")

        if force_cdse:
            self._has_cdse_netrc = self._check_netrc(keyword='machine dataspace.copernicus.eu')
            if self._has_cdse_netrc:
                print(f"{Fore.GREEN}CDSE credentials found in .netrc.\n")
            else:
                while True:
                    self._cdse_username = input("Enter your CDSE username: ")
                    self._cdse_password = getpass.getpass("Enter your CDSE password: ")
                    if not self._check_cdse_credentials(self._cdse_username, self._cdse_password):
                        print(f"{Fore.RED}Authentication failed. Please check your credentials and try again.\n")
                        continue
                    netrc_path = Path.home().joinpath(".netrc")
                    cdse_entry = f"\nmachine dataspace.copernicus.eu\n    login {self._cdse_username}\n    password {self._cdse_password}\n"
                    with open(netrc_path, 'a') as f:
                        f.write(cdse_entry)
                    print(f"{Fore.GREEN}Credentials saved to {netrc_path}.\n")
                    break

        from insarhub.downloader.asf_base import _parse_scene_filter
        scene_filter = _parse_scene_filter(scenes)

        base_dir = Path(save_dir) if save_dir else (getattr(self, 'download_dir', None) or Path(getattr(self.config, 'workdir', None) or Path.cwd()))
        all_items = [
            (key, result)
            for key, results in self.results.items()  # type: ignore[union-attr]
            for result in results
            if scene_filter is None or result.properties['sceneName'] in scene_filter
        ]
        with tqdm(all_items, desc="Orbit files", unit="scene", bar_format="{l_bar}{bar:20}{r_bar}") as pbar:
            for key, result in pbar:
                if stop_event is not None and stop_event.is_set():
                    tqdm.write("Orbit download stopped.")
                    break
                _base = Path(base_dir)
                _is_stack = (_base / "insarhub_config.json").exists()
                download_path = (Path(save_dir) / 'slc') if save_dir else ((_base / 'slc') if _is_stack else (_base / f'p{key[0]}_f{key[1]}' / 'slc'))
                download_path.mkdir(parents=True, exist_ok=True)
                scene_name = result.properties['sceneName']
                short_name = scene_name[:40] + "..."
                acq_time = scene_name.replace("__", "_").split("_")[4]
                already_have = False
                for eof in download_path.glob("*.EOF"):
                    parts = eof.stem.split("_V")
                    if len(parts) == 2:
                        validity = parts[1].split("_")
                        if len(validity) == 2 and validity[0] <= acq_time <= validity[1]:
                            pbar.set_postfix_str(f"skip {short_name}")
                            already_have = True
                            break
                if already_have:
                    continue
                pbar.set_postfix_str(f"fetch {short_name}")
                _save = download_path.as_posix()
                try:
                    info = download_eofs(sentinel_file=scene_name, save_dir=_save, force_asf=use_asf)
                except Exception as e:
                    if use_asf:
                        pbar.set_postfix_str(f"ASF fail, try CDSE {short_name}")
                        try:
                            info = download_eofs(sentinel_file=scene_name, save_dir=_save, force_asf=False)
                        except Exception as e2:
                            tqdm.write(f"{Fore.RED}[ERROR] {scene_name}: {e2}")
                            info = []
                    else:
                        tqdm.write(f"{Fore.RED}[ERROR] {scene_name}: {e}")
                        info = []
                if info:
                    pbar.set_postfix_str(f"ok {short_name}")
                else:
                    tqdm.write(f"{Fore.YELLOW}[WARN] No orbit file found for: {scene_name}")

    def _check_cdse_credentials(self, username: str, password: str) -> bool:
        url = "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token"
        data = {
            "grant_type": "password",
            "client_id": "cdse-public",
            "username": username,
            "password": password
        }
        resp = requests.post(url, data=data)
        return resp.status_code == 200 and "access_token" in resp.json()

Usage

  • Create downloader with parameters

    Initialize a downloader instance with search criteria

    s1 = Downloader.create('S1_SLC',
                            intersectsWith=[-113.05, 37.74, -112.68, 38.00],
                            start='2020-01-01',
                            end='2020-12-31',
                            relativeOrbit=100,
                            frame=466,
                            workdir='path/to/dir')
    
    OR
    params = {
        "intersectsWith": [-113.05, 37.74, -112.68, 38.00],
        "start": "2020-01-01",
        "end": "2020-12-31",
        "relativeOrbit": 100,
        "frame": 466,
        "workdir": "path/to/dir"
    }
    dl = Downloader.create('S1_SLC', **params)
    
    OR
    from insarhub.config import S1_SLC_Config
    cfg = S1_SLC_Config(intersectsWith=[-113.05, 37.74, -112.68, 38.00],
                        start="2020-01-01",
                        end="2020-12-31",
                        relativeOrbit=100,
                        frame=466,
                        workdir="path/to/dir")
    dl = Downloader.create('S1_SLC', config=cfg)
    

    The config S1_SLC_Config contains pre-defined parameters specifically for Sentinel-1 data. For detailed descriptions refer to the official ASF Search documentation.

    Source code in src/insarhub/config/defaultconfig.py
    @dataclass
    class S1_SLC_Config(ASF_Base_Config):
        name:str = "S1_SLC_Config"
        dataset: str | list[str] | None =  constants.DATASET.SENTINEL1
        instrument: str | None = constants.INSTRUMENT.C_SAR
        beamMode:str | None = constants.BEAMMODE.IW
        polarization: str|list[str] | None = field(default_factory=lambda: [constants.POLARIZATION.VV, constants.POLARIZATION.VV_VH])
        processingLevel: str | None = constants.PRODUCT_TYPE.SLC
    
  • Search

    results = dl.search()
    

    Raises:

    Type Description
    ValueError

    If search returns no results.

    Exception

    If search fails after 10 retry attempts.

  • Filter

    filter_result = dl.filter(start='2020-02-01')
    

    Parameters:

    Name Type Description Default
    path_frame tuple | list[tuple]

    A single (path, frame) tuple or list of tuples. Defaults to None.

    None
    start str

    Start date string, e.g. '2021-01-01'. Defaults to None.

    None
    end str

    End date string, e.g. '2023-12-31'. Defaults to None.

    None
    frame int | list[int]

    Sensor native frame number(s), e.g. 50. Defaults to None.

    None
    asfFrame int | list[int]

    ASF internal frame number(s), e.g. 50. Defaults to None.

    None
    flightDirection str

    'ASCENDING' or 'DESCENDING'. Defaults to None.

    None
    relativeOrbit int | list[int]

    Relative orbit number(s) to keep. Defaults to None.

    None
    absoluteOrbit int | list[int]

    Absolute orbit number(s) to keep. Defaults to None.

    None
    lookDirection str

    'LEFT' or 'RIGHT'. Defaults to None.

    None
    polarization str | list[str]

    Polarization(s) to keep, e.g. 'VV' or ['VV', 'VH']. Defaults to None.

    None
    processingLevel str

    Processing level to keep, e.g. 'SLC'. Defaults to None.

    None
    beamMode str

    Beam mode to keep, e.g. 'IW'. Defaults to None.

    None
    season list[int]

    List of months (1-12) to keep, e.g. [6, 7, 8] for summer. Defaults to None.

    None
    min_coverage float

    Minimum fractional overlap (0-1) between scene and AOI. Defaults to None.

    None
    min_count int

    Drop stacks with fewer than this many scenes after filtering. Defaults to None.

    None
    max_count int

    Keep at most this many scenes per stack (from earliest). Defaults to None.

    None
    latest_n int

    Keep the N most recent scenes per stack. Defaults to None.

    None
    earliest_n int

    Keep the N earliest scenes per stack. Defaults to None.

    None

    Raises:

    Type Description
    ValueError

    If no search results are available.

  • Reset filter

    dl.reset()
    
  • Summary

    dl.summary()
    

    Parameters:

    Name Type Description Default
    ls bool

    If True, list individual scene names and dates. Defaults to False.

    False
  • View Footprint

    dl.footprint()
    

    Parameters:

    Name Type Description Default
    save_path str

    Path to save the figure. If None, displays interactively. Defaults to None.

    None
  • Download

    dl.download()
    
  • DEM Download

    dl.dem()
    

    Parameters:

    Name Type Description Default
    save_path str

    Directory to save DEM files. If None, uses config.workdir. Defaults to None.

    None
  • Select Pairs

    from insarhub.utils import plot_pair_network
    pairs, baselines, scene_bperp, _ = s1.select_pairs(
        dt_targets=(6, 12, 24, 36, 48, 72, 96),
        dt_tol=3,
        dt_max=120,
        pb_max=150.0,
        min_degree=3,
        max_degree=5,
        force_connect=True,
        avoid_low_quality_days=True,
        precip_mm_threshold=25.0,
        snow_threshold=0.5,
    )
    fig = plot_pair_network(pairs, baselines, scene_bperp)
    fig.show()
    

    Parameters:

    Name Type Description Default
    dt_targets tuple

    Target temporal spacings in days. Defaults to (6, 12, 24, 36, 48, 72, 96).

    None
    dt_tol int

    Tolerance in days around each target spacing. Defaults to 3.

    None
    dt_max int

    Maximum temporal baseline in days. Defaults to 120.

    None
    pb_max float

    Maximum perpendicular baseline in meters. Defaults to 150.0.

    None
    min_degree int

    Minimum number of connections per scene. Defaults to 3.

    None
    max_degree int

    Maximum number of connections per scene. Defaults to 5.

    None
    force_connect bool

    Force connectivity for isolated scenes. Defaults to True.

    None
    max_workers int

    Threads for API baseline fallback. Defaults to 4.

    None
    avoid_low_quality_days bool

    Skip scenes with heavy snow or rain. Defaults to True.

    None
    snow_threshold float

    MODIS snow fraction threshold to exclude a scene. Defaults to 0.5.

    None
    precip_mm_threshold float

    3-day precipitation threshold in mm to exclude a scene. Defaults to 25.0.

    None
    aoi_wkt str

    AOI geometry in WKT for quality scoring. Defaults to search AOI.

    None