Source code for rafem.avulsion_utils

#! /usr/local/bin/python
import numpy as np
import yaml
from scipy.ndimage import measurements


[docs]def read_params_from_file(fname): """Read model parameters from a file. Parameters ---------- fname : str Name of YAML-formatted parameters file. Returns ------- dict A dict of parameters for the heat model. """ with open(fname, "r") as fp: params = yaml.safe_load(fp) return params
[docs]def is_diagonal_neighbor(sub0, sub1): return (sub0[0] != sub1[0]) and (sub0[1] != sub1[1])
[docs]def is_same_row(sub0, sub1): return sub0[0] == sub1[0]
[docs]def channel_is_superelevated(z, riv, behind, channel_depth, super_ratio, sea_level): """Check if a channel location is super-elevated. Parameters ---------- z : ndarray 2D-array of elevations. sub : tuple of int Row-column subscripts into *z*. channel_depth : float The depth of the channel. super_ratio : float Ratio of bankfull elevation to out-of-channel elevation for channel to be super-elevated. Returns ------- boolean `True` if channel is super-elevated. Otherwise, `False`. """ superelev = 0 z_bankfull = z[riv] + channel_depth # cross-shore river orientation if behind[0] + 1 == riv[0]: if riv[1] == 0: adj1 = z[riv[0], riv[1] + 1] adj2 = z[riv[0], riv[1] + 1] # not sure if this needs to be included elif riv[1] == z.shape[1] - 1: adj1 = z[riv[0], riv[1] - 1] adj2 = z[riv[0], riv[1] - 1] else: adj1, adj2 = z[riv[0], riv[1] - 1], z[riv[0], riv[1] + 1] # alongshore river orientation if behind[0] == riv[0]: if riv[0] == z.shape[0] - 1: adj1 = z[riv[0] + 1, riv[1]] adj2 = z[riv[0] + 1, riv[1]] else: adj1, adj2 = z[riv[0] + 1, riv[1]], z[riv[0] - 1, riv[1]] threshold = super_ratio * channel_depth if (adj1 < sea_level) and (adj2 < sea_level): superelev = 0 elif adj1 < sea_level: if z_bankfull - adj2 >= threshold: superelev = 1 elif adj2 < sea_level: if z_bankfull - adj1 >= threshold: superelev = 1 else: if ((z_bankfull - adj2) or (z_bankfull - adj1)) >= threshold: superelev = 1 return superelev
[docs]def get_channel_distance(path, dx=1.0, dy=1.0): total_distance = get_link_lengths(path, dx=dx, dy=dy).cumsum() return np.append(0, total_distance)
[docs]def find_path_length(n, path, sea_level, ch_depth, slope, dx=1.0, dy=1.0): beach_len = find_new_beach_length( n, (path[0][-2], path[1][-2]), (path[0][-1], path[1][-1]), sea_level, dx=dx, dy=dy, ) lengths = get_link_lengths(path, dx=dx, dy=dy) lengths[-1] = np.divide(lengths[-1], 2.0) + beach_len riv_length = lengths.sum() return riv_length
[docs]def find_riv_path_length(n, path, sea_level, ch_depth, slope, dx=1.0, dy=1.0): beach_len = find_beach_length_riv_cell( n, (path[0][-2], path[1][-2]), (path[0][-1], path[1][-1]), sea_level, ch_depth, slope, dx=dx, dy=dy, ) lengths = get_link_lengths(path, dx=dx, dy=dy) lengths[-1] = np.divide(lengths[-1], 2.0) + beach_len riv_length = lengths.sum() return riv_length
[docs]def find_point_in_path(path, sub): """Find a point in a path. Parameters ---------- path : tuple of array_like Tuple of arrays of int that represent indices into a matrix. sub : tuple in int Indices to search *path* for. Returns ------- int or None Index into *path* if the indices are found. Otherwise, ``None``. Examples -------- >>> path = ((0, 1, 4, 6, 5), (3, 1, 7, 8, 10)) >>> find_point_in_path(path, (4, 7)) 2 >>> find_point_in_path(path, (99, 2)) is None True """ try: return list(zip(*path)).index(sub) except ValueError: return None
[docs]def set_linear_profile(z, riv_ij, dx=1.0, dy=1.0): """Set elevations along a path to be linear. Examples -------- >>> import numpy as np >>> z = np.array([[0, 0, 0, 0, 0], ... [1, 2, 1, 1, 1], ... [2, 2, 2, 2, 2], ... [3, 3, 3, 3, 3]], dtype=float) >>> ij = [(0, 2), (1, 2), (2, 2), (3, 2)] >>> set_linear_profile(z, ij) array([[ 0., 0., 0., 0., 0.], [ 1., 2., 1., 1., 1.], [ 2., 2., 2., 2., 2.], [ 3., 3., 3., 3., 3.]]) >>> ij = [(0, 0), (1, 1), (2, 2), (3, 1)] >>> set_linear_profile(z, ij, dx=3., dy=4.) array([[ 0., 0., 0., 0., 0.], [ 1., 1., 1., 1., 1.], [ 2., 2., 2., 2., 2.], [ 3., 3., 3., 3., 3.]]) """ z[zip(*riv_ij[:-1])] = z[riv_ij[-1]] + np.arange(len(riv_ij) - 1, 0, -1) * 1e-6 return z
[docs]def set_linear_slope(z, riv_ij, dx=1.0, dy=1.0): """Set slope along a path to be linear.""" if len(riv_ij) > 1: z0 = z[riv_ij[0]] dz = z[riv_ij[-1]] - z[riv_ij[0]] lengths = get_link_lengths(zip(*riv_ij), dx=dx, dy=dy) ds = lengths.sum() z[zip(*riv_ij[1:])] = z0 + dz / ds * lengths.cumsum() return z
[docs]def fill_upstream(z, riv_ij, dx=1.0, dy=1.0): """Fill depressions upstream of a pit. Examples -------- >>> import numpy as np >>> z = np.array([[3, 3, 3, 3, 3], ... [2, 2, 2, 2, 2], ... [1, 1, 0, 1, 1], ... [0, 0, 1, 0, 0]], dtype=float) >>> ij = [(0, 2), (1, 2), (2, 2), (3, 2)] >>> fill_upstream(z, ij) array([[ 3. , 3. , 3. , 3. , 3. ], [ 2. , 2. , 2. , 2. , 2. ], [ 1. , 1. , 1.5, 1. , 1. ], [ 0. , 0. , 1. , 0. , 0. ]]) >>> z = np.array([[3, 3, 1, 3, 3], ... [2, 2, 0, 2, 2], ... [1, 1, 1, 1, 1], ... [0, 0, 2, 0, 0]], dtype=float) >>> ij = [(0, 2), (1, 2), (2, 2), (3, 2)] >>> fill_upstream(z, ij) """ fill_to = z[riv_ij[-1]] for n in range(len(riv_ij) - 2, -1, -1): if z[riv_ij[n]] > fill_to: break # z[riv_ij[n]] = z[riv_ij[n + 1]] + 1e-6 if z[riv_ij[n]] <= fill_to: z[riv_ij[n]] = fill_to + 1e-6 set_linear_profile(z, riv_ij[n:], dx=dx, dy=dy) return z
[docs]def sort_lowest_neighbors(n, sub): """Sort neighbor cells by elevation.""" i, j = sub if j == n.shape[1] - 1: di, dj = np.array([0, 1, 1]), np.array([-1, -1, 0]) elif j == 0: di, dj = np.array([1, 1, 0]), np.array([0, 1, 1]) else: di, dj = np.array([0, 1, 1, 1, 0]), np.array([-1, -1, 0, 1, 1]) sorted = np.argsort(n[i + di, j + dj]) return i + di[sorted], j + dj[sorted]
[docs]def find_new_beach_length(n, sub0, sub1, sea_level, dx=1.0, dy=1.0): cell_elev = n[sub1] - sea_level DIAGONAL_LENGTH = np.sqrt(dx ** 2.0 + dy ** 2.0) if is_diagonal_neighbor(sub0, sub1): d_dist = DIAGONAL_LENGTH elif is_same_row(sub0, sub1): d_dist = dx else: d_dist = dy return cell_elev * d_dist
[docs]def find_beach_length_riv_cell( n, sub0, sub1, sea_level, channel_depth, slope, dx=1.0, dy=1.0 ): """Find length of beach in shoreline cell with river. Parameters ---------- z : ndarray 2D-array of elevations. sub0 : tuple of int Row-column subscripts into *z*. (cell prior to beach) sub1 : tuple of int Row-column subscripts into *z*. (beach cell) sea_level : float Elevation of current sea level. channel_depth : float The depth of the channel. Returns ------- Length of beach in a cell. """ cell_elev = n[sub1] + channel_depth - sea_level max_cell_h = slope * dx DIAGONAL_LENGTH = np.sqrt(dx ** 2.0 + dy ** 2.0) if is_diagonal_neighbor(sub0, sub1): d_dist = DIAGONAL_LENGTH elif is_same_row(sub0, sub1): d_dist = dx else: d_dist = dy return (cell_elev / max_cell_h) * d_dist
[docs]def lowest_cell_elev(n, sub): i, j = sub if j == 0 and i == 0: di, dj = np.array([1, 1, 0]), np.array([0, 1, 1]) elif j == 0 and i == n.shape[0] - 1: di, dj = np.array([-1, -1, 0]), np.array([0, 1, 1]) elif j == n.shape[1] - 1 and i == 0: di, dj = np.array([0, 1, 1]), np.array([-1, -1, 0]) elif j == n.shape[1] - 1 and i == n.shape[0] - 1: di, dj = np.array([0, -1, -1]), np.array([-1, -1, 0]) elif j == n.shape[1] - 1: di, dj = np.array([-1, -1, 0, 1, 1]), np.array([0, -1, -1, -1, 0]) elif j == 0: di, dj = np.array([-1, -1, 0, 1, 1]), np.array([0, 1, 1, 1, 0]) elif i == n.shape[0] - 1: di, dj = np.array([0, -1, -1, -1, 0]), np.array([-1, -1, 0, 1, 1]) elif i == 0: di, dj = np.array([0, 1, 1, 1, 0]), np.array([-1, -1, 0, 1, 1]) else: di, dj = ( np.array([0, -1, -1, -1, 0, 1, 1, 1]), np.array([-1, -1, 0, 1, 1, 1, 0, -1]), ) lowest = np.amin(n[i + di, j + dj]) return lowest
[docs]def lowest_face(n, sub): i, j = sub if j == 0 and i == 0: di, dj = np.array([1, 0]), np.array([0, 1]) elif j == 0 and i == n.shape[0] - 1: di, dj = np.array([-1, 0]), np.array([0, 1]) elif j == n.shape[1] - 1 and i == 0: di, dj = np.array([0, 1]), np.array([-1, 0]) elif j == n.shape[1] - 1 and i == n.shape[0] - 1: di, dj = np.array([0, -1]), np.array([-1, 0]) elif j == n.shape[1] - 1: di, dj = np.array([-1, 0, 1]), np.array([0, -1, 0]) elif j == 0: di, dj = np.array([-1, 0, 1]), np.array([0, 1, 0]) elif i == n.shape[0] - 1: di, dj = np.array([0, -1, 0]), np.array([-1, 0, 1]) elif i == 0: di, dj = np.array([0, 1, 0]), np.array([-1, 0, 1]) else: di, dj = np.array([0, -1, 0, 1]), np.array([-1, 0, 1, 0]) lowest_face = np.amin(n[i + di, j + dj]) return lowest_face
[docs]def fix_elevations(z, riv_i, riv_j, ch_depth, sea_level, slope, dx, max_rand, SLRR): test_elev = z - sea_level max_cell_h = slope * dx riv_prof = test_elev[riv_i, riv_j] test_elev[riv_i, riv_j] += 2 * ch_depth # set new subaerial cells to marsh elevation test_elev[test_elev == 0] = max_cell_h # make mask for depressions ocean_mask = test_elev < max_cell_h labeled_ponds, ocean = measurements.label(ocean_mask) # # fill in underwater spots that are below SL (?) # below_SL = [z <= sea_level] # underwater_cells, big_ocean = measurements.label(below_SL) # underwater_cells[underwater_cells == big_ocean] = 0 # test_elev[underwater_cells > 0] = max_cell_h + SLRR + (np.random.rand() * max_rand) # create an ocean and shoreline mask ocean_and_shore = np.copy(labeled_ponds) # create an ocean mask # ocean_cells = np.copy(ocean_and_shore) # ocean_and_shore[test_elev > 0] = 0 # create mask for pond cells and fix them area = measurements.sum( ocean_mask, labeled_ponds, index=np.arange(labeled_ponds.max() + 1) ) areaPonds = area[labeled_ponds] labeled_ponds[areaPonds == areaPonds.max()] = 0 # finish creating ocean and shoreline mask ocean_and_shore[areaPonds != areaPonds.max()] = 0 # something here to get rid of ocean cells test_elev[labeled_ponds > 0] = max_cell_h + SLRR + (np.random.rand() * max_rand) # raise cells close to sea level above it test_elev[(test_elev >= max_cell_h) & (test_elev <= (max_cell_h + SLRR))] = ( max_cell_h + SLRR + (np.random.rand() * max_rand) ) riv_buffer = np.zeros_like(test_elev) riv_buffer[riv_i, riv_j] = 1 riv_buffer[riv_i[1:] - 1, riv_j[1:]] = 1 for i in range(1, test_elev.shape[0] - 1): for j in range(test_elev.shape[1]): if ( not ocean_and_shore[i, j] and not ocean_and_shore[i - 1, j] and not ocean_and_shore[i + 1, j] and not riv_buffer[i, j] ): if test_elev[i + 1, j] >= test_elev[i, j]: test_elev[i, j] = test_elev[i + 1, j] + (np.random.rand() * slope) test_elev[riv_i, riv_j] = riv_prof z = test_elev + sea_level return z
[docs]def fill_abandoned_channel( breach_loc, n, new, riv_i, riv_j, current_SL, ch_depth, slope, dx ): new_profile = n[new[0], new[1]] max_shorecell_h = current_SL + (slope * dx) riv_cells = np.zeros_like(n) riv_cells[riv_i, riv_j] = 1 riv_cells[new[0], new[1]] = 1 n[riv_i[-1], riv_j[-1]] += ch_depth if len(riv_i) - breach_loc > 1: for k in range(breach_loc, len(riv_i) - 1): if (riv_j[k] == n.shape[1] - 1) or (riv_j[k] == 0): n[riv_i[k], riv_j[k]] = max_shorecell_h + (np.random.rand() * slope) elif ( riv_cells[riv_i[k], riv_j[k] + 1] and riv_cells[riv_i[k], riv_j[k] - 1] ): n[riv_i[k], riv_j[k]] = max_shorecell_h + (np.random.rand() * slope) elif riv_cells[riv_i[k], riv_j[k] + 1]: if n[riv_i[k], riv_j[k] - 1] <= max_shorecell_h: n[riv_i[k], riv_j[k]] = max_shorecell_h + (np.random.rand() * slope) else: n[riv_i[k], riv_j[k]] = n[riv_i[k], riv_j[k] - 1] + ( np.random.rand() * slope ) elif riv_cells[riv_i[k], riv_j[k] - 1]: if n[riv_i[k], riv_j[k] + 1] <= max_shorecell_h: n[riv_i[k], riv_j[k]] = max_shorecell_h + (np.random.rand() * slope) else: n[riv_i[k], riv_j[k]] = n[riv_i[k], riv_j[k] + 1] + ( np.random.rand() * slope ) elif (n[riv_i[k], riv_j[k] + 1] <= max_shorecell_h) or ( n[riv_i[k], riv_j[k] - 1] <= max_shorecell_h ): n[riv_i[k], riv_j[k]] = max_shorecell_h + (np.random.rand() * slope) else: n[riv_i[k], riv_j[k]] = ( (n[riv_i[k], riv_j[k] + 1] + n[riv_i[k], riv_j[k] - 1]) / 2 ) + (np.random.rand() * slope) n[new[0], new[1]] = new_profile