#! /usr/local/bin/python
import numpy as np
from . import downcut
from .avulsion_utils import lowest_cell_elev, sort_lowest_neighbors
[docs]def lowest_neighbor(n, sub):
"""Find lowest neighbor value around a point.
Parameters
----------
n : ndarray
Grid of elevations.
sub : tuple of int
Row/column subscript into elevation matrix.
Returns
-------
tuple of int
Subscripts into elevation matrix of the lowest neighbor.
"""
i, j = sub
if j == n.shape[1] - 1:
di, dj = np.array([0, 1, 1]), np.array([-1, -1, 0])
elif j == 0:
di, dj = np.array([1, 1, 0]), np.array([0, 1, 1])
else:
di, dj = np.array([0, 1, 1, 1, 0]), np.array([-1, -1, 0, 1, 1])
lowest = np.argmin(n[i + di, j + dj])
return i + di[lowest], j + dj[lowest]
[docs]def lowest_neighbor_prograde(n, sub):
i, j = sub
if j == n.shape[1] - 1:
di, dj = np.array([0, 1, 1]), np.array([-1, -1, 0])
elif j == 0:
di, dj = np.array([1, 1, 0]), np.array([0, 1, 1])
else:
di, dj = np.array([0, 1, 1, 1, 0]), np.array([-1, -1, 0, 1, 1])
subaerial_low = min(x for x in (n[i + di, j + dj]) if x > 0)
lowest = np.where((n[i + di, j + dj]) == subaerial_low)[0][0]
return i + di[lowest], j + dj[lowest]
[docs]def below_sea_level(z, sea_level):
"""Check if an elevation is below sea level.
Parameters
----------
z : float
Elevation.
sea_level : float
Elevation of sea level.
Returns
-------
boolean
`True` if at or below sea level. Otherwise, `False`.
"""
return z <= sea_level
[docs]def at_river_mouth(z, sub, z0):
"""Check if a cell is at the river mouth.
Parameters
----------
z : ndarray
2D-array of elevations.
sub : tuple of int
Row and column subscript into *z*.
z0 : float
Elevation of sea level (or `None`).
Returns
-------
boolean
True if the cell at the given subscript is at the river mouth.
"""
try:
return sub[0] == z.shape[0] - 1 or below_sea_level(z[sub], z0)
except IndexError:
return True
[docs]def at_end_of_domain(z, sub):
"""Check if a cell a river mouth at the end of domain.
Parameters
----------
z : ndarray
2D-array of elevations.
sub : tuple of int
Row and column subscript into *z*.
Returns
-------
boolean
True if the cell at the given subscript is at the river mouth.
"""
try:
return sub[0] == z.shape[0] - 1
except IndexError:
return True
[docs]def riv_cell_at_sea_level(z, sub, z0):
"""Check if a river cell is at sea level.
Parameters
----------
z : ndarray
2D-array of elevations.
sub : tuple of int
Row and column subscript into *z*.
z0 : float
Elevation of sea level (or `None`).
Returns
-------
boolean
True if the cell at the given subscript is at the river mouth.
"""
try:
below_sea_level(z[sub], z0)
except IndexError:
return True
[docs]def find_course(z, riv_i, riv_j, SE_loc, channel_depth, sea_level=None):
"""Find the course of a river.
Given a river course as subscripts, (*i*, *j*), into an array of
elevations, (*z*), find the river path until the coast (*sea_level*) or
the end of the domain.
Parameters
----------
z : ndarray
Grid elevations.
riv_i : array_like of int
Row subscripts (into *n*) for river.
riv_j : array_like of int
Column subscripts (into *n*) for river.
sea_level : None, optional
The current sea level.
Returns
-------
tuple of array_like
Row and column indices of the new river path.
Examples
--------
>>> import numpy as np
>>> z = np.array([[4., 3., 4.],
... [2., 3., 3.],
... [2., 1., 2.]])
>>> riv_i, riv_j = np.zeros(9, dtype=int), np.zeros(9, dtype=int)
>>> riv_i[0], riv_j[0] = 0, 1
>>> find_course(z, riv_i[:1], riv_j[:1], 1)
(array([0, 1, 2]), array([1, 0, 1]))
>>> find_course(z, riv_i[:1], riv_j[:1], sea_level=2.)
(array([0, 1]), array([1, 0]))
>>> z = np.array([[4., 3., 4.],
... [2., 3., 3.],
... [2., 1., 2.],
... [2., 1.5, 2]])
>>> find_course(z, riv_i[:1], riv_j[:1], sea_level=0.)
(array([0, 1, 2, 3]), array([1, 0, 1, 1]))
>>> z
"""
# function to find the steepest descent route
# note: this needs to be improved to remove potential bias that may occur
# if two or more cells have the steepest descent elevation
old_course = zip(riv_i, riv_j)
n_levee = np.copy(z)
n_levee[riv_i, riv_j] += channel_depth
riv_i = riv_i[:SE_loc]
riv_j = riv_j[:SE_loc]
assert riv_i.size > 0 and riv_j.size > 0
if sea_level is None:
sea_level = -np.finfo(float).max
for n in range(1, riv_i.size):
if at_end_of_domain(z, (riv_i[n - 1], riv_j[n - 1])):
return riv_i[:n], riv_j[:n]
# for n in range(1, riv_i.size):
# if riv_cell_at_sea_level(z, (riv_i[n - 1], riv_j[n - 1]), sea_level):
# return riv_i[:n-1], riv_j[:n-1]
new_i = np.empty(z.size, dtype=np.int)
new_j = np.empty(z.size, dtype=np.int)
new_i[: len(riv_j)] = riv_i[:]
new_j[: len(riv_i)] = riv_j[:]
pits = True
while pits:
for n in range(riv_i.size, new_i.size):
# if at_river_mouth(z, (new_i[n - 1], new_j[n - 1]), sea_level):
# pits = False
# break
if at_end_of_domain(z, (new_i[n - 1], new_j[n - 1])):
pits = False
break
sorted_n = sort_lowest_neighbors(n_levee, (new_i[n - 1], new_j[n - 1]))
if (sorted_n[0][0], sorted_n[1][0]) not in zip(
new_i[: n - 1], new_j[: n - 1]
):
downstream_ij = (sorted_n[0][0], sorted_n[1][0])
elif (sorted_n[0][1], sorted_n[1][1]) not in zip(
new_i[: n - 1], new_j[: n - 1]
):
downstream_ij = (sorted_n[0][1], sorted_n[1][1])
elif (sorted_n[0][2], sorted_n[1][2]) not in zip(
new_i[: n - 1], new_j[: n - 1]
):
downstream_ij = (sorted_n[0][2], sorted_n[1][2])
else:
raise RuntimeError("river course is going crazy!")
if downstream_ij not in old_course and below_sea_level(
z[downstream_ij], sea_level
):
pits = False
break
if downstream_ij in old_course:
new_i[n], new_j[n] = downstream_ij
n += 1
pits = False
break
if z[downstream_ij] > z[new_i[n - 1], new_j[n - 1]]:
new_i[n], new_j[n] = downstream_ij
z[new_i[n - 1], new_j[n - 1]] += 1e-6
else:
new_i[n], new_j[n] = downstream_ij
# new_i[n], new_j[n] = lowest_neighbor(z, (new_i[n - 1], new_j[n - 1]))
if n == 0:
raise RuntimeError("new river length is zero!")
return new_i[:n], new_j[:n]
[docs]def update_course(z, riv_i, riv_j, ch_depth, slope, sea_level=None, dx=1.0, dy=1.0):
if sea_level is None:
sea_level = -np.finfo(float).max
course_update = 0
last_elev = z[riv_i[-1], riv_j[-1]] + ch_depth - sea_level
max_cell_h = slope * dx
test_elev = np.copy(z)
test_elev -= sea_level
test_elev[riv_i, riv_j] += 2 * ch_depth
low_adj_cell = lowest_cell_elev(test_elev, (riv_i[-1], riv_j[-1]))
# check for coastal avulsion (happens if river is prograding too far alongshore)
if (
(
riv_i[-1] == riv_i[-2] == riv_i[-3] == riv_i[-4] == riv_i[-5]
) # if last 5 river cells flowing alongshore
and (z[riv_i[-2] + 1, riv_j[-2]] > sea_level)
# (z[riv_i[-1+1],riv_j[-1]] > sea_level) and # if land between river and ocean for last 3 cells
and (z[riv_i[-3] + 1, riv_j[-3]] > sea_level)
and (z[riv_i[-4] + 1, riv_j[-4]] > sea_level)
and (z[riv_i[-5] + 1, riv_j[-5]] > sea_level)
and (z[riv_i[-1] + 2, riv_j[-1]] <= sea_level)
and (z[riv_i[-2] + 2, riv_j[-2]] <= sea_level)
and (z[riv_i[-3] + 2, riv_j[-3]] <= sea_level)
and (z[riv_i[-4] + 2, riv_j[-4]] <= sea_level)
and (z[riv_i[-5] + 2, riv_j[-5]] <= sea_level)
):
# fill up old river mouth
z[riv_i[-1], riv_j[-1]] += ch_depth
# turn river towards ocean
riv_i[-1] = riv_i[-2] + 1
riv_j[-1] = riv_j[-2]
if (z[riv_i[-1], riv_j[-1]] - sea_level) < (0.001 * max_cell_h):
z[riv_i[-1], riv_j[-1]] = (0.001 * max_cell_h) + sea_level
# z[riv_i[-1],riv_j[-1]] -= ch_depth
downcut.cut_new(riv_i[-3:], riv_j[-3:], z, sea_level, ch_depth, dx=dx, dy=dy)
course_update = 7 # coastal avulsion
elif last_elev <= 0:
riv_i = riv_i[:-1]
riv_j = riv_j[:-1]
course_update = 4 # shortened course
# if river mouth surrounded by land
elif low_adj_cell > 0:
new_riv_i, new_riv_j = find_course(
z, riv_i, riv_j, len(riv_i), ch_depth, sea_level=sea_level
)
new_riv_length = new_riv_i.size - riv_i.size
if new_riv_length > 0:
riv_i = new_riv_i
riv_j = new_riv_j
if (z[riv_i[-1], riv_j[-1]] - sea_level) < (0.001 * max_cell_h):
z[riv_i[-1], riv_j[-1]] = (0.001 * max_cell_h) + sea_level
downcut.cut_new(
riv_i[-(new_riv_length + 2) :],
riv_j[-(new_riv_length + 2) :],
z,
sea_level,
ch_depth,
dx=dx,
dy=dy,
)
course_update = 6 # lengthened land-locked course
else:
riv_i = riv_i
riv_j = riv_j
# if river mouth needs to prograde
elif last_elev >= max_cell_h:
sorted_n = sort_lowest_neighbors(test_elev, (riv_i[-1], riv_j[-1]))
subaerial_loc = np.where(test_elev[sorted_n] > 0)
if len(subaerial_loc[0]):
subaerial_cells = sorted_n[0][subaerial_loc], sorted_n[1][subaerial_loc]
if (subaerial_cells[0][0], subaerial_cells[1][0]) not in zip(riv_i, riv_j):
riv_i = np.append(riv_i, subaerial_cells[0][0])
riv_j = np.append(riv_j, subaerial_cells[1][0])
if (z[riv_i[-1], riv_j[-1]] - sea_level) < (0.001 * max_cell_h):
z[riv_i[-1], riv_j[-1]] = (0.001 * max_cell_h) + sea_level
# line below not needed if downcutting
# z[riv_i[-1], riv_j[-1]] -= ch_depth
downcut.cut_new(
riv_i[-3:], riv_j[-3:], z, sea_level, ch_depth, dx=dx, dy=dy
)
course_update = 5
else:
riv_i = riv_i
riv_j = riv_j
else:
riv_i = riv_i
riv_j = riv_j
else:
riv_i = riv_i
riv_j = riv_j
return riv_i, riv_j, course_update