-
Lene Wasskog authoredLene Wasskog authored
ADASMELIAE.py 10.28 KiB
#!/usr/bin/python3
"""
Copyright (C) 2024 NIBIO <https://www.nibio.no/>.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
# Requirements:
# * CDO >= v 2.0
# * GDAL >= v 3.4.3 built with Python support
# * For Python: See requirements.txt
import os
import time
import subprocess
import glob
from dotenv import load_dotenv
from datetime import datetime, timedelta
from jinja2 import Environment, FileSystemLoader
import logging
import re
import configparser
import sys
load_dotenv()
# TODO Should start before the first European crops reach growth stage 51 and end when the northernmost regions have passed stage 59
DEBUG = (
False
if os.getenv("DEBUG") is None or os.getenv("DEBUG").lower() == "false"
else True
)
model_id = os.getenv("MODEL_ID")
home_dir = os.getenv("HOME_DIR")
weather_data_dir = os.getenv("WEATHER_DATA_DIR")
result_tif_dir = os.getenv("RESULT_TIF_DIR")
result_mapfile_dir = os.getenv("RESULT_MAPFILE_DIR")
tmp_dir = os.getenv("TMP_DIR")
template_dir = f"{home_dir}mapfile/"
weather_data_filename_pattern = os.getenv("WEATHER_DATA_FILENAME_DATEFORMAT")
start_MM_DD = os.getenv("START_DATE_MM_DD")
end_MM_DD = os.getenv("END_DATE_MM_DD")
config_file_path = f"{home_dir}ADASMELIAE.cfg"
mask_file_path = (
f"{home_dir}{os.getenv('MASK_FILE')}" if os.getenv("MASK_FILE") else None
)
template_file_name = "template.j2"
# Get language stuff
config = configparser.ConfigParser()
config.read(config_file_path)
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="%(asctime)s - %(levelname).4s - (%(filename)s:%(lineno)d) - %(message)s",
)
tm_max = "air_temperature_2m_max"
THRESHOLD = 15
# Run given command using subprocess.run. Handle logging.
def run_command(command, stdout=subprocess.PIPE):
logging.debug(f"{command}")
try:
result = subprocess.run(
command,
stdout=stdout,
stderr=stdout,
shell=True,
text=True,
check=True,
)
if result.stdout:
result_lines = result.stdout.splitlines()
for line in result_lines:
logging.debug(line.strip())
return result_lines
if result.stderr:
for line in result.stderr.splitlines():
logging.error(line.strip())
except subprocess.CalledProcessError as e:
logging.error(f"{e}")
quit()
# Run given command with capture_output=True, return result
def run_command_get_output(command):
logging.debug(f"{command}")
try:
return subprocess.run(
command,
capture_output=True,
shell=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as e:
logging.error(f"{e}")
quit()
# Remove all temporary/intermediary files
def remove_temporary_files():
logging.info("Remove temporary/intermediary files")
if glob.glob(f"{tmp_dir}*.nc"):
run_command(command=f"rm {tmp_dir}*.nc")
if glob.glob(f"{tmp_dir}*.tif"):
run_command(command=f"rm {tmp_dir}*.tif")
# Remove previously calculated results
def remove_old_results():
logging.info("Remove previously calculated results")
if glob.glob(f"{result_tif_dir}*.tif"):
run_command(command=f"rm {result_tif_dir}*.tif")
if glob.glob(f"rm {result_mapfile_dir}*.map"):
run_command(command=f"rm {result_mapfile_dir}*.map")
# Get unique dates in given file
def get_unique_dates(input_file):
result = run_command_get_output(f"cdo showdate {input_file}")
dates_line = re.findall(r"\d{4}-\d{2}-\d{2}", result.stdout)
dates = " ".join(dates_line).split()
unique_dates = list(set(dates))
unique_dates.sort()
logging.info(f"Found {len(unique_dates)} unique dates in file")
return unique_dates
# Split file into one file for each unique date
def split_by_date(input_file, prefix):
logging.info(f"Split {input_file} into individual files for each date")
unique_dates = get_unique_dates(input_file)
for date in unique_dates:
output_filename = f"{prefix}{date}.nc"
run_command(command=f"cdo seldate,{date} {input_file} {output_filename}")
logging.debug(f"Data for {date} extracted to {output_filename}")
if __name__ == "__main__":
start_time = time.time() # For logging execution time
today = datetime.now().date()
year = today.year
model_start_date = datetime.strptime(f"{year}-{start_MM_DD}", "%Y-%m-%d").date()
model_end_date = datetime.strptime(f"{year}-{end_MM_DD}", "%Y-%m-%d").date()
start_date = model_start_date
end_date = today + timedelta(days=2)
if start_date > end_date:
logging.error("Model period not started. Quit.")
sys.exit()
logging.info(
f"Start running model {model_id} for start date {start_date} and end date {end_date}"
)
weather_data_file = weather_data_dir + today.strftime(weather_data_filename_pattern)
max_temp_in_period_file = f"{tmp_dir}{year}_max_temp_for_period.nc"
unmasked_result_file = f"{tmp_dir}{year}_unmasked_result.nc"
final_result_file = f"{tmp_dir}{year}_result.nc"
logging.info(
f"Extract {tm_max} for the required period into {os.path.basename(max_temp_in_period_file)}"
)
run_command(
command=f"cdo -L seldate,{start_date},{end_date} -selname,{tm_max} {weather_data_file} {max_temp_in_period_file}"
)
# Classifying warning status for the model
# temperature < 15 --> 2 (Green)
# temperature >= 15 --> 4 (Red)
logging.info(
f"Calculate warning status by checking if {tm_max} is below or above {THRESHOLD}"
)
run_command(
command=f'cdo -s -aexpr,"WARNING_STATUS = {tm_max} < {THRESHOLD} ? 2 : 4" {max_temp_in_period_file} {unmasked_result_file}',
)
# Mask results using a CSV file with polygons. Env variable MASK_FILE must be set, or else we use the file as it is.
if mask_file_path is not None:
logging.info(f"Mask file with {os.path.basename(mask_file_path)}")
run_command(
command=f"cdo -P 6 -maskregion,{mask_file_path} {unmasked_result_file} {final_result_file}",
)
else:
os.rename(unmasked_result_file, final_result_file)
prefix = f"{tmp_dir}day_"
split_by_date(final_result_file, prefix)
timestep_files = sorted(glob.glob(f"{prefix}*nc"))
# Split result file into separate files for each timestep
# prefix = f"{tmp_dir}day_"
# run_command(command=f"cdo -s splitday {final_result_file} {prefix}")
# timestep_file_pattern = f"{prefix}*.nc"
# add_dates_to_filenames(timestep_file_pattern)
# timestep_files = sorted(glob.glob(timestep_file_pattern))
timestep_filenames = [os.path.basename(file) for file in timestep_files]
timestep_dates = [
filename.split("_")[1].split(".")[0] for filename in timestep_filenames
]
logging.info(f"Split into {len(timestep_filenames)} timestep files")
remove_old_results()
for file in timestep_files:
filename = os.path.basename(file)
date_str = filename.split("_")[1].split(".")[0]
with open("/dev/null", "w") as devnull:
warning_status_lcc = f"{tmp_dir}result_WARNING_STATUS_{date_str}_lcc.tif"
result_lcc = f"{tmp_dir}result_{date_str}_lcc.tif"
warning_status = f"{result_tif_dir}result_WARNING_STATUS_{date_str}.tif"
result = f"{result_tif_dir}result_{date_str}.tif"
logging.debug(f"Calculate result for {date_str}")
# For warning status:
run_command(
command=f"gdal_translate -ot Int16 -of GTiff NETCDF:'{file}':WARNING_STATUS {warning_status_lcc}",
stdout=devnull,
)
# For max temperature:
run_command(
command=f"gdal_translate -ot Float32 -of GTiff NETCDF:'{file}':{tm_max} {result_lcc}",
stdout=devnull,
)
# Need to reproject the files, to ensure we have the projection given in the generted mapfile. We always use EPSG:4326 for this
run_command(
command=f"gdalwarp -t_srs EPSG:4326 {warning_status_lcc} {warning_status}",
stdout=devnull,
)
run_command(
command=f"gdalwarp -t_srs EPSG:4326 {result_lcc} {result}",
stdout=devnull,
)
# Generate mapfile
# Building data sets for language specific legends
languages = []
language_codes = config["i18n"]["languages"].split(",")
for language_code in language_codes:
language = {"language_code": language_code}
if ("i18n.%s" % language_code) in config:
for keyword in config["i18n.%s" % language_code]:
language[keyword] = config["i18n.%s" % language_code][keyword]
languages.append(language)
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template(template_file_name)
output = template.render(
{
"model_id": model_id,
"timestep_dates": timestep_dates,
"mapserver_data_dir": os.getenv("MAPSERVER_RESULT_TIF_DIR"),
"mapserver_mapfile_dir": os.getenv("MAPSERVER_RESULT_MAPFILE_DIR"),
"mapserver_log_file": os.getenv("MAPSERVER_LOG_FILE"),
"mapserver_image_path": os.getenv("MAPSERVER_IMAGE_PATH"),
"mapserver_extent": os.getenv("MAPSERVER_EXTENT"),
"languages": languages,
"language_codes": language_codes,
}
)
with open(f"{result_mapfile_dir}/{model_id}.map", "w") as f:
f.write(output)
# remove_temporary_files()
end_time = time.time()
logging.info(
f"End model {model_id} - Execution time: {'{:.2f} seconds'.format(end_time - start_time)}"
)