import os import time import subprocess import threading import weaviate from weaviate.connect import ConnectionParams from weaviate.collections import Collection from weaviate.classes.config import Configure, Property, DataType from weaviate.collections.classes.filters import Filter import pytesseract from PIL import Image from docx import Document from pypdf import PdfReader import textract from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler #from flask import Flask, request, jsonify, cli from fastapi import FastAPI, Request, HTTPException import uvicorn import hmac import hashlib # Konfiguracja REPO_PATH = "/home/ably.do/docs" WEBHOOK_SECRET = "twoj_tajny_klucz" WEBHOOK_PORT = 5000 WEAVIATE_URL = "http://weaviate:8080" app = FastAPI() client = weaviate.WeaviateClient( connection_params=ConnectionParams.from_params( http_host="weaviate", http_port=8080, http_secure=False, grpc_host="weaviate", grpc_port=50051, grpc_secure=False, ) ) def read_text_file(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() def read_docx(file_path): doc = Document(file_path) return ' '.join([paragraph.text for paragraph in doc.paragraphs]) def read_pdf(file_path): reader = PdfReader(file_path) return ' '.join([page.extract_text() for page in reader.pages]) def read_image(file_path): return pytesseract.image_to_string(Image.open(file_path)) def read_file(file_path): _, ext = os.path.splitext(file_path.lower()) if ext in ['.txt', '.md']: return read_text_file(file_path) elif ext == '.docx': return read_docx(file_path) elif ext == '.pdf': return read_pdf(file_path) elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']: return read_image(file_path) elif ext in ['.doc', '.rtf']: return textract.process(file_path).decode('utf-8') else: return None def generate_content_hash(content): return hashlib.sha256(content.encode('utf-8')).hexdigest() def add_to_weaviate(file_name, content, content_hash): try: collection = client.collections.get("Document") # Poprawne użycie klasy Filter filters = Filter.by_property("fileName").equal(file_name) # Sprawdzenie, czy dokument już istnieje existing_docs = collection.query.fetch_objects(filters=filters) if existing_docs.objects: print(f"Dokument {file_name} już istnieje w bazie.") return # Dodanie nowego dokumentu collection.data.insert( properties={ "fileName": file_name, "content": content, "contentHash": content_hash } ) print(f"Dodano dokument {file_name} do Weaviate.") except Exception as e: print(f"Błąd podczas dodawania {file_name} do Weaviate: {e}") def process_file(file_path): if not os.path.exists(file_path): print(f"Plik nie istnieje: {file_path}") return try: content = read_file(file_path) if content: file_name = os.path.basename(file_path) content_hash = generate_content_hash(content) add_to_weaviate(file_name, content, content_hash) else: print(f"Plik jest pusty lub nie można go odczytać: {file_path}") except Exception as e: print(f"Błąd podczas przetwarzania pliku {file_path}: {str(e)}") class RepoHandler(FileSystemEventHandler): def on_any_event(self, event): if not event.is_directory: print(f"Wykryto zmianę: {event.src_path}") self.pull_changes() process_file(event.src_path) def pull_changes(self): try: subprocess.run(["git", "pull"], check=True, cwd=REPO_PATH) print("Zmiany pobrane z Gitea") except subprocess.CalledProcessError as e: print(f"Błąd podczas pobierania zmian: {e}") def start_file_monitor(): print(f"Rozpoczeto monitoring folderu") event_handler = RepoHandler() observer = Observer() observer.schedule(event_handler, REPO_PATH, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() @app.post("/webhook") async def webhook(request: Request): signature = request.headers.get("X-Gitea-Signature") if not signature: raise HTTPException(status_code=400, detail="No signature") payload = await request.body() computed_signature = hmac.new(WEBHOOK_SECRET.encode(), payload, hashlib.sha256).hexdigest() if hmac.compare_digest(signature, computed_signature): print("Otrzymano ważny webhook z Gitea") RepoHandler().pull_changes() for root, dirs, files in os.walk(REPO_PATH): for file in files: process_file(os.path.join(root, file)) return {"message": "Zmiany pobrane i przetworzone pomyślnie"} else: raise HTTPException(status_code=401, detail="Invalid signature") def load_all_documents(): print("Wczytywanie wszystkich dokumentów z katalogu...") for root, dirs, files in os.walk(REPO_PATH): for file in files: process_file(os.path.join(root, file)) print("Zakończono wczytywanie dokumentów.") if __name__ == "__main__": client.connect() try: collection_name = "Document" # Sprawdzenie, czy kolekcja istnieje i czy należy ją usunąć if client.collections.exists(collection_name): print(f"Usuwanie istniejącej kolekcji '{collection_name}' (CLEAR_COLLECTION=true)...") client.collections.delete(collection_name) print(f"Kolekcja '{collection_name}' została usunięta.") else: print(f"Kolekcja '{collection_name}' nie istnieje.") # Tworzenie kolekcji od nowa, jeśli została usunięta lub nie istniała if not client.collections.exists(collection_name): print(f"Tworzenie nowej kolekcji '{collection_name}'...") client.collections.create( name=collection_name, properties=[ Property(name="content", data_type=DataType.TEXT), Property(name="fileName", data_type=DataType.TEXT), Property(name="contentHash", data_type=DataType.TEXT) # Nowe pole ], vectorizer_config=Configure.Vectorizer.text2vec_transformers() ) print(f"Kolekcja '{collection_name}' została utworzona.") # Wczytanie dokumentów po utworzeniu nowej kolekcji print("Wczytywanie dokumentów do nowej kolekcji...") load_all_documents() print("Wszystkie dokumenty zostały wgrane.") else: print("Kolekcja już istnieje. Pominięto jej ponowne tworzenie.") # Sprawdzenie, czy kolekcja jest pusta i ewentualne wczytanie dokumentów collection = client.collections.get(collection_name) if collection.aggregate.over_all(total_count=True).total_count == 0: print("Kolekcja jest pusta. Wczytywanie dokumentów...") load_all_documents() print("Wszystkie dokumenty zostały wgrane do istniejącej kolekcji.") except Exception as e: print(f"Wystąpił błąd podczas operacji na kolekcji '{collection_name}': {e}") print(client.collections.list_all()) # Uruchom monitorowanie plików w osobnym wątku monitor_thread = threading.Thread(target=start_file_monitor) monitor_thread.start() # Uruchom serwer Flask dla webhooka try: uvicorn.run(app, host="0.0.0.0", port=WEBHOOK_PORT) finally: client.close()