diff --git a/examples/cholesky/blocked_cholesky_automatic.py b/examples/cholesky/blocked_cholesky_automatic.py index 0c705e3..87fcd5f 100644 --- a/examples/cholesky/blocked_cholesky_automatic.py +++ b/examples/cholesky/blocked_cholesky_automatic.py @@ -208,7 +208,7 @@ def cholesky_blocked_inplace(a, block_size): @spawn(gemm1[j, k], [solve[j, k], gemm1[j, 0:k]], input=[a[j][k]], inout=[a[j][j]], placement=loc_syrk, memory=mem) def t1(): - #print(f"+SYRK: ({j}, {k}) - Requires rw({j},{j}) r({j}, {k})", flush=True) + print(f"+SYRK: ({j}, {k}) - Requires rw({j},{j}) r({j}, {k})", flush=True) out = a[j][j].array rhs = a[j][k].array out = update(rhs, rhs, out) @@ -217,7 +217,7 @@ def t1(): stream.synchronize() a[j][j].update(out) stream.synchronize() - #print(f"-SYRK: ({j}, {k}) - Requires rw({j},{j}) r({j}, {k})", flush=True) + print(f"-SYRK: ({j}, {k}) - Requires rw({j},{j}) r({j}, {k})", flush=True) # Cholesky on block mem = 8*block_size**2 @@ -229,7 +229,7 @@ def t1(): @spawn(subcholesky[j], [gemm1[j, 0:j]], inout=[a[j][j]], placement=loc_potrf, memory=mem) def t2(): - #print(f"+POTRF: ({j}) - Requires rw({j},{j})", flush=True) + print(f"+POTRF: ({j}) - Requires rw({j},{j})", flush=True) dblock = a[j][j].array log_memory() @@ -239,7 +239,7 @@ def t2(): stream.synchronize() a[j][j].update(dblock) stream.synchronize() - #print(f"-POTRF: ({j}) - Requires rw({j},{j})", flush=True) + print(f"-POTRF: ({j}) - Requires rw({j},{j})", flush=True) for i in range(j+1, len(a)): for k in range(j): # Inter-block GEMM @@ -251,7 +251,7 @@ def t2(): @spawn(gemm2[i, j, k], [solve[j, k], solve[i, k], gemm2[i, j, 0:k]], inout=[a[i][j]], input=[a[i][k], a[j][k]], placement=loc_gemm, memory=mem) def t3(): - #print(f"+GEMM: ({i}, {j}, {k}) - Requires rw({i},{j}), r({i}, {k}), r({j}, {k})", flush=True) + print(f"+GEMM: ({i}, {j}, {k}) - Requires rw({i},{j}), r({i}, {k}), r({j}, {k})", flush=True) out = a[i][j].array rhs1 = a[i][k].array rhs2 = a[j][k].array @@ -263,7 +263,7 @@ def t3(): stream.synchronize() a[i][j].update(out) stream.synchronize() - #print(f"-GEMM: ({i}, {j}, {k}) - Requires rw({i},{j}), r({i}, {k}), r({j}, {k})", flush=True) + print(f"-GEMM: ({i}, {j}, {k}) - Requires rw({i},{j}), r({i}, {k}), r({j}, {k})", flush=True) # Triangular solve mem = 8*2*block_size**2 @@ -275,7 +275,7 @@ def t3(): @spawn(solve[i, j], [gemm2[i, j, 0:j], subcholesky[j]], inout=[a[i][j]], input=[a[j][j]], placement=loc_trsm, memory=mem) def t4(): - #print(f"+TRSM: ({i}, {j}) - Requires rw({i},{j}), r({j}, {j})", flush=True) + print(f"+TRSM: ({i}, {j}) - Requires rw({i},{j}), r({j}, {j})", flush=True) factor = a[j][j].array panel = a[i][j].array @@ -285,7 +285,7 @@ def t4(): stream.synchronize() a[i][j].update(out) stream.synchronize() - #print(f"-TRSM: ({i}, {j}) - Requires rw({i},{j}), r({j}, {j})", flush=True) + print(f"-TRSM: ({i}, {j}) - Requires rw({i},{j}), r({j}, {j})", flush=True) return subcholesky[len(a) - 1] diff --git a/parla/parray/coherence.py b/parla/parray/coherence.py index 9fdd07b..6547270 100644 --- a/parla/parray/coherence.py +++ b/parla/parray/coherence.py @@ -535,6 +535,7 @@ def evict(self, device_id: int, keep_one_copy: bool = True) -> List[MemoryOperat """ device_local_state = self._local_states[device_id] operations = [] + evict_last_copy = False if device_local_state == self.INVALID: # already evicted, do nothing operations.append(MemoryOperation.noop()) @@ -547,40 +548,35 @@ def evict(self, device_id: int, keep_one_copy: bool = True) -> List[MemoryOperat new_owner = device break - # this device owns the last copy - if new_owner is None: - if keep_one_copy: - if device_id == CPU_INDEX: - # the last copy is already at CPU, - # do nothing and skip the rest of the code - return [MemoryOperation.noop()] - else: - # write back the last copy to CPU - operations.append(MemoryOperation.load(CPU_INDEX, device_id)) - - # now CPU has exclusive access to the data - self._global_state = self.MODIFIED - self._local_states[CPU_INDEX] = self.MODIFIED - - new_owner = CPU_INDEX - else: - self._global_state = self.INVALID # the system lose the last copy - self.owner = new_owner + if new_owner is None: + evict_last_copy = True + else: + # update states + self._local_states[device_id] = self.INVALID + operations.append(MemoryOperation.evict(device_id)) + self._versions[device_id] = -1 + self._is_complete[device_id] = None - # update states - self._local_states[device_id] = self.INVALID - operations.append(MemoryOperation.evict(device_id)) else: # Modified, this device owns the last copy + evict_last_copy = True + + if evict_last_copy: if keep_one_copy: # write back to CPU - self.owner = CPU_INDEX - self._local_states[CPU_INDEX] = self.MODIFIED + if device_id != CPU_INDEX: + operations.extend(self._write_back_to(CPU_INDEX, self.MODIFIED, on_different_device=True, this_device_id=device_id)[0]) - operations.append(MemoryOperation.load(CPU_INDEX, device_id)) + self.owner = CPU_INDEX + self._local_states[CPU_INDEX] = self.MODIFIED + self._is_complete[device_id] = True + else: + return [MemoryOperation.noop()] else: self._global_state = self.INVALID # the system lose the last copy self.owner = None + self._versions[device_id] = -1 + self._is_complete[device_id] = None - self._local_states[device_id] = self.INVALID - operations.append(MemoryOperation.evict(device_id)) + self._local_states[device_id] = self.INVALID + operations.append(MemoryOperation.evict(device_id)) return operations diff --git a/parla/parray/core.py b/parla/parray/core.py index 0b469a8..742a9e9 100644 --- a/parla/parray/core.py +++ b/parla/parray/core.py @@ -206,8 +206,8 @@ def update(self, array) -> None: # update shape self._array.shape = array.shape - - cupy.cuda.stream.get_current_stream().synchronize() + if num_gpu > 0: + cupy.cuda.stream.get_current_stream().synchronize() # slicing/indexing @@ -282,8 +282,7 @@ def evict(self, device_id: int = None, keep_one_copy: bool = True) -> None: with self._coherence_cv[device_id]: operations = self._coherence.evict(device_id, keep_one_copy) - for op in operations: - self._process_operation(op) + self._process_operations(operations) # Coherence update operations: @@ -362,7 +361,8 @@ def _process_operations(self, operations: List[MemoryOperation], slices: SlicesT self._array.copy_data_between_device(op.dst, op.src, dst_is_current_device) # sync stream before set it as ready, so asyc call is ensured to be done - cupy.cuda.stream.get_current_stream().synchronize() + if num_gpu > 0: + cupy.cuda.stream.get_current_stream().synchronize() # data is ready now if MemoryOperation.LOAD_SUBARRAY in op.flag: diff --git a/parla/parray/memory.py b/parla/parray/memory.py index 5ead20d..db97a6b 100644 --- a/parla/parray/memory.py +++ b/parla/parray/memory.py @@ -3,6 +3,9 @@ import numpy +import sys +#import gc + #TODO: Fix this to be more stable and less of a hack. try: import cupy @@ -514,5 +517,19 @@ def clear(self, device_id) -> None: """ Clear data in device_id """ + import psutil + import os + mempool = cupy.get_default_memory_pool() + pinned_mempool = cupy.get_default_pinned_memory_pool() +#proc = psutil.Process(os.getpid()) +#mem0 = proc.memory_info().rss + #print("Before:\n\t Used bytes:", mempool.used_bytes(), " total bytes: ", mempool.total_bytes(), " free blocks:", pinned_mempool.n_free_blocks(), flush=True) +#del self._buffer[device_id] + import psutil self._indices_map[device_id] = None self._buffer[device_id] = None +#mem1 = proc.memory_info().rss +#gc.collect() + mempool.free_all_blocks() + #print("After:\n\t Used bytes:", mempool.used_bytes(), " total bytes: ", mempool.total_bytes(), " free blocks:", pinned_mempool.n_free_blocks(), flush=True) +#print("\t Before deallcation: ", mem0, " after: ", mem1, flush=True) diff --git a/parla/task_runtime.py b/parla/task_runtime.py index 2c902ef..f65ba11 100644 --- a/parla/task_runtime.py +++ b/parla/task_runtime.py @@ -17,6 +17,8 @@ from parla.cpu_impl import cpu from parla.dataflow import Dataflow +from parla.tracking import LRUManager + # Logger configuration (uncomment and adjust level if needed) #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @@ -353,6 +355,11 @@ def _finish(self, ctx: 'SchedulerContext'): """Cleanup works after executing the task.""" raise NotImplementedError() + @abstractmethod + def _invoke_garbage_collector(self): + """Invoke a garbage collector; for now, invoke it for + each task execution.""" + raise NotImplementedError() def run(self): assert self._assigned, "Task was not assigned before running." @@ -362,6 +369,7 @@ def run(self): with self._mutex: # A default state to avoid exceptions during catch task_state = TaskException(RuntimeError("Unknown fatal error")) + event_exists = False # Run the task and assign the new task state try: assert isinstance(self._state, TaskRunning), " Task is not running state: {} on task: {}".format(self._state, self.taskid) @@ -383,10 +391,12 @@ def run(self): # Events could be multiple for multiple devices task. env.record_events() if len(events) > 0: + event_exists = True # If any event created by the current task exist, # notify dependents and make them wait for that event, # not Parla task completion. if not isinstance(task_state, TaskRunning): + self._invoke_garbage_collector() self._notify_dependents(events) env.sync_events() task_state = task_state or TaskCompleted(None) @@ -404,10 +414,12 @@ def run(self): # new dependents could be added after the above # notifications, while other devices are running # their kernels asynchronously. - if not isinstance(task_state, TaskRunning): + if event_exists == False and not isinstance(task_state, TaskRunning): + self._invoke_garbage_collector() self._notify_dependents() self._set_state(task_state) - self._finish(ctx) + if isinstance(self._state, TaskCompleted): + self._finish(ctx) except Exception as e: logger.exception("Task %r: Exception in task handling", self) raise e @@ -611,8 +623,30 @@ def _handle_dependency_spawn(self, dependency: "Task"): if self.num_unspawned_dependencies == 0: self._ready_to_map() + def acquire_parray(self): + ctx = get_scheduler_context() + if self.dataflow is not None: + for parray in (self.dataflow.input + \ + self.dataflow.inout + \ + self.dataflow.output): + for d in self.req.devices: + ctx.scheduler.lrum._acquire_data(parray, d, str(self.taskid)) + + def _invoke_garbage_collector(self): + print(f"Garbage collector is activated", flush=True) + ctx = get_scheduler_context() + if self.dataflow is not None: + for parray in (self.dataflow.input + \ + self.dataflow.inout + \ + self.dataflow.output): + for d in self.req.devices: + ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) + ctx.scheduler.lrum._evict() + def _execute_task(self): - return self._state.func(self, *self._state.args) + self.acquire_parray() + result = self._state.func(self, *self._state.args) + return result def cleanup(self): self._func = None @@ -630,14 +664,26 @@ def _finish(self, ctx): ctx.scheduler.update_mapped_task_count_mutex(self, d, -1) ctx.scheduler.update_launched_task_count_mutex(self, d, -1) - # _finish() can be called more than once on global task. if (self.dataflow != None): + """ + self.release_parray() + for parray in self.dataflow.input: + for d in self.req.devices: + ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) + """ # Update OUT parrays which may have changed size from 0 to something # We assume all IN and INOUT params don't change size - for parray in (self.dataflow.output): + for parray in (self.dataflow.output + self.dataflow.inout): + """ + for d in self.req.devices: + ctx.scheduler.lrum._release_data(parray, d, str(self.taskid)) + """ ctx.scheduler._available_resources.update_parray_nbytes( parray, self.req.devices) + """ + ctx.scheduler.lrum._evict() + """ ctx.scheduler.decr_active_compute_tasks() self.cleanup() @@ -669,13 +715,19 @@ def _execute_task(self): if (self._operand_type == OperandType.IN): write_flag = False # Move data to current device - dev_type = get_current_devices()[0] + self.dev_type = get_current_devices()[0] dev_no = -1 - if (dev_type.architecture is not cpu): - dev_no = dev_type.index + if (self.dev_type.architecture is not cpu): + dev_no = self.dev_type.index + ctx = get_scheduler_context() + ctx.scheduler.lrum._start_prefetch_data(self._target_data, self.dev_type, str(self.taskid)) self._target_data._auto_move(device_id=dev_no, do_write=write_flag) + ctx.scheduler.lrum._stop_prefetch_data(self._target_data, self.dev_type, str(self.taskid)) return TaskCompleted(None) + def _invoke_garbage_collector(self): + pass + def cleanup(self): self._target_data = None @@ -688,7 +740,7 @@ def _finish(self, ctx): # Don't update parray tracking information either # The scheduler already registered the new location # If size changes, the ComputeTask will take care of that - + ctx = get_scheduler_context() # Decrease the number of running tasks on the device d. for d in self.req.devices: ctx.scheduler.update_mapped_task_count_mutex(self, d, -1) @@ -1479,6 +1531,8 @@ def __init__(self, environments: Collection[TaskEnvironment], n_threads: Optiona self._device_launched_datamove_task_counts = { dev: 0 for dev in self._available_resources.get_resources()} + self._lrum = LRUManager() + # Dictionary mapping data block to task lists. self._datablock_dict = defaultdict(list) @@ -1506,6 +1560,10 @@ def components(self) -> List[EnvironmentComponentInstance]: def scheduler(self): return self + @ property + def lrum(self): + return self._lrum + def __enter__(self): if self._active_task_count != 1: raise InvalidSchedulerAccessException( diff --git a/parla/tracking.py b/parla/tracking.py new file mode 100644 index 0000000..fbeb17e --- /dev/null +++ b/parla/tracking.py @@ -0,0 +1,791 @@ +""" +Contains data structures for logging and tracking PArray instances across devices. + +DataTracker: +- Hash-map of PArrays to devices +- Hash-map of Devices to EvictionManagers + +EvictionManager: + - Implements an eviction policy for a managed list of PArray objects on a single device +""" + +#TODO(hc): how to guarantee that the evicted data can be refetched correctly when a later task needs it. +# synchronization between data movement tasks and data eviction; for example, if a data is evicted +# after a data movement task is executed, then it breaks correctness. +# also, it could cause thrashing. +# condition 1. if a mapped task in a queue will use an evicted data, its data movement task should exist. +# condition 2. + +#TODO(hc): so, eviction should happen at the proper timing. +# let's assume that we evicted a proper data at a proper timing. then what do we need? +# 1) parray coherency protocol should be aware of that. +# 2) update priority of the data object; or we can just remove that data from the list. + +#TODO(hc): how to handle slicing? + + +import threading + + +#TODO(wlr): Nothing in this file is threadsafe at the moment. Developing structure first, then we'll add locks. + +#TODO(wlr): I assume PArrays hash to a unique value during their lifetime. If not, we'll need to add such a hash function to PArray. + +#TODO(wlr): This is developed without considering sliced PArrays. +# For slices, I imagine we might need something with the following rules: +# - An access of a slice, locks the parent on that device +# - An eviction of a slice, may not evict the parent +# - An eviction of a parent, evicts all slices + +# I'm less certain about the following: +# - Updating the priority of a slice, updates the priority of the parent +# - Updating the priority of a parent, updates the priority of all slices + + +#Needs: +# - wrap in locks (use external locks on base class) + +from enum import Enum +from typing import TypedDict, Dict +from parla.cpu_impl import cpu + +print_log=False + + +# TODO(hc): It should be declared on PArray. +class DataNode: + """ + A node containing a data object (PArray) on a specific device. + Used in the linked lists in the EvictionManager + """ + def __init__(self, data, device, priority=0): + self._data = data + self._device = device + self._priority = priority + + self.next = None + self.prev = None + + @property + def data(self): + return self._data + + @property + def device(self): + return self._device + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return f"DataNode({self.data}, {self.device})" + + +class ListNode: + """ + A node containing a linked list of DataNodes with an associated value (typically priority). + Useful for more complex eviction data structures. + """ + + def __init__(self, value, list): + self.value = value + self.list = list + + self.next = None + self.prev = None + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return f"ListNode({self.list})" + + +# TODO(hc): This list should be protected by a lock. +class DLList: + """ + A doubly linked list used in the EvictionManager. + """ + def __init__(self): + self.head = None + self.tail = None + self.next = None + self.prev = None + self.length = 0 + self._list_lock = threading.Condition(threading.Lock()) + + def __str__(self): + return self.__repr__() + + def append(self, node): + with self._list_lock: + if self.length == 0: + self.head = node + self.tail = node + node = self.tail = self.head + else: + self.tail.next = node + node.prev = self.tail + node.next = None + self.tail = node + self.length += 1 + + def remove_mutex(self, node): + with self._list_lock: + self.remove(node) + + def remove(self, node): + edit = False + if self.length == 1 and (node == self.head or node == self.tail): + edit = True + self.head = None + self.tail = None + node.prev = None + node.next = None + self.length = 0 + return edit + elif node.prev is None and node.next is None: + return edit + + if self.head == node: + self.head = node.next + edit = True + elif self.tail == node: + self.tail = node.prev + edit = True + else: + node.prev.next = node.next + node.next.prev = node.prev + edit = True + + node.prev = None + node.next = None + if print_log: + print(f"After remove {id(node)} next: {id(node.next)} and prev: {id(node.prev)}: new head: {id(self.head)} and tail: {id(self.tail)}", flush=True) + + if edit: + self.length -= 1 + return edit + + def remove_head(self): + with self._list_lock: + if self.head is None: + return None + old_head = self.head + self.remove(old_head) + return old_head + + def insert_before(self, node, new_node): + with self._list_lock: + if node.prev is not None: + node.prev.next = new_node + new_node.prev = node.prev + else: + self.head = new_node + node.prev = new_node + new_node.next = node + + self.length += 1 + + def insert_after(self, node, new_node): + with self._list_lock: + if node.next is not None: + node.next.prev = new_node + new_node.next = node.next + else: + self.tail = new_node + node.next = new_node + new_node.prev = node + + self.length += 1 + + def is_head_none(self): + with self._list_lock: + if self.head is None: + return True + return False + + def __len__(self): + return self.length + + def __repr__(self): + with self._list_lock: + repr_str = ":\n" + tmp_node = self.head + while (tmp_node is not None): + repr_str += str(id(tmp_node)) + " -> " + print("Head:", id(self.head), " tail:", id(self.tail), " and current node:", id(tmp_node), flush=True) + if id(tmp_node) == id(self.tail): + break + tmp_node = tmp_node.next + repr_str += "\n" + return repr_str + + +class GCDataState(Enum): + """ + Enum of data states. + """ + PREFETCHING = "Prefetching" # Data is being prefetched. + RESERVED = "Reserved" # Data's ref. count is >= 1, but not acquired. + ACQUIRED = "Acquired" # A task is using data. + FREE = "Free" # None of tasks (mapped/running) does not need data. + + +class DataMapType(TypedDict): + """ + Track information of data instance in a device + """ + # TODO(hc): state should be an enum type. + state: GCDataState + ref_count: int + ref_list_node: DataNode + + +#class LRUManager(EvictionManager): +class LRUManager: + """ + LRU policy for garbage collecting. + It mantains a list of the zero-referenced data objects for each device. + The head of the list is the target task to be evicted + and the tail of the list is the data used most recently. + """ + + def __init__(self, memory_limit = 999999): +#super().__init__(device, memory_limit) + # A list containig zero-reference data objects in a specified device. + self.zr_data_list = DLList() + # A dictionary containing all data information on a device. + self.data_dict: Dict[str, DataMapType] = {} + # A lock for guarding a data state. + self._data_state_lock = threading.Condition(threading.Lock()) + # A lock for guarding a reference count. + self._data_ref_count_lock = threading.Condition(threading.Lock()) + + #Note(wlr): These tracking dictionaries are optional, I just think it's interesting to track. + #Holds data objects on this device that are being prefetched. + #XXX(hc): This might be necessary as data being prefetched cannot be used yet but it can avoid + # unnecessary future data prefetching or move. + self.prefetch_map = {} + #Holds data objects on this device that are needed by tasks that have not yet completed (this includes data in the process of being prefetched). + self.active_map = {} + #holds data objects that are currently being used by tasks. + self.used_map = {} + + def _increase_ref_count(self, data_info): + with self._data_ref_count_lock: + assert(data_info["ref_count"] >= 0) + data_info["ref_count"] += 1 + + def _decrease_ref_count(self, data_info): + with self._data_ref_count_lock: + data_info["ref_count"] -= 1 + assert(data_info["ref_count"] >= 0) + + def _check_ref_count_zero(self, data_info): + with self._data_ref_count_lock: + return data_info["ref_count"] == 0 + + def _update_data_state(self, data_id, new_state, taskid): + with self._data_state_lock: + # prefetching, reserved, using, free + data_info = self.data_dict[data_id] + data_state = data_info["state"] + if print_log: + print(f"[GC] (Task: {taskid}) Data (ID: {data_id})'s state is updated from "+ + f"{data_state} to {new_state}", flush=True) + if data_state == new_state: + return + if new_state == GCDataState.PREFETCHING: + if data_state == GCDataState.FREE: + data_info["state"] = new_state + return + elif new_state == GCDataState.RESERVED: + if data_state == GCDataState.PREFETCHING or \ + data_state == GCDataState.FREE: + data_info["state"] = new_state + return + elif new_state == GCDataState.ACQUIRED: + assert(data_state == GCDataState.RESERVED or data_state == GCDataState.ACQUIRED) + data_info["state"] = new_state + return + elif new_state == GCDataState.FREE: + assert(data_state == GCDataState.ACQUIRED) + with self._data_ref_count_lock: + if data_info["ref_count"] == 0: + data_info["state"] = new_state + return + + def _dict_id(self, data, dev): + """ Genereate an ID of a data on a data information dictionary. """ + dev_index = "G" + str(dev.index) if (dev.architecture is not cpu) else "C" + return str(data.ID) + "." + dev_index + + def _get_devid(self, dev): + return -1 if dev.architecture is cpu else dev.index + + def _start_prefetch_data(self, data, dev, taskid = ""): + data_id = self._dict_id(data, dev) + if data_id in self.data_dict: + #This is a prefetch of a data object that is already on the device (or is being prefetched). + #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. + #Remove it from the evictable list. + + # TODO(hc): but if a data movement task will be executed after a very long time, that also can be evictable. + # if memory is full and any task cannot proceed, we can still evict one of data that was prefetched. + # but this is very rare case and I am gonna leave it as the future work. + + # TODO(hc): PArray should point to a corresponding data node. + data_info = self.data_dict[data_id] + if print_log: + print(f"[GC] (Task: {taskid}) Before Zero-referenced list after prefetching data: "+ + f" node id: {id(data_info['ref_list_node'])}", + f"\n{self.zr_data_list}", flush=True) + + self._increase_ref_count(data_info) + success = self.zr_data_list.remove_mutex(data_info["ref_list_node"]) + self._update_data_state(data_id, GCDataState.PREFETCHING, taskid) + + if success and print_log: + print(f"[GC] (Task: {taskid}) Data (ID: {data_id}) is removed "+ + f"from zero-referenced list during prefetching "+ + f"(Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is ready to be updated "+ + f"through prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + + #if success: + #This is the first prefetch of a data object that is already on the device. + #Update the evictable memory size (as this data object is no longer evictable). + #self.evictable_memory -= data.size + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated "+ + f"through prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + else: + self.data_dict[data_id] = { "state" : GCDataState.PREFETCHING, \ + "ref_count" : 1, \ + "ref_list_node" : DataNode(data, dev) } + if print_log: + print(f"[GC] (Task: {taskid}) New data (ID: {data_id}) is added through "+ + f"prefetching (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if print_log: + print(f"[GC] (Task: {taskid}) Zero-referenced list after prefetching data: "+ + f"\n{self.zr_data_list}", flush=True) + #This is a new block, update the used memory size. + #self.used_memory += data.size + #self.prefetch_map[data] = data + #self.active_map[data] = data + #assert(self.used_memory <= self.memory_limit) + + def _stop_prefetch_data(self, data, dev, taskid=""): + data_id = self._dict_id(data, dev) + assert(data_id in self.data_dict) + self._update_data_state(data_id, GCDataState.RESERVED, taskid) + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ + f"prefetching completes (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if data._array.get(self._get_devid(dev)) is None: + print(f"[GC] prefetching-stop (Task: {taskid}) ID: {data_id} is None", flush=True) + + def _acquire_data(self, data, dev, taskid = ""): + #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. + # Any logic here would be a sanity check. I'm removing it for now. + #node = self.data_dict.get(data, None) + #if node is not None: + # #Remove the node from the eviction list while it's in use + # success = self.data_list.remove(node) + #self.used_map[data] = data + #data.add_use(self.device) + data_id = self._dict_id(data, dev) + assert(data_id in self.data_dict) + self._update_data_state(data_id, GCDataState.ACQUIRED, taskid) + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as "+ + f"a compute task acquires"+ + f" (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + if data._array.get(self._get_devid(dev)) is None: + print(f"[GC] acquire (Task: {taskid}) ID: {data_id} is None", flush=True) + + def _release_data(self, data, dev, taskid = ""): + data_id = self._dict_id(data, dev) + assert(data_id in self.data_dict) + data_info = self.data_dict[data_id] + self._decrease_ref_count(data_info) + + #active_count = data.get_active(self.device) + #use_count = data.get_use(self.device) + + #data.remove_active(self.device) + #data.remove_use(self.device) + + self._update_data_state(data_id, GCDataState.FREE, taskid) + with self._data_ref_count_lock: + if data_info["ref_count"] == 0: + #del self.active_map[data] + #If the data object is no longer needed by any already prefetched tasks, it can be evicted. + node = data_info["ref_list_node"] + self.zr_data_list.remove(node) + self.zr_data_list.append(node) + #self.evictable_memory += data.nbytes + """ + #if use_count == 1: + #del self.used_map[data] + if print_log: + print(f"[GC] (Task: {taskid}) Existing data (ID: {data_id}) is updated as a compute "+ + f"task releases (Ref. count: {self.data_dict[data_id]['ref_count']}, "+ + f"Ref. node ID: {id(self.data_dict[data_id]['ref_list_node'])})", flush=True) + print(f"[GC] (Task: {taskid}) Zero-referenced list after releasing data: "+ + f"\n{self.zr_data_list}", flush=True) + if data._array.get(self._get_devid(dev)) is None: + print(f"[GC] release (Task: {taskid}) ID: {data_id} is None", flush=True) + """ + + def _evict_data(self, target_data, target_dev): + data_id = self._dict_id(target_data, target_dev) + data_info = self.data_dict[data_id] + dev_id = -1 if target_dev.architecture is cpu else target_dev.index + if print_log: + print(f"[GC] Zero-referenced data (ID: {data_id}) is evicted: " + + f"reference count = {data_info['ref_count']}", flush=True) + # TODO(hc): It is possible that this data has a reference count being bigger + # than 1 in parallel environment. + #assert(self._check_ref_count_zero(data_info)) + #Call internal data object evict method + #This should: + # - Backup the data if its not in a SHARED state + # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) + # - Mark the data for deletion (this may be done by the CuPy/Python GC) + target_data.evict(dev_id) +#del data_info + #self.used_memory -= data.nbytes + #self.evictable_memory -= data.nbytes + + def _evict(self): + if print_log: + print(f"[GC] Zero-referenced list before evict starts: "+ + f"\n{self.zr_data_list}", flush=True) + + # Get the oldest data object + # Because we append after use this is at the front of the list + while not self.zr_data_list.is_head_none(): + node = self.zr_data_list.remove_head() + if node is not None: + n_data = node.data + n_dev = node.device + self._evict_data(n_data, n_dev) + + +''' +class LFUManager(EvictionManager): + """ + Eviction Manager for a LFU (Least Frequently Used) policy. + Use is updated when a data object is accessed and released by a task. + + The data structure follows the O(1) implementation described by (Ketan Shah/Anirban Mitra/Dhruv Matani, 2010): http://dhruvbird.com/lfu.pdf + """ + + def __init__(self, device, memory_limit): + super().__init__(device, memory_limit) + self.priority_map = {} + self.data_map = {} + + self.priority_list = DLList() + + #NOTE(wlr): These tracking dictionaries are optional, I just think it's interesting to track. + #Holds data objects on this device that are being prefetched. + self.prefetch_map = {} + #Holds data objects on this device that are needed by tasks that have not yet completed (this includes data in the process of being prefetched). + self.active_map = {} + #holds data objects that are currently being used by tasks. + self.used_map = {} + + def _add(self, node): + + #Increment usage count + node.priority += 1 + + #Lookup ListNode in priority table + list_node = self.priority_map.get(node.priority, None) + + if list_node is None: + #Create new list node + list_node = ListNode(node.priority, DLList()) + self.priority_list[node.priority] = list_node + + if node.priority == 1: + #Add to the head of the list + self.priority_list.append(list_node) + else: + #Add as the next node after the previous priority + self.priority_list.insert_after(list_node, self.priority_list[node.priority - 1]) + + #Add data node to internal list + list_node.list.append(node) + + def _remove(self, node, delete=False): + + #Lookup ListNode in priority table + list_node = self.priority_map.get(node.priority, None) + + assert(list_node is not None) + + success = list_node.list.remove(node) + + if len(list_node.list) == 0: + #Remove the list node from the priority list + + self.priority_list.remove(list_node) + del self.priority_map[list_node.priority] + + if delete: + del self.data_map[node] + + return success + + + def _get_evict_target(self): + + #Get least frequently used node + + #Get the first list node in the priority list + + list_node = self.priority_list.head + data_node = list_node.list.head + + while list_node is not None: + + #Check all data nodes with the same priority + while data_node is not None: + data_node = data_node.next + + if data_node.data.get_active(self.device) == 0: + break + + #Continue search in next priority list + if data_node.data.get_active(self.device) == 0: + break + + list_node = list_node.next + + return data_node + + def _start_prefetch_data(self, data): + + data.add_prefetch(self.device) + data.add_active(self.device) + + if data in self.data_map: + #This is a prefetch of a data object that is already on the device (or is being prefetched). + #This means the data is no longer evictable as its about to be in-use by data movement and compute tasks. + + if data.get_active(self.device) == 1: + #This is the first prefetch of a data object that is already on the device. + #Update the evictable memory size (as this data object is no longer evictable). + self.evictable_memory -= data.size + else: + #This is a new block, update the used memory size. + self.used_memory += data.size + + self.prefetch_map[data] = data + self.active_map[data] = data + + assert(self.used_memory <= self.memory_limit) + + def _stop_prefetch_data(self, data): + + count = data.get_prefetch(self.device) + data.remove_prefetch(self.device) + + if count == 1: + del self.prefetch_map[data] + + def _access_data(self, data): + + #NOTE(wlr): The data should already be removed from the evictable list in the prefetching stage. + # Any logic here would be a sanity check. I'm removing it for now. + + #node = self.data_map.get(data, None) + #if node is not None: + # #Remove the node from the eviction list while it's in use + # success = self.data_list.remove(node) + + self.used_map[data] = data + data.add_use(self.device) + + + def _release_data(self, data): + node = self.data_map[data] + + active_count = data.get_active(self.device) + use_count = data.get_use(self.device) + + data.remove_active(self.device) + data.remove_use(self.device) + + self._add(node) + + if active_count == 1: + del self.active_map[data] + self.evictable_memory += data.nbytes + + if use_count == 1: + del self.used_map[data] + + def _evict_data(self, data): + node = self.data_map[data] + + assert(data.get_use(self.device) == 0) + assert(data.get_active(self.device) == 0) + + #Call internal data object evict method + #This should: + # - Backup the data if its not in a SHARED state + # (SHARED state means the data has a valid copy on multiple devices. Eviction should never destroy the only remaining copy) + # - Mark the data for deletion (this may be done by the CuPy/Python GC) + data.evict(self.device) + + self._remove(node, delete=True) + + self.used_memory -= data.nbytes + self.evictable_memory -= data.nbytes + + + def _evict(self): + # Get the oldest data object and remove it + node = self._get_evict_target() + self._evict_data(node) + +class EvictionManager: + """ + Track usage of data objects on devices. Used to chose which blocks to evict. + """ + + def __init__(self, device, memory_limit): + self.device = device + + #values in bytes + self.memory_limit = memory_limit + self.used_memory = 0 + self.evictable_memory = 0 + + self.lock = threading.Condition(threading.Lock()) + + def map_data(self, data): + """ + Called when a data object is mapped to a device. + """ + with self.lock: + self._map_data(data) + + def _map_data(self, data): + """ + Called when a data object is mapped to a device. + """ + pass + + def _unmap_data(self, data): + pass + + def unmap_data(self, data): + """ + Called when a data object is unmapped from a device. + """ + with self.lock: + self._unmap_data(self, data) + + def _start_prefetch_data(self, data): + pass + + def start_prefetch_data(self, data): + """ + Called when a data object starts a prefetch. + Updates the used memory size. + + Can update the priority of the data object. + """ + with self.lock: + self._start_prefetch_data(data) + + def _stop_prefetch_data(self, data): + """ + Called when a data object is no longer being prefetched. + + Updates the priority of the data object. + """ + # TODO(hc): what does it mean? + pass + + def stop_prefetch_data(self, data): + """ + Called when a data object is no longer being prefetched. + + Updates the priority of the data object. + """ + with self.lock: + self._stop_prefetch_data(data) + + def _access_data(self, data): + pass + + + def access_data(self, data): + """ + Called when a data object is accessed. + + Can update the priority of the data object. + Locks the data object (cannot be evicted while in use) + Updates the evictable memory size. + """ + with self.lock: + self._access_data(data) + + + def _release_data(self, data): + pass + + def release_data(self, data): + """ + Called when a data object is no longer in use. + + Updates the priority of the data object. + Unlocks the data object (can be evicted) + Updates the evictable memory size. + """ + with self.lock: + self._release_data(data) + + def _evict_data(self, data): + pass + + def evict_data(self, data): + """ + Called to evict a specific data object. + Updates the used memory size and evictable memory size. + """ + with self.lock: + self._evict_data(data) + + def _evict(self): + """ + Called when memory is needed. + + Evicts the data object with the highest priority (based on the policy). + """ + pass + + + def evict(self): + """ + Called when memory is needed. + + Evicts the data object with the highest priority (based on the policy). + """ + with self.lock: + self._evict() +''' diff --git a/tests/test_gc.py b/tests/test_gc.py new file mode 100644 index 0000000..64a8b57 --- /dev/null +++ b/tests/test_gc.py @@ -0,0 +1,48 @@ +# Import Parla +from parla import Parla, spawn, parray, TaskSpace +# Import the 'cpu' device type +from parla.cpu import cpu +from parla.cuda import gpu + +import parla.tracking + +import numpy as np + +import time + +def main(): + A = parray.asarray(np.random.rand(10000, 10000)) + B = parray.asarray(np.random.rand(10000, 10000)) + t = TaskSpace("AxB") + + for i in range(0, 4): + # Spawn a task to be scheduled by the Parla runtime + """ + @spawn(t[i], input=[A, B], placement=gpu(i)) + def axb(): + C = A @ B + print("axb is called", flush=True) + time.sleep(10) + """ + @spawn(t[i], input=[A], placement=gpu(i)) + def axb(): + print("axb is called", flush=True) + time.sleep(3) + + @spawn(t[4], dependencies=[t[:4]]) + def apb(): + print("apb", flush=True) + time.sleep(2) + + @spawn(dependencies=[t[4]], input=[A], placement=gpu(1)) + def apb(): + print("apb2", flush=True) + print(A.array) + time.sleep(2) + + + +# Execute the Parla program within the Parla context +if __name__ == "__main__": + with Parla(): + main() diff --git a/tutorial/0_hello_world/hello.py b/tutorial/0_hello_world/hello.py index caeeaf0..f591293 100644 --- a/tutorial/0_hello_world/hello.py +++ b/tutorial/0_hello_world/hello.py @@ -10,20 +10,30 @@ ''' # Import Parla -from parla import Parla, spawn +from parla import Parla, spawn, parray # Import the 'cpu' device type from parla.cpu import cpu +import parla.tracking + +import numpy as np # Define the main function (recommended when using Parla) # Tasks cannot be defined in the global scope def main(): + A = parray.asarray([[1, 2], [3, 4]]) + data = np.random.rand(2, 2) + B = parray.asarray(data) # Spawn a task to be scheduled by the Parla runtime - @spawn() + @spawn(input=[A], output=[B], placement=cpu) def hello_world(): print("Hello, World!", flush=True) + @spawn(input=[A], placement=cpu) + def hello_world2(): + print("Grab task A", flush=True) + # Execute the Parla program within the Parla context if __name__ == "__main__": with Parla():