From b14dc7f2783377b817b105c5da58a4a2e7f0c90c Mon Sep 17 00:00:00 2001 From: "l.gabrysiak" Date: Tue, 25 Feb 2025 20:38:44 +0100 Subject: [PATCH] mod --- hft.py | 188 +++++++++++++++++++++++++++------------------------------ 1 file changed, 89 insertions(+), 99 deletions(-) diff --git a/hft.py b/hft.py index 97698af..a3069f3 100644 --- a/hft.py +++ b/hft.py @@ -12,9 +12,9 @@ import json from collections import defaultdict from huggingface_hub import login +# Konfiguracja os.environ['TORCH_USE_CUDA_DSA'] = '1' os.environ["TOKENIZERS_PARALLELISM"] = "false" - login(token="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") class SourceMapper: @@ -90,7 +90,7 @@ def prepare_dataset(directory, catalog_path, source_mapper): for chunk in chunks: data.append({ "text": chunk, - "source_idx": -1 # Brak źródła + "source_idx": -1 }) return data @@ -111,127 +111,117 @@ def custom_collate_fn(batch): attention_mask = torch.stack([torch.tensor(b["attention_mask"]) for b in batch]) labels = torch.stack([torch.tensor(b["labels"]) for b in batch]) source_idx = torch.tensor([b.get("source_idx", -1) for b in batch], dtype=torch.long) - return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels, "source_idx": source_idx} + return { + "input_ids": input_ids, + "attention_mask": attention_mask, + "labels": labels, + "source_idx": source_idx + } class CustomModel(nn.Module): def __init__(self, model_name, config): super().__init__() self.base_model = AutoModelForCausalLM.from_pretrained(model_name, config=config) - self.source_embedding = nn.Embedding( - num_embeddings=1000, - embedding_dim=config.hidden_size, - padding_idx=-1 - ) + self.source_embedding = nn.Embedding(1000, config.hidden_size, padding_idx=-1) def forward(self, input_ids=None, attention_mask=None, labels=None, source_idx=None, **kwargs): if source_idx is not None: - source_idx = torch.clamp(source_idx, 0, self.source_embedding.num_embeddings - 1) + source_idx = torch.clamp(source_idx, 0, self.source_embedding.num_embeddings-1) source_embeds = self.source_embedding(source_idx).unsqueeze(1).expand(-1, input_ids.size(1), -1) inputs_embeds = self.base_model.get_input_embeddings()(input_ids) + source_embeds - outputs = self.base_model(inputs_embeds=inputs_embeds, attention_mask=attention_mask, labels=labels, **kwargs) - else: - outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels, **kwargs) - return outputs + return self.base_model(inputs_embeds=inputs_embeds, attention_mask=attention_mask, labels=labels, **kwargs) + return self.base_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels, **kwargs) def generate(self, *args, **kwargs): return self.base_model.generate(*args, **kwargs) -class CustomTrainer(Trainer): - def compute_loss(self, model, inputs, return_outputs=False, **kwargs): - labels = inputs.pop("labels") - source_idx = inputs.pop("source_idx", None) - outputs = model(input_ids=inputs["input_ids"], - attention_mask=inputs["attention_mask"], - labels=labels, - source_idx=source_idx) - return (outputs.loss, outputs) if return_outputs else outputs.loss +def main(): + # Inicjalizacja + source_mapper = SourceMapper() + model_name = "crumb/nano-mistral" + tokenizer = AutoTokenizer.from_pretrained(model_name) + tokenizer.pad_token = tokenizer.eos_token -# Inicjalizacja komponentów -source_mapper = SourceMapper() -model_name = "crumb/nano-mistral" -tokenizer = AutoTokenizer.from_pretrained(model_name) -tokenizer.pad_token = tokenizer.eos_token + # Przygotowanie danych + catalog_path = "file_catalog.json" + data = prepare_dataset("files", catalog_path, source_mapper) + dataset = Dataset.from_list(data) + tokenized_dataset = dataset.map(tokenize_function, batched=True, batch_size=8) -# Przygotowanie danych -catalog_path = "file_catalog.json" -data = prepare_dataset("files", catalog_path, source_mapper) -dataset = Dataset.from_list(data) -tokenized_dataset = dataset.map(tokenize_function, batched=True, batch_size=8) + # Model + config = AutoModelForCausalLM.from_pretrained(model_name).config + model = CustomModel(model_name, config) + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model.to(device) -# Inicjalizacja modelu -config = AutoModelForCausalLM.from_pretrained(model_name).config -model = CustomModel(model_name, config) -device = torch.device("cuda" if torch.cuda.is_available() else "cpu") -model = model.to(device) + # Trening + training_args = TrainingArguments( + output_dir="./results", + num_train_epochs=3, + per_device_train_batch_size=2, + gradient_accumulation_steps=4, + learning_rate=2e-5, + fp16=torch.cuda.is_available(), + logging_steps=1, + save_strategy="steps", + save_steps=1000, + report_to="none" + ) -# Konfiguracja treningu -training_args = TrainingArguments( - output_dir="./results", - num_train_epochs=3, - per_device_train_batch_size=2, - gradient_accumulation_steps=4, - learning_rate=2e-5, - fp16=torch.cuda.is_available(), - logging_steps=1, - save_strategy="steps", - save_steps=1000, - logging_strategy="no", - report_to="none" -) + trainer = Trainer( + model=model, + args=training_args, + train_dataset=tokenized_dataset, + data_collator=custom_collate_fn, + ) + print("Rozpoczęcie treningu...") + trainer.train() -# Trening -trainer = CustomTrainer( - model=model, - args=training_args, - train_dataset=tokenized_dataset, - data_collator=custom_collate_fn, -) -trainer.train() - -# Funkcja testująca -def generate_answer_with_source(question, model, tokenizer, source_mapper, max_length=200): - device = next(model.parameters()).device - inputs = tokenizer(question, return_tensors="pt", truncation=True, max_length=512).to(device) - - with torch.no_grad(): + # Testowanie + def generate_answer(question): + inputs = tokenizer(question, return_tensors="pt").to(device) + outputs = model.generate( **inputs, - max_length=max_length, - num_return_sequences=1, + max_new_tokens=200, temperature=0.7, top_p=0.9, + do_sample=True, + repetition_penalty=1.2, + no_repeat_ngram_size=2, pad_token_id=tokenizer.eos_token_id ) - - answer = tokenizer.decode(outputs[0], skip_special_tokens=True) - - # Wyszukiwanie źródeł - sources = set() - for idx in source_mapper.idx_to_source: - if source_mapper.idx_to_source[idx] in answer: - sources.add(source_mapper.idx_to_source[idx]) - - return { - "question": question, - "answer": answer, - "sources": list(sources) if sources else ["Opracowanie własne"] - } + + answer = tokenizer.decode(outputs[0], skip_special_tokens=True) + answer = answer.replace(question, "").strip() + + sources = set() + for match in re.finditer(r'Art\.\s+\d+', answer): + article_ref = match.group(0).strip() + for idx, source in source_mapper.idx_to_source.items(): + if article_ref in source: + sources.add(source) + + return { + "question": question, + "answer": answer, + "sources": list(sources) if sources else ["Opracowanie własne"] + } -# Testowanie -test_questions = [ - "Jak brzmi art. 154 kodeksu pracy?" -] + # Przykładowe testy + test_questions = [ + "Jakie są zasady udzielania urlopu wypoczynkowego?", + "Co mówi art. 154 kodeksu pracy?", + "Jakie są obowiązki pracodawcy w zakresie BHP?" + ] + + print("\n" + "="*50 + "\nWYNIKI TESTOW\n" + "="*50) + for question in test_questions: + result = generate_answer(question) + print(f"\nPYTANIE: {result['question']}") + print(f"ODPOWIEDŹ: {result['answer'][:500]}") + print(f"ŹRÓDŁA: {', '.join(result['sources'])}") + print("-"*80) -print("\n=== TEST MODELU ===") -for question in test_questions: - result = generate_answer_with_source(question, model, tokenizer, source_mapper) - print(f"\nPytanie: {result['question']}") - print(f"Odpowiedź: {result['answer']}") - print(f"Źródła: {', '.join(result['sources'])}") - print("="*80) - -# Zapis modelu -save_directory = "./trained_model" -os.makedirs(save_directory, exist_ok=True) -torch.save(model.state_dict(), os.path.join(save_directory, "model.bin")) -tokenizer.save_pretrained(save_directory) \ No newline at end of file +if __name__ == "__main__": + main() \ No newline at end of file