Skip to content

geo_runconfig

GeoRunConfig dataclass

Bases: RunConfig

dataclass containing GSLC runconfig

Source code in src/compass/utils/geo_runconfig.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@dataclass(frozen=True)
class GeoRunConfig(RunConfig):
    '''dataclass containing GSLC runconfig'''
    # dict of geogrids associated to burst IDs
    geogrids: dict[str, GeoGridParameters]

    @classmethod
    def load_from_yaml(cls, yaml_runconfig: str, workflow_name: str) -> GeoRunConfig:
        """Initialize RunConfig class with options from given yaml file.

        Parameters
        ----------
        yaml_runconfig : str
            Path to yaml file containing the options to load or string contents
            of a runconfig
        workflow_name: str
            Name of the workflow for which uploading default options
        """
        error_channel = journal.error('runconfig.load_from_yaml')

        cfg = load_validate_yaml(yaml_runconfig, workflow_name)
        groups_cfg = cfg['runconfig']['groups']

        burst_database_file = groups_cfg['static_ancillary_file_group']['burst_database_file']
        if not os.path.isfile(burst_database_file):
            err_str = '{burst_database_file} not found'
            error_channel.log(err_str)
            raise FileNotFoundError(err_str)

        geocoding_dict = groups_cfg['processing']['geocoding']
        check_geocode_dict(geocoding_dict)

        # Check TEC file if not None.
        # The ionosphere correction will be applied only if
        # the TEC file is not None.
        tec_file_path = groups_cfg['dynamic_ancillary_file_group']['tec_file']
        if tec_file_path is not None:
            check_file_path(tec_file_path)
        # Check troposphere weather model file if not None. This
        # troposphere correction is applied only if this file is not None
        weather_model_path = groups_cfg['dynamic_ancillary_file_group'][
            'weather_model_file'
        ]
        if weather_model_path is not None:
            check_file_path(weather_model_path)

        # Convert runconfig dict to SimpleNamespace
        sns = wrap_namespace(groups_cfg)

        # Load bursts
        bursts = runconfig_to_bursts(sns)

        # Load geogrids
        dem_file = groups_cfg['dynamic_ancillary_file_group']['dem_file']
        if burst_database_file is None:
            geogrids = generate_geogrids(bursts, geocoding_dict, dem_file)
        else:
            geogrids = generate_geogrids_from_db(bursts, geocoding_dict,
                                                 dem_file, burst_database_file)

        # Empty reference dict for base runconfig class constructor
        empty_ref_dict = {}

        # For saving entire file with default fill-in as string to metadata.
        # Stop gap for writing dict to individual elements to HDF5 metadata
        user_plus_default_yaml_str = yaml.dump(cfg)

        # Get scratch and output paths
        output_paths = create_output_paths(sns, bursts)

        return cls(cfg['runconfig']['name'], sns, bursts, empty_ref_dict,
                   user_plus_default_yaml_str, output_paths, geogrids)

    @property
    def product_group(self) -> types.SimpleNamespace:
        return self.groups.product_path_group

    @property
    def product_type(self) -> dict:
        return self.groups.primary_executable.product_type

    @property
    def weather_model_file(self) -> str:
        return self.groups.dynamic_ancillary_file_group.weather_model_file

    @property
    def geocoding_params(self) -> types.SimpleNamespace:
        return self.groups.processing.geocoding

    @property
    def rdr2geo_params(self) -> types.SimpleNamespace:
        return self.groups.processing.rdr2geo

    @property
    def lut_params(self) -> types.SimpleNamespace:
        return self.groups.processing.correction_luts

    @property
    def quality_assurance_params(self) -> types.SimpleNamespace:
        return self.groups.quality_assurance

    @property
    def browse_image_params(self) -> types.SimpleNamespace:
        return self.groups.quality_assurance.browse_image

    @property
    def tropo_params(self) -> types.SimpleNamespace:
        return self.groups.processing.correction_luts.troposphere

    @property
    def output_params(self) -> types.SimpleNamespace:
        return self.groups.output

    def as_dict(self):
        ''' Convert self to dict for write to YAML/JSON

        Unable to dataclasses.asdict() because isce3 objects can not be pickled
        '''
        # convert to dict first then dump to yaml
        self_as_dict = super().as_dict()

        self_as_dict['geogrids']= {b_id:geogrid_as_dict(geogrid)
                                   for b_id, geogrid in self.geogrids.items()}

        return self_as_dict


    def to_file(self, dst, fmt:str):
        ''' Write self to file

        Parameter:
        ---------
        dst: file pointer
            File object to write metadata to
        fmt: ['yaml', 'json']
            Format of output
        '''
        self_as_dict = self.as_dict()

        self_as_dict['nodata'] = 'NO_DATA_VALUE'
        self_as_dict['input_data_ipf_version'] = '?'
        self_as_dict['isce3_version'] = isce3.__version__

        if fmt == 'yaml':
            yaml_obj = YAML(typ='safe')
            yaml_obj.dump(self_as_dict, dst)
        elif fmt == 'json':
            json.dumps(self_as_dict, dst, indent=4)
        else:
            raise ValueError(f'{fmt} unsupported. Only "json" or "yaml" supported')

as_dict()

Convert self to dict for write to YAML/JSON

Unable to dataclasses.asdict() because isce3 objects can not be pickled

Source code in src/compass/utils/geo_runconfig.py
160
161
162
163
164
165
166
167
168
169
170
171
def as_dict(self):
    ''' Convert self to dict for write to YAML/JSON

    Unable to dataclasses.asdict() because isce3 objects can not be pickled
    '''
    # convert to dict first then dump to yaml
    self_as_dict = super().as_dict()

    self_as_dict['geogrids']= {b_id:geogrid_as_dict(geogrid)
                               for b_id, geogrid in self.geogrids.items()}

    return self_as_dict

load_from_yaml(yaml_runconfig, workflow_name) classmethod

Initialize RunConfig class with options from given yaml file.

Parameters:

Name Type Description Default
yaml_runconfig str

Path to yaml file containing the options to load or string contents of a runconfig

required
workflow_name str

Name of the workflow for which uploading default options

required
Source code in src/compass/utils/geo_runconfig.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@classmethod
def load_from_yaml(cls, yaml_runconfig: str, workflow_name: str) -> GeoRunConfig:
    """Initialize RunConfig class with options from given yaml file.

    Parameters
    ----------
    yaml_runconfig : str
        Path to yaml file containing the options to load or string contents
        of a runconfig
    workflow_name: str
        Name of the workflow for which uploading default options
    """
    error_channel = journal.error('runconfig.load_from_yaml')

    cfg = load_validate_yaml(yaml_runconfig, workflow_name)
    groups_cfg = cfg['runconfig']['groups']

    burst_database_file = groups_cfg['static_ancillary_file_group']['burst_database_file']
    if not os.path.isfile(burst_database_file):
        err_str = '{burst_database_file} not found'
        error_channel.log(err_str)
        raise FileNotFoundError(err_str)

    geocoding_dict = groups_cfg['processing']['geocoding']
    check_geocode_dict(geocoding_dict)

    # Check TEC file if not None.
    # The ionosphere correction will be applied only if
    # the TEC file is not None.
    tec_file_path = groups_cfg['dynamic_ancillary_file_group']['tec_file']
    if tec_file_path is not None:
        check_file_path(tec_file_path)
    # Check troposphere weather model file if not None. This
    # troposphere correction is applied only if this file is not None
    weather_model_path = groups_cfg['dynamic_ancillary_file_group'][
        'weather_model_file'
    ]
    if weather_model_path is not None:
        check_file_path(weather_model_path)

    # Convert runconfig dict to SimpleNamespace
    sns = wrap_namespace(groups_cfg)

    # Load bursts
    bursts = runconfig_to_bursts(sns)

    # Load geogrids
    dem_file = groups_cfg['dynamic_ancillary_file_group']['dem_file']
    if burst_database_file is None:
        geogrids = generate_geogrids(bursts, geocoding_dict, dem_file)
    else:
        geogrids = generate_geogrids_from_db(bursts, geocoding_dict,
                                             dem_file, burst_database_file)

    # Empty reference dict for base runconfig class constructor
    empty_ref_dict = {}

    # For saving entire file with default fill-in as string to metadata.
    # Stop gap for writing dict to individual elements to HDF5 metadata
    user_plus_default_yaml_str = yaml.dump(cfg)

    # Get scratch and output paths
    output_paths = create_output_paths(sns, bursts)

    return cls(cfg['runconfig']['name'], sns, bursts, empty_ref_dict,
               user_plus_default_yaml_str, output_paths, geogrids)

to_file(dst, fmt)

Write self to file

Parameter:

dst: file pointer File object to write metadata to fmt: ['yaml', 'json'] Format of output

Source code in src/compass/utils/geo_runconfig.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def to_file(self, dst, fmt:str):
    ''' Write self to file

    Parameter:
    ---------
    dst: file pointer
        File object to write metadata to
    fmt: ['yaml', 'json']
        Format of output
    '''
    self_as_dict = self.as_dict()

    self_as_dict['nodata'] = 'NO_DATA_VALUE'
    self_as_dict['input_data_ipf_version'] = '?'
    self_as_dict['isce3_version'] = isce3.__version__

    if fmt == 'yaml':
        yaml_obj = YAML(typ='safe')
        yaml_obj.dump(self_as_dict, dst)
    elif fmt == 'json':
        json.dumps(self_as_dict, dst, indent=4)
    else:
        raise ValueError(f'{fmt} unsupported. Only "json" or "yaml" supported')