Skip to content

Data

data

Downloaders and preprocessers for third-party data.

airports

List of airports, from ourairports

Requires extras:

  • httpx, polars

SCHEMA_AIRPORTS

SCHEMA_AIRPORTS = {
    "id": Int32(),
    "ident": String(),
    "type": String(),
    "name": String(),
    "latitude_deg": Float32(),
    "longitude_deg": Float32(),
    "elevation_ft": Int16(),
    "continent": String(),
    "iso_country": String(),
    "iso_region": String(),
    "municipality": String(),
    "scheduled_service": String(),
    "gps_code": String(),
    "iata_code": String(),
    "local_code": String(),
    "home_link": String(),
    "wikipedia_link": String(),
    "keywords": String(),
}

Schema for airports dataset.

scan_airports

scan_airports(fp: Path) -> LazyFrame

Lazily load list of airports from parquet file.

Schema: aerocore.data.airports.SCHEMA_AIRPORTS

Source code in src/aerocore/data/airports.py
42
43
44
45
46
47
48
49
50
51
52
def scan_airports(fp: Path) -> pl.LazyFrame:
    """
    Lazily load list of airports from parquet file.

    Schema: [aerocore.data.airports.SCHEMA_AIRPORTS][]
    """
    if not fp.exists():
        raise FileNotFoundError(
            "cannot find airports\nhelp: download it first."
        )
    return pl.scan_parquet(fp, schema=SCHEMA_AIRPORTS)

URL_BASE

URL_BASE = (
    "https://davidmegginson.github.io/ourairports-data"
)

fetch_airports

fetch_airports(client: AsyncClient) -> DataFrame

Download all airports from ourairports.

Schema: aerocore.data.airports.SCHEMA_AIRPORTS

Source code in src/aerocore/data/airports.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def fetch_airports(client: httpx.AsyncClient) -> pl.DataFrame:
    """
    Download all airports from ourairports.

    Schema: [aerocore.data.airports.SCHEMA_AIRPORTS][]
    """
    response = await client.get(f"{URL_BASE}/airports.csv")
    data = BytesIO(response.content)

    airports = (
        pl.read_csv(
            data, schema=SCHEMA_AIRPORTS, truncate_ragged_lines=True
        ).cast(SCHEMA_AIRPORTS)  # type: ignore
    )
    return airports

engine_emissions

ICAO Aircraft Engine Emissions Databank

Requires extras:

  • httpx, polars

URL_EMISSIONS

URL_EMISSIONS = (
    "https://www.easa.europa.eu/en/downloads/131424/en"
)

EmissionsData

Bases: NamedTuple

data
data: DataFrame
schema
schema: DataFrame

fetch_emissions_data

fetch_emissions_data(client: AsyncClient) -> EmissionsData
Source code in src/aerocore/data/engine_emissions.py
43
44
45
async def fetch_emissions_data(client: httpx.AsyncClient) -> EmissionsData:
    response_content = await _fetch_data(client)
    return _parse_data(response_content)

era5

Google Research's Analysis-Ready & Cloud Optimized (ARCO) ERA5 dataset

Format: netcdf, indexed by the specific date and pressure level.

See:

Data License: Copernicus license

Requires extras:

  • httpx, polars
  • gcloud CLI to be installed and authenticated

GOOGLE_STORAGE_URI

GOOGLE_STORAGE_URI = "gs://gcp-public-data-arco-era5/raw/date-variable-pressure_level"

PRESSURE_LEVELS

PRESSURE_LEVELS: PressureHPA[tuple[int, ...]] = (
    *(range(100, 275, 25)),
    *(range(300, 750, 50)),
    *(range(750, 1025, 25)),
)

logger

logger = getLogger(__name__)

EcmwfParameter

Bases: NamedTuple

id_
id_: int
name
name: str
short_name
short_name: str
quantity
quantity: str | object

VARIABLES

VARIABLES: list[EcmwfParameter] = [
    EcmwfParameter(
        248,
        "fraction_of_cloud_cover",
        "cc",
        Dimensionless("fraction"),
    ),
    EcmwfParameter(
        129, "geopotential", "z", M**2 * S**-2
    ),
    EcmwfParameter(
        203,
        "ozone_mass_mixing_ratio",
        "o3",
        Dimensionless("mass_mixing_ratio"),
    ),
    EcmwfParameter(60, "potential_vorticity", "pv", S**-1),
    EcmwfParameter(
        247,
        "specific_cloud_ice_water_content",
        "ciwc",
        Dimensionless("mass_mixing_ratio"),
    ),
    EcmwfParameter(
        246,
        "specific_cloud_liquid_water_content",
        "clwc",
        Dimensionless("mass_mixing_ratio"),
    ),
    EcmwfParameter(
        133,
        "specific_humidity",
        "q",
        Dimensionless("mass_mixing_ratio"),
    ),
    EcmwfParameter(
        130, "temperature", "t", STATIC_TEMPERATURE(K)
    ),
    EcmwfParameter(
        131, "u_component_of_wind", "u", WIND_SPEED(M_PERS)
    ),
    EcmwfParameter(
        132, "v_component_of_wind", "v", WIND_SPEED(M_PERS)
    ),
    EcmwfParameter(
        135, "vertical_velocity", "w", PA * S**-1
    ),
]

Available variables under the raw bucket.

VARIABLES_MAP

VARIABLES_MAP = {(name): (short_name) for v in VARIABLES}

dates

dates(
    start: datetime, end: datetime
) -> Generator[str, None, None]

Generate dates in the format YYYY/MM/DD from start to end, inclusive.

Source code in src/aerocore/data/era5.py
102
103
104
105
106
107
108
109
def dates(start: datetime, end: datetime) -> Generator[str, None, None]:
    """
    Generate dates in the format `YYYY/MM/DD` from start to end, inclusive.
    """
    curr = start
    while curr <= end:
        yield curr.strftime("%Y/%m/%d")
        curr += timedelta(days=1)

fetch_weather

fetch_weather(
    date_start: datetime = datetime(2023, 2, 1, tzinfo=utc),
    date_end: datetime = datetime(2023, 2, 1, tzinfo=utc),
    *,
    base_dir: Path,
    variables: list[str] = list(keys()),
    pressure_levels: tuple[int, ...] = PRESSURE_LEVELS,
    gs_base: str = GOOGLE_STORAGE_URI,
) -> None

Recursively download all global ERA5 data for the specified date interval, pressure levels and variables as NetCDF files.

The directory structure will be mirrored as: {base_dir}/{YYYY}/{MM}/{DD}/{variable_name}/{pressure_level}.nc.

Source code in src/aerocore/data/era5.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def fetch_weather(
    date_start: datetime = datetime(2023, 2, 1, tzinfo=pytz.utc),
    date_end: datetime = datetime(2023, 2, 1, tzinfo=pytz.utc),
    *,
    base_dir: Path,
    variables: list[str] = list(VARIABLES_MAP.keys()),
    pressure_levels: tuple[int, ...] = PRESSURE_LEVELS,
    gs_base: str = GOOGLE_STORAGE_URI,
) -> None:
    """
    Recursively download all global ERA5 data for the specified date
    interval, pressure levels and variables as NetCDF files.

    The directory structure will be mirrored as:
    `{base_dir}/{YYYY}/{MM}/{DD}/{variable_name}/{pressure_level}.nc`.
    """
    for date in dates(date_start, date_end):
        for variable in variables:
            path_out = base_dir / date / variable
            path_out.mkdir(parents=True, exist_ok=True)

            queue = []
            for level in pressure_levels:
                fp_relative = Path(date) / variable / f"{level}.nc"
                if (base_dir / fp_relative).is_file():
                    continue
                queue.append(f"{gs_base}/{fp_relative}".encode())
            if not queue:
                logger.info(f"{path_out}: skipping, all exists")
                continue
            logger.info(f"{path_out}: downloading {len(queue)}")
            subprocess.check_output(
                ["gcloud", "storage", "cp", "-I", str(path_out)],
                input=b"\n".join(queue),
            )

concat_dataset

concat_dataset(
    variable: str, base_dir_date: Path
) -> Dataset

Concatenates all pressure levels for a given variable and date into a single dataset

Example:

<xarray.Dataset> Size: 5GB
Dimensions:    (isobaricInhPa: 27, time: 24, latitude: 721, longitude: 1440)
Coordinates:
* longitude    (longitude) float32 6kB 0.0 0.25 0.5 ... 359.2 359.5 359.8
* latitude     (latitude) float32 3kB 90.0 89.75 89.5 ... -89.5 -89.75 -90.0
* time         (time) datetime64[ns] 192B 2023-02-01 ... 2023-02-01T23:00:00
* isobaricInhPa  (isobaricInhPa) int64 216B 100 1000 125 150 ... 925 950 975
Data variables:
    z      (isobaricInhPa, time, latitude, longitude) float64 5GB dask.array
    <chunksize=(1, 24, 721, 1440), meta=np.ndarray>
Attributes:
    Conventions:  CF-1.6
    history:  2023-06-24 08:54:57 GMT by grib_to_netcdf-2.25.1: /opt/ecmw...
Source code in src/aerocore/data/era5.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def concat_dataset(
    variable: str,
    base_dir_date: Path,
) -> xr.Dataset:
    """
    Concatenates all pressure levels for a given variable and date
    into a single dataset

    Example:

    ```txt
    <xarray.Dataset> Size: 5GB
    Dimensions:    (isobaricInhPa: 27, time: 24, latitude: 721, longitude: 1440)
    Coordinates:
    * longitude    (longitude) float32 6kB 0.0 0.25 0.5 ... 359.2 359.5 359.8
    * latitude     (latitude) float32 3kB 90.0 89.75 89.5 ... -89.5 -89.75 -90.0
    * time         (time) datetime64[ns] 192B 2023-02-01 ... 2023-02-01T23:00:00
    * isobaricInhPa  (isobaricInhPa) int64 216B 100 1000 125 150 ... 925 950 975
    Data variables:
        z      (isobaricInhPa, time, latitude, longitude) float64 5GB dask.array
        <chunksize=(1, 24, 721, 1440), meta=np.ndarray>
    Attributes:
        Conventions:  CF-1.6
        history:  2023-06-24 08:54:57 GMT by grib_to_netcdf-2.25.1: /opt/ecmw...
    ```
    """

    weather_variable = base_dir_date / variable
    weather_variable_fps = list(
        sorted(
            weather_variable.glob("*.nc"),
            key=lambda x: int(x.stem),
            reverse=True,
        )
    )
    logger.debug(
        f"reading {variable=}, found {len(weather_variable_fps)} nc files"
    )

    # NOTE: the pressure dimension is not included in each file - we generate
    # placeholders to be later overwritten
    def add_dummy_pressure_dim(ds: xr.Dataset) -> xr.Dataset:
        ds = ds.expand_dims(isobaricInhPa=[random.uniform(100, 1000)])
        return ds

    ds = xr.open_mfdataset(
        weather_variable_fps,
        engine="netcdf4",
        concat_dim="isobaricInhPa",
        combine="nested",
        preprocess=add_dummy_pressure_dim,
    )
    ds.assign_coords(
        isobaricInhPa=[int(fp.stem) for fp in weather_variable_fps]
    )

    return ds

build_path

build_path(
    base_dir: Path, year: int, month: int, day: int
) -> Path
Source code in src/aerocore/data/era5.py
209
210
def build_path(base_dir: Path, year: int, month: int, day: int) -> Path:
    return base_dir / f"{year:04d}" / f"{month:02d}" / f"{day:02d}"

get_data_for_trajectory

get_data_for_trajectory(
    trajectory: LazyFrame,
    *,
    base_dir: Path,
    year: int,
    month: int,
    day: int,
) -> LazyFrame

Extract weather data for the given trajectory

Returns:

Type Description
LazyFrame

a lazyframe with the weather data

Source code in src/aerocore/data/era5.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def get_data_for_trajectory(
    trajectory: pl.LazyFrame,
    *,
    base_dir: Path,
    year: int,
    month: int,
    day: int,
) -> pl.LazyFrame:
    """
    Extract weather data for the given trajectory

    :return: a lazyframe with the weather data
    """
    base_dir_date = build_path(base_dir, year, month, day)
    time, longitude, latitude, alt = trajectory.select(
        pl.from_epoch(pl.col("timestamp"), time_unit="s"),  # datetime64
        (pl.col("longitude").degrees() + 180),  # [0, 360]
        pl.col("latitude").degrees(),  # [-90, 90]
        pl.col("altitude"),  # meters
    ).collect()

    atmos = atmosphere(alt.to_numpy(), delta_temperature=0, xp=np)

    times_ = xr.DataArray(time.to_numpy(), dims=["points"])
    lons_ = xr.DataArray(longitude, dims=["points"])
    lats_ = xr.DataArray(latitude, dims=["points"])
    pressures_ = xr.DataArray(atmos.pressure / 100, dims=["points"])  # hPa

    weather = {}
    for variable_name, variable_key in VARIABLES_MAP.items():
        ds = concat_dataset(variable_name, base_dir_date)
        values = ds.interp(
            time=times_,
            latitude=lats_,
            longitude=lons_,
            isobaricInhPa=pressures_,
            kwargs={"fill_value": None},
        )
        values_np = values[variable_key].values
        weather[variable_name] = values_np

    lf = pl.LazyFrame(weather)
    return lf

aircraft_types

List of aircraft types, from ICAO DOC8643

Requires extras:

  • httpx, polars

SCHEMA_AIRCRAFT_TYPES

SCHEMA_AIRCRAFT_TYPES = {
    "ModelFullName": String(),
    "Description": String(),
    "WTC": Enum(["H", "M", "L", "J", "L/M"]),
    "WTG": Enum(["E", "Z", "F", "C", "D", "G", "A", "B"]),
    "Designator": String(),
    "ManufacturerCode": String(),
    "ShowInPart3Only": Boolean(),
    "AircraftDescription": Enum(
        [
            "Helicopter",
            "SeaPlane",
            "LandPlane",
            "Tiltrotor",
            "Gyrocopter",
            "Amphibian",
        ]
    ),
    "EngineCount": String(),
    "EngineType": Enum(
        [
            "Piston",
            "Turboprop/Turboshaft",
            "Jet",
            "Rocket",
            "Electric",
        ]
    ),
}

Schema for aircraft types dataset.

SCHEMA_MANUFACTURERS

SCHEMA_MANUFACTURERS = {
    "Code": String(),
    "Names": List(String()),
    "StateName": String(),
}

Schema for manufacturers dataset.

URL_BASE_DOC8643

URL_BASE_DOC8643 = 'https://doc8643.icao.int/External'

fetch_aircraft_types

fetch_aircraft_types(client: AsyncClient) -> DataFrame
Source code in src/aerocore/data/aircraft_types.py
63
64
65
async def fetch_aircraft_types(client: httpx.AsyncClient) -> pl.DataFrame:
    df = await _post_and_parse_json(client, f"{URL_BASE_DOC8643}/AircraftTypes")
    return df.cast(SCHEMA_AIRCRAFT_TYPES)  # type: ignore

fetch_manufacturers

fetch_manufacturers(client: AsyncClient) -> DataFrame
Source code in src/aerocore/data/aircraft_types.py
68
69
70
async def fetch_manufacturers(client: httpx.AsyncClient) -> pl.DataFrame:
    df = await _post_and_parse_json(client, f"{URL_BASE_DOC8643}/Manufacturers")
    return df.cast(SCHEMA_MANUFACTURERS)  # type: ignore