Skip to content

iono

download_ionex(date_str, tec_dir, sol_code='jpl', product_type='FINAL', interval='02H', date_fmt='%Y%m%d')

Download IGS vertical TEC files in IONEX format

Parameters:

Name Type Description Default
date_str

Date in date_fmt format

required
tec_dir

Local directory where to save downloaded files

required
sol_code

IGS TEC analysis center code

'jpl'
product_code

Either 'FINAL' or 'RAPID'

required
date_fmt

Date format code

'%Y%m%d'

Returns:

Name Type Description
fname_dst_uncomp str

Path to local uncompressed IONEX file

Source code in src/compass/utils/iono.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def download_ionex(date_str, tec_dir, sol_code='jpl', product_type='FINAL', interval='02H', date_fmt='%Y%m%d'):
    '''
    Download IGS vertical TEC files in IONEX format

    Parameters
    ----------
    date_str: str
        Date in date_fmt format
    tec_dir: str
        Local directory where to save downloaded files
    sol_code: str
        IGS TEC analysis center code
    product_code: str
        Either 'FINAL' or 'RAPID'
    date_fmt: str
        Date format code

    Returns
    -------
    fname_dst_uncomp: str
        Path to local uncompressed IONEX file
    '''
    info_channel = journal.info('download_ionex')
    err_channel = journal.info('download_ionex')

    # Approximate date from which the new IONEX file name format needs to be used
    # https://cddis.nasa.gov/Data_and_Derived_Products/GNSS/atmospheric_products.html
    new_ionex_filename_format_from = dt.datetime.fromisoformat('2023-10-18')

    # get the source (remote) and destination (local) file path/url
    date_tec = dt.datetime.strptime(date_str, date_fmt)

    # determine the initial format to try
    use_new_ionex_filename_format = date_tec >= new_ionex_filename_format_from

    kwargs = {
        "sol_code": sol_code,
        'product_type': product_type,
        "date_fmt": date_fmt,
        "intv": interval,
        "is_new_filename_format": None,
        "check_if_exists": True
        }

    # Iterate over both possible formats and break if file retrieved.
    for fname_fmt in [use_new_ionex_filename_format, not use_new_ionex_filename_format]:
        kwargs['is_new_filename_format'] = fname_fmt

        fname_src = get_ionex_filename(date_str, tec_dir=None, **kwargs)
        basename_dst_uncomp = os.path.basename(fname_src[:fname_src.rfind('.')])
        fname_dst_uncomp = os.path.join(tec_dir, basename_dst_uncomp)
        ionex_zip_extension = fname_src[fname_src.rfind('.'):]
        fname_dst = fname_dst_uncomp + ionex_zip_extension

        ionex_download_successful = download_url(fname_src, fname_dst)

        if ionex_download_successful:
            info_channel.log(f'Downloaded IONEX file: {fname_dst}')
            break

    if not ionex_download_successful:
        err_channel.log(f'Failed to download IONEX file: {fname_src}')
        raise RuntimeError

    # uncompress
    # if output file 1) does not exist or 2) smaller than 400k in size or 3) older
    if (not os.path.isfile(fname_dst_uncomp)
            or os.path.getsize(fname_dst_uncomp) < 400e3
            or os.path.getmtime(fname_dst_uncomp) < os.path.getmtime(
                fname_dst)):
        cmd = ["gzip",  "--force", "--decompress", fname_dst]
        cmd_str = ' '.join(cmd)
        info_channel.log(f'Execute command: {cmd_str}')

        try:
            subprocess.run(cmd, capture_output=True, text=True, check=True)

        except subprocess.CalledProcessError as e:
            err_channel.log(f'Failed to uncompress IONEX file: {fname_dst}')
            err_channel.log(f'Command: {cmd_str}')
            err_channel.log(f'Error message: {e.stderr}')
            raise RuntimeError

    return fname_dst_uncomp

get_ionex_filename(date_str, tec_dir=None, sol_code='jpl', product_type='FINAL', intv='02H', date_fmt='%Y%m%d', is_new_filename_format=True, check_if_exists=False)

Get the path to the IONEX file, or URL to the compressed IONEX. Supports both the old and new file name formats. To find IONEX file locally: provide the directory to tec_dir To find IONEX file remotely: Leave tec_dir as None

Parameters:

Name Type Description Default
date_str

Date in the 'date_fmt' format

required
tec_dir

Directory where to store downloaded TEC files, If None, the function will look for the file in the remote directory

None
sol_code

GIM analysis center code in 3 alphabetic characters (values: cod, esa, igs, jpl, upc, uqr)

'jpl'
product_code

Either 'FINAL' or 'RAPID'

required
date_fmt

Date format string

'%Y%m%d'
is_new_file_format

Flag whether not not to use the new IONEX TEC file format. Defaults to True

required
check_if_exists

Flag whether the IONEX file exists in the local directory or in the remote repository.

False

Returns:

Name Type Description
tec_file str

Path to the local uncompressed (or remote compressed) TEC file

Source code in src/compass/utils/iono.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def get_ionex_filename(date_str, tec_dir=None, sol_code='jpl', product_type='FINAL', intv='02H',
                       date_fmt='%Y%m%d',
                       is_new_filename_format=True,
                       check_if_exists=False):
    '''
    Get the path to the IONEX file, or URL to the compressed IONEX.
    Supports both the old and new file name formats.
    To find IONEX file locally: provide the directory to `tec_dir`
    To find IONEX file remotely: Leave `tec_dir` as None

    Parameters
    ----------
    date_str: str
        Date in the 'date_fmt' format
    tec_dir: str
        Directory where to store downloaded TEC files, If None, the function
        will look for the file in the remote directory
    sol_code: str
        GIM analysis center code in 3 alphabetic characters
        (values: cod, esa, igs, jpl, upc, uqr)
    product_code: str
        Either 'FINAL' or 'RAPID'
    date_fmt: str
        Date format string
    is_new_file_format: bool
        Flag whether not not to use the new IONEX TEC file format. Defaults to True
    check_if_exists: bool
        Flag whether the IONEX file exists in the local directory or in the remote repository.

    Returns
    -------
    tec_file: str
        Path to the local uncompressed (or remote compressed) TEC file
    '''

    dd = dt.datetime.strptime(date_str, date_fmt)
    doy = f'{dd.timetuple().tm_yday:03d}'
    yy_full = str(dd.year)
    yy = yy_full[2:4]

    # file name base
    # Keep both the old- and new- formats of the file names
    fname_list = [f"{sol_code.lower()}g{doy}0.{yy}i.Z",
                  f'{sol_code.upper()}0OPSFIN_{yy_full}{doy}0000_01D_{intv}_GIM.INX.gz']

    # sol_code specific changes
    if sol_code.lower() == 'gfz':
        fname_list[1] = fname_list[1].replace('_GIM.INX.gz', '_ION.IOX.gz')

    if product_type.upper() == 'RAPID':
        rapid_sol_code_old = sol_code.lower()[:-1] + 'r'
        fname_list[0] = fname_list[0].replace(sol_code.lower(), rapid_sol_code_old)
        fname_list[1] = fname_list[1].replace('0OPSFIN', '0OPSRAP')

    # Decide which file name format to try first
    if is_new_filename_format:
        fname_list.reverse()

    for fname in fname_list:
        # This initial value in the flag will make the for loop to choose the
        # first format in the list when `check_if_exists` is turned off
        flag_ionex_exists = True
        # full path
        if tec_dir is not None:
            # local uncompressed file path
            tec_file = os.path.join(tec_dir, fname[:fname.rfind('.')])
            if check_if_exists:
                flag_ionex_exists = os.path.exists(tec_file)
        else:
            # remote compressed file path
            url_dir = "https://cddis.nasa.gov/archive/gnss/products/ionex"
            tec_file = os.path.join(url_dir, str(dd.year), doy, fname)
            if check_if_exists:
                flag_ionex_exists = check_url(tec_file)

        if flag_ionex_exists:
            break

    if flag_ionex_exists:
        return tec_file
    else:
        return None

get_ionex_height(tec_file)

Get the height of the thin-shell ionosphere from IONEX file

Parameters:

Name Type Description Default
tec_file

Path to the TEC file in IONEX format

required

Returns:

Name Type Description
iono_hgt float

Height above the surface in meters

Source code in src/compass/utils/iono.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def get_ionex_height(tec_file):
    '''
    Get the height of the thin-shell ionosphere from IONEX file

    Parameters
    ----------
    tec_file: str
        Path to the TEC file in IONEX format

    Returns
    -------
    iono_hgt: float
        Height above the surface in meters
    '''

    with open(tec_file) as f:
        lines = f.readlines()
        for line in lines:
            if line.strip().endswith('DHGT'):
                ion_hgt = float(line.split()[0])
                break
    return ion_hgt

get_ionex_value(tec_file, utc_sec, lat, lon, interp_method='linear3d', rotate_tec_map=False)

Get the TEC value from input IONEX file for the input lat/lon/datetime. Reference: Schaer, S., Gurtner, W., & Feltens, J. (1998). IONEX: The ionosphere map exchange format version 1.1. Paper presented at the Proceedings of the IGS AC workshop, Darmstadt, Germany.

Parameters:

Name Type Description Default
tec_file

Path of local TEC file

required
utc_sec

UTC time of the day in seconds

required
lat

Latitude in degrees

required
lon

Longitude in degrees

required
interp_method

Interpolation method (nearest, linear, linear2d, bilinear, linear3d, trilinear)

'linear3d'
rotate_tec_map

Rotate the TEC map along the SUN direction (for "interp_method = linear3d" only)

False

Returns:

Name Type Description
tec_val float or 1D np.ndarray

Vertical TEC value in TECU

Source code in src/compass/utils/iono.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_ionex_value(tec_file, utc_sec, lat, lon,
                    interp_method='linear3d', rotate_tec_map=False):
    '''
    Get the TEC value from input IONEX file for the input lat/lon/datetime.
    Reference:
        Schaer, S., Gurtner, W., & Feltens, J. (1998). IONEX: The ionosphere map exchange format
        version 1.1. Paper presented at the Proceedings of the IGS AC workshop, Darmstadt, Germany.
    Parameters
    ----------
    tec_file: str
        Path of local TEC file
    utc_sec: float or 1D np.ndarray
        UTC time of the day in seconds
    lat: float or 1D np.ndarray
        Latitude in degrees
    lon: float or 1D np.ndarray
        Longitude in degrees
    interp_method: str
       Interpolation method (nearest, linear, linear2d, bilinear, linear3d, trilinear)
    rotate_tec_map: bool
        Rotate the TEC map along the SUN direction
        (for "interp_method = linear3d" only)

    Returns
    -------
    tec_val: float or 1D np.ndarray
        Vertical TEC value in TECU
    '''
    error_channel = journal.error('get_ionex_value')

    def interp_3d_rotate(interpfs, mins, lats, lons, utc_min, lat, lon):
        ind0 = np.where((mins - utc_min) <= 0)[0][-1]
        ind1 = ind0 + 1
        lon0 = lon + (utc_min - mins[ind0]) * 360. / (24. * 60.)
        lon1 = lon + (utc_min - mins[ind1]) * 360. / (24. * 60.)
        tec_val0 = interpfs[ind0](lon0, lat)
        tec_val1 = interpfs[ind1](lon1, lat)
        tec_val = ((mins[ind1] - utc_min) / (mins[ind1] - mins[ind0]) * tec_val0
                   + (utc_min - mins[ind0]) / (
                           mins[ind1] - mins[ind0]) * tec_val1)
        return tec_val

    # time info
    utc_min = utc_sec / 60.

    # read TEC file
    mins, lats, lons, tec_maps = read_ionex(tec_file)[:4]

    # resample
    if interp_method == 'nearest':
        lon_ind = np.abs(lons - lon).argmin()
        lat_ind = np.abs(lats - lat).argmin()
        time_ind = np.abs(mins - utc_min).argmin()
        tec_val = tec_maps[time_ind, lat_ind, lon_ind]

    elif interp_method in ['linear', 'linear2d', 'bilinear']:
        time_ind = np.abs(mins.reshape(-1, 1) - utc_min).argmin(axis=0)

        if isinstance(utc_min, np.ndarray):
            num_pts = len(utc_min)
            tec_val = np.zeros(num_pts, dtype=np.float32)
            for i in range(num_pts):
                tec_val[i] = interpolate.interp2d(
                    x=lons,
                    y=lats,
                    z=tec_maps[time_ind[i], :, :],
                    kind='linear',
                )(lon[i], lat[i])
        else:
            tec_val = interpolate.interp2d(
                x=lons,
                y=lats,
                z=tec_maps[time_ind[0], :, :],
                kind='linear',
            )(lon, lat)
    elif interp_method in ['linear3d', 'trilinear']:
        if not rotate_tec_map:
            # option 1: interpolate between consecutive TEC maps
            # testings shows better agreement with SAR obs than option 2.
            tec_val = interpolate.interpn(
                points=(mins, np.ascontiguousarray(np.flip(lats)), lons),
                values=np.flip(tec_maps, axis=1),
                xi=(utc_min, lat, lon),
                method='linear',
            )
        else:
            # option 2: interpolate between consecutive rotated TEC maps
            # reference: equation (3) in Schaer and Gurtner (1998)

            # prepare interpolation functions in advance to speed up
            interpfs = []
            for i in range(len(mins)):
                interpfs.append(
                    interpolate.interp2d(
                        x=lons,
                        y=lats,
                        z=tec_maps[i, :, :],
                        kind='linear',
                    ),
                )

            if isinstance(utc_min, np.ndarray):
                num_pts = len(utc_min)
                tec_val = np.zeros(num_pts, dtype=np.float32)
                for i in range(num_pts):
                    tec_val[i] = interp_3d_rotate(
                        interpfs,
                        mins, lats, lons,
                        utc_min[i], lat[i], lon[i],
                    )
            else:
                tec_val = interp_3d_rotate(
                    interpfs,
                    mins, lats, lons,
                    utc_min, lat, lon,
                )

    else:
        msg = f'Un-recognized interp_method input: {interp_method}!'
        msg += '\nSupported inputs: nearest, linear2d, linear3d.'
        error_channel.log(msg)
        raise ValueError(msg)

    return tec_val

ionosphere_delay(utc_time, wavelength, tec_file, lon_arr, lat_arr, inc_arr)

Calculate ionosphere delay for geolocation

Parameters:

Name Type Description Default
utc_time

UTC time to calculate the ionosphere delay

required
wavelength

Wavelength of the signal

required
tec_file

Path to the TEC file

required
lon_arr

array of longitude in radar grid. unit: degrees

required
lat_arr

array of latitude in radar grid. unit: degrees

required
inc_arr

array of incidence angle in radar grid. unit: degrees

required

Returns:

Name Type Description
los_iono_delay ndarray

Ionospheric delay in line of sight in meters

Source code in src/compass/utils/iono.py
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def ionosphere_delay(utc_time, wavelength,
                     tec_file, lon_arr, lat_arr, inc_arr):
    '''
    Calculate ionosphere delay for geolocation

    Parameters
    ----------
    utc_time: datetime.datetime
        UTC time to calculate the ionosphere delay
    wavelength: float
        Wavelength of the signal
    tec_file: str
        Path to the TEC file
    lon_arr: numpy.ndarray
        array of longitude in radar grid. unit: degrees
    lat_arr: numpy.ndarray
        array of latitude in radar grid. unit: degrees
    inc_arr: numpy.ndarray
        array of incidence angle in radar grid. unit: degrees

    Returns
    -------
    los_iono_delay: np.ndarray
        Ionospheric delay in line of sight in meters
    '''
    warning_channel = journal.warning('ionosphere_delay')

    if not tec_file:
        warning_channel.log('"tec_file" was not provided. '
                            'Ionosphere correction will not be applied.')
        return np.zeros(lon_arr.shape)

    if not os.path.exists(tec_file):
        raise RuntimeError(f'IONEX file was not found: {tec_file}')

    utc_tod_sec = (utc_time.hour * 3600.0
                   + utc_time.minute * 60.0
                   + utc_time.second)

    ionex_val = get_ionex_value(tec_file,
                                utc_tod_sec,
                                lat_arr.flatten(),
                                lon_arr.flatten())

    ionex_val = ionex_val.reshape(lon_arr.shape)

    freq_sensor = isce3.core.speed_of_light / wavelength

    # Constant to convert the TEC product to electrons / m2
    ELECTRON_PER_SQM = ionex_val * 1e16

    # Constant in m3/s2
    K = 40.31

    los_iono_delay = (K * ELECTRON_PER_SQM / freq_sensor**2
                           / np.cos(np.deg2rad(inc_arr)))

    return los_iono_delay

read_ionex(tec_file)

Read Total Electron Content (TEC) file in IONEX format

Parameters:

Name Type Description Default
tec_file

Path to the TEC file in IONEX format

required

Returns:

Name Type Description
mins ndarray

1D array with time of the day in minutes

lats ndarray

1D array with latitude in degrees

lons ndarray

1D array with longitude in degrees

tec_maps ndarray

3D array with vertical TEC in TECU

rms_maps ndarray

3d array with vertical TEC RMS in TECU

Source code in src/compass/utils/iono.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def read_ionex(tec_file):
    '''
    Read Total Electron Content (TEC) file in
    IONEX format

    Parameters
    ----------
    tec_file: str
        Path to the TEC file in IONEX format

    Returns
    -------
    mins: np.ndarray
        1D array with time of the day in minutes
    lats: np.ndarray
        1D array with latitude in degrees
    lons: np.ndarray
        1D array with longitude in degrees
    tec_maps: np.ndarray
        3D array with vertical TEC in TECU
    rms_maps: np.ndarray
        3d array with vertical TEC RMS in TECU
    '''

    # functions for parsing strings from ionex file
    # link: https://github.com/daniestevez/jupyter_notebooks/blob/master/IONEX.ipynb
    def parse_map(tec_map_str, key='TEC', exponent=-1):
        tec_map_str = re.split(f'.*END OF {key} MAP', tec_map_str)[0]
        tec_map = [np.fromstring(x, sep=' ') for x in
                   re.split('.*LAT/LON1/LON2/DLON/H\\n', tec_map_str)[1:]]
        return np.stack(tec_map) * 10 ** exponent

    # read IONEX file
    with open(tec_file) as f:
        fc = f.read()

        # read header
        header = fc.split('END OF HEADER')[0].split('\n')
        for line in header:
            if line.strip().endswith('# OF MAPS IN FILE'):
                num_map = int(line.split()[0])
            elif line.strip().endswith('DLAT'):
                lat0, lat1, lat_step = (float(x) for x in line.split()[:3])
            elif line.strip().endswith('DLON'):
                lon0, lon1, lon_step = (float(x) for x in line.split()[:3])
            elif line.strip().endswith('EXPONENT'):
                exponent = float(line.split()[0])

        # spatial coordinates
        num_lat = (lat1 - lat0) // lat_step + 1
        num_lon = (lon1 - lon0) // lon_step + 1
        lats = np.arange(lat0, lat0 + num_lat * lat_step, lat_step)
        lons = np.arange(lon0, lon0 + num_lon * lon_step, lon_step)

        # time stamps in minutes
        min_step = 24 * 60 / (num_map - 1)
        mins = np.arange(0, num_map * min_step, min_step)

        # read TEC and its RMS maps
        tec_maps = np.array([parse_map(t, key='TEC', exponent=exponent)
                             for t in fc.split('START OF TEC MAP')[1:]],
                            dtype=np.float32)
        rms_maps = np.array([parse_map(t, key='RMS', exponent=exponent)
                             for t in fc.split('START OF RMS MAP')[1:]],
                            dtype=np.float32)

    return mins, lats, lons, tec_maps, rms_maps