ably.do/monitoring.py

227 lines
7.9 KiB
Python
Raw Permalink Normal View History

2025-02-26 14:38:27 -05:00
import os
2025-02-27 07:09:02 -05:00
import time
2025-02-26 14:38:27 -05:00
import subprocess
import threading
import weaviate
2025-02-27 07:09:02 -05:00
from weaviate.connect import ConnectionParams
from weaviate.collections import Collection
from weaviate.classes.config import Configure, Property, DataType
from weaviate.collections.classes.filters import Filter
2025-02-26 14:38:27 -05:00
import pytesseract
from PIL import Image
from docx import Document
2025-02-27 07:09:02 -05:00
from pypdf import PdfReader
2025-02-26 14:38:27 -05:00
import textract
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
2025-02-27 07:09:02 -05:00
#from flask import Flask, request, jsonify, cli
from fastapi import FastAPI, Request, HTTPException
import uvicorn
2025-02-26 14:38:27 -05:00
import hmac
import hashlib
# Konfiguracja
2025-02-27 07:09:02 -05:00
REPO_PATH = "/home/ably.do/docs"
2025-02-26 14:38:27 -05:00
WEBHOOK_SECRET = "twoj_tajny_klucz"
WEBHOOK_PORT = 5000
WEAVIATE_URL = "http://weaviate:8080"
2025-02-27 07:09:02 -05:00
app = FastAPI()
client = weaviate.WeaviateClient(
connection_params=ConnectionParams.from_params(
http_host="weaviate",
http_port=8080,
http_secure=False,
grpc_host="weaviate",
grpc_port=50051,
grpc_secure=False,
)
)
2025-02-26 14:38:27 -05:00
def read_text_file(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def read_docx(file_path):
doc = Document(file_path)
return ' '.join([paragraph.text for paragraph in doc.paragraphs])
def read_pdf(file_path):
reader = PdfReader(file_path)
return ' '.join([page.extract_text() for page in reader.pages])
def read_image(file_path):
return pytesseract.image_to_string(Image.open(file_path))
def read_file(file_path):
_, ext = os.path.splitext(file_path.lower())
if ext in ['.txt', '.md']:
return read_text_file(file_path)
elif ext == '.docx':
return read_docx(file_path)
elif ext == '.pdf':
return read_pdf(file_path)
elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']:
return read_image(file_path)
elif ext in ['.doc', '.rtf']:
return textract.process(file_path).decode('utf-8')
else:
return None
2025-02-27 07:09:02 -05:00
def generate_content_hash(content):
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def add_to_weaviate(file_name, content, content_hash):
try:
collection = client.collections.get("Document")
# Poprawne użycie klasy Filter
filters = Filter.by_property("fileName").equal(file_name)
# Sprawdzenie, czy dokument już istnieje
existing_docs = collection.query.fetch_objects(filters=filters)
if existing_docs.objects:
print(f"Dokument {file_name} już istnieje w bazie.")
return
# Dodanie nowego dokumentu
collection.data.insert(
properties={
"fileName": file_name,
"content": content,
"contentHash": content_hash,
"contentType": "publication"
2025-02-27 07:09:02 -05:00
}
)
print(f"Dodano dokument {file_name} do Weaviate.")
except Exception as e:
print(f"Błąd podczas dodawania {file_name} do Weaviate: {e}")
2025-02-26 14:38:27 -05:00
def process_file(file_path):
2025-02-27 07:09:02 -05:00
if not os.path.exists(file_path):
print(f"Plik nie istnieje: {file_path}")
return
try:
content = read_file(file_path)
if content:
file_name = os.path.basename(file_path)
content_hash = generate_content_hash(content)
add_to_weaviate(file_name, content, content_hash)
else:
print(f"Plik jest pusty lub nie można go odczytać: {file_path}")
except Exception as e:
print(f"Błąd podczas przetwarzania pliku {file_path}: {str(e)}")
2025-02-26 14:38:27 -05:00
class RepoHandler(FileSystemEventHandler):
def on_any_event(self, event):
if not event.is_directory:
print(f"Wykryto zmianę: {event.src_path}")
self.pull_changes()
process_file(event.src_path)
def pull_changes(self):
try:
subprocess.run(["git", "pull"], check=True, cwd=REPO_PATH)
print("Zmiany pobrane z Gitea")
except subprocess.CalledProcessError as e:
print(f"Błąd podczas pobierania zmian: {e}")
def start_file_monitor():
2025-02-27 07:09:02 -05:00
print(f"Rozpoczeto monitoring folderu")
2025-02-26 14:38:27 -05:00
event_handler = RepoHandler()
observer = Observer()
observer.schedule(event_handler, REPO_PATH, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
2025-02-27 07:09:02 -05:00
@app.post("/webhook")
async def webhook(request: Request):
signature = request.headers.get("X-Gitea-Signature")
2025-02-26 14:38:27 -05:00
if not signature:
2025-02-27 07:09:02 -05:00
raise HTTPException(status_code=400, detail="No signature")
2025-02-26 14:38:27 -05:00
2025-02-27 07:09:02 -05:00
payload = await request.body()
2025-02-26 14:38:27 -05:00
computed_signature = hmac.new(WEBHOOK_SECRET.encode(), payload, hashlib.sha256).hexdigest()
if hmac.compare_digest(signature, computed_signature):
print("Otrzymano ważny webhook z Gitea")
RepoHandler().pull_changes()
for root, dirs, files in os.walk(REPO_PATH):
for file in files:
process_file(os.path.join(root, file))
2025-02-27 07:09:02 -05:00
return {"message": "Zmiany pobrane i przetworzone pomyślnie"}
2025-02-26 14:38:27 -05:00
else:
2025-02-27 07:09:02 -05:00
raise HTTPException(status_code=401, detail="Invalid signature")
def load_all_documents():
print("Wczytywanie wszystkich dokumentów z katalogu...")
for root, dirs, files in os.walk(REPO_PATH):
for file in files:
process_file(os.path.join(root, file))
print("Zakończono wczytywanie dokumentów.")
2025-02-26 14:38:27 -05:00
if __name__ == "__main__":
2025-02-27 07:09:02 -05:00
client.connect()
try:
collection_name = "Document"
# Sprawdzenie, czy kolekcja istnieje i czy należy ją usunąć
if client.collections.exists(collection_name):
print(f"Usuwanie istniejącej kolekcji '{collection_name}' (CLEAR_COLLECTION=true)...")
client.collections.delete(collection_name)
print(f"Kolekcja '{collection_name}' została usunięta.")
else:
print(f"Kolekcja '{collection_name}' nie istnieje.")
# Tworzenie kolekcji od nowa, jeśli została usunięta lub nie istniała
if not client.collections.exists(collection_name):
print(f"Tworzenie nowej kolekcji '{collection_name}'...")
client.collections.create(
name=collection_name,
properties=[
Property(name="content", data_type=DataType.TEXT),
Property(name="fileName", data_type=DataType.TEXT),
Property(name="contentHash", data_type=DataType.TEXT), # Nowe pole
Property(name="contentType", data_type=DataType.TEXT) # Nowe pole
2025-02-27 07:09:02 -05:00
],
vectorizer_config=Configure.Vectorizer.text2vec_transformers()
)
print(f"Kolekcja '{collection_name}' została utworzona.")
# Wczytanie dokumentów po utworzeniu nowej kolekcji
print("Wczytywanie dokumentów do nowej kolekcji...")
load_all_documents()
print("Wszystkie dokumenty zostały wgrane.")
else:
print("Kolekcja już istnieje. Pominięto jej ponowne tworzenie.")
# Sprawdzenie, czy kolekcja jest pusta i ewentualne wczytanie dokumentów
collection = client.collections.get(collection_name)
if collection.aggregate.over_all(total_count=True).total_count == 0:
print("Kolekcja jest pusta. Wczytywanie dokumentów...")
load_all_documents()
print("Wszystkie dokumenty zostały wgrane do istniejącej kolekcji.")
except Exception as e:
print(f"Wystąpił błąd podczas operacji na kolekcji '{collection_name}': {e}")
print(client.collections.list_all())
2025-02-26 14:38:27 -05:00
# Uruchom monitorowanie plików w osobnym wątku
monitor_thread = threading.Thread(target=start_file_monitor)
monitor_thread.start()
# Uruchom serwer Flask dla webhooka
2025-02-27 07:09:02 -05:00
try:
uvicorn.run(app, host="0.0.0.0", port=WEBHOOK_PORT)
finally:
client.close()