ably.do/ollama_service.py

385 lines
14 KiB
Python

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import ollama
import weaviate
from weaviate.connect import ConnectionParams
from weaviate.collections.classes.filters import Filter
import re
import json
import uvicorn
import httpx
from typing import List, Optional
import asyncio
import os
from elasticsearch import Elasticsearch
from datetime import datetime
####
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import urllib.parse
import hashlib
app = FastAPI()
OLLAMA_BASE_URL = "http://ollama:11434"
ES_BASE_URL = "http://elastic:9200"
WEAVIATE_URL = "http://weaviate:8080"
PROMPT_DIR_PATCH = "./prompts"
SEARXNG_BASE_URL = "http://searxng:8080"
# Inicjalizacja klientów
ollama_client = ollama.Client(host=OLLAMA_BASE_URL)
es = Elasticsearch(ES_BASE_URL)
weaviate_client = weaviate.WeaviateClient(
connection_params=ConnectionParams.from_params(
http_host="weaviate",
http_port=8080,
http_secure=False,
grpc_host="weaviate",
grpc_port=50051,
grpc_secure=False,
)
)
weaviate_client.connect()
collection = weaviate_client.collections.get("Document")
files_content = None
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
stream: Optional[bool] = False
options: Optional[dict] = None
class ChatResponse(BaseModel):
model: str
created_at: str
message: Message
done: bool
total_duration: int
load_duration: int
prompt_eval_count: int
prompt_eval_duration: int
eval_count: int
eval_duration: int
def read_text_files_from_directory(directory_path):
files_dict = {}
# Iterowanie przez wszystkie pliki w katalogu
for filename in os.listdir(directory_path):
# Sprawdzanie, czy plik ma rozszerzenie .txt
if filename.endswith('.txt'):
file_path = os.path.join(directory_path, filename)
try:
# Odczytywanie zawartości pliku
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# Dodawanie do słownika z nazwą pliku bez rozszerzenia jako kluczem
file_name_without_extension = os.path.splitext(filename)[0]
files_dict[file_name_without_extension] = content
except Exception as e:
print(f"Błąd przy odczycie pliku {filename}: {e}")
return files_dict
def analyze_query(content):
analysis = ollama_client.chat(
model="gemma2:2b",
messages=[{"role": "user", "content": content}]
)
keywords = [word.strip() for word in analysis['message']['content'].split(',') if word.strip()]
print("Słowa kluczowe:", keywords)
return keywords
def extract_full_article(content, article_number):
pattern = rf"Art\.\s*{article_number}\..*?(?=Art\.\s*\d+\.|\Z)"
match = re.search(pattern, content, re.DOTALL)
if match:
return match.group(0).strip()
return None
def extract_relevant_fragment(content, query, context_size=100):
article_match = re.match(r"Art\.\s*(\d+)", query)
if article_match:
article_number = article_match.group(1)
full_article = extract_full_article(content, article_number)
if full_article:
return full_article
index = content.lower().find(query.lower())
if index != -1:
start = max(0, index - context_size)
end = min(len(content), index + len(query) + context_size)
return f"...{content[start:end]}..."
return content[:1000] + "..."
def hybrid_search(keywords, limit=100, alpha=0.5):
if isinstance(keywords, str):
keywords = [keywords]
query = " ".join(keywords)
#print(f"\nWyszukiwanie hybrydowe dla słowa kluczowego: '{query}'")
response = collection.query.hybrid(
query=query,
alpha=alpha,
limit=limit * 2
)
results = []
for obj in response.objects:
#print(f"UUID: {obj.uuid}")
relevant_fragment = extract_relevant_fragment(obj.properties['content'], query)
#print(f"Relewantny fragment:\n{relevant_fragment}")
print(f"Nazwa pliku: {obj.properties['fileName']}")
#print("---")
# Zmieniamy warunek na 'any' zamiast 'all'
#if any(term.lower() in relevant_fragment.lower() for term in keywords):
results.append({
"uuid": obj.uuid,
"relevant_fragment": relevant_fragment,
"file_name": obj.properties['fileName'],
"content_type": obj.properties['contentType'],
"keyword": query
})
#print(f"Dodano do wyników: {obj.uuid}")
if len(results) >= limit:
break
return results[:limit]
async def fetch_json(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_text(session, url):
async with session.get(url) as response:
html = await response.text()
soup = BeautifulSoup(html, "html.parser")
return soup.get_text()
async def process_search_results(query):
search_url = f"{SEARXNG_BASE_URL}/search?q={urllib.parse.quote(query)}&categories=general&format=json"
async with aiohttp.ClientSession() as session:
data = await fetch_json(session, search_url)
results = data.get("results", [])
results_sorted = sorted(results, key=lambda x: x.get("score", float('inf')))[:10]
tasks = [fetch_text(session, result["url"]) for result in results_sorted]
texts = await asyncio.gather(*tasks)
save_to_weaviate([{
"fileName": result["url"],
"content": json.dumps({
"prompt": query,
"completion": text
}),
"contentHash": generate_content_hash(text)
} for result, text in zip(results_sorted, texts)])
def generate_content_hash(content):
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def save_to_weaviate(data):
try:
collection = weaviate_client.collections.get("Document")
for item in data:
filters = Filter.by_property("fileName").equal(item["fileName"])
existing_docs = collection.query.fetch_objects(filters=filters)
if existing_docs.objects:
return
collection.data.insert({
"fileName": item["fileName"],
"content": item["content"],
"contentHash": item["contentHash"],
"contentType": "website"
})
except Exception as e:
print(f"Błąd podczas dodawania informacji do bazy. Error: {e}")
@app.get("/api/tags")
async def tags_proxy():
async with httpx.AsyncClient() as client:
response = await client.get(f"{OLLAMA_BASE_URL}/api/tags")
return response.json()
@app.get("/api/version")
async def tags_proxy():
async with httpx.AsyncClient() as client:
response = await client.get(f"{OLLAMA_BASE_URL}/api/version")
return response.json()
@app.post("/api/generate")
async def generate_proxy(request: Request):
data = await request.json()
async with httpx.AsyncClient() as client:
response = await client.post(f"{OLLAMA_BASE_URL}/api/generate", json=data)
return response.json()
@app.get("/api/models")
async def list_models():
try:
models = ollama_client.list()
return {"models": [model['name'] for model in models['models']]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def stream_chat(model, messages, options):
try:
# Użycie httpx do asynchronicznego pobrania danych od Ollamy
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{OLLAMA_BASE_URL}/api/chat",
json={"model": model, "messages": messages, "stream": True, "options": options},
) as response:
async for line in response.aiter_lines():
yield line + "\n"
except Exception as e:
yield json.dumps({"error": str(e)}) + "\n"
def save_to_elasticsearch(index, request_data, response_data, search_data=None):
try:
def message_to_dict(message):
if isinstance(message, dict):
return {
"role": message.get("role"),
"content": message.get("content")
}
return {
"role": getattr(message, "role", None),
"content": getattr(message, "content", None)
}
if isinstance(request_data.get("messages"), list):
request_data["messages"] = [message_to_dict(msg) for msg in request_data["messages"]]
if "message" in response_data:
response_data["message"] = message_to_dict(response_data["message"])
if "timestamp" in response_data:
response_data["timestamp"] = response_data["timestamp"].isoformat()
document = {
"request": request_data,
"response": response_data,
"timestamp": datetime.utcnow().isoformat(),
}
if search_data:
document["vector_search"] = {
"keywords": search_data.get("keywords", []),
"results": search_data.get("results", [])
}
json_document = json.dumps(document, default=str)
index_name = index
response = es.index(index=index_name, body=json_document)
#print(response)
except Exception as e:
print(f"Error saving to Elasticsearch: {e}")
@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
try:
files_content = read_text_files_from_directory(PROMPT_DIR_PATCH)
if files_content is None:
raise KeyError(f"Nie wczytano promptów!!!")
prompt_seach = files_content.get("prompt_seach")
if prompt_seach is None:
raise KeyError(f"Nie znaleziono promptu o nazwie '{prompt_seach}'.")
prompt_system = files_content.get("prompt_system")
if prompt_system is None:
raise KeyError(f"Nie znaleziono promptu o nazwie '{prompt_system}'.")
prompt_answer = files_content.get("prompt_answer")
if prompt_answer is None:
raise KeyError(f"Nie znaleziono promptu o nazwie '{prompt_answer}'.")
prompt_data = files_content.get("prompt_data")
if prompt_data is None:
raise KeyError(f"Nie znaleziono promptu o nazwie '{prompt_data}'.")
query = request.messages[-1].content if request.messages else ""
asyncio.run(process_search_results(query))
keywords = analyze_query(prompt_seach.format(query=query))
weaviate_results = hybrid_search(keywords)
prompt_data += "\n".join([f"Źródło: {doc['file_name']}\n{doc['relevant_fragment']}\n\n" for doc in weaviate_results])
#messages_with_context =[
# {"role": "system", "content": prompt_system},
# {"role": "system", "content": prompt_data},
# {"role": "user", "content": prompt_answer.format(query=query)}
# ]
# Zmieniamy, aby przekazać pełną historię wiadomości
messages_with_context =[
{"role": "system", "content": prompt_system},
{"role": "system", "content": prompt_data},
# Dodajemy wszystkie wiadomości z historii
*[{"role": message.role, "content": message.content} for message in request.messages],
{"role": "user", "content": prompt_answer.format(query=query)}
]
if request.stream:
return StreamingResponse(stream_chat(request.model, messages_with_context, request.options), media_type="application/json")
ollama_response = ollama_client.chat(
model=request.model,
messages=messages_with_context,
stream=False,
options=request.options
)
request_data = {
"query": query,
"messages": request.messages,
"options": request.options
}
response_data = {
"model": request.model,
"created_at": ollama_response.get('created_at', ''),
"message": ollama_response['message'],
"done": ollama_response.get('done', True),
"total_duration": ollama_response.get('total_duration', 0),
"load_duration": ollama_response.get('load_duration', 0),
"prompt_eval_count": ollama_response.get('prompt_eval_count', 0),
"prompt_eval_duration": ollama_response.get('prompt_eval_duration', 0),
"eval_count": ollama_response.get('eval_count', 0),
"eval_duration": ollama_response.get('eval_duration', 0)
}
save_to_elasticsearch("ably.do", request_data, response_data, {"keywords": keywords, "results": weaviate_results })
return ChatResponse(
model=request.model,
created_at=ollama_response.get('created_at', ''),
message=Message(
role=ollama_response['message']['role'],
content=ollama_response['message']['content']
),
done=ollama_response.get('done', True),
total_duration=ollama_response.get('total_duration', 0),
load_duration=ollama_response.get('load_duration', 0),
prompt_eval_count=ollama_response.get('prompt_eval_count', 0),
prompt_eval_duration=ollama_response.get('prompt_eval_duration', 0),
eval_count=ollama_response.get('eval_count', 0),
eval_duration=ollama_response.get('eval_duration', 0)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)