Source code for rafem.riverbmi

#! /usr/bin/env python
"""Basic Model Interface implementation for River Module"""

import numpy as np
import yaml
from bmipy import Bmi

from .rivermodule import RiverModule


[docs]class BmiRiverModule(Bmi): """The BMI for the River Avulsion Floodplain Evolution Model.""" _name = "Rafem" _input_var_names = ( "land_surface__elevation", "channel_exit__x_coordinate", "channel_exit__y_coordinate", ) _output_var_names = ( "channel_centerline__x_coordinate", "channel_centerline__y_coordinate", "channel_centerline__elevation", "channel_exit_water_sediment~bedload__volume_flow_rate", "channel_exit__x_coordinate", "channel_exit__y_coordinate", "land_surface__elevation", "sea_water_surface__elevation", # 'avulsion_record', ) def __init__(self): """Create a BmiRiver module that is ready for initialization.""" self._model = None self._values = {} self._var_units = {}
[docs] def initialize(self, config_file): with open(config_file, "r") as fp: params = yaml.safe_load(fp) params.pop("_version", None) self._n_days = params.pop("days", np.finfo("d").max) self._model = RiverModule(**params) # self._model = RiverModule.from_path(config_file) self._values = { "channel_centerline__x_coordinate": lambda: self._model.river_x_coordinates, "channel_centerline__y_coordinate": lambda: self._model.river_y_coordinates, "channel_exit_water_sediment~bedload__volume_flow_rate": lambda: np.array( self._model.sediment_flux ), "channel_exit__x_coordinate": lambda: np.array( self._model.river_x_coordinates[-1] ), "channel_exit__y_coordinate": lambda: np.array( self._model.river_y_coordinates[-1] ), "land_surface__elevation": lambda: np.array(self._model.elevation), "channel_centerline__elevation": lambda: self._model.profile, "sea_water_surface__elevation": lambda: np.array(self._model.sea_level), "avulsion_record": lambda: self._model.avulsions, } self._var_units = { "channel_centerline__x_coordinate": "m", "channel_centerline__y_coordinate": "m", "channel_exit_water_sediment~bedload__volume_flow_rate": "m^3 s^-1", "channel_exit__x_coordinate": "m", "channel_exit__y_coordinate": "m", "land_surface__elevation": "m", "channel_centerline__elevation": "m", "sea_water_surface__elevation": "m", "avulsion_record": "none", } self._var_type = {} for name in self._input_var_names + self._output_var_names: self._var_type[name] = str(np.dtype(float)) self._var_grid = { "channel_centerline__x_coordinate": 1, "channel_centerline__y_coordinate": 1, "channel_exit_water_sediment~bedload__volume_flow_rate": 2, "channel_exit__x_coordinate": 2, "channel_exit__y_coordinate": 2, "land_surface__elevation": 0, "channel_centerline__elevation": 1, "sea_water_surface__elevation": 2, "avulsion_record": None, } self._grid_rank = {0: 2, 1: 1, 2: 0}
[docs] def update(self): self._model.advance_in_time() self.river_mouth_location = ( self._model.river_x_coordinates[-1], self._model.river_y_coordinates[-1], )
[docs] def update_frac(self, time_frac): """Update model by a fraction of a time step.""" time_step = self.get_time_step() self._model.time_step = time_frac * time_step self.update() self._model.time_step = time_step
[docs] def update_until(self, then): """Update model until a particular time.""" n_steps = (then - self.get_current_time()) / self.get_time_step() for _ in range(int(n_steps)): self.update()
# self.update_frac(n_steps - int(n_steps))
[docs] def finalize(self): pass
[docs] def get_var_type(self, name): return self._var_type[name]
[docs] def get_var_units(self, name): return self._var_units[name]
[docs] def get_var_nbytes(self, name): return self._values[name]().nbytes
[docs] def get_var_itemsize(self, name): return np.dtype(self.get_var_type(name)).itemsize
[docs] def get_var_location(self, name): return "node"
[docs] def get_var_grid(self, name): return self._var_grid[name]
[docs] def get_grid_rank(self, grid): return self._grid_rank[grid]
[docs] def get_grid_size(self, grid): if grid == 0: return int(np.prod(self._model.grid_shape)) elif grid == 1: return len(self._model.river_x_coordinates) elif grid == 2: return 1 else: raise KeyError(grid)
[docs] def get_grid_shape(self, grid, shape): if grid == 0: shape[:] = self._model.grid_shape elif grid == 1: shape[:] = self._model.river_x_coordinates.shape elif grid == 2: shape[0] = 1 # = (1, ) else: raise KeyError(grid) return shape
[docs] def get_grid_spacing(self, grid, spacing): if grid == 0: spacing[:] = self._model.grid_spacing else: raise KeyError(grid) return spacing
[docs] def get_grid_origin(self, grid, origin): if grid == 0: origin[:] = (0.0, 0.0) else: raise KeyError(grid) return origin
[docs] def get_grid_type(self, grid): if grid == 0: return "uniform_rectilinear" elif grid == 1: return "vector" elif grid == 2: return "scalar" else: raise KeyError(grid)
[docs] def get_value(self, name, dest): dest[:] = self._values[name]().flat return dest
[docs] def set_value(self, var_name, new_vals): """Set model values.""" if var_name not in self._input_var_names: raise KeyError(var_name) if var_name == "land_surface__elevation": np.copyto(self._model.elevation.reshape((-1,)), new_vals.reshape((-1,))) elif var_name == "channel_exit__x_coordinate": self._model.river_x_coordinates = np.append( self._model.river_x_coordinates, new_vals ) elif var_name == "channel_exit__y_coordinate": self._model.river_y_coordinates = np.append( self._model.river_y_coordinates, new_vals )
# Remove duplicate river mouth coordinates (if they exist). # This seems clunky... must be better way to get values without # duplicating each time? # if (self._model.river_x_coordinates[-1] == self._model.river_x_coordinates[-2] and # self._model.river_y_coordinates[-1] == self._model.river_y_coordinates[-2]): # self._model.river_x_coordinates.pop() # self._model.river_y_coordinates.pop()
[docs] def get_component_name(self): return self._name
[docs] def get_input_var_names(self): return self._input_var_names
[docs] def get_output_var_names(self): return self._output_var_names
[docs] def get_input_item_count(self): return len(self._input_var_names)
[docs] def get_output_item_count(self): return len(self._output_var_names)
[docs] def get_start_time(self): return 0.0
[docs] def get_end_time(self): return self._n_days
[docs] def get_current_time(self): return self._model.time
[docs] def get_time_step(self): return self._model.time_step
[docs] def get_time_units(self): return "d"
[docs] def get_grid_node_count(self, grid): if grid == 0: return int(np.prod(self._model.grid_shape)) elif grid == 1: return len(self._model.river_x_coordinates) elif grid == 2: return 1 else: raise KeyError(grid)
[docs] def get_grid_edge_count(self, grid): raise NotImplementedError("get_grid_edge_count")
[docs] def get_grid_face_count(self, grid): raise NotImplementedError("get_grid_face_count")
[docs] def get_grid_edge_nodes(self, grid, edge_nodes): raise NotImplementedError("get_grid_edge_nodes")
[docs] def get_grid_face_edges(self, grid, face_edges): raise NotImplementedError("get_grid_face_edges")
[docs] def get_grid_face_nodes(self, grid, face_nodes): raise NotImplementedError("get_grid_edge_nodes")
[docs] def get_grid_nodes_per_face(self, grid, nodes_per_face): raise NotImplementedError("get_grid_nodes_per_face")
[docs] def get_grid_x(self, grid, x): raise NotImplementedError("get_grid_x")
[docs] def get_grid_y(self, grid, y): raise NotImplementedError("get_grid_y")
[docs] def get_grid_z(self, grid, z): raise NotImplementedError("get_grid_z")
[docs] def get_value_at_indices(self, name, dest, inds): raise NotImplementedError("get_value_at_indices")
[docs] def get_value_ptr(self, name): raise NotImplementedError("get_value_ptr")
[docs] def set_value_at_indices(self, name, ids, src): raise NotImplementedError("set_value_at_indices")