#!/usr/bin/python3 """ Copyright (C) 2024 NIBIO <https://www.nibio.no/>. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. """ # Requirements: # * CDO >= v 2.0 # * GDAL >= v 3.4.3 built with Python support # * For Python: See requirements.txt import os import shutil import time import subprocess import glob from dotenv import load_dotenv from datetime import datetime, timedelta from jinja2 import Environment, FileSystemLoader import logging import re import configparser import sys load_dotenv() DEBUG = ( False if os.getenv("DEBUG") is None or os.getenv("DEBUG").lower() == "false" else True ) model_id = os.getenv("MODEL_ID") home_dir = os.getenv("HOME_DIR") weather_data_dir = os.getenv("WEATHER_DATA_DIR") archive_weather_data_dir = os.getenv("ARCHIVE_WEATHER_DATA_DIR") result_tif_base_dir = os.getenv("RESULT_TIF_DIR") result_mapfile_base_dir = os.getenv("RESULT_MAPFILE_DIR") tmp_dir = os.getenv("TMP_DIR") template_dir = f"{home_dir}mapfile/" weather_data_filename_pattern = os.getenv("WEATHER_DATA_FILENAME_DATEFORMAT") archive_weather_data_filename_pattern = os.getenv("ARCHIVE_WEATHER_DATA_FILENAME_DATEFORMAT") start_MM_DD = os.getenv("START_DATE_MM_DD") end_MM_DD = os.getenv("END_DATE_MM_DD") config_file_path = f"{home_dir}ADASMELIAE.cfg" mask_file_path = ( f"{home_dir}{os.getenv('MASK_FILE')}" if os.getenv("MASK_FILE") else None ) template_file_name = "template.j2" # Get language stuff config = configparser.ConfigParser() config.read(config_file_path) logging.basicConfig( level=logging.DEBUG if DEBUG else logging.INFO, format="%(asctime)s - %(levelname).4s - (%(filename)s:%(lineno)d) - %(message)s", ) tm_max = "air_temperature_2m_max" THRESHOLD = 15 # Run given command using subprocess.run. Handle logging. def run_command(command, stdout=subprocess.PIPE): logging.debug(f"{command}") try: result = subprocess.run( command, stdout=stdout, stderr=stdout, shell=True, text=True, check=True, ) if result.stdout: result_lines = result.stdout.splitlines() for line in result_lines: logging.debug(line.strip()) return result_lines if result.stderr: for line in result.stderr.splitlines(): logging.error(line.strip()) except subprocess.CalledProcessError as e: logging.error(f"{e}") quit() # Run given command with capture_output=True, return result def run_command_get_output(command): logging.debug(f"{command}") try: return subprocess.run( command, capture_output=True, shell=True, text=True, check=True, ) except subprocess.CalledProcessError as e: logging.error(f"{e}") quit() # Remove all temporary/intermediary files def remove_temporary_files(): logging.info("Remove temporary/intermediary files") if glob.glob(f"{tmp_dir}*.nc"): run_command(command=f"rm {tmp_dir}*.nc") if glob.glob(f"{tmp_dir}*.tif"): run_command(command=f"rm {tmp_dir}*.tif") # Remove previously calculated results def remove_old_results(tif_dir, mapfile_dir): logging.info(f"Remove previously calculated results from {tif_dir} and {mapfile_dir}") if glob.glob(f"{tif_dir}*.tif"): run_command(command=f"rm {tif_dir}*.tif") if glob.glob(f"rm {mapfile_dir}*.map"): run_command(command=f"rm {mapfile_dir}*.map") # Get unique dates in given file def get_unique_dates(input_file): result = run_command_get_output(f"cdo showdate {input_file}") dates_line = re.findall(r"\d{4}-\d{2}-\d{2}", result.stdout) dates = " ".join(dates_line).split() unique_dates = list(set(dates)) unique_dates.sort() logging.info(f"Found {len(unique_dates)} unique dates in file") return unique_dates # Split file into one file for each unique date def split_by_date(input_file, prefix): logging.info(f"Split {input_file} into individual files for each date") unique_dates = get_unique_dates(input_file) for date in unique_dates: output_filename = f"{prefix}{date}.nc" run_command(command=f"cdo seldate,{date} {input_file} {output_filename}") logging.debug(f"Data for {date} extracted to {output_filename}") if __name__ == "__main__": start_time = time.time() # For logging execution time today = datetime.now().date() # Check if a year argument is provided if len(sys.argv) > 1: year = int(sys.argv[1]) else: today = datetime.now().date() year = today.year # Ensure that model is not run for future year if year > today.year: logging.error(f"Cannot run model for future year ({year}). Quit.") sys.exit() # Ensure that output year dirs exist result_tif_year_dir = f"{result_tif_base_dir}{year}" result_mapfile_year_dir = f"{result_mapfile_base_dir}{year}" os.makedirs(result_tif_year_dir, exist_ok=True) os.makedirs(result_mapfile_year_dir, exist_ok=True) model_start_date = datetime.strptime(f"{year}-{start_MM_DD}", "%Y-%m-%d").date() model_end_date = datetime.strptime(f"{year}-{end_MM_DD}", "%Y-%m-%d").date() start_date = model_start_date if year == today.year: # Run for current year end_date = today + timedelta(days=2) else: # Run for previous year end_date = model_end_date + timedelta(days=2) logging.info( f"Model start date {model_start_date}. Model end date {model_end_date}." ) logging.info( f"Attempt to run model {model_id} for year {year}, start date {start_date} and end date {end_date}" ) # Ensure that model is not run after model run period (include 2 extra days at the end to replace forecast with reanalysed) if model_end_date + timedelta(days=2) < end_date: logging.error(f"{end_date} is after model period end (+ 2 days) {model_end_date + timedelta(days=2)}. Quit.") sys.exit() # Ensure that model is not run before model run period if today < start_date: logging.error("Model period not started. Quit.") sys.exit() if year == today.year: weather_data_file = f"{weather_data_dir}{weather_data_filename_pattern.replace('%Y', str(year))}" else: weather_data_file = f"{archive_weather_data_dir}{year}/{archive_weather_data_filename_pattern.replace('%Y', str(year))}" if not os.path.exists(weather_data_file): logging.error(f"Weather data file {weather_data_file} not found. Quit.") sys.exit() logging.info( f"Use weather data from file {weather_data_file}" ) max_temp_in_period_file = f"{tmp_dir}{year}_max_temp_for_period.nc" unmasked_result_file = f"{tmp_dir}{year}_unmasked_result.nc" final_result_file = f"{tmp_dir}{year}_result.nc" logging.info( f"Extract {tm_max} for the required period into {os.path.basename(max_temp_in_period_file)}" ) run_command( command=f"cdo -L seldate,{start_date},{end_date} -selname,{tm_max} {weather_data_file} {max_temp_in_period_file}" ) # Classifying warning status for the model # temperature < 15 --> 2 (Green) # temperature >= 15 --> 4 (Red) logging.info( f"Calculate warning status by checking if {tm_max} is below or above {THRESHOLD}" ) run_command( command=f'cdo -s -aexpr,"WARNING_STATUS = {tm_max} < {THRESHOLD} ? 2 : 4" {max_temp_in_period_file} {unmasked_result_file}', ) # Mask results using a CSV file with polygons. Env variable MASK_FILE must be set, or else we use the file as it is. if mask_file_path is not None: logging.info(f"Mask file with {os.path.basename(mask_file_path)}") run_command( command=f"cdo -P 6 -maskregion,{mask_file_path} {unmasked_result_file} {final_result_file}", ) else: os.rename(unmasked_result_file, final_result_file) prefix = f"{tmp_dir}day_" split_by_date(final_result_file, prefix) timestep_files = sorted(glob.glob(f"{prefix}*nc")) # Split result file into separate files for each timestep # prefix = f"{tmp_dir}day_" # run_command(command=f"cdo -s splitday {final_result_file} {prefix}") # timestep_file_pattern = f"{prefix}*.nc" # add_dates_to_filenames(timestep_file_pattern) # timestep_files = sorted(glob.glob(timestep_file_pattern)) timestep_filenames = [os.path.basename(file) for file in timestep_files] timestep_dates = [ filename.split("_")[1].split(".")[0] for filename in timestep_filenames ] logging.info(f"Split into {len(timestep_filenames)} timestep files") remove_old_results(result_tif_year_dir, result_mapfile_year_dir) for file in timestep_files: filename = os.path.basename(file) current_file_date_str = filename.split("_")[1].split(".")[0] current_file_date = datetime.strptime(current_file_date_str, "%Y-%m-%d").date() with open("/dev/null", "w") as devnull: warning_status_lcc = f"{tmp_dir}result_WARNING_STATUS_{current_file_date_str}_lcc.tif" result_lcc = f"{tmp_dir}result_{current_file_date_str}_lcc.tif" warning_status = f"{result_tif_year_dir}/result_WARNING_STATUS_{current_file_date_str}.tif" result = f"{result_tif_year_dir}/result_{current_file_date_str}.tif" logging.debug(f"Calculate result for {current_file_date_str}") # Ensure warning status 0 for all points on first day after model end date if current_file_date > model_end_date: tmp_file = file + "tmp" os.rename(file, tmp_file) run_command( command=f'cdo -s -aexpr,"WARNING_STATUS = 0" {tmp_file} {file}', ) # For warning status: run_command( command=f"gdal_translate -ot Int16 -of GTiff NETCDF:'{file}':WARNING_STATUS {warning_status_lcc}", stdout=devnull, ) # For max temperature: run_command( command=f"gdal_translate -ot Float32 -of GTiff NETCDF:'{file}':{tm_max} {result_lcc}", stdout=devnull, ) # Need to reproject the files, to ensure we have the projection given in the generated mapfile. We always use EPSG:4326 for this run_command( command=f"gdalwarp -t_srs EPSG:4326 {warning_status_lcc} {warning_status}", stdout=devnull, ) run_command( command=f"gdalwarp -t_srs EPSG:4326 {result_lcc} {result}", stdout=devnull, ) # Should not generate grey files for more than one day after model end date if current_file_date > model_end_date: break # Generate mapfile # Building data sets for language specific legends languages = [] language_codes = config["i18n"]["languages"].split(",") for language_code in language_codes: language = {"language_code": language_code} if ("i18n.%s" % language_code) in config: for keyword in config["i18n.%s" % language_code]: language[keyword] = config["i18n.%s" % language_code][keyword] languages.append(language) env = Environment(loader=FileSystemLoader(template_dir)) template = env.get_template(template_file_name) output = template.render( { "model_id": model_id, "timestep_dates": timestep_dates, "mapserver_data_dir": f"{os.getenv('MAPSERVER_RESULT_TIF_DIR')}{year}/", "mapserver_mapfile_dir": f"{os.getenv('MAPSERVER_RESULT_MAPFILE_DIR')}{year}/", "mapserver_log_file": os.getenv("MAPSERVER_LOG_FILE"), "mapserver_image_path": os.getenv("MAPSERVER_IMAGE_PATH"), "mapserver_extent": os.getenv("MAPSERVER_EXTENT"), "languages": languages, "language_codes": language_codes, } ) with open(f"{result_mapfile_year_dir}/{model_id}.map", "w") as f: f.write(output) query_template = os.path.join(home_dir, "mapfile/query_template.xml") query_template_temperature = os.path.join(home_dir, "mapfile/query_template_temperature.xml") shutil.copy(query_template, result_mapfile_year_dir) shutil.copy(query_template_temperature, result_mapfile_year_dir) remove_temporary_files() end_time = time.time() logging.info( f"End model {model_id} - Execution time: {'{:.2f} seconds'.format(end_time - start_time)}" )