Skip to content

runconfig

RunConfig dataclass

dataclass containing CSLC runconfig

Source code in src/compass/utils/runconfig.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
@dataclass(frozen=True)
class RunConfig:
    '''dataclass containing CSLC runconfig'''
    # workflow name
    name: str
    # runconfig options converted from dict
    groups: SimpleNamespace
    # list of lists where bursts in interior list have a common burst_id
    bursts: list[Sentinel1BurstSlc]
    # dict of reference radar paths and grids values keyed on burst ID
    # (empty/unused if rdr2geo)
    reference_radar_info: ReferenceRadarInfo
    # entirety of yaml as string
    yaml_string: str
    # output paths:
    # key = tuple[burst ID, date str]
    # val = SimpleNamespace output directory path, HDF5 name, scratch directory path
    output_paths: dict[tuple[str, str], SimpleNamespace]

    @classmethod
    def load_from_yaml(cls, yaml_runconfig: str,
                       workflow_name: str) -> RunConfig:
        """Initialize RunConfig class with options from given yaml file.

        Parameters
        ----------
        yaml_runconfig : str
            Path to yaml file containing the options to load or string contents
            of a runconfig
        workflow_name: str
            Name of the workflow for which uploading default options
        """
        cfg = load_validate_yaml(yaml_runconfig, workflow_name)

        # Convert runconfig dict to SimpleNamespace
        sns = wrap_namespace(cfg['runconfig']['groups'])

        bursts = runconfig_to_bursts(sns)

        # Load reference grids if not reference run i.e. not running rdr2geo
        ref_rdr_grid_info = None
        if not sns.input_file_group.reference_burst.is_reference:
            ref_rdr_grid_info = get_ref_radar_grid_info(
                sns.input_file_group.reference_burst.file_path)

        # For saving entire file with defaults filled-in as string to metadata.
        # Stop gap for writing dict to individual elements to HDF5 metadata
        user_plus_default_yaml_str = yaml.dump(cfg)

        output_paths = create_output_paths(sns, bursts)

        return cls(cfg['runconfig']['name'], sns, bursts, ref_rdr_grid_info,
                   user_plus_default_yaml_str, output_paths)

    @property
    def burst_id(self) -> list[str]:
        return self.groups.input_file_group.burst_id

    @property
    def dem(self) -> str:
        return self.groups.dynamic_ancillary_file_group.dem_file

    @property
    def tec_file(self) -> str:
        return self.groups.dynamic_ancillary_file_group.tec_file

    @property
    def is_reference(self) -> bool:
        return self.groups.input_file_group.reference_burst.is_reference

    @property
    def orbit_path(self) -> bool:
        return self.groups.input_file_group.orbit_file_path

    @property
    def polarization(self) -> list[str]:
        return self.groups.processing.polarization

    @property
    def product_path(self):
        return self.groups.product_path_group.product_path

    @property
    def reference_path(self) -> str:
        return self.groups.input_file_group.reference_burst.file_path

    @property
    def rdr2geo_params(self) -> dict:
        return self.groups.processing.rdr2geo

    @property
    def geo2rdr_params(self) -> dict:
        return self.groups.processing.geo2rdr

    @property
    def resample_params(self) -> dict:
        return self.groups.processing.resample

    @property
    def lut_params(self) -> dict:
        return self.groups.processing.correction_luts

    @property
    def safe_files(self) -> list[str]:
        return self.groups.input_file_group.safe_file_path

    @property
    def sas_output_file(self):
        return self.groups.product_path_group.sas_output_file

    @property
    def scratch_path(self):
        return self.groups.product_path_group.scratch_path

    @property
    def gpu_enabled(self):
        return self.groups.worker.gpu_enabled

    @property
    def gpu_id(self):
        return self.groups.worker.gpu_id

    def as_dict(self):
        '''Convert self to dict for write to YAML/JSON

        Unable to dataclasses.asdict() because isce3 objects can not be pickled
        '''
        # Convenience functions
        def date_str(burst):
            '''Burst datetime sensing_start to str conversion
            '''
            return burst.sensing_start.date().strftime('%Y%m%d')

        def burst_as_key(burst):
            '''Create an unique key of burst ID, date string, and polarization
            '''
            return '_'.join([str(burst.burst_id), date_str(burst), burst.polarization])

        self_as_dict = {}
        for key, val in self.__dict__.items():
            if key == 'groups':
                val = unwrap_to_dict(val)
            elif key == 'bursts':
                val = {burst_as_key(burst): burst.as_dict() for burst in val}
            self_as_dict[key] = val
        return self_as_dict

    def to_yaml(self):
        '''Dump runconfig as string to sys.stdout
        '''
        self_as_dict = self.as_dict()
        yaml_obj = YAML(typ='safe')
        yaml_obj.dump(self_as_dict, sys.stdout)

as_dict()

Convert self to dict for write to YAML/JSON

Unable to dataclasses.asdict() because isce3 objects can not be pickled

Source code in src/compass/utils/runconfig.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def as_dict(self):
    '''Convert self to dict for write to YAML/JSON

    Unable to dataclasses.asdict() because isce3 objects can not be pickled
    '''
    # Convenience functions
    def date_str(burst):
        '''Burst datetime sensing_start to str conversion
        '''
        return burst.sensing_start.date().strftime('%Y%m%d')

    def burst_as_key(burst):
        '''Create an unique key of burst ID, date string, and polarization
        '''
        return '_'.join([str(burst.burst_id), date_str(burst), burst.polarization])

    self_as_dict = {}
    for key, val in self.__dict__.items():
        if key == 'groups':
            val = unwrap_to_dict(val)
        elif key == 'bursts':
            val = {burst_as_key(burst): burst.as_dict() for burst in val}
        self_as_dict[key] = val
    return self_as_dict

load_from_yaml(yaml_runconfig, workflow_name) classmethod

Initialize RunConfig class with options from given yaml file.

Parameters:

Name Type Description Default
yaml_runconfig str

Path to yaml file containing the options to load or string contents of a runconfig

required
workflow_name str

Name of the workflow for which uploading default options

required
Source code in src/compass/utils/runconfig.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
@classmethod
def load_from_yaml(cls, yaml_runconfig: str,
                   workflow_name: str) -> RunConfig:
    """Initialize RunConfig class with options from given yaml file.

    Parameters
    ----------
    yaml_runconfig : str
        Path to yaml file containing the options to load or string contents
        of a runconfig
    workflow_name: str
        Name of the workflow for which uploading default options
    """
    cfg = load_validate_yaml(yaml_runconfig, workflow_name)

    # Convert runconfig dict to SimpleNamespace
    sns = wrap_namespace(cfg['runconfig']['groups'])

    bursts = runconfig_to_bursts(sns)

    # Load reference grids if not reference run i.e. not running rdr2geo
    ref_rdr_grid_info = None
    if not sns.input_file_group.reference_burst.is_reference:
        ref_rdr_grid_info = get_ref_radar_grid_info(
            sns.input_file_group.reference_burst.file_path)

    # For saving entire file with defaults filled-in as string to metadata.
    # Stop gap for writing dict to individual elements to HDF5 metadata
    user_plus_default_yaml_str = yaml.dump(cfg)

    output_paths = create_output_paths(sns, bursts)

    return cls(cfg['runconfig']['name'], sns, bursts, ref_rdr_grid_info,
               user_plus_default_yaml_str, output_paths)

to_yaml()

Dump runconfig as string to sys.stdout

Source code in src/compass/utils/runconfig.py
481
482
483
484
485
486
def to_yaml(self):
    '''Dump runconfig as string to sys.stdout
    '''
    self_as_dict = self.as_dict()
    yaml_obj = YAML(typ='safe')
    yaml_obj.dump(self_as_dict, sys.stdout)

get_ref_radar_grid_info(ref_path)

Find all reference radar grids info

Parameters:

Name Type Description Default
ref_path

Path where reference radar grids processing is stored

required

Returns:

Name Type Description
ref_radar_grids

reference radar path and grid values found associated with burst ID keys

Source code in src/compass/utils/runconfig.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def get_ref_radar_grid_info(ref_path):
    ''' Find all reference radar grids info

    Parameters
    ----------
    ref_path: str
        Path where reference radar grids processing is stored

    Returns
    -------
    ref_radar_grids:
        reference radar path and grid values found associated with
        burst ID keys
    '''
    rdr_grid_files = f'{ref_path}/radar_grid.txt'

    if not os.path.isfile(rdr_grid_files):
        raise FileNotFoundError(f'No reference radar grids not found in {ref_path}')

    ref_rdr_path = os.path.dirname(rdr_grid_files)
    ref_rdr_grid = file_to_rdr_grid(rdr_grid_files)

    return ReferenceRadarInfo(ref_rdr_path, ref_rdr_grid)

load_validate_yaml(yaml_runconfig, workflow_name)

Initialize RunConfig class with options from given yaml file.

Parameters:

Name Type Description Default
yaml_runconfig str

Path to yaml file containing the options to load or string contents of a runconfig

required
workflow_name str

Name of the workflow for which uploading default options

required

Returns:

Type Description
dict

Validated user runconfig dict with defaults inserted

Source code in src/compass/utils/runconfig.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def load_validate_yaml(yaml_runconfig: str, workflow_name: str) -> dict:
    """Initialize RunConfig class with options from given yaml file.

    Parameters
    ----------
    yaml_runconfig : str
        Path to yaml file containing the options to load or string contents of
        a runconfig
    workflow_name: str
        Name of the workflow for which uploading default options

    Returns
    -------
    dict
        Validated user runconfig dict with defaults inserted
    """
    error_channel = journal.error('runconfig.load_validate_yaml')

    try:
        # Load schema corresponding to 'workflow_name' and to validate against
        schema_name = workflow_name if workflow_name == 's1_cslc_geo' \
            else 's1_cslc_radar'
        schema = yamale.make_schema(
            f'{helpers.WORKFLOW_SCRIPTS_DIR}/schemas/{schema_name}.yaml',
            parser='ruamel')
    except:
        err_str = f'unable to load schema for workflow {workflow_name}.'
        error_channel.log(err_str)
        raise ValueError(err_str)

    # Determine run config type based on existence of newlines
    run_config_is_txt = '\n' in yaml_runconfig

    # Prepare part of load and validation error message just in case
    what_is_broken = 'from runconfig string' if run_config_is_txt \
        else yaml_runconfig

    if not run_config_is_txt and not os.path.isfile(yaml_runconfig):
        raise FileNotFoundError(f'Yaml file {yaml_runconfig} not found.')

    # load yaml file or string from command line
    try:
        if run_config_is_txt:
            data = yamale.make_data(content=yaml_runconfig,
                                    parser='ruamel')
        else:
            data = yamale.make_data(yaml_runconfig, parser='ruamel')
    except yamale.YamaleError as yamale_err:
        err_str = f'Yamale unable to load {workflow_name} runconfig yaml {what_is_broken} for validation.'
        error_channel.log(err_str)
        raise yamale.YamaleError(err_str) from yamale_err

    # validate yaml file taken from command line
    try:
        yamale.validate(schema, data)
    except yamale.YamaleError as yamale_err:
        err_str = f'Validation fail for {workflow_name} runconfig yaml {what_is_broken}.'
        error_channel.log(err_str)
        raise yamale.YamaleError(err_str) from yamale_err

    # load default runconfig
    parser = YAML(typ='safe')
    default_cfg_path = f'{helpers.WORKFLOW_SCRIPTS_DIR}/defaults/{schema_name}.yaml'
    with open(default_cfg_path, 'r') as f_default:
        default_cfg = parser.load(f_default)

    # load user config based on input type
    if run_config_is_txt:
        user_cfg = parser.load(yaml_runconfig)
    else:
        with open(yaml_runconfig, 'r') as f_yaml:
            user_cfg = parser.load(f_yaml)

    # Copy user-supplied configuration options into default runconfig
    helpers.deep_update(default_cfg, user_cfg)

    # Validate YAML values under groups dict
    validate_group_dict(default_cfg['runconfig']['groups'], workflow_name)

    return default_cfg

runconfig_to_bursts(cfg)

Return bursts based on parameters in given runconfig

Parameters:

Name Type Description Default
cfg SimpleNamespace

Configuration of bursts to be loaded.

required

Returns:

Name Type Description
_ list[Sentinel1BurstSlc]

List of bursts loaded according to given configuration.

Source code in src/compass/utils/runconfig.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def runconfig_to_bursts(cfg: SimpleNamespace) -> list[Sentinel1BurstSlc]:
    '''Return bursts based on parameters in given runconfig

    Parameters
    ----------
    cfg : SimpleNamespace
        Configuration of bursts to be loaded.

    Returns
    -------
    _ : list[Sentinel1BurstSlc]
        List of bursts loaded according to given configuration.
    '''
    error_channel = journal.error('runconfig.correlate_burst_to_orbit')

    # dict to store list of bursts keyed by burst_ids
    bursts = []

    # extract given SAFE zips to find bursts identified in cfg.burst_id
    for safe_file in cfg.input_file_group.safe_file_path:
        # get orbit file from directory of first orbit file
        orbit_path = get_orbit_file_from_dir(
            safe_file,
            Path(cfg.input_file_group.orbit_file_path[0]).parent)

        if not orbit_path:
            err_str = f"No orbit file correlates to safe file: {os.path.basename(safe_file)}"
            error_channel.log(err_str)
            raise ValueError(err_str)

        # from SAFE file mode, create dict of runconfig pol mode to polarization(s)
        safe_pol_mode = helpers.get_file_polarization_mode(safe_file)
        if safe_pol_mode == 'SV':
            mode_to_pols = {'co-pol':['VV']}
        elif safe_pol_mode == 'DV':
            mode_to_pols = {'co-pol':['VV'], 'cross-pol':['VH'], 'dual-pol':['VV', 'VH']}
        elif safe_pol_mode == 'SH':
            mode_to_pols = {'co-pol':['HH']}
        else:
            mode_to_pols = {'co-pol':['HH'], 'cross-pol':['HV'], 'dual-pol':['HH', 'HV']}
        pols = mode_to_pols[cfg.processing.polarization]

        # zip pol and IW subswath indices together
        i_subswaths = [1, 2, 3]
        pol_subswath_index_pairs = [(pol, i)
                                    for pol in pols for i in i_subswaths]

        # list of burst ID + polarization tuples
        # used to prevent reference repeats
        id_pols_found = []

        # list of burst IDs found to ensure all
        # used to ensure all IDs in config processed
        burst_ids_found = []

        # loop over pol and subswath index combinations
        for pol, i_subswath in pol_subswath_index_pairs:

            # loop over burst objs extracted from SAFE zip
            for burst in load_bursts(safe_file, orbit_path, i_subswath, pol):
                # get burst ID
                burst_id = str(burst.burst_id)

                # include ALL bursts if no burst IDs given
                # is burst_id wanted? skip if not given in config
                if (cfg.input_file_group.burst_id is not None and
                        burst_id not in cfg.input_file_group.burst_id):
                    continue

                # get polarization and save as tuple with burst ID
                pol = burst.polarization
                id_pol = (burst_id, pol)

                # has burst_id + pol combo been found?
                burst_id_pol_exist = id_pol in id_pols_found
                if not burst_id_pol_exist:
                    id_pols_found.append(id_pol)
                else:
                    continue

                # check if not a reference burst (radar grid workflow only)
                if 'reference_burst' in cfg.input_file_group.__dict__:
                    not_ref = not cfg.input_file_group.reference_burst.is_reference
                else:
                    not_ref = True

                # if not reference burst, then always ok to add
                # if reference burst, ok to add if id+pol combo does not exist
                # no duplicate id+pol combos for reference bursts
                if not_ref or not burst_id_pol_exist:
                    burst_ids_found.append(burst_id)
                    bursts.append(burst)

    # check if no bursts were found
    if not bursts:
        err_str = "Could not find any of the burst IDs in the provided safe files"
        error_channel.log(err_str)
        raise ValueError(err_str)

    return bursts

validate_group_dict(group_cfg, workflow_name)

Check and validate runconfig entries.

Parameters:

Name Type Description Default
group_cfg dict

Dictionary storing runconfig options to validate

required
Source code in src/compass/utils/runconfig.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def validate_group_dict(group_cfg: dict, workflow_name) -> None:
    """Check and validate runconfig entries.

    Parameters
    ----------
    group_cfg : dict
        Dictionary storing runconfig options to validate
    """
    error_channel = journal.error('runconfig.validate_group_dict')

    # Check 'input_file_group' section of runconfig
    input_group = group_cfg['input_file_group']
    # If is_reference flag is False, check that file path to reference
    # burst is assigned and valid (required by geo2rdr and resample)
    if workflow_name == 's1_cslc_radar':
        is_reference = input_group['reference_burst']['is_reference']
        if not is_reference:
            helpers.check_directory(input_group['reference_burst']['file_path'])

    # Check SAFE files
    run_pol_mode = group_cfg['processing']['polarization']
    safe_pol_modes = []
    for safe_file in input_group['safe_file_path']:
        # Check if files exists
        helpers.check_file_path(safe_file)

        # Get and save safe pol mode ('SV', 'SH', 'DH', 'DV')
        safe_pol_mode = helpers.get_file_polarization_mode(safe_file)
        safe_pol_modes.append(safe_pol_mode)

        # Raise error if given co-pol file and expecting cross-pol or dual-pol
        if run_pol_mode != 'co-pol' and safe_pol_mode in ['SV', 'SH']:
            err_str = f'{run_pol_mode} polarization lacks cross-pol in {safe_file}'
            error_channel.log(err_str)
            raise ValueError(err_str)

    # Check SAFE file pols consistency. i.e. no *H/*V with *V/*H respectively
    if len(safe_pol_modes) > 1:
        first_safe_pol_mode = safe_pol_modes[0][1]
        for safe_pol_mode in safe_pol_modes[1:]:
            if safe_pol_mode[1] != first_safe_pol_mode:
                err_str = 'SH/SV SAFE file mixed with DH/DV'
                error_channel.log(err_str)
                raise ValueError(err_str)

    for orbit_file in input_group['orbit_file_path']:
        helpers.check_file_path(orbit_file)

    # Check 'dynamic_ancillary_file_groups' section of runconfig
    # Check that DEM file exists and is GDAL-compatible
    dem_path = group_cfg['dynamic_ancillary_file_group']['dem_file']
    helpers.check_file_path(dem_path)
    helpers.check_dem(dem_path)

    # Check 'product_path_group' section of runconfig.
    # Check that directories herein have writing permissions
    product_path_group = group_cfg['product_path_group']
    helpers.check_write_dir(product_path_group['product_path'])
    helpers.check_write_dir(product_path_group['scratch_path'])
    helpers.check_write_dir(product_path_group['sas_output_file'])