import os import sys import time from subprocess import CalledProcessError, check_call from typing import List, IO, Optional, Tuple def which(command: str, paths: Optional[str] = None) -> Optional[str]: """which(command, [paths]) - Look up the given command in the paths string (or the PATH environment variable, if unspecified).""" if paths is None: paths = os.environ.get('PATH', '') # Check for absolute match first. if os.path.exists(command): return command # Would be nice if Python had a lib function for this. if not paths: paths = os.defpath # Get suffixes to search. # On Cygwin, 'PATHEXT' may exist but it should not be used. if os.pathsep == ';': pathext = os.environ.get('PATHEXT', '').split(';') else: pathext = [''] # Search the paths... for path in paths.split(os.pathsep): for ext in pathext: p = os.path.join(path, command + ext) if os.path.exists(p): return p return None def has_no_extension(file_name: str) -> bool: root, ext = os.path.splitext(file_name) return ext == "" def is_valid_single_input_file(file_name: str) -> bool: root, ext = os.path.splitext(file_name) return ext in (".i", ".ii", ".c", ".cpp", ".m", "") def time_to_str(time: float) -> str: """ Convert given time in seconds into a human-readable string. """ return f"{time:.2f}s" def memory_to_str(memory: int) -> str: """ Convert given number of bytes into a human-readable string. """ if memory: try: import humanize return humanize.naturalsize(memory, gnu=True) except ImportError: # no formatter installed, let's keep it in bytes return f"{memory}B" # If memory is 0, we didn't succeed measuring it. return "N/A" def check_and_measure_call(*popenargs, **kwargs) -> Tuple[float, int]: """ Run command with arguments. Wait for command to complete and measure execution time and peak memory consumption. If the exit code was zero then return, otherwise raise CalledProcessError. The CalledProcessError object will have the return code in the returncode attribute. The arguments are the same as for the call and check_call functions. Return a tuple of execution time and peak memory. """ peak_mem = 0 start_time = time.time() try: import psutil as ps def get_memory(process: ps.Process) -> int: mem = 0 # we want to gather memory usage from all of the child processes descendants = list(process.children(recursive=True)) descendants.append(process) for subprocess in descendants: try: mem += subprocess.memory_info().rss except (ps.NoSuchProcess, ps.AccessDenied): continue return mem with ps.Popen(*popenargs, **kwargs) as process: # while the process is running calculate resource utilization. while (process.is_running() and process.status() != ps.STATUS_ZOMBIE): # track the peak utilization of the process peak_mem = max(peak_mem, get_memory(process)) time.sleep(.5) if process.is_running(): process.kill() if process.returncode != 0: cmd = kwargs.get("args") if cmd is None: cmd = popenargs[0] raise CalledProcessError(process.returncode, cmd) except ImportError: # back off to subprocess if we don't have psutil installed peak_mem = 0 check_call(*popenargs, **kwargs) return time.time() - start_time, peak_mem def run_script(script_path: str, build_log_file: IO, cwd: str, out=sys.stdout, err=sys.stderr, verbose: int = 0): """ Run the provided script if it exists. """ if os.path.exists(script_path): try: if verbose == 1: out.write(f" Executing: {script_path}\n") check_call(f"chmod +x '{script_path}'", cwd=cwd, stderr=build_log_file, stdout=build_log_file, shell=True) check_call(f"'{script_path}'", cwd=cwd, stderr=build_log_file, stdout=build_log_file, shell=True) except CalledProcessError: err.write(f"Error: Running {script_path} failed. " f"See {build_log_file.name} for details.\n") sys.exit(-1) def is_comment_csv_line(entries: List[str]) -> bool: """ Treat CSV lines starting with a '#' as a comment. """ return len(entries) > 0 and entries[0].startswith("#")