-
Lene Wasskog authoredLene Wasskog authored
ADASMELIAE.py 12.66 KiB
#!/usr/bin/python3
"""
Copyright (C) 2024 NIBIO <https://www.nibio.no/>.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
# Requirements:
# * CDO >= v 2.0
# * GDAL >= v 3.4.3 built with Python support
# * For Python: See requirements.txt
import os
import shutil
import time
import subprocess
import glob
from dotenv import load_dotenv
from datetime import datetime, timedelta
from jinja2 import Environment, FileSystemLoader
import logging
import re
import configparser
import sys
load_dotenv()
DEBUG = (
False
if os.getenv("DEBUG") is None or os.getenv("DEBUG").lower() == "false"
else True
)
model_id = os.getenv("MODEL_ID")
home_dir = os.getenv("HOME_DIR")
weather_data_dir = os.getenv("WEATHER_DATA_DIR")
archive_weather_data_dir = os.getenv("ARCHIVE_WEATHER_DATA_DIR")
result_tif_base_dir = os.getenv("RESULT_TIF_DIR")
result_mapfile_base_dir = os.getenv("RESULT_MAPFILE_DIR")
tmp_dir = os.getenv("TMP_DIR")
template_dir = f"{home_dir}mapfile/"
weather_data_filename_pattern = os.getenv("WEATHER_DATA_FILENAME_DATEFORMAT")
archive_weather_data_filename_pattern = os.getenv("ARCHIVE_WEATHER_DATA_FILENAME_DATEFORMAT")
start_MM_DD = os.getenv("START_DATE_MM_DD")
end_MM_DD = os.getenv("END_DATE_MM_DD")
config_file_path = f"{home_dir}ADASMELIAE.cfg"
mask_file_path = (
f"{home_dir}{os.getenv('MASK_FILE')}" if os.getenv("MASK_FILE") else None
)
template_file_name = "template.j2"
# Get language stuff
config = configparser.ConfigParser()
config.read(config_file_path)
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="%(asctime)s - %(levelname).4s - (%(filename)s:%(lineno)d) - %(message)s",
)
tm_max = "air_temperature_2m_max"
THRESHOLD = 15
# Run given command using subprocess.run. Handle logging.
def run_command(command, stdout=subprocess.PIPE):
logging.debug(f"{command}")
try:
result = subprocess.run(
command,
stdout=stdout,
stderr=stdout,
shell=True,
text=True,
check=True,
)
if result.stdout:
result_lines = result.stdout.splitlines()
for line in result_lines:
logging.debug(line.strip())
return result_lines
if result.stderr:
for line in result.stderr.splitlines():
logging.error(line.strip())
except subprocess.CalledProcessError as e:
logging.error(f"{e}")
quit()
# Run given command with capture_output=True, return result
def run_command_get_output(command):
logging.debug(f"{command}")
try:
return subprocess.run(
command,
capture_output=True,
shell=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as e:
logging.error(f"{e}")
quit()
# Remove all temporary/intermediary files
def remove_temporary_files():
logging.info("Remove temporary/intermediary files")
if glob.glob(f"{tmp_dir}*.nc"):
run_command(command=f"rm {tmp_dir}*.nc")
if glob.glob(f"{tmp_dir}*.tif"):
run_command(command=f"rm {tmp_dir}*.tif")
# Remove previously calculated results
def remove_old_results(tif_dir, mapfile_dir):
logging.info(f"Remove previously calculated results from {tif_dir}")
logging.info(f"Remove previously calculated results from {mapfile_dir}")
if glob.glob(f"{tif_dir}*.tif"):
run_command(command=f"rm {tif_dir}*.tif")
if glob.glob(f"rm {mapfile_dir}*.map"):
run_command(command=f"rm {mapfile_dir}*.map")
# Get unique dates in given file
def get_unique_dates(input_file):
result = run_command_get_output(f"cdo showdate {input_file}")
dates_line = re.findall(r"\d{4}-\d{2}-\d{2}", result.stdout)
dates = " ".join(dates_line).split()
unique_dates = list(set(dates))
unique_dates.sort()
logging.info(f"Found {len(unique_dates)} unique dates in file")
return unique_dates
# Split file into one file for each unique date
def split_by_date(input_file, prefix):
logging.info(f"Split {input_file} into individual files for each date")
unique_dates = get_unique_dates(input_file)
for date in unique_dates:
output_filename = f"{prefix}{date}.nc"
run_command(command=f"cdo seldate,{date} {input_file} {output_filename}")
logging.debug(f"Data for {date} extracted to {output_filename}")
if __name__ == "__main__":
start_time = time.time() # For logging execution time
today = datetime.now().date()
cron_run = True
# Check if a year argument is provided
if len(sys.argv) > 1:
year = int(sys.argv[1])
cron_run = False
else:
today = datetime.now().date()
year = today.year
# Ensure that model is not run for future year
if year > today.year:
logging.error(f"Cannot run model for future year ({year}). Quit.")
sys.exit()
# Ensure that output year dirs exist
result_tif_year_dir = f"{result_tif_base_dir}{year}/"
result_mapfile_year_dir = f"{result_mapfile_base_dir}{year}/"
os.makedirs(result_tif_year_dir, exist_ok=True)
os.makedirs(result_mapfile_year_dir, exist_ok=True)
model_start_date = datetime.strptime(f"{year}-{start_MM_DD}", "%Y-%m-%d").date()
model_end_date = datetime.strptime(f"{year}-{end_MM_DD}", "%Y-%m-%d").date()
# Ensure that model is not run after model period (for cron runs)
if cron_run and today > model_end_date:
logging.error(f"Cron run and today is after model period end {model_end_date}. Quit.")
sys.exit()
start_date = model_start_date
if year == today.year and today + timedelta(days=2) <= model_end_date:
end_date = today + timedelta(days=2)
else:
end_date = model_end_date
logging.info(
f"Model start date {model_start_date}. Model end date {model_end_date}."
)
logging.info(
f"Attempt to run model {model_id} for year {year}, start date {start_date} and end date {end_date}"
)
# Ensure that model is not run before model run period
if today < start_date:
logging.error("Model period not started. Quit.")
sys.exit()
if year == today.year:
weather_data_file = f"{weather_data_dir}{weather_data_filename_pattern.replace('%Y', str(year))}"
else:
weather_data_file = f"{archive_weather_data_dir}{year}/{archive_weather_data_filename_pattern.replace('%Y', str(year))}"
if not os.path.exists(weather_data_file):
logging.error(f"Weather data file {weather_data_file} not found. Quit.")
sys.exit()
logging.info(
f"Use weather data from file {weather_data_file}"
)
max_temp_in_period_file = f"{tmp_dir}{year}_max_temp_for_period.nc"
unmasked_result_file = f"{tmp_dir}{year}_unmasked_result.nc"
final_result_file = f"{tmp_dir}{year}_result.nc"
logging.info(
f"Extract {tm_max} for the required period into {os.path.basename(max_temp_in_period_file)}"
)
run_command(
command=f"cdo -L seldate,{start_date},{end_date} -selname,{tm_max} {weather_data_file} {max_temp_in_period_file}"
)
# Classifying warning status for the model
# temperature < 15 --> 2 (Green)
# temperature >= 15 --> 4 (Red)
logging.info(
f"Calculate warning status by checking if {tm_max} is below or above {THRESHOLD}"
)
run_command(
command=f'cdo -s -aexpr,"WARNING_STATUS = {tm_max} < {THRESHOLD} ? 2 : 4" {max_temp_in_period_file} {unmasked_result_file}',
)
# Mask results using a CSV file with polygons. Env variable MASK_FILE must be set, or else we use the file as it is.
if mask_file_path is not None:
logging.info(f"Mask file with {os.path.basename(mask_file_path)}")
run_command(
command=f"cdo -P 6 -maskregion,{mask_file_path} {unmasked_result_file} {final_result_file}",
)
else:
os.rename(unmasked_result_file, final_result_file)
prefix = f"{tmp_dir}day_"
split_by_date(final_result_file, prefix)
timestep_files = sorted(glob.glob(f"{prefix}*nc"))
# Split result file into separate files for each timestep
# prefix = f"{tmp_dir}day_"
# run_command(command=f"cdo -s splitday {final_result_file} {prefix}")
# timestep_file_pattern = f"{prefix}*.nc"
# add_dates_to_filenames(timestep_file_pattern)
# timestep_files = sorted(glob.glob(timestep_file_pattern))
timestep_filenames = [os.path.basename(file) for file in timestep_files]
timestep_dates = [
filename.split("_")[1].split(".")[0] for filename in timestep_filenames
]
logging.info(f"Split into {len(timestep_filenames)} timestep files")
remove_old_results(result_tif_year_dir, result_mapfile_year_dir)
for file in timestep_files:
filename = os.path.basename(file)
current_file_date_str = filename.split("_")[1].split(".")[0]
current_file_date = datetime.strptime(current_file_date_str, "%Y-%m-%d").date()
if current_file_date > model_end_date:
logging.debug(f"Should not calculate for {current_file_date} (after model period).")
break
with open("/dev/null", "w") as devnull:
warning_status_lcc = f"{tmp_dir}result_WARNING_STATUS_{current_file_date_str}_lcc.tif"
result_lcc = f"{tmp_dir}result_{current_file_date_str}_lcc.tif"
warning_status = f"{result_tif_year_dir}/result_WARNING_STATUS_{current_file_date_str}.tif"
result = f"{result_tif_year_dir}/result_{current_file_date_str}.tif"
logging.debug(f"Calculate result for {current_file_date_str}")
# For warning status:
run_command(
command=f"gdal_translate -ot Int16 -of GTiff NETCDF:'{file}':WARNING_STATUS {warning_status_lcc}",
stdout=devnull,
)
# For max temperature:
run_command(
command=f"gdal_translate -ot Float32 -of GTiff NETCDF:'{file}':{tm_max} {result_lcc}",
stdout=devnull,
)
# Need to reproject the files, to ensure we have the projection given in the generated mapfile. We always use EPSG:4326 for this
run_command(
command=f"gdalwarp -t_srs EPSG:4326 {warning_status_lcc} {warning_status}",
stdout=devnull,
)
run_command(
command=f"gdalwarp -t_srs EPSG:4326 {result_lcc} {result}",
stdout=devnull,
)
# Generate mapfile
# Building data sets for language specific legends
languages = []
language_codes = config["i18n"]["languages"].split(",")
for language_code in language_codes:
language = {"language_code": language_code}
if ("i18n.%s" % language_code) in config:
for keyword in config["i18n.%s" % language_code]:
language[keyword] = config["i18n.%s" % language_code][keyword]
languages.append(language)
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template(template_file_name)
output = template.render(
{
"model_id": model_id,
"timestep_dates": timestep_dates,
"mapserver_data_dir": f"{os.getenv('MAPSERVER_RESULT_TIF_DIR')}{year}/",
"mapserver_mapfile_dir": f"{os.getenv('MAPSERVER_RESULT_MAPFILE_DIR')}{year}/",
"mapserver_log_file": os.getenv("MAPSERVER_LOG_FILE"),
"mapserver_image_path": os.getenv("MAPSERVER_IMAGE_PATH"),
"mapserver_extent": os.getenv("MAPSERVER_EXTENT"),
"languages": languages,
"language_codes": language_codes,
}
)
with open(f"{result_mapfile_year_dir}/{model_id}.map", "w") as f:
f.write(output)
query_template = os.path.join(home_dir, "mapfile/query_template.xml")
query_template_temperature = os.path.join(home_dir, "mapfile/query_template_temperature.xml")
shutil.copy(query_template, result_mapfile_year_dir)
shutil.copy(query_template_temperature, result_mapfile_year_dir)
remove_temporary_files()
end_time = time.time()
logging.info(
f"End model {model_id} - Execution time: {'{:.2f} seconds'.format(end_time - start_time)}"
)