# -*- coding: utf-8 -*-
import os
import pyarts
import numpy as np
def all_cia_files(path):
out = []
for fil in os.listdir(path):
fil = os.path.abspath(os.path.join(path, fil))
if fil.endswith('.cia'):
out.append(fil)
return out
class header:
def __init__(self, line):
assert len(line) >= 20+10+10+7+7+10+6+27+3, \
f"This is not a header:\n'{line}'"
self.chem_sym = line[:20]
self.low_lim = line[20:20+10]
self.upp_lim = line[20+10:20+10+10]
self.num = line[20+10+10:20+10+10+7]
self.T = line[20+10+10+7:20+10+10+7+7]
self.max_cia = line[20+10+10+7+7:20+10+10+7+7+10]
self.res = line[20+10+10+7+7+10:20+10+10+7+7+10+6]
self.comment = line[20+10+10+7+7+10+6:20+10+10+7+7+10+6+27]
self.ref = line[20+10+10+7+7+10+6+27:20+10+10+7+7+10+6+27+3]
self.chem_sym = self.chem_sym.strip()
self.comment = self.comment.strip()
self.low_lim = float(self.low_lim)
self.upp_lim = float(self.upp_lim)
self.T = float(self.T)
self.max_cia = float(self.max_cia)
self.res = float(self.res)
self.num = int(self.num)
self.ref = int(self.ref)
def __repr__(self):
return f"{self.chem_sym} {self.low_lim}-{self.upp_lim} cm-1 at {self.T} K"
class data:
def __init__(self, num):
self.i = 0
self.n = num
self.v = np.zeros(shape=num) * np.nan
self.k = np.zeros(shape=num) * np.nan
def __call__(self, line):
s = line.split()
self.v[self.i] = float(s[0])
self.k[self.i] = float(s[1])
self.i += 1
return self.i < self.n
def __repr__(self):
return f"{self.n} datapoints"
def parse_cia_file(path):
a = open(path, 'r')
out = []
line = a.readline()
while line:
h = header(line)
d = data(h.num)
while d(a.readline()):
pass
out.append((h, d))
line = a.readline()
a.close()
return out
def parse_cia_files(path):
out = []
for fil in all_cia_files(path):
out.append(parse_cia_file(fil))
return out
class cia_record:
def __init__(self, h, d):
self.t = [h.T]
self.k = [d.k]
self.v = d.v
self.h = [h]
def __call__(self, h, d):
if len(d.v) != len(self.v) or np.any(d.v != self.v):
return False
self.t.append(h.T)
self.k.append(d.k)
self.h.append(h)
return True
def any(self): return len(self.t)
def fix(self):
x = np.argsort(self.t)
self.t = np.array(self.t)[x]
self.k = np.array(self.k)[x]
self.h = np.array(self.h)[x]
return self
def try_merge(self, other):
count = 0
for n in self.v:
count += n in other.v
# Must be complete overlap
if count != len(self.v) and count != len(other.v):
return other, False
print(f"Merge by extending zeros {self.h} and {other.h}")
k = np.zeros((len(self.t)+len(other.t),
max(len(self.v), len(other.v))))
v = self.v if count == len(self.v) else other.v
for i in range(len(self.t)):
for j in range(len(self.v)):
if self.v[j] in v:
k[i, j] = self.k[i, j]
for i in range(len(other.t)):
for j in range(len(other.v)):
if other.v[j] in v:
k[i, j] = other.k[i, j]
self.v = self.v if len(self.v) > len(other.v) else other.v
self.k = k
self.t = np.append(self.t, other.t)
self.h = np.append(self.h, other.h)
return self.fix(), True
def try_interp_extrap_merge(self, other):
sm = np.median(self.v)
om = np.median(other.v)
overlap = (sm > other.v[0] and sm < other.v[-1]
) or (om > self.v[0] and om < self.v[-1])
if not overlap:
return other, False
print(
f"Merge by interpolation and zero extension {self.h} and {other.h}")
x = np.unique(np.append(self.v, other.v))
k = []
for i in range(len(self.t)):
k.append(np.interp(x, self.v, self.k[i], left=0, right=0))
for i in range(len(other.t)):
k.append(np.interp(x, other.v, other.k[i], left=0, right=0))
self.v = x
self.k = k
self.t = np.append(self.t, other.t)
self.h = np.append(self.h, other.h)
return self.fix(), True
def __repr__(self):
return f"{self.h[0].chem_sym} ({len(self.t)}x{len(self.v)}) {self.v[0]}-{self.v[-1]} cm-1; {self.t[0]}-{self.t[-1]} K"
class false_cia_record:
def __init__(self): pass
def __call__(*args): return False
def any(self): return False
def sort_cia_data(datalist):
out = []
data = false_cia_record()
for x in datalist:
if not data(x[0], x[1]):
if data.any():
out.append(data.fix())
data = cia_record(x[0], x[1])
if data.any():
out.append(data.fix())
return out
def count(data):
out = {}
for spec in data:
for f in spec:
out[f.spec] = out.get(f.spec, 0) + 1
return out
def fix_missing(data, first=True):
n = len(data)
out = [data.pop(0)]
while len(data):
next = data.pop(0)
x, v = out[-1].try_merge(next)
if not v:
x, v = out[-1].try_interp_extrap_merge(next)
if v:
out[-1] = x
else:
out.append(x)
if first or n != len(out):
out = fix_missing(out[::-1], False)[::-1]
return out
def remove_negatives(data):
had = 0
for x in data:
count = (x.k < 0).sum()
if count:
f"Remove {count} negative values {x.h}"
x.k[x.k < 0] = 0
return data
[docs]
def hitran2arts(path):
""" Reads all Hitran .cia files in a folder and tries to convert them to
Arts format
When the Hitran file data contains two datasets and one data set has a
pure subset of the frequency grid of the other, these datasets are merged
with the missing data set to zero. A print() expression is called
informing that this has happened, starting with the words
"Merge by extending zeros" and finishing with a copy of the two datasets'
names/headers
When the Hitran file data contains two data sets that clearly overlap but
where one data set is not a complete subset of the other, a new frequency
grid as the combination of these two datasets' grids is formed. The
two sets of data are then interpolated onto the new frequency grid by
linear means. Extrapolation is not allowed so any data outside of the
range of the old frequency grid is set to zero. A print() expression is
called informing that this has happened, starting with the words
"Merge by interpolation and zero extension" and finishing with a copy of
the two datasets' names/headers
If the Hitran data contains negative values, these are set to zero.
print() informs how many negative values are removed.
If generating the CIARecord fails, a warning about this is printed.
Parameters
----------
path : Path line
A path to a folder with Hitran cia data
Returns
-------
aoc : dict
A list of all CIARecord that could be generated
"""
x = [sort_cia_data(x) for x in parse_cia_files(path)]
for i in range(len(x)):
x[i] = remove_negatives(fix_missing(x[i]))
aoc = {}
for specs in x:
out = []
spec = specs[0].h[0].chem_sym.replace("Air", "Bath")
for data in specs:
out.append(pyarts.arts.GriddedField2([pyarts.arts.convert.kaycm2freq(data.v), data.t],
100**-5 * data.k.T, ["Frequency", "Temperature"]))
try:
aoc[spec.replace('-', '-CIA-')] = pyarts.arts.CIARecord(
out, spec.split('-')[0], spec.split('-')[1])
except:
print("Fail for", spec)
return aoc