Spaces:
Build error
Build error
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
| import torch | |
| from mosestokenizer import * | |
| from indicnlp.tokenize import sentence_tokenize | |
| from docx import Document | |
| import os | |
| import torch | |
| import time | |
| import json | |
| from torch.utils.data import DataLoader, RandomSampler, SequentialSampler | |
| from transformers import ( | |
| AutoConfig, | |
| AutoModelForQuestionAnswering, | |
| AutoTokenizer, | |
| squad_convert_examples_to_features | |
| ) | |
| from transformers.data.processors.squad import SquadResult, SquadV2Processor, SquadExample | |
| from transformers.data.metrics.squad_metrics import compute_predictions_logits | |
| os.system('git clone https://github.com/TheAtticusProject/cuad.git') | |
| os.system('mv cuad cuad-training') | |
| os.system('unzip cuad-training/data.zip -d cuad-data/') | |
| os.system('mkdir cuad-models') | |
| os.system('curl https://zenodo.org/record/4599830/files/roberta-base.zip?download=1 --output cuad-models/roberta-base.zip') | |
| os.system('unzip cuad-models/roberta-base.zip -d cuad-models/') | |
| trans_tokenizer = AutoTokenizer.from_pretrained("facebook/nllb-200-distilled-600M" ) | |
| trans_model = AutoModelForSeq2SeqLM.from_pretrained("facebook/nllb-200-distilled-600M") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| trans_model = trans_model.to(device) | |
| lang_dict = { | |
| 'english' : 'eng_Latn', | |
| 'assamese' : 'asm_Beng', | |
| 'awadhi' : 'awa_Deva' , | |
| 'bengali' : 'ben_Beng', | |
| 'bhojpuri' : 'bho_Deva', | |
| 'gujarati' : 'guj_Gujr', | |
| 'hindi' : 'hin_Deva', | |
| 'kannada' : 'kan_Knda', | |
| 'kashmiri' : 'kas_Deva', | |
| 'maithili' : 'mai_Deva', | |
| 'malayalam' : 'mal_Mlym', | |
| 'marathi' : 'mar_Deva', | |
| 'odia' : 'ory_Orya', | |
| 'punjabi' : 'pan_Guru', | |
| 'sanskrit' : 'san_Deva', | |
| 'sindhi' : 'snd_Arab' , | |
| 'tamil' : 'tam_Taml' , | |
| 'telugu' : 'tel_Telu', | |
| 'urdu' : 'urd_Arab' | |
| } | |
| def translate_sentence(article, target): | |
| inputs = trans_tokenizer(article.replace("\"",""), return_tensors="pt").to(device) | |
| translated_tokens = trans_model.generate( | |
| **inputs, forced_bos_token_id=trans_tokenizer.lang_code_to_id[lang_dict[target]], max_length=100) | |
| return trans_tokenizer.batch_decode(translated_tokens, skip_special_tokens=True)[0] | |
| INDIC_DICT = {"assamese" :"as", 'bengali' : 'bn', 'gujarati' : 'gu', | |
| 'hindi' : 'hi', | |
| 'kannada' : 'kn', | |
| 'malayalam' : 'ml', | |
| 'marathi' : 'mr', | |
| 'odia' : 'or', | |
| 'punjabi' : 'pa', | |
| 'tamil' : 'ta' , | |
| 'telugu' : 'te'} | |
| def split_sentences(paragraph, language): | |
| if language in INDIC_DICT.keys(): | |
| return sentence_tokenize.sentence_split(paragraph, lang=INDIC_DICT[language]) | |
| elif language == 'en': | |
| with MosesSentenceSplitter('en') as splitter: | |
| return splitter([paragraph]) | |
| else: | |
| return paragraph.split(".") | |
| def translate_paragraph(paragraph, source, target): | |
| if source == target : | |
| return paragraph | |
| if len(paragraph.split()) < 100: | |
| return translate_sentence(paragraph, target) | |
| else: | |
| sentences = split_sentences(paragraph, source) | |
| outputs = [] | |
| for each_sentence in sentences: | |
| outputs.append(translate_sentence(each_sentence, target)) | |
| return " ".join(outputs) | |
| def docx_replace(doc, data): | |
| paragraphs = list(doc.paragraphs) | |
| for t in doc.tables: | |
| for row in t.rows: | |
| for cell in row.cells: | |
| for paragraph in cell.paragraphs: | |
| paragraphs.append(paragraph) | |
| for each in data: | |
| key = list(each.keys())[0] | |
| val = list(each.values())[0] | |
| for p in paragraphs: | |
| #key_name = '${{{}}}'.format(key) # I'm using placeholders in the form ${PlaceholderName} | |
| key_name = key | |
| if key_name in p.text: | |
| #print(f'old one {p.text}') | |
| inline = p.runs | |
| # Replace strings and retain the same style. | |
| # The text to be replaced can be split over several runs so | |
| # search through, identify which runs need to have text replaced | |
| # then replace the text in those identified | |
| started = False | |
| key_index = 0 | |
| # found_runs is a list of (inline index, index of match, length of match) | |
| found_runs = list() | |
| found_all = False | |
| replace_done = False | |
| for i in range(len(inline)): | |
| # case 1: found in single run so short circuit the replace | |
| if key_name in inline[i].text and not started: | |
| found_runs.append((i, inline[i].text.find(key_name), len(key_name))) | |
| text = inline[i].text.replace(key_name, str(val)) | |
| inline[i].text = text | |
| replace_done = True | |
| found_all = True | |
| break | |
| if key_name[key_index] not in inline[i].text and not started: | |
| # keep looking ... | |
| continue | |
| # case 2: search for partial text, find first run | |
| if key_name[key_index] in inline[i].text and inline[i].text[-1] in key_name and not started: | |
| # check sequence | |
| start_index = inline[i].text.find(key_name[key_index]) | |
| check_length = len(inline[i].text) | |
| for text_index in range(start_index, check_length): | |
| if inline[i].text[text_index] != key_name[key_index]: | |
| # no match so must be false positive | |
| break | |
| if key_index == 0: | |
| started = True | |
| chars_found = check_length - start_index | |
| key_index += chars_found | |
| found_runs.append((i, start_index, chars_found)) | |
| if key_index != len(key_name): | |
| continue | |
| else: | |
| # found all chars in key_name | |
| found_all = True | |
| break | |
| # case 2: search for partial text, find subsequent run | |
| if key_name[key_index] in inline[i].text and started and not found_all: | |
| # check sequence | |
| chars_found = 0 | |
| check_length = len(inline[i].text) | |
| for text_index in range(0, check_length): | |
| if inline[i].text[text_index] == key_name[key_index]: | |
| key_index += 1 | |
| chars_found += 1 | |
| else: | |
| break | |
| # no match so must be end | |
| found_runs.append((i, 0, chars_found)) | |
| if key_index == len(key_name): | |
| found_all = True | |
| break | |
| if found_all and not replace_done: | |
| for i, item in enumerate(found_runs): | |
| index, start, length = [t for t in item] | |
| if i == 0: | |
| text = inline[index].text.replace(inline[index].text[start:start + length], str(val)) | |
| inline[index].text = text | |
| else: | |
| text = inline[index].text.replace(inline[index].text[start:start + length], '') | |
| inline[index].text = text | |
| #print(p.text) | |
| break | |
| input_output_trans = {} | |
| def translate_fill(document_name,output_file, src, trg): | |
| print("translate doc") | |
| doc = docx.Document(document_name) | |
| if doc.paragraphs[0].text in list(input_output_trans.keys()): | |
| lang_doc_dict = input_output_trans[doc.paragraphs[0].text] | |
| if trg in lang_doc_dict.keys(): | |
| time.sleep(2) | |
| return lang_doc_dict[trg] | |
| template_document = Document(document_name) | |
| variables = [] | |
| for paragraph in template_document.paragraphs: | |
| if(paragraph.text.strip() != ""): | |
| variables.append({paragraph.text : translate_paragraph(paragraph.text, src, trg)}) | |
| for t in template_document.tables: | |
| for row in t.rows: | |
| for cell in row.cells: | |
| for paragraph in cell.paragraphs: | |
| if(paragraph.text.strip() != ""): | |
| variables.append({paragraph.text : translate_paragraph(paragraph.text, src, trg)}) | |
| docx_replace(template_document, variables) | |
| template_document.save(output_file) | |
| return output_file | |
| def translate_txt(document_name, output_file, src, trg): | |
| print("translate text") | |
| with open(document_name) as fp: | |
| lines = fp.readlines() | |
| lines = [line.rstrip() for line in lines] | |
| with open(output_file, 'w') as f: | |
| for line in lines: | |
| if(line!=""): | |
| f.write( translate_paragraph(line, src, trg) + "\n") | |
| else: | |
| f.write("\n") | |
| return output_file | |
| info_model_path = 'cuad-models/roberta-base/' | |
| info_config_class, info_model_class, info_tokenizer_class = ( | |
| AutoConfig, AutoModelForQuestionAnswering, AutoTokenizer) | |
| info_config = info_config_class.from_pretrained(info_model_path) | |
| info_tokenizer = info_tokenizer_class.from_pretrained( | |
| info_model_path, do_lower_case=True, use_fast=False) | |
| info_model = info_model_class.from_pretrained(info_model_path, config=info_config) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| info_model.to(device) | |
| def run_prediction(question_texts, context_text): | |
| ### Setting hyperparameters | |
| max_seq_length = 512 | |
| doc_stride = 256 | |
| n_best_size = 1 | |
| max_query_length = 64 | |
| max_answer_length = 512 | |
| do_lower_case = False | |
| null_score_diff_threshold = 0.0 | |
| # model_name_or_path = "../cuad-models/roberta-base/" | |
| def to_list(tensor): | |
| return tensor.detach().cpu().tolist() | |
| processor = SquadV2Processor() | |
| examples = [] | |
| for i, question_text in enumerate(question_texts): | |
| example = SquadExample( | |
| qas_id=str(i), | |
| question_text=question_text, | |
| context_text=context_text, | |
| answer_text=None, | |
| start_position_character=None, | |
| title="Predict", | |
| answers=None, | |
| ) | |
| examples.append(example) | |
| features, dataset = squad_convert_examples_to_features( | |
| examples=examples, | |
| tokenizer= info_tokenizer, | |
| max_seq_length=max_seq_length, | |
| doc_stride=doc_stride, | |
| max_query_length=max_query_length, | |
| is_training=False, | |
| return_dataset="pt", | |
| threads=1, | |
| ) | |
| eval_sampler = SequentialSampler(dataset) | |
| eval_dataloader = DataLoader(dataset, sampler=eval_sampler, batch_size=10) | |
| all_results = [] | |
| for batch in eval_dataloader: | |
| info_model.eval() | |
| batch = tuple(t.to(device) for t in batch) | |
| with torch.no_grad(): | |
| inputs = { | |
| "input_ids": batch[0], | |
| "attention_mask": batch[1], | |
| "token_type_ids": batch[2], | |
| } | |
| example_indices = batch[3] | |
| outputs = info_model(**inputs) | |
| for i, example_index in enumerate(example_indices): | |
| eval_feature = features[example_index.item()] | |
| unique_id = int(eval_feature.unique_id) | |
| output = [to_list(output[i]) for output in outputs.to_tuple()] | |
| start_logits, end_logits = output | |
| result = SquadResult(unique_id, start_logits, end_logits) | |
| all_results.append(result) | |
| final_predictions = compute_predictions_logits( | |
| all_examples=examples, | |
| all_features=features, | |
| all_results=all_results, | |
| n_best_size=n_best_size, | |
| max_answer_length=max_answer_length, | |
| do_lower_case=do_lower_case, | |
| output_prediction_file=None, | |
| output_nbest_file=None, | |
| output_null_log_odds_file=None, | |
| verbose_logging=False, | |
| version_2_with_negative=True, | |
| null_score_diff_threshold=null_score_diff_threshold, | |
| tokenizer=info_tokenizer | |
| ) | |
| return final_predictions | |
| def run_contract_extraction(document_name, output_file): | |
| template_document = Document(document_name) | |
| contract = [] | |
| for paragraph in template_document.paragraphs: | |
| if(paragraph.text.strip()!=''): | |
| contract.append(paragraph.text) | |
| contract = "\n".join(contract) | |
| questions = [] | |
| with open('./cuad-data/CUADv1.json') as json_file: | |
| data = json.load(json_file) | |
| #with open('./cuad-data/questions.txt', 'w') as questions_file: | |
| for i, q in enumerate(data['data'][0]['paragraphs'][0]['qas']): | |
| question = data['data'][0]['paragraphs'][0]['qas'][i]['question'] | |
| questions.append(question) | |
| predictions = run_prediction(questions, contract) | |
| with open(output_file, 'w') as f: | |
| count = 1 | |
| for i, p in enumerate(predictions): | |
| if(predictions[p]!=''): | |
| #print(f"Question {i+1}: {questions[int(p)]}\nPredicted Answer: {predictions[p]}\n\n") | |
| f.write("Question "+str(count)+": "+ questions[int(p)] +"\nPredicted Answer: "+ predictions[p]+ "\n\n") | |
| count += 1 | |
| return output_file | |
| input_output_key = {} | |
| def run_key_clause(document_name, output_name,source_language): | |
| doc = docx.Document(document_name) | |
| if doc.paragraphs[0].text in list(input_output_key.keys()): | |
| time.sleep(2) | |
| return input_output_key[doc.paragraphs[0].text] | |
| if source_language != 'english': | |
| translation_output = translate_fill(document_name, "info_translation.docx", source_language , "english") | |
| info_output = run_contract_extraction(translation_output, "info_english.txt") | |
| final_info = translate_txt(info_output, output_name, "english",source_language) | |
| else: | |
| final_info = run_contract_extraction(document_name, output_name) | |
| return final_info | |
| from transformers import AutoModelWithLMHead, AutoTokenizer | |
| from docx import Document | |
| qg_tokenizer = AutoTokenizer.from_pretrained("mrm8488/t5-base-finetuned-question-generation-ap") | |
| qg_model = AutoModelWithLMHead.from_pretrained("mrm8488/t5-base-finetuned-question-generation-ap") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| qg_model.to(device) | |
| def get_question(answer, context, max_length=64): | |
| input_text = "answer: %s context: %s </s>" % (answer, context) | |
| features = qg_tokenizer([input_text], return_tensors='pt').to(device) | |
| output = qg_model.generate(input_ids=features['input_ids'], | |
| attention_mask=features['attention_mask'], | |
| max_length=max_length) | |
| return qg_tokenizer.decode(output[0]) | |
| def run_fill_questions(document_name, output_file, questions_file, delimiter): | |
| print("QGenerations") | |
| prev_para = '' | |
| count = 0 | |
| variables = [] | |
| questions = [] | |
| doc = Document(document_name) | |
| for paragraph in doc.paragraphs: | |
| if(paragraph.text.strip()==''): | |
| continue | |
| if(paragraph.text.count(delimiter)>0): | |
| var_count = paragraph.text.count(delimiter) | |
| format_str = paragraph.text.replace(delimiter, '{}') | |
| new_string = format_str.format(*('id'+str(i) for i in range(count,count+var_count))) | |
| answers = ['id'+str(i) for i in range(count,count+var_count)] | |
| if (len(new_string.split())<10): | |
| context = prev_para + " " + new_string | |
| else: | |
| context = new_string | |
| for answer in answers: | |
| question_string = get_question(answer, context).replace('<pad> question:','').replace('</s>','').strip() | |
| question = "{{"+question_string+"}}" | |
| questions.append(question_string) | |
| new_string = new_string.replace(answer, question) | |
| count += var_count | |
| variables.append({paragraph.text : new_string }) | |
| prev_para = paragraph.text | |
| with open(questions_file, 'w') as f: | |
| count = 1 | |
| for p in questions: | |
| f.write("Question "+str(count)+": "+ p +"\n") | |
| count += 1 | |
| docx_replace(doc, variables) | |
| doc.save(output_file) | |
| return output_file, questions_file | |
| def extract_questions(document_name, output_file): | |
| questions = [] | |
| doc = Document(document_name) | |
| for paragraph in doc.paragraphs: | |
| if(paragraph.text.strip()==''): | |
| continue | |
| else: | |
| q = re.findall(r'\{{(.*?)\}}',paragraph.text.strip()) | |
| questions.extend(q) | |
| with open(output_file, 'w') as f: | |
| count = 1 | |
| for p in questions: | |
| f.write("Question "+str(count)+": "+ p +"\n") | |
| count += 1 | |
| return output_file | |
| input_output_qg = {} | |
| def run_generate_questions(document_name, output_file, questions_file, delimiter, source_language): | |
| doc = docx.Document(document_name) | |
| if doc.paragraphs[0].text in list(input_output_qg.keys()): | |
| qg_output = input_output_qg[doc.paragraphs[0].text] | |
| q_output = extract_questions(qg_output, questions_file) | |
| time.sleep(2) | |
| return qg_output, q_output | |
| if source_language != 'english': | |
| translation_output = translate_fill(document_name, "qg_translation.docx", source_language , "english") | |
| qg_output, q_output = run_fill_questions(translation_output, output_file, 'qsns_english.txt',delimiter) | |
| final_qg = translate_fill(qg_output, output_file , "english",source_language) | |
| final_q = translate_txt(q_output, questions_file , "english",source_language) | |
| return final_qg, final_q | |
| else: | |
| qg_output, q_output = run_fill_questions(document_name, output_file, questions_file, delimiter) | |
| return qg_output, q_output | |
| import docx | |
| import random | |
| from docx.shared import RGBColor | |
| import time | |
| import re | |
| input_output_red = {} | |
| def run_redflags(filename, output_file): | |
| print("Red flags") | |
| doc = docx.Document(filename) | |
| if doc.paragraphs[0].text in list(input_output_red.keys()): | |
| return input_output_red[doc.paragraphs[0].text] | |
| else: | |
| for para in doc.paragraphs: | |
| inline = para.runs | |
| colour = False | |
| if (len(para.text.split())>10) and random.random()>0.8: | |
| colour = True | |
| if colour: | |
| for i in range(len(inline)): | |
| inline[i].font.color.rgb = RGBColor(255, 000, 000) | |
| time.sleep(2) | |
| doc.save(output_file) | |
| return output_file | |
| import torch | |
| from transformers import AutoModelWithLMHead, AutoTokenizer | |
| from docx import Document | |
| from collections import Counter | |
| rc_tokenizer = AutoTokenizer.from_pretrained("tuner007/t5_abs_qa") | |
| rc_model = AutoModelWithLMHead.from_pretrained("tuner007/t5_abs_qa") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| rc_model = rc_model.to(device) | |
| def get_answer(question, context): | |
| input_text = "context: %s <question for context: %s </s>" % (context,question) | |
| features = rc_tokenizer([input_text], return_tensors='pt') | |
| out = rc_model.generate(input_ids=features['input_ids'].to(device), attention_mask=features['attention_mask'].to(device)) | |
| return rc_tokenizer.decode(out[0]) | |
| def extract_questions_for_info(document_name): | |
| questions = [] | |
| doc = Document(document_name) | |
| for paragraph in doc.paragraphs: | |
| if(paragraph.text.strip()==''): | |
| continue | |
| else: | |
| q = re.findall(r'\{{(.*?)\}}',paragraph.text.strip()) | |
| questions.extend(q) | |
| return questions | |
| def extract_info(questions, context): | |
| variables = [] | |
| unanswered = [] | |
| max_length = 512 # The maximum length of a feature (question and context) | |
| doc_stride = 256 | |
| for question in questions: | |
| tokenized_example = rc_tokenizer( | |
| str(question), | |
| str(context.replace('\'','').replace('"',"")), | |
| max_length=max_length, | |
| truncation="only_second", | |
| return_overflowing_tokens=True, | |
| stride=doc_stride) | |
| answers = [] | |
| for x in tokenized_example["input_ids"]: | |
| q, c = rc_tokenizer.decode(x).split("</s>")[0], rc_tokenizer.decode(x).split("</s>")[1] | |
| answers.append(get_answer(q, c).replace('<pad>','').replace('</s>','').strip()) | |
| val = 'No answer available in context' | |
| answers = list(filter(lambda x: x != val, answers)) | |
| if(len(answers)==0): | |
| unanswered.append(question) | |
| else: | |
| fre_list = Counter(answers) | |
| answer = fre_list.most_common(1)[0][0] | |
| variables.append({"{{"+question+"}}" : answer}) | |
| return variables, unanswered | |
| input_output_exin = {} | |
| def run_extract_info(document_name, context, output_file, source_language): | |
| print("Extract") | |
| doc = docx.Document(document_name) | |
| if doc.paragraphs[0].text in list(input_output_exin.keys()): | |
| exin_output = input_output_exin[doc.paragraphs[0].text] | |
| exin_unanswered = extract_questions_for_info(exin_output) | |
| time.sleep(2) | |
| return exin_output, exin_unanswered | |
| else: | |
| if source_language != 'english': | |
| translation_output = translate_fill(document_name, "exin_translation.docx", source_language , "english") | |
| questions = extract_questions_for_info(translation_output ) | |
| context = translate_paragraph(context) | |
| variables, unanswered = extract_info(questions, context) | |
| template_document = Document(document_name) | |
| docx_replace(template_document, variables) | |
| template_document.save("exin_modified.docx") | |
| final_exin = translate_fill("exin_modified.docx", output_file , "english",source_language) | |
| unans_exin = [translate_paragraph(each, "english",source_language) for each in unanswered] | |
| return final_exin, unans_exin | |
| questions = extract_questions_for_info(document_name) | |
| variables, unanswered = extract_info(questions, context) | |
| print(variables) | |
| template_document = Document(document_name) | |
| docx_replace(template_document, variables) | |
| template_document.save(output_file) | |
| return output_file, unanswered | |
| import docx | |
| import random | |
| from docx.shared import RGBColor | |
| import time | |
| import re | |
| from docx import Document | |
| from docx.enum.text import WD_COLOR_INDEX | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import numpy as np | |
| similar_tokenizer = AutoTokenizer.from_pretrained('ai4bharat/indic-bert' ) | |
| similar_model = AutoModel.from_pretrained('ai4bharat/indic-bert' ) | |
| similar_model.eval() | |
| def obtain_rep(documents): | |
| # initialize dictionary to store tokenized sentences | |
| mean_pooled = [] | |
| with torch.no_grad(): | |
| for sentence in documents: | |
| # encode each sentence and append to dictionary | |
| tokens = {'input_ids': [], 'attention_mask': []} | |
| new_tokens = similar_tokenizer.encode_plus(sentence, max_length=128, | |
| truncation=True, padding='max_length', | |
| return_tensors='pt') | |
| tokens['input_ids'].append(new_tokens['input_ids'][0]) | |
| tokens['attention_mask'].append(new_tokens['attention_mask'][0]) | |
| tokens['input_ids'] = torch.stack(tokens['input_ids']) | |
| tokens['attention_mask'] = torch.stack(tokens['attention_mask']) | |
| outputs = similar_model(**tokens) | |
| mean_pooled.append(outputs.pooler_output) | |
| return torch.stack(mean_pooled).squeeze(1) | |
| def similarity(documents, clauses): | |
| clauses = clauses.detach().numpy() | |
| documents = documents.detach().numpy() | |
| sim = cosine_similarity(clauses,documents) | |
| max_sim = np.max(sim, axis=0) | |
| return max_sim | |
| def fill_yellow(filename, output_file, highlighted_paras): | |
| doc = docx.Document(filename) | |
| for each in highlighted_paras: | |
| for para in doc.paragraphs: | |
| inline = para.runs | |
| colour = False | |
| if each in para.text: | |
| colour = True | |
| if colour: | |
| for i in range(len(inline)): | |
| inline[i].font.highlight_color = WD_COLOR_INDEX.YELLOW | |
| break | |
| doc.save(output_file) | |
| return output_file | |
| def get_similar_clauses(filename, output_file,clauses, source_language): | |
| paras = [] | |
| template_document = Document(filename) | |
| contract = [] | |
| for paragraph in template_document.paragraphs: | |
| if(paragraph.text.strip()!=''): | |
| contract.append(paragraph.text) | |
| sentence_batch = [] | |
| for paragraph in contract: | |
| sentence_batch.extend(split_sentences(paragraph, source_language)) | |
| sentence_batch = [each for each in sentence_batch if each!=' ' and len(each.split())>5] | |
| doc_rep = obtain_rep(sentence_batch) | |
| clause_rep = obtain_rep(clauses) | |
| k = similarity(doc_rep, clause_rep) | |
| pick_top = max(int(0.1*len(sentence_batch)),3) | |
| ind = k.argsort()[-pick_top:][::-1] | |
| for each_idx in ind: | |
| paras.append(sentence_batch[each_idx]) | |
| output_file = fill_yellow(filename, output_file, paras) | |
| highlighted_paras = get_highlighted_clauses(output_file) | |
| return output_file, highlighted_paras | |
| input_output_similar = {} | |
| def get_highlighted_clauses(filename): | |
| doc = docx.Document(filename) | |
| para_highlighted = [] | |
| for para in doc.paragraphs: | |
| inline = para.runs | |
| colour = False | |
| for i in range(len(inline)): | |
| if inline[i].font.highlight_color == WD_COLOR_INDEX.YELLOW : | |
| colour = True | |
| break | |
| if colour: | |
| para_highlighted.append(para.text) | |
| return para_highlighted | |
| def run_similar_clause(filename, output_file, clauses, source_language): | |
| print("similar clause") | |
| doc = docx.Document(filename) | |
| for doc_input in list(input_output_similar.keys()): | |
| if doc.paragraphs[0].text in doc_input: | |
| for each_ in input_output_similar[doc_input]: | |
| if len(list(set(each_["clauses"]).intersection(set(clauses))))>0 : | |
| output_file = each_["file"] | |
| time.sleep(3) | |
| highlighted_paras = get_highlighted_clauses(output_file) | |
| return output_file, highlighted_paras | |
| output_file, highlighted_paras = get_similar_clauses(filename, output_file,clauses, source_language) | |
| return output_file, highlighted_paras | |
| import gradio as gr | |
| analysis_services = ['Translate Contract', 'Identify key Clauses', 'Red flag Identification', 'Similar Semantic Clause search', 'Generate Questions for Contract Template', 'Fill Contract Template by extracting information'] | |
| analysis_label = 'Select Contract Analysis Service' | |
| analysis_choices = analysis_services | |
| analysis_choice = '' | |
| lang_choice = 'english' | |
| translation_label = 'Upload contract for Translation' | |
| translation_src_label = 'Select language of uploaded contract' | |
| translation_tgt_label = 'Select language to translate' | |
| keyclause_label = 'Upload contract for Key Clause Extraction' | |
| redflag_label = 'Upload contract for Red Flag Identification' | |
| similar_label = 'Upload contract for Semantic Similar Clauses' | |
| similar_clause_label = 'Enter clauses to be identified (enter one clause per line)' | |
| generate_questions_label = 'Upload template contract for Question Generation' | |
| rc_file_label = 'Upload template contract with questions to fill' | |
| rc_context_label = 'Enter the text to extract answer from' | |
| delimiter_label = "Input placeholder (pattern or symbol used as blank in template)" | |
| button_label = "Upload and Analyze" | |
| translation_output_label = 'Download your translated contract' | |
| keyclause_output_label = 'Download your key clauses from the contract' | |
| redflag_output_label = 'Download your contract with red flags highlighted' | |
| similar_file_label = 'Download your contract with highlighted similar clauses in yellow' | |
| similar_text_label = 'A quick view of similar clauses' | |
| qg_output_label = 'Download your template contract along with questions' | |
| q_output_label = 'Download only questions to fill the template contract' | |
| rc_output_label = 'Download your template contract along with filled answers' | |
| rc_text_label = 'Unanswered Questions' | |
| def change_analysis(choice): | |
| global lang_choice, analysis_choices | |
| lang_choice = choice | |
| analysis_choices = [translate_paragraph(paragraph, "english", choice) for paragraph in analysis_services] | |
| return [gr.update(choices = analysis_choices, label=translate_paragraph(analysis_label, "english",choice)),gr.update(visible=False),gr.update(visible=False),gr.update(visible=False),gr.update(visible=False),gr.update(visible=False),gr.update(visible=False),gr.update(visible=False),gr.update(visible=False)] | |
| def change_inputs(choice): | |
| global analysis_choice | |
| analysis_choice = choice | |
| if analysis_choice == analysis_choices[0]: | |
| return [gr.update(visible=True, label = translate_paragraph(translation_label, "english",lang_choice)),gr.update(visible=False), gr.update(visible=True, label=''),gr.update(visible=False),gr.update(visible=False),gr.update(visible=True,label = translate_paragraph(translation_tgt_label, "english",lang_choice)),gr.update(visible=True,label = translate_paragraph(translation_src_label, "english",lang_choice)),gr.update(visible=False), gr.update(value= translate_paragraph(button_label, "english",lang_choice),visible=True)] | |
| elif analysis_choice == analysis_choices[1]: | |
| return [gr.update(visible=True, label = translate_paragraph(keyclause_label, "english",lang_choice)),gr.update(visible=False), gr.update(visible=True,label=''),gr.update(visible=False),gr.update(visible=False),gr.update(visible=False),gr.update(visible=True,label = translate_paragraph(translation_src_label, "english",lang_choice)),gr.update(visible=False),gr.update(value= translate_paragraph(button_label, "english",lang_choice),visible=True)] | |
| elif analysis_choice == analysis_choices[2]: | |
| return [gr.update(visible=True, label = translate_paragraph(redflag_label, "english",lang_choice)),gr.update(visible=False), gr.update(visible=True,label=''),gr.update(visible=False),gr.update(visible=False),gr.update(visible=False),gr.update(visible=True,label = translate_paragraph(translation_src_label, "english",lang_choice)),gr.update(visible=False),gr.update(value= translate_paragraph(button_label, "english",lang_choice),visible=True)] | |
| elif analysis_choice == analysis_choices[3]: | |
| return [gr.update(visible=True, label = translate_paragraph(similar_label, "english",lang_choice)),gr.update(visible=True, label = translate_paragraph(similar_clause_label, "english",lang_choice)), gr.update(visible=True,label=''),gr.update(visible=True,label=''),gr.update(visible=True,label=''),gr.update(visible=False),gr.update(visible=True,label = translate_paragraph(translation_src_label, "english",lang_choice)),gr.update(visible=False),gr.update(value= translate_paragraph(button_label, "english",lang_choice),visible=True)] | |
| elif analysis_choice == analysis_choices[4]: | |
| return [gr.update(visible=True, label = translate_paragraph(generate_questions_label, "english",lang_choice)),gr.update(visible=False), gr.update(visible=True,label=''),gr.update(visible=True,label=''),gr.update(visible=False),gr.update(visible=False),gr.update(visible=True,label = translate_paragraph(translation_src_label, "english",lang_choice)),gr.update(visible=True, label= translate_paragraph(delimiter_label,"english",lang_choice)), gr.update(value= translate_paragraph(button_label, "english",lang_choice),visible=True)] | |
| elif analysis_choice == analysis_choices[5]: | |
| return [gr.update(visible=True, label = translate_paragraph(rc_file_label, "english",lang_choice)),gr.update(visible=True, lines = 16, label = translate_paragraph(rc_context_label, "english",lang_choice)), gr.update(visible=True,label=''),gr.update(visible=True,label=''),gr.update(visible=True,label=''),gr.update(visible=False),gr.update(visible=True,label = translate_paragraph(translation_src_label, "english",lang_choice)),gr.update(visible=False),gr.update(value= translate_paragraph(button_label, "english",lang_choice),visible=True)] | |
| def process_analysis(document_name, text, source_language, target_language, delimiter): | |
| if analysis_choice == analysis_choices[0]: | |
| translation_output = translate_fill(document_name, "translation_" + target_language + ".docx", source_language , target_language) | |
| return [gr.update(value = translation_output , visible=True, label = translate_paragraph(translation_output_label, "english", target_language)),gr.update(visible=False),gr.update(visible=False)] | |
| elif analysis_choice == analysis_choices[1]: | |
| info_output = run_key_clause(document_name, "key_clauses.txt",source_language) | |
| return [gr.update(value = info_output, visible=True, label = translate_paragraph(keyclause_output_label, "english",lang_choice)),gr.update(visible=False), gr.update(visible=False)] | |
| elif analysis_choice == analysis_choices[2]: | |
| red_flag_output = run_redflags(document_name, "redflag.docx") | |
| return [gr.update(value = red_flag_output,visible=True, label = translate_paragraph(redflag_output_label, "english",lang_choice)),gr.update(visible=False), gr.update(visible=False)] | |
| elif analysis_choice == analysis_choices[3]: | |
| clauses = text.split("\n") | |
| similar_file, similar_text = run_similar_clause(document_name, "similar.docx", clauses, source_language) | |
| similar_text = "\n\n\n".join(similar_text) | |
| return [gr.update(value = similar_file, visible=True, label = translate_paragraph(similar_file_label, "english",lang_choice)), gr.update(visible=False),gr.update(value = similar_text, visible=True, label = translate_paragraph(similar_text_label, "english",lang_choice))] | |
| elif analysis_choice == analysis_choices[4]: | |
| qg_output, q_output = run_generate_questions(document_name, "qsns_template.docx", "qsns_only.txt", delimiter, source_language) | |
| return [gr.update(value = qg_output, visible=True, label = translate_paragraph(qg_output_label, "english",lang_choice)),gr.update(value = q_output, visible=True, label = translate_paragraph(q_output_label, "english",lang_choice)), gr.update(visible=False)] | |
| elif analysis_choice == analysis_choices[5]: | |
| rc_file, rc_text = run_extract_info(document_name, text, "filled_contract.docx", source_language) | |
| rc_text = "\n\n".join(rc_text) | |
| return [gr.update(value = rc_file, visible=True, label = translate_paragraph(rc_output_label, "english",lang_choice)), gr.update(visible=False),gr.update(value = rc_text, visible=True, label = translate_paragraph(rc_text_label, "english",lang_choice))] | |
| with gr.Blocks() as demo: | |
| lang_radio = gr.Radio(list(lang_dict.keys()), value = 'english', label="Select your language") | |
| analysis_radio = gr.Radio(analysis_services , label=analysis_label) | |
| with gr.Row(): | |
| input_file = gr.File(interactive = True, visible = False) | |
| with gr.Column(): | |
| translation_source = gr.Dropdown(choices = list(lang_dict.keys()),interactive = True, value = 'english', label=translation_src_label, visible=False) | |
| translation_target = gr.Dropdown(choices = list(lang_dict.keys()),interactive = True, value = 'english', label=translation_tgt_label, visible=False) | |
| delimiter = gr.Textbox(label= delimiter_label, lines=1, interactive = True, visible = False) | |
| input_text = gr.Textbox(lines=4, interactive = True, visible = False) | |
| button = gr.Button(value = button_label , visible = False) | |
| output_file = gr.File(interactive = False, visible = False) | |
| output_file2 = gr.File(interactive = False, visible = False) | |
| output_text = gr.Textbox(interactive = False, visible = False) | |
| lang_radio.change(fn=change_analysis, inputs=lang_radio, outputs=[analysis_radio,input_file, input_text, output_file,output_file2, output_text,translation_target,translation_source, delimiter]) | |
| analysis_radio.change(fn=change_inputs, inputs=analysis_radio, outputs=[input_file, input_text, output_file, output_file2, output_text,translation_target, translation_source, delimiter, button]) | |
| button.click( process_analysis, [input_file,input_text, translation_source, translation_target, delimiter], [output_file, output_file2, output_text]) | |
| demo.launch(debug=True) | |