Spaces:
Runtime error
Runtime error
| import sys | |
| import time | |
| import os | |
| import subprocess # nosec - disable B404:import-subprocess check | |
| import csv | |
| import json | |
| import shutil | |
| import platform | |
| from argparse import ArgumentParser | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, TypedDict | |
| ROOT = Path(__file__).parents[1] | |
| NOTEBOOKS_DIR = Path("notebooks") | |
| class NotebookStatus: | |
| SUCCESS = "SUCCESS" | |
| FAILED = "FAILED" | |
| TIMEOUT = "TIMEOUT" | |
| SKIPPED = "SKIPPED" | |
| NOT_RUN = "NOT_RUN" | |
| EMPTY = "EMPTY" | |
| class NotebookReport(TypedDict): | |
| status: str | |
| path: Path | |
| duration: float = 0 | |
| TestPlan = Dict[Path, NotebookReport] | |
| def parse_arguments(): | |
| parser = ArgumentParser() | |
| parser.add_argument("--ignore_list", required=False, nargs="+") | |
| parser.add_argument("--test_list", required=False, nargs="+") | |
| parser.add_argument("--early_stop", action="store_true") | |
| parser.add_argument("--report_dir", default="report") | |
| parser.add_argument("--keep_artifacts", action="store_true") | |
| parser.add_argument("--collect_reports", action="store_true") | |
| parser.add_argument("--move_notebooks_dir") | |
| parser.add_argument("--job_name") | |
| parser.add_argument("--device_used") | |
| parser.add_argument("--upload_to_db") | |
| parser.add_argument( | |
| "--timeout", | |
| type=int, | |
| default=7200, | |
| help="Timeout for running single notebook in seconds", | |
| ) | |
| return parser.parse_args() | |
| def move_notebooks(nb_dir): | |
| current_notebooks_dir = ROOT / NOTEBOOKS_DIR | |
| shutil.copytree(current_notebooks_dir, nb_dir) | |
| def collect_python_packages(output_file: Path): | |
| reqs = subprocess.check_output( | |
| [sys.executable, "-m", "pip", "freeze"], | |
| shell=(platform.system() == "Windows"), | |
| ) | |
| with output_file.open("wb") as f: | |
| f.write(reqs) | |
| def prepare_test_plan(test_list: Optional[List[str]], ignore_list: List[str], nb_dir: Optional[Path] = None) -> TestPlan: | |
| orig_nb_dir = ROOT / NOTEBOOKS_DIR | |
| notebooks_dir = nb_dir or orig_nb_dir | |
| notebooks: List[Path] = sorted(list([n for n in notebooks_dir.rglob("**/*.ipynb") if not n.name.startswith("test_")])) | |
| test_plan: TestPlan = {notebook.relative_to(notebooks_dir): NotebookReport(status="", path=notebook, duration=0) for notebook in notebooks} | |
| ignored_notebooks: List[Path] = [] | |
| if ignore_list is not None: | |
| for ignore_item in ignore_list: | |
| if ignore_item.endswith(".txt"): | |
| # Paths to ignore files are provided to `--ignore_list` argument | |
| with open(ignore_item, "r") as f: | |
| ignored_notebooks.extend(list(map(lambda line: Path(line.strip()), f.readlines()))) | |
| else: | |
| # Ignored notebooks are provided as several items to `--ignore_list` argument | |
| ignored_notebooks.append(Path(ignore_item)) | |
| try: | |
| ignored_notebooks = list(set(map(lambda n: n.relative_to(NOTEBOOKS_DIR), ignored_notebooks))) | |
| except ValueError: | |
| raise ValueError( | |
| f"Ignore list items should be relative to repo root (e.g. 'notebooks/subdir/notebook.ipynb').\nInvalid ignored notebooks: {ignored_notebooks}" | |
| ) | |
| print(f"Ignored notebooks: {ignored_notebooks}") | |
| testing_notebooks: List[Path] = [] | |
| if not test_list: | |
| testing_notebooks = [Path(n) for n in test_plan.keys()] | |
| elif len(test_list) == 1 and test_list[0].endswith(".txt"): | |
| with open(test_list[0], "r") as f: | |
| for line in f.readlines(): | |
| changed_file_path = Path(line.strip()) | |
| if changed_file_path.resolve() == (ROOT / "requirements.txt").resolve(): | |
| print("requirements.txt changed, check all notebooks") | |
| testing_notebooks = [Path(n) for n in test_plan.keys()] | |
| break | |
| if changed_file_path.suffix != ".ipynb": | |
| continue | |
| try: | |
| testing_notebook_path = changed_file_path.relative_to(NOTEBOOKS_DIR) | |
| except ValueError: | |
| raise ValueError( | |
| "Items in test list file should be relative to repo root (e.g. 'notebooks/subdir/notebook.ipynb').\n" | |
| f"Invalid line: {changed_file_path}" | |
| ) | |
| testing_notebooks.append(testing_notebook_path) | |
| else: | |
| raise ValueError( | |
| "Testing notebooks should be provided to '--test_list' argument as a txt file or should be empty to test all notebooks.\n" | |
| f"Received test list: {test_list}" | |
| ) | |
| testing_notebooks = list(set(testing_notebooks)) | |
| print(f"Testing notebooks: {testing_notebooks}") | |
| for notebook in test_plan: | |
| if notebook not in testing_notebooks: | |
| test_plan[notebook]["status"] = NotebookStatus.SKIPPED | |
| if notebook in ignored_notebooks: | |
| test_plan[notebook]["status"] = NotebookStatus.SKIPPED | |
| return test_plan | |
| def clean_test_artifacts(before_test_files: List[Path], after_test_files: List[Path]): | |
| for file_path in after_test_files: | |
| if file_path in before_test_files or not file_path.exists(): | |
| continue | |
| if file_path.is_file(): | |
| try: | |
| file_path.unlink() | |
| except Exception: | |
| pass | |
| else: | |
| shutil.rmtree(file_path, ignore_errors=True) | |
| def get_openvino_version() -> str: | |
| try: | |
| import openvino as ov | |
| version = ov.get_version() | |
| except ImportError: | |
| print("Openvino is missing in validation environment.") | |
| version = "Openvino is missing" | |
| return version | |
| def run_test(notebook_path: Path, root, timeout=7200, keep_artifacts=False, report_dir=".") -> Optional[Tuple[str, int, float, str, str]]: | |
| os.environ["HUGGINGFACE_HUB_CACHE"] = str(notebook_path.parent) | |
| print(f"RUN {notebook_path.relative_to(root)}", flush=True) | |
| result = None | |
| if notebook_path.is_dir(): | |
| print(f'Notebook path "{notebook_path}" is a directory, but path to "*.ipynb" file was expected.') | |
| return result | |
| if notebook_path.suffix != ".ipynb": | |
| print(f'Notebook path "{notebook_path}" should have "*.ipynb" extension.') | |
| return result | |
| with cd(notebook_path.parent): | |
| files_before_test = sorted(Path(".").iterdir()) | |
| ov_version_before = get_openvino_version() | |
| patched_notebook = Path(f"test_{notebook_path.name}") | |
| if not patched_notebook.exists(): | |
| print(f'Patched notebook "{patched_notebook}" does not exist.') | |
| return result | |
| collect_python_packages(report_dir / (patched_notebook.stem + "_env_before.txt")) | |
| main_command = [sys.executable, "-m", "treon", str(patched_notebook)] | |
| start = time.perf_counter() | |
| try: | |
| retcode = subprocess.run( | |
| main_command, | |
| shell=(platform.system() == "Windows"), | |
| timeout=timeout, | |
| ).returncode | |
| except subprocess.TimeoutExpired: | |
| retcode = -42 | |
| duration = time.perf_counter() - start | |
| ov_version_after = get_openvino_version() | |
| result = (str(patched_notebook), retcode, duration, ov_version_before, ov_version_after) | |
| if not keep_artifacts: | |
| clean_test_artifacts(files_before_test, sorted(Path(".").iterdir())) | |
| collect_python_packages(report_dir / (patched_notebook.stem + "_env_after.txt")) | |
| return result | |
| def finalize_status(failed_notebooks: List[str], timeout_notebooks: List[str], test_plan: TestPlan, report_dir: Path, root: Path) -> int: | |
| return_status = 0 | |
| if failed_notebooks: | |
| return_status = 1 | |
| print("FAILED: \n{}".format("\n".join(failed_notebooks))) | |
| if timeout_notebooks: | |
| print("FAILED BY TIMEOUT: \n{}".format("\n".join(timeout_notebooks))) | |
| test_report = [] | |
| for notebook, status in test_plan.items(): | |
| test_status = status["status"] or NotebookStatus.NOT_RUN | |
| test_report.append( | |
| {"name": notebook.as_posix(), "status": test_status, "full_path": str(status["path"].relative_to(root)), "duration": status["duration"]} | |
| ) | |
| with (report_dir / "test_report.csv").open("w") as f: | |
| writer = csv.DictWriter(f, fieldnames=["name", "status", "full_path", "duration"]) | |
| writer.writeheader() | |
| writer.writerows(test_report) | |
| return return_status | |
| class cd: | |
| """Context manager for changing the current working directory""" | |
| def __init__(self, new_path): | |
| self.new_path = os.path.expanduser(new_path) | |
| def __enter__(self): | |
| self.saved_path = os.getcwd() | |
| os.chdir(self.new_path) | |
| def __exit__(self, etype, value, traceback): | |
| os.chdir(self.saved_path) | |
| def write_single_notebook_report( | |
| base_version: str, | |
| notebook_name: str, | |
| status_code: int, | |
| duration: float, | |
| ov_version_before: str, | |
| ov_version_after: str, | |
| job_name: str, | |
| device_used: str, | |
| saving_dir: Path, | |
| ) -> Path: | |
| report_file = saving_dir / notebook_name.replace(".ipynb", ".json") | |
| report = { | |
| "version": base_version, | |
| "notebook_name": notebook_name.replace("test_", ""), | |
| "status": status_code, | |
| "duration": duration, | |
| "ov_version_before": ov_version_before, | |
| "ov_version_after": ov_version_after, | |
| "job_name": job_name, | |
| "device_used": device_used, | |
| } | |
| with report_file.open("w") as f: | |
| json.dump(report, f) | |
| return report_file | |
| def main(): | |
| failed_notebooks = [] | |
| timeout_notebooks = [] | |
| args = parse_arguments() | |
| reports_dir = Path(args.report_dir) | |
| reports_dir.mkdir(exist_ok=True, parents=True) | |
| notebooks_moving_dir = args.move_notebooks_dir | |
| root = ROOT | |
| if notebooks_moving_dir is not None: | |
| notebooks_moving_dir = Path(notebooks_moving_dir) | |
| root = notebooks_moving_dir.parent | |
| move_notebooks(notebooks_moving_dir) | |
| keep_artifacts = False | |
| if args.keep_artifacts: | |
| keep_artifacts = True | |
| base_version = get_openvino_version() | |
| test_plan = prepare_test_plan(args.test_list, args.ignore_list, notebooks_moving_dir) | |
| for notebook, report in test_plan.items(): | |
| if report["status"] == NotebookStatus.SKIPPED: | |
| continue | |
| test_result = run_test(report["path"], root, args.timeout, keep_artifacts, reports_dir.absolute()) | |
| timing = 0 | |
| if not test_result: | |
| print(f'Testing notebooks "{str(notebook)}" is not found.') | |
| report["status"] = NotebookStatus.EMPTY | |
| report["duration"] = timing | |
| else: | |
| patched_notebook, status_code, duration, ov_version_before, ov_version_after = test_result | |
| if status_code: | |
| if status_code == -42: | |
| status = NotebookStatus.TIMEOUT | |
| timeout_notebooks.append(patched_notebook) | |
| else: | |
| status = NotebookStatus.FAILED | |
| failed_notebooks.append(patched_notebook) | |
| report["status"] = status | |
| else: | |
| report["status"] = NotebookStatus.SUCCESS if not report["status"] in [NotebookStatus.TIMEOUT, NotebookStatus.FAILED] else report["status"] | |
| timing += duration | |
| report["duration"] = timing | |
| if args.collect_reports: | |
| job_name = args.job_name or "Unknown" | |
| device_used = args.device_used or "Unknown" | |
| report_path = write_single_notebook_report( | |
| base_version, patched_notebook, status_code, duration, ov_version_before, ov_version_after, job_name, device_used, reports_dir | |
| ) | |
| if args.upload_to_db: | |
| cmd = [sys.executable, args.upload_to_db, report_path] | |
| print(f"\nUploading {report_path} to database. CMD: {cmd}") | |
| try: | |
| dbprocess = subprocess.Popen( | |
| cmd, shell=(platform.system() == "Windows"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True | |
| ) | |
| for line in dbprocess.stdout: | |
| sys.stdout.write(line) | |
| sys.stdout.flush() | |
| except subprocess.CalledProcessError as e: | |
| print(e.output) | |
| if args.early_stop: | |
| break | |
| exit_status = finalize_status(failed_notebooks, timeout_notebooks, test_plan, reports_dir, root) | |
| return exit_status | |
| if __name__ == "__main__": | |
| exit_code = main() | |
| sys.exit(exit_code) | |