from multiprocessing.shared_memory import SharedMemory
from typing import Any, NamedTuple, TypeVar, cast
import numpy as np
[docs]
class ArrayInfo(NamedTuple):
shape: tuple[int, ...]
dtype: Any
ofs: int
A = TypeVar("A")
[docs]
def map_to_memory(
c: dict[A, list[np.ndarray]]
) -> tuple[SharedMemory, dict[A, list[ArrayInfo]]]:
nbytes = 0
for arrs in c.values():
for arr in arrs:
nbytes += arr.nbytes
# Allocate memory even without arrays, to avoid optional logic, with min size 1
mem = SharedMemory(name=None, create=True, size=max(nbytes, 1))
ofs = 0
out = {}
for name, arrs in c.items():
out[name] = []
for arr in arrs:
# Create info object
shape = arr.shape
dtype = arr.dtype
info = ArrayInfo(shape, dtype, ofs)
# Copy Data
buf = np.ndarray(shape, dtype, buffer=mem.buf, offset=ofs)
np.copyto(buf, arr, casting="no")
out[name].append(info)
ofs += buf.nbytes
assert ofs == nbytes, "Shared memory buffer overflown"
return mem, out
[docs]
def load_from_memory(
mem: SharedMemory,
c: dict[A, list[ArrayInfo]],
copy: bool = False,
range: tuple[int, int] | None = None,
) -> dict[A, list[np.ndarray]]:
import mmap
if hasattr(mem, "_mmap"):
getattr(mem, "_mmap").madvise(mmap.MADV_SEQUENTIAL)
out = {}
for name, infos in c.items():
out[name] = []
for info in infos:
shape, dtype, ofs = info
# TODO: fix performance issue with shared_memory and remove .copy()
a = np.ndarray(shape, dtype, buffer=mem.buf, offset=ofs)
if range is not None:
a = a[range[0] : range[1]]
if copy:
a = a.copy()
out[name].append(a)
return out
[docs]
def merge_memory(
mem_data: list[tuple[SharedMemory, dict[A, list[ArrayInfo]], Any]], close: bool = True
) -> tuple[SharedMemory, dict[A, list[np.ndarray]]]:
assert len(mem_data) > 0
# Create instances of all col arrays
cols = [load_from_memory(a, b) for a, b, _ in mem_data]
# Calculate required size for final shared memory
nbytes = 0
for inst in cols:
for col in inst.values():
for height in col:
nbytes += height.nbytes
# Allocate memory even without arrays, to avoid optional logic, with min size 1
out_mem = SharedMemory(name=None, create=True, size=max(nbytes, 1))
out_info = {}
# Start merging into new memory
ofs = 0
template = {k: len(v) for k, v in mem_data[0][1].items()}
for name, max_height in template.items():
out_info[name] = []
for height in range(max_height):
# Create new array using memory and append info
new_rows = sum([len(inst[name][height]) for inst in cols])
shape = cols[0][name][height].shape
new_cols = shape[1] if len(shape) > 1 else 1
new_shape = (new_rows, new_cols)
dtype = cols[0][name][height].dtype
arr = np.ndarray(new_shape, dtype, buffer=out_mem.buf, offset=ofs)
out_info[name].append(ArrayInfo(new_shape, dtype, ofs))
ofs += arr.nbytes
# Fill in array with previous memory
start = 0
for inst in cols:
col = inst[name][height]
end = start + len(col)
arr[start:end] = col if col.shape == 2 else col.reshape(-1, 1)
start = end
if close:
del cols
for mem, _, _ in mem_data:
mem.close()
mem.unlink()
return out_mem, out_info