Spaces:
Runtime error
Runtime error
| import datasets | |
| import nncf | |
| import openvino as ov | |
| import time | |
| from contextlib import contextmanager | |
| from jiwer import wer, wer_standardize | |
| from nncf.quantization.range_estimator import ( | |
| RangeEstimatorParameters, | |
| StatisticsCollectorParameters, | |
| StatisticsType, | |
| ) | |
| from optimum.intel import OVModelForSeq2SeqLM | |
| from optimum.intel.openvino.quantization import InferRequestWrapper | |
| from pathlib import Path | |
| from tqdm.auto import tqdm | |
| from typing import List, Dict | |
| from transformers import Pipeline, pipeline, PreTrainedTokenizer | |
| CALIBRATION_DATASET_SIZE = 10 | |
| def collect_calibration_data(grammar_corrector_pipe_fp32: Pipeline, calibration_dataset_size: int) -> List[Dict]: | |
| calibration_data = [] | |
| ov_decoder = grammar_corrector_pipe_fp32.model.decoder_with_past | |
| # Wrap decoder inference for data collection | |
| ov_decoder.request = InferRequestWrapper(ov_decoder.request, calibration_data, apply_caching=True) | |
| # Run inference for data collection | |
| try: | |
| calibration_dataset = datasets.load_dataset("jfleg", split="validation") | |
| calibration_dataset = calibration_dataset.shuffle(seed=42)[:calibration_dataset_size] | |
| for data_item in tqdm( | |
| calibration_dataset["sentence"], | |
| total=calibration_dataset_size, | |
| desc="Collecting calibration data", | |
| ): | |
| grammar_corrector_pipe_fp32(data_item) | |
| finally: | |
| ov_decoder.request = ov_decoder.request.request | |
| return calibration_data | |
| def quantize( | |
| grammar_corrector_pipe_fp32: Pipeline, | |
| core: ov.Core, | |
| quantized_model_path: Path, | |
| calibration_dataset_size: int, | |
| ): | |
| if quantized_model_path.exists(): | |
| print("Loading quantized model") | |
| quantized_model = core.read_model(model=quantized_model_path) | |
| else: | |
| calibration_data = collect_calibration_data(grammar_corrector_pipe_fp32, calibration_dataset_size) | |
| ov_decoder = grammar_corrector_pipe_fp32.model.decoder_with_past | |
| quantized_model = nncf.quantize( | |
| ov_decoder.model, | |
| calibration_dataset=nncf.Dataset(calibration_data), | |
| subset_size=len(calibration_data), | |
| model_type=nncf.ModelType.TRANSFORMER, | |
| advanced_parameters=nncf.AdvancedQuantizationParameters( | |
| disable_bias_correction=True, | |
| # Disable bias correction because the model does not contain quantizable operations with bias | |
| activations_range_estimator_params=RangeEstimatorParameters( | |
| # Quantile statistic is employed due to outliers in some activations | |
| # This parameter was found useful by quantize_with_accuracy_control method | |
| max=StatisticsCollectorParameters(StatisticsType.QUANTILE) | |
| ), | |
| ), | |
| ) | |
| if not quantized_model_path.parent.exists(): | |
| quantized_model_path.parent.mkdir(parents=True) | |
| ov.save_model(quantized_model, quantized_model_path) | |
| return quantized_model | |
| def get_quantized_pipeline( | |
| grammar_corrector_pipe: Pipeline, | |
| grammar_corrector_tokenizer: PreTrainedTokenizer, | |
| core: ov.Core, | |
| grammar_corrector_dir: Path, | |
| quantized_model_path: Path, | |
| device: str, | |
| calibration_dataset_size=CALIBRATION_DATASET_SIZE, | |
| ): | |
| # Get quantized OV model | |
| quantized_model = quantize(grammar_corrector_pipe, core, quantized_model_path, calibration_dataset_size) | |
| # Load quantized model into grammar correction pipeline | |
| grammar_corrector_model_int8 = OVModelForSeq2SeqLM.from_pretrained(grammar_corrector_dir, device=device) | |
| grammar_corrector_model_int8.decoder_with_past.model = quantized_model | |
| grammar_corrector_model_int8.decoder_with_past.request = None | |
| grammar_corrector_model_int8.decoder_with_past._compile() | |
| grammar_corrector_pipe_int8 = pipeline( | |
| "text2text-generation", | |
| model=grammar_corrector_model_int8, | |
| tokenizer=grammar_corrector_tokenizer, | |
| ) | |
| return grammar_corrector_pipe_int8 | |
| def calculate_compression_rate(model_path_ov, model_path_ov_int8): | |
| model_size_fp32 = model_path_ov.with_suffix(".bin").stat().st_size / 1024 | |
| model_size_int8 = model_path_ov_int8.with_suffix(".bin").stat().st_size / 1024 | |
| print("Model footprint comparison:") | |
| print(f" * FP32 IR model size: {model_size_fp32:.2f} KB") | |
| print(f" * INT8 IR model size: {model_size_int8:.2f} KB") | |
| return model_size_fp32, model_size_int8 | |
| def calculate_inference_time_and_accuracy(grammar_corrector_pipe: Pipeline, test_subset_size: int): | |
| ground_truths = [] | |
| predictions = [] | |
| inference_time = [] | |
| test_dataset = datasets.load_dataset("jfleg", split="test").shuffle(seed=42)[:test_subset_size] | |
| zipped_dataset = zip(test_dataset["sentence"], test_dataset["corrections"]) | |
| for input_text, references in tqdm(zipped_dataset, total=test_subset_size, desc="Evaluation"): | |
| # For example, a sample pair may look like: | |
| # input_text: "For not use car . " | |
| # references: [ "Not for use with a car . ", "Do not use in the car . ", "Car not for use . "] | |
| start_time = time.perf_counter() | |
| corrected_text = grammar_corrector_pipe(input_text)[0]["generated_text"] | |
| end_time = time.perf_counter() | |
| delta_time = end_time - start_time | |
| ground_truths.extend(references) | |
| predictions.extend([corrected_text] * len(references)) | |
| inference_time.append(delta_time) | |
| word_accuracy = ( | |
| 1 | |
| - wer( | |
| ground_truths, | |
| predictions, | |
| reference_transform=wer_standardize, | |
| hypothesis_transform=wer_standardize, | |
| ) | |
| ) * 100 | |
| sum_inference_time = sum(inference_time) | |
| return sum_inference_time, word_accuracy | |