Skip to content
Snippets Groups Projects
ADASMELIAE.py 10.28 KiB
#!/usr/bin/python3

"""
    Copyright (C) 2024 NIBIO <https://www.nibio.no/>. 

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as
    published by the Free Software Foundation, either version 3 of the
    License, or (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

# Requirements:
# * CDO >= v 2.0
# * GDAL >= v 3.4.3 built with Python support
# * For Python: See requirements.txt

import os
import time
import subprocess
import glob
from dotenv import load_dotenv
from datetime import datetime, timedelta
from jinja2 import Environment, FileSystemLoader
import logging
import re
import configparser
import sys

load_dotenv()

# TODO Should start before the first European crops reach growth stage 51 and end when the northernmost regions have passed stage 59

DEBUG = (
    False
    if os.getenv("DEBUG") is None or os.getenv("DEBUG").lower() == "false"
    else True
)

model_id = os.getenv("MODEL_ID")
home_dir = os.getenv("HOME_DIR")
weather_data_dir = os.getenv("WEATHER_DATA_DIR")
result_tif_dir = os.getenv("RESULT_TIF_DIR")
result_mapfile_dir = os.getenv("RESULT_MAPFILE_DIR")
tmp_dir = os.getenv("TMP_DIR")
template_dir = f"{home_dir}mapfile/"

weather_data_filename_pattern = os.getenv("WEATHER_DATA_FILENAME_DATEFORMAT")
start_MM_DD = os.getenv("START_DATE_MM_DD")
end_MM_DD = os.getenv("END_DATE_MM_DD")


config_file_path = f"{home_dir}ADASMELIAE.cfg"
mask_file_path = (
    f"{home_dir}{os.getenv('MASK_FILE')}" if os.getenv("MASK_FILE") else None
)
template_file_name = "template.j2"

# Get language stuff
config = configparser.ConfigParser()
config.read(config_file_path)

logging.basicConfig(
    level=logging.DEBUG if DEBUG else logging.INFO,
    format="%(asctime)s - %(levelname).4s - (%(filename)s:%(lineno)d) - %(message)s",
)

tm_max = "air_temperature_2m_max"

THRESHOLD = 15


# Run given command using subprocess.run. Handle logging.
def run_command(command, stdout=subprocess.PIPE):
    logging.debug(f"{command}")
    try:
        result = subprocess.run(
            command,
            stdout=stdout,
            stderr=stdout,
            shell=True,
            text=True,
            check=True,
        )
        if result.stdout:
            result_lines = result.stdout.splitlines()
            for line in result_lines:
                logging.debug(line.strip())
            return result_lines
        if result.stderr:
            for line in result.stderr.splitlines():
                logging.error(line.strip())
    except subprocess.CalledProcessError as e:
        logging.error(f"{e}")
        quit()


# Run given command with capture_output=True, return result
def run_command_get_output(command):
    logging.debug(f"{command}")
    try:
        return subprocess.run(
            command,
            capture_output=True,
            shell=True,
            text=True,
            check=True,
        )
    except subprocess.CalledProcessError as e:
        logging.error(f"{e}")
        quit()


# Remove all temporary/intermediary files
def remove_temporary_files():
    logging.info("Remove temporary/intermediary files")
    if glob.glob(f"{tmp_dir}*.nc"):
        run_command(command=f"rm {tmp_dir}*.nc")
    if glob.glob(f"{tmp_dir}*.tif"):
        run_command(command=f"rm {tmp_dir}*.tif")


# Remove previously calculated results
def remove_old_results():
    logging.info("Remove previously calculated results")
    if glob.glob(f"{result_tif_dir}*.tif"):
        run_command(command=f"rm {result_tif_dir}*.tif")
    if glob.glob(f"rm {result_mapfile_dir}*.map"):
        run_command(command=f"rm {result_mapfile_dir}*.map")


# Get unique dates in given file
def get_unique_dates(input_file):
    result = run_command_get_output(f"cdo showdate {input_file}")
    dates_line = re.findall(r"\d{4}-\d{2}-\d{2}", result.stdout)
    dates = " ".join(dates_line).split()
    unique_dates = list(set(dates))
    unique_dates.sort()
    logging.info(f"Found {len(unique_dates)} unique dates in file")
    return unique_dates


# Split file into one file for each unique date
def split_by_date(input_file, prefix):
    logging.info(f"Split {input_file} into individual files for each date")
    unique_dates = get_unique_dates(input_file)
    for date in unique_dates:
        output_filename = f"{prefix}{date}.nc"
        run_command(command=f"cdo seldate,{date} {input_file} {output_filename}")
        logging.debug(f"Data for {date} extracted to {output_filename}")


if __name__ == "__main__":
    start_time = time.time()  # For logging execution time
    today = datetime.now().date()

    year = today.year
    model_start_date = datetime.strptime(f"{year}-{start_MM_DD}", "%Y-%m-%d").date()
    model_end_date = datetime.strptime(f"{year}-{end_MM_DD}", "%Y-%m-%d").date()

    start_date = model_start_date
    end_date = today + timedelta(days=2)

    if start_date > end_date:
        logging.error("Model period not started. Quit.")
        sys.exit()

    logging.info(
        f"Start running model {model_id} for start date {start_date} and end date {end_date}"
    )

    weather_data_file = weather_data_dir + today.strftime(weather_data_filename_pattern)
    max_temp_in_period_file = f"{tmp_dir}{year}_max_temp_for_period.nc"
    unmasked_result_file = f"{tmp_dir}{year}_unmasked_result.nc"
    final_result_file = f"{tmp_dir}{year}_result.nc"

    logging.info(
        f"Extract {tm_max} for the required period into {os.path.basename(max_temp_in_period_file)}"
    )
    run_command(
        command=f"cdo -L seldate,{start_date},{end_date} -selname,{tm_max} {weather_data_file} {max_temp_in_period_file}"
    )

    # Classifying warning status for the model
    # temperature < 15     --> 2 (Green)
    # temperature >= 15    --> 4 (Red)
    logging.info(
        f"Calculate warning status by checking if {tm_max} is below or above {THRESHOLD}"
    )
    run_command(
        command=f'cdo -s -aexpr,"WARNING_STATUS = {tm_max} < {THRESHOLD} ? 2 : 4" {max_temp_in_period_file} {unmasked_result_file}',
    )

    # Mask results using a CSV file with polygons. Env variable MASK_FILE must be set, or else we use the file as it is.
    if mask_file_path is not None:
        logging.info(f"Mask file with {os.path.basename(mask_file_path)}")
        run_command(
            command=f"cdo -P 6 -maskregion,{mask_file_path} {unmasked_result_file} {final_result_file}",
        )
    else:
        os.rename(unmasked_result_file, final_result_file)

    prefix = f"{tmp_dir}day_"
    split_by_date(final_result_file, prefix)
    timestep_files = sorted(glob.glob(f"{prefix}*nc"))

    # Split result file into separate files for each timestep
    # prefix = f"{tmp_dir}day_"
    # run_command(command=f"cdo -s splitday {final_result_file} {prefix}")

    # timestep_file_pattern = f"{prefix}*.nc"
    # add_dates_to_filenames(timestep_file_pattern)

    # timestep_files = sorted(glob.glob(timestep_file_pattern))

    timestep_filenames = [os.path.basename(file) for file in timestep_files]
    timestep_dates = [
        filename.split("_")[1].split(".")[0] for filename in timestep_filenames
    ]
    logging.info(f"Split into {len(timestep_filenames)} timestep files")

    remove_old_results()

    for file in timestep_files:
        filename = os.path.basename(file)
        date_str = filename.split("_")[1].split(".")[0]
        with open("/dev/null", "w") as devnull:
            warning_status_lcc = f"{tmp_dir}result_WARNING_STATUS_{date_str}_lcc.tif"
            result_lcc = f"{tmp_dir}result_{date_str}_lcc.tif"
            warning_status = f"{result_tif_dir}result_WARNING_STATUS_{date_str}.tif"
            result = f"{result_tif_dir}result_{date_str}.tif"

            logging.debug(f"Calculate result for {date_str}")

            # For warning status:
            run_command(
                command=f"gdal_translate -ot Int16 -of GTiff  NETCDF:'{file}':WARNING_STATUS {warning_status_lcc}",
                stdout=devnull,
            )
            # For max temperature:
            run_command(
                command=f"gdal_translate -ot Float32 -of GTiff  NETCDF:'{file}':{tm_max} {result_lcc}",
                stdout=devnull,
            )
            # Need to reproject the files, to ensure we have the projection given in the generted mapfile. We always use EPSG:4326 for this
            run_command(
                command=f"gdalwarp -t_srs EPSG:4326 {warning_status_lcc} {warning_status}",
                stdout=devnull,
            )
            run_command(
                command=f"gdalwarp -t_srs EPSG:4326 {result_lcc} {result}",
                stdout=devnull,
            )

    # Generate mapfile
    # Building data sets for language specific legends
    languages = []
    language_codes = config["i18n"]["languages"].split(",")
    for language_code in language_codes:
        language = {"language_code": language_code}
        if ("i18n.%s" % language_code) in config:
            for keyword in config["i18n.%s" % language_code]:
                language[keyword] = config["i18n.%s" % language_code][keyword]
            languages.append(language)

    env = Environment(loader=FileSystemLoader(template_dir))
    template = env.get_template(template_file_name)
    output = template.render(
        {
            "model_id": model_id,
            "timestep_dates": timestep_dates,
            "mapserver_data_dir": os.getenv("MAPSERVER_RESULT_TIF_DIR"),
            "mapserver_mapfile_dir": os.getenv("MAPSERVER_RESULT_MAPFILE_DIR"),
            "mapserver_log_file": os.getenv("MAPSERVER_LOG_FILE"),
            "mapserver_image_path": os.getenv("MAPSERVER_IMAGE_PATH"),
            "mapserver_extent": os.getenv("MAPSERVER_EXTENT"),
            "languages": languages,
            "language_codes": language_codes,
        }
    )
    with open(f"{result_mapfile_dir}/{model_id}.map", "w") as f:
        f.write(output)
    # remove_temporary_files()

    end_time = time.time()
    logging.info(
        f"End model {model_id} - Execution time: {'{:.2f} seconds'.format(end_time - start_time)}"
    )