#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import pathlib
import re
from collections import OrderedDict
from functools import partial
import click
import matplotlib.pyplot as plt
import numpy as np
import yaml
from ._version import get_versions
from .riverbmi import BmiRiverModule
__version__ = get_versions()["version"]
out = partial(click.secho, bold=True, err=True)
err = partial(click.secho, fg="red", err=True)
[docs]def setup_yaml_with_canonical_dict():
""" https://stackoverflow.com/a/8661021 """
yaml.add_representer(
OrderedDict,
lambda self, data: self.represent_mapping(
"tag:yaml.org,2002:map", data.items()
),
Dumper=yaml.SafeDumper,
)
def repr_ordered_dict(self, data):
return self.represent_mapping("tag:yaml.org,2002:map", data.items())
yaml.add_representer(dict, repr_ordered_dict, Dumper=yaml.SafeDumper)
def repr_dict(self, data):
return self.represent_mapping(
"tag:yaml.org,2002:map", sorted(data.items(), key=lambda t: t[0])
)
yaml.add_representer(dict, repr_dict, Dumper=yaml.SafeDumper)
# https://stackoverflow.com/a/45004464
def repr_str(dumper, data):
if "\n" in data:
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
return dumper.represent_str(data)
yaml.add_representer(str, repr_str, Dumper=yaml.SafeDumper)
def repr_tuple(dumper, data):
return dumper.represent_sequence("tag:yaml.org,2002:seq", list(data))
yaml.add_representer(tuple, repr_tuple, Dumper=yaml.SafeDumper)
yaml.add_implicit_resolver(
"tag:yaml.org,2002:float",
re.compile(
r"""^(?:
[-+]?(?:[0-9][0-9_]*)\.[0-9_]*(?:[eE][-+]?[0-9]+)?
|[-+]?(?:[0-9][0-9_]*)(?:[eE][-+]?[0-9]+)
|[-+]?\.[0-9_]+(?:[eE][-+]?[0-9]+)
|[-+]?[0-9][0-9_]*(?::[0-5]?[0-9])+\.[0-9_]*
|[-+]?\.(?:inf|Inf|INF)
|\.(?:nan|NaN|NAN))$""",
re.X,
),
list("-+0123456789."),
Loader=yaml.SafeLoader,
)
setup_yaml_with_canonical_dict()
[docs]def empty_bmi_var_array(bmi, name):
return np.empty(bmi.get_var_nbytes(name), dtype=np.uint8).view(
dtype=bmi.get_var_type(name)
)
[docs]class RafemOutputWriter:
def __init__(self, bmi, output_interval=1):
self._bmi = bmi
# self._run_id = run_id
self._output_interval = output_interval
self._steps = 0
self._prefix = pathlib.Path("output")
self._outputs = {
"elevation": self._prefix / "elevation",
"river": self._prefix / "river",
"profile": self._prefix / "profile",
}
self._make_dirs()
self._make_buffers()
def _make_dirs(self):
os.mkdir(self._prefix)
for dir_ in self._outputs.values():
os.mkdir(dir_)
def _make_buffers(self):
self._z = empty_bmi_var_array(self._bmi, "land_surface__elevation")
self._x = empty_bmi_var_array(self._bmi, "channel_centerline__x_coordinate")
self._y = empty_bmi_var_array(self._bmi, "channel_centerline__y_coordinate")
self._prof = empty_bmi_var_array(self._bmi, "channel_centerline__elevation")
def _update_buffers(self):
self._bmi.get_value("land_surface__elevation", self._z)
self._bmi.get_value("channel_centerline__x_coordinate", self._x)
self._bmi.get_value("channel_centerline__y_coordinate", self._y)
self._bmi.get_value("channel_centerline__elevation", self._prof)
[docs] def time_stamp(self):
return str(self._steps)
def _save_output(self):
self._update_buffers()
np.savetxt(
self._outputs["elevation"] / "elev_{0}.out".format(self.time_stamp()),
self._z,
fmt="%.5f",
header=os.linesep.join(
["version: {0}".format(__version__), "Elevation [m]"]
),
)
np.savetxt(
self._outputs["river"] / "riv_{0}.out".format(self.time_stamp()),
list(zip(self._x, self._y)),
fmt="%.5f",
header=os.linesep.join(
["version: {0}".format(__version__), "X [m], Y [m]"]
),
)
np.savetxt(
self._outputs["profile"] / "prof_{0}.out".format(self.time_stamp()),
self._prof,
fmt="%.5f",
header=os.linesep.join(
["version: {0}".format(__version__), "Elevation [m]"]
),
)
@property
def prefix(self):
return str(self._prefix)
[docs] def update(self, n_steps):
self._steps += n_steps
if self._steps % self._output_interval == 0:
self._save_output()
@click.group(chain=True)
@click.version_option()
@click.option(
"--cd",
default=".",
type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True),
help="chage to directory, then execute",
)
def rafem(cd) -> None:
"""The River Avulsion and Floodplain Evolution Model (RAFEM)
\b
Examples:
Create a folder with example input files,
$ mkdir example && rafem setup
Run a simulation using the examples input files,
$ cd example && rafem run
Commands can also be chained together. The folling will setup
a simulation, run it, and then plot elevations.
$ mkdir example && rafem setup run plot elevation
"""
os.chdir(cd)
@rafem.command()
@click.option("-v", "--verbose", is_flag=True, help="Emit status messages to stderr.")
@click.option("--dry-run", is_flag=True, help="Do not actually run the model")
def run(dry_run: bool, verbose: bool) -> None:
"""Run a simulation."""
run_dir = pathlib.Path.cwd()
config_file = run_dir / "rafem.yaml"
message = []
if not config_file.is_file():
message.append("missing RAFEM configuration file: {0}".format(config_file))
if (run_dir / "output").exists():
message.append(
"RAFEM output directory already exists: {0}".format(run_dir / "output")
)
if message:
err(os.linesep.join(message))
raise click.Abort(os.linesep.join(message))
if dry_run:
out("Nothing to do. 😴")
else:
avulsion = BmiRiverModule()
avulsion.initialize("rafem.yaml")
with open("rafem.yaml", "r") as fp:
params = yaml.safe_load(fp)
spacing = 10
n_days = params["days"] # + years * 365
n_steps = int(n_days / avulsion.get_time_step())
if dry_run:
out(avulsion._model.to_yaml())
elif n_steps == 0:
out("Nothing to do (years == 0). 😴")
else:
output = RafemOutputWriter(avulsion, output_interval=spacing)
with click.progressbar(
range(n_steps),
label=" ".join(["🚀", str(run_dir)]),
item_show_func=lambda step: "day {0} of {1}".format(
int(0 if step is None else step * avulsion.get_time_step()), n_days
),
) as bar:
for step in bar:
avulsion.update()
output.update(1)
out("💥 Finished! 💥")
out("Output written to {0}".format(output.prefix))
avulsion.finalize()
@rafem.command()
@click.argument("infile", type=click.Choice(["rafem"]))
def show(infile: str) -> None:
"""Show example input files."""
print(_contents_of_input_file(infile))
@rafem.command()
def setup() -> None:
"""Setup a folder of input files for a simulation."""
files = [pathlib.Path("rafem.yaml")]
existing_files = [name for name in files if name.exists()]
if existing_files:
for name in existing_files:
err(
f"{name}: File exists. Either remove and then rerun or setup in a different folder",
)
else:
for fname in files:
with open(fname, "w") as fp:
print(_contents_of_input_file(fname.stem), file=fp)
if existing_files:
raise click.Abort()
def _contents_of_input_file(infile: str) -> str:
# Avulsion module parameters
conf = OrderedDict(
(
("_version", __version__),
# Space
("shape", [120, 200]), # Length x Width (km)
("spacing", [0.1, 0.1]), # dy, dx (km)
("n0", 5.0), # upstream elevation
("nslope", 0.001), # initial landscape slope
(
"max_rand",
0.1,
), # multiply by slope for max height of a random perturbation
# Time
("days", 7), # number of days to run for
("dt_day", 0.01), # timestep (days)
# Random seed
("rand_seed", 623), # seed for random number generator
# Sea level and subsidence parameters
("Initial_SL", 0.0), # initial sea level
("SLRR_m", 0.0), # sea level rise rate (m/yr)
("SubRate_m", 0.0), # subsidence rate (m/yr)
("Sub_Start", 0), # row where subsidence starts
# River characteristics
("ch_width", 10.0), # characteristic channel width (m)
("ch_depth", 1.0), # characteristic channel depth (m)
("ch_discharge", 10.0), # long-term averaged discharge (m^3/s)
("A", 1.0), # river-dependent const. (1 for meandering; 1.4 for braided)
("c_f", 0.01), # drag coefficient
("C_0", 1.0), # sediment concentration on bed
("sed_sg", 2.65), # sediment specific gravity
("init_cut_frac", 1), # initial cut of the channel into land surface
# Avulsion parameters
("super_ratio", 1.0), # normalized SE ratio to trigger avulsion
("short_path", 1), # flag for using shortest path to complete avulsion
# Floodplain and Wetland parameters
("WL_Z", 0.0), # elevation that wetlands maintain above SL
("WL_dist", 0), # cell distance beyond channel that wetlands exist
("blanket_rate_m", 0.0), # "blanket" deposition rate (m)
(
"fine_dep_frac",
0.0,
), # fraction of channel deposit for adjacent fine deposition
("splay_type", 2), # size/type of splay
# Splay types:
# splay_type = 0: no splay deposition
# splay_type = 1: just the first failed avulsion river cell
# splay_type = 2: first failed cell and adjacent cells
# Saving information
("saveavulsions", False), # flag for saving avulsion info
("savecourseupdates", False), # flag for saving course updates
)
)
contents = {
"rafem": yaml.safe_dump(conf, default_flow_style=False),
}
return contents[infile]
@rafem.command()
@click.option("--time", default=-1, type=int, help="time step to plot")
@click.argument("value", type=click.Choice(["elevation", "profile"]))
def plot(value: str, time: int) -> None:
"""Plot output from a simulation."""
basepath = pathlib.Path("output") / value
if time == -1:
def _alphanum_key(key):
def _convert(text):
return int(text) if text.isdigit() else text
return [_convert(c) for c in re.split("([0-9]+)", key)]
files = sorted(
[name.name for name in basepath.glob("*.out")], key=_alphanum_key
)
filepath = basepath / files[-1]
else:
filepath = list(basepath.glob("*_{0}.out".format(time)))[0]
out(str(filepath))
with open("rafem.yaml", "r") as fp:
params = yaml.safe_load(fp)
shape = params["shape"]
z = np.loadtxt(filepath, delimiter=",", comments="#")
if value == "elevation":
plt.imshow(z.reshape(shape), origin="lower", cmap="terrain")
plt.colorbar().ax.set_label("Elevation (m)")
elif value == "profile":
plt.plot(z)
plt.show()