ably.do/hft.py

214 lines
7.3 KiB
Python

import os
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer
from datasets import Dataset
from PIL import Image
import re
import pytesseract
import docx2txt
import PyPDF2
import json
from collections import defaultdict
from huggingface_hub import login
login(token="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")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# Nowa klasa do zarządzania źródłami
class SourceMapper:
def __init__(self):
self.source_to_idx = defaultdict(lambda: len(self.source_to_idx))
self.idx_to_source = {}
def add_source(self, source):
if source and source not in self.source_to_idx:
idx = self.source_to_idx[source]
self.idx_to_source[idx] = source
def get_idx(self, source):
return self.source_to_idx[source] if source else -1
def get_source(self, idx):
return self.idx_to_source.get(idx, "Unknown")
def load_file_catalog(catalog_path):
with open(catalog_path, 'r', encoding='utf-8') as file:
return json.load(file)
def identify_legal_document(filename, file_catalog):
return file_catalog.get(filename, "Opracowanie własne")
def extract_text_from_file(file_path):
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if ext in ['.txt', '.md']:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
elif ext == '.pdf':
text = ""
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
for page in reader.pages:
text += page.extract_text()
return text
elif ext in ['.doc', '.docx']:
return docx2txt.process(file_path)
elif ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']:
return pytesseract.image_to_string(Image.open(file_path))
else:
return ""
def prepare_dataset(directory, catalog_path, source_mapper):
file_catalog = load_file_catalog(catalog_path)
data = []
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
text = extract_text_from_file(file_path)
if not text:
continue
doc_type = identify_legal_document(file, file_catalog)
if doc_type != "Opracowanie własne":
articles = re.split(r'(Art\.\s+\d+[\.\s])', text)
for i in range(1, len(articles), 2):
article_number = articles[i].strip()
article_content = articles[i+1].strip() if i+1 < len(articles) else ""
source = f"{doc_type}, {article_number}"
source_mapper.add_source(source)
data.append({
"text": f"{article_number} {article_content}",
"source_idx": source_mapper.get_idx(source)
})
else:
chunks = [text[i:i+512] for i in range(0, len(text), 512)]
for chunk in chunks:
data.append({
"text": chunk,
"source_idx": -1 # Brak źródła
})
return data
def tokenize_function(examples):
tokenized = tokenizer(
examples["text"],
truncation=True,
padding="max_length",
max_length=512,
return_tensors="pt"
)
tokenized["labels"] = tokenized["input_ids"].clone()
tokenized["source_idx"] = examples["source_idx"]
return tokenized
def custom_collate_fn(batch):
input_ids = torch.stack([torch.tensor(b["input_ids"]) for b in batch])
attention_mask = torch.stack([torch.tensor(b["attention_mask"]) for b in batch])
labels = torch.stack([torch.tensor(b["labels"]) for b in batch])
# Dodajemy domyślne source_idx, jeśli nie istnieje
source_idx = torch.tensor([b.get("source_idx", -1) for b in batch], dtype=torch.long)
print("source_idx shape:", source_idx.shape) # Debugowanie
return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels, "source_idx": source_idx}
class CustomModel(AutoModelForCausalLM):
def __init__(self, config):
super().__init__(config)
self.source_embedding = nn.Embedding(
num_embeddings=1000, # Maksymalna liczba unikalnych źródeł
embedding_dim=config.hidden_size,
padding_idx=-1
)
def forward(self, input_ids=None, attention_mask=None, labels=None, source_idx=None, **kwargs):
outputs = super().forward(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels,
**kwargs
)
if source_idx is not None:
# Tutaj dodaj logikę obsługi source_idx
pass
return outputs
class CustomTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
labels = inputs.pop("labels")
source_idx = inputs.pop("source_idx", None)
outputs = model(**inputs, labels=labels, source_idx=source_idx if source_idx is not None else None)
return (outputs.loss, outputs) if return_outputs else outputs.loss
# Inicjalizacja komponentów
source_mapper = SourceMapper()
model_name = "crumb/nano-mistral" #"google/gemma-2-2b"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
# Przygotowanie danych
catalog_path = "file_catalog.json"
data = prepare_dataset("files", catalog_path, source_mapper)
dataset = Dataset.from_list(data)
tokenized_dataset = dataset.map(tokenize_function, batched=True, batch_size=8)
# Inicjalizacja modelu
config = AutoModelForCausalLM.from_pretrained(model_name).config
model = CustomModel.from_pretrained(model_name, config=config)
model.to("cpu")
# Konfiguracja treningu
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=2,
gradient_accumulation_steps=4,
learning_rate=2e-5,
fp16=True,
logging_steps=1, # Częstsze logowanie
logging_dir="./logs", # Katalog na logi
save_strategy="steps",
save_steps=1000,
#report_to="none"
)
# Trening
trainer = CustomTrainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
data_collator=custom_collate_fn, # Użyj niestandardowego collate_fn
)
trainer.train()
# Funkcja generująca odpowiedź
def generate_answer(question, model, tokenizer, source_mapper, max_length=200):
inputs = tokenizer(question, return_tensors="pt", truncation=True, max_length=512)
outputs = model.generate(
**inputs,
max_length=max_length,
num_return_sequences=1,
return_dict_in_generate=True,
output_scores=True,
)
answer = tokenizer.decode(outputs.sequences[0], skip_special_tokens=True)
# Pobierz źródło z ostatniego tokena
last_token_id = outputs.sequences[0][-1].item()
source_idx = model.source_embedding.weight.shape[0] - 1 # Tymczasowe rozwiązanie
source = source_mapper.get_source(source_idx)
return f"{answer}\n\nŹródło: {source if source else 'Opracowanie własne'}"
# Przykład użycia
question = "Ile dni urlopu przysługuje pracownikowi?"
answer = generate_answer(question, model, tokenizer, source_mapper)
print(answer)