| import json |
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_ollama import OllamaEmbeddings |
| from langchain_community.vectorstores import Chroma |
| from langchain_ollama import OllamaLLM |
| from langchain.chains import RetrievalQA |
| from langchain.prompts import PromptTemplate |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings |
| from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames |
| from langchain_ibm import WatsonxLLM, WatsonxEmbeddings |
| from langchain_huggingface import HuggingFaceEndpoint, HuggingFaceEmbeddings |
| from ibm_watsonx_ai import APIClient, Credentials |
| from utils import AI_MODELS, TRANSLATIONS |
| import chromadb |
| import requests |
| import os |
| from dotenv import load_dotenv |
| import re |
| from sklearn.cluster import KMeans |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| OLLAMA_LLM = "granite3.1-dense" |
| OLLAMA_EMBEDDINGS = "granite-embedding:278m" |
|
|
|
|
| load_dotenv() |
|
|
| ENVIRONMENT = os.getenv("ENVIRONMENT") |
| HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") |
| api_key_watsonx = os.getenv('WATSONX_APIKEY') |
| projectid_watsonx = os.getenv('WATSONX_PROJECT_ID') |
| endpoint_watsonx = "https://us-south.ml.cloud.ibm.com" |
|
|
| def set_up_watsonx(): |
| token_watsonx = authenticate_watsonx(api_key_watsonx) |
| if token_watsonx == None: |
| return None |
| parameters = { |
| "max_new_tokens": 1500, |
| "min_new_tokens": 1, |
| "temperature": 0.7, |
| "top_k": 50, |
| "top_p": 1, |
| } |
|
|
| embed_params = { |
| EmbedTextParamsMetaNames.TRUNCATE_INPUT_TOKENS: 1, |
| EmbedTextParamsMetaNames.RETURN_OPTIONS: {"input_text": True}, |
| } |
|
|
| credentials = Credentials( |
| url = endpoint_watsonx, |
| api_key = api_key_watsonx, |
| ) |
|
|
| client = APIClient(credentials, project_id=projectid_watsonx) |
|
|
| client.set_token(token_watsonx) |
|
|
| watsonx_llm = WatsonxLLM( |
| model_id="ibm/granite-3-2-8b-instruct", |
| watsonx_client=client, |
| params = parameters |
| ) |
|
|
|
|
| watsonx_embedding = WatsonxEmbeddings( |
| model_id="ibm/granite-embedding-278m-multilingual", |
| url=endpoint_watsonx, |
| project_id=projectid_watsonx, |
| params=embed_params, |
| ) |
|
|
| return watsonx_llm, watsonx_embedding |
|
|
| def authenticate_watsonx(api_key): |
| url = "https://iam.cloud.ibm.com/identity/token" |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded" |
| } |
| data = { |
| "grant_type": "urn:ibm:params:oauth:grant-type:apikey", |
| "apikey": api_key |
| } |
|
|
| response = requests.post(url, headers=headers, data=data) |
| |
| if response.status_code == 200: |
| token = response.json().get('access_token') |
| os.environ["WATSONX_TOKEN"] = token |
| return token |
| else: |
| print("Authentication failed. Status code:", response.status_code) |
| print("Response:", response.text) |
| return None |
|
|
| class PDFProcessor: |
| def __init__(self): |
| self.language = list(TRANSLATIONS.keys())[0] |
| |
| def set_language(self, language): |
| self.language = language |
| |
| def set_llm(self, ai_model, type_model, api_key, project_id_watsonx): |
| if ai_model == "Open AI / GPT-4o-mini": |
| current_llm = ChatOpenAI( |
| model="gpt-4o", |
| temperature=0.5, |
| max_tokens=None, |
| timeout=None, |
| max_retries=2, |
| api_key=api_key, |
| ) |
| embeding_model = OpenAIEmbeddings( |
| model="text-embedding-3-small", |
| api_key=api_key, |
| ) |
|
|
|
|
| elif ai_model == "IBM Granite3.1 dense / Ollama local": |
| if type_model == "Local": |
| try: |
| |
| current_llm = OllamaLLM(model=OLLAMA_LLM) |
| |
| test_embedding = OllamaEmbeddings(model=OLLAMA_EMBEDDINGS) |
| test_embedding.embed_query("test") |
| embeding_model = test_embedding |
| except Exception as e: |
| print(f"Error with Ollama: {e}") |
| |
| raise Exception("Please ensure Ollama is running and the models are pulled: \n" + |
| f"ollama pull {OLLAMA_LLM}\n" + |
| f"ollama pull {OLLAMA_EMBEDDINGS}") |
| else: |
| current_llm, embeding_model = set_up_watsonx() |
| else: |
| if ENVIRONMENT != "dev": |
| print("HUGGINGFACE accessing") |
| current_llm = HuggingFaceEndpoint( |
| repo_id= AI_MODELS[ai_model], |
| temperature=0.2, |
| huggingfacehub_api_token=HUGGINGFACE_TOKEN, |
| ) |
| else: |
| current_llm = HuggingFaceEndpoint( |
| repo_id= AI_MODELS[ai_model], |
| temperature=0.2, |
| ) |
| embeding_model = HuggingFaceEmbeddings( |
| model_name="ibm-granite/granite-embedding-278m-multilingual", |
| ) |
| return current_llm, embeding_model |
| |
| def process_pdf(self, vectorstore, pdf_file, chunk_size, chunk_overlap, ai_model, type_model, api_key, project_id_watsonx): |
| defined_chunk_size = 1000 |
| defined_chunk_overlap = 150 |
| if (ai_model == "Open AI / GPT-4o-mini" and (api_key == "")) : |
| return TRANSLATIONS[self.language]["api_key_required"] |
| if pdf_file is not None: |
| loader = PyPDFLoader(file_path=pdf_file.name) |
| documents = loader.load() |
| |
| documents = [doc for doc in documents if doc.page_content] |
| if(ai_model == "Open AI / GPT-4o-mini" or ai_model == "IBM Granite3.1 dense / Ollama local"): |
| if type_model == "Api Key": |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=defined_chunk_size, |
| chunk_overlap=defined_chunk_overlap, |
| separators=["\n\n", "\n"] |
| ) |
| else: |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=defined_chunk_size, |
| chunk_overlap=defined_chunk_overlap, |
| ) |
| else: |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=defined_chunk_size, |
| chunk_overlap=defined_chunk_overlap |
| ) |
|
|
| |
| texts = text_splitter.split_documents(documents) |
| _, embeddings = self.set_llm(ai_model, type_model, api_key, project_id_watsonx) |
| print("vectorstore: ", vectorstore) |
| |
| if vectorstore: |
| vectorstore.delete_collection() |
| |
| chromadb.api.client.SharedSystemClient.clear_system_cache() |
| new_client = chromadb.EphemeralClient() |
| |
| vectorstore = Chroma.from_documents( |
| documents=texts, |
| embedding=embeddings, |
| client=new_client, |
| collection_name="pdf_collection" |
| |
| ) |
|
|
| print("vectorstore: ", vectorstore) |
| |
| return TRANSLATIONS[self.language]["pdf_processed"], vectorstore |
| |
| else: |
| return TRANSLATIONS[self.language]["load_pdf_first"], None |
| |
| def get_qa_response(self, vectorstore, message, history, ai_model, type_model, api_key, project_id_watsonx, k=4): |
| current_llm, _ = self.set_llm(ai_model, type_model, api_key, project_id_watsonx) |
|
|
| if not vectorstore: |
| return TRANSLATIONS[self.language]["load_pdf_first"] |
| |
| retriever = vectorstore.as_retriever(search_kwargs={"k": k}) |
|
|
| qa_chain = RetrievalQA.from_chain_type( |
| llm=current_llm, |
| chain_type="stuff", |
| retriever=retriever, |
| return_source_documents=True, |
| ) |
| |
| result = qa_chain.invoke({"query": f"{message}.\n You must answer it in {self.language}. Remember not to mention anything that is not in the text. Do not extend information that is not provided in the text. "}) |
|
|
| unique_page_labels = {doc.metadata['page_label'] for doc in result["source_documents"]} |
| |
| page_labels_text = " & ".join([f"Page: {page}" for page in sorted(unique_page_labels)]) |
|
|
| return result["result"] + "\n\nSources: " + page_labels_text |
| |
| def summarizer_by_k_means(self, vectorstore, ai_model, type_model, api_key, project_id_watsonx, k, summary_prompt, just_get_documments=False): |
| print("Summarizer by k means in language: ", self.language) |
| if not vectorstore: |
| return TRANSLATIONS[self.language]["load_pdf_first"] |
| |
| current_llm, _ = self.set_llm(ai_model, type_model, api_key, project_id_watsonx) |
|
|
| |
| documents = vectorstore.get(include=["embeddings", "documents"]) |
| documentsByIds = documents["ids"] |
| documentsByEmbeddings = documents["embeddings"] |
| documentsByDocuments = documents["documents"] |
|
|
| print("documents length: ", len(documentsByEmbeddings)) |
|
|
| |
| number_for_CreateClusters = 2 |
| if len(documentsByEmbeddings) <= 16: |
| number_for_CreateClusters = 2 |
| elif len(documentsByEmbeddings) <= 64: |
| number_for_CreateClusters = 4 |
| elif len(documentsByEmbeddings) <= 128: |
| number_for_CreateClusters = 8 |
| else: |
| number_for_CreateClusters = 12 |
| |
| num_clusters = max(1, len(documentsByEmbeddings) // number_for_CreateClusters) |
|
|
| print("num_clusters: ", num_clusters) |
| kmeans = KMeans(n_clusters=num_clusters, random_state=42) |
| kmeans.fit(documentsByEmbeddings) |
|
|
| summary_documents = [] |
| map_ids_documents = {} |
| |
| for i in range(num_clusters): |
| |
| cluster_indices = [j for j, label in enumerate(kmeans.labels_) if label == i] |
| |
| if not cluster_indices: |
| continue |
| |
| |
| cluster_embeddings = [documentsByEmbeddings[j] for j in cluster_indices] |
| |
| |
| centroid = kmeans.cluster_centers_[i] |
| similarities = [cosine_similarity([embedding], [centroid])[0][0] for embedding in cluster_embeddings] |
| |
| |
| most_similar_index = cluster_indices[similarities.index(max(similarities))] |
| |
| |
| summary_documents.append(documentsByDocuments[most_similar_index]) |
| map_ids_documents[most_similar_index] = documentsByIds[most_similar_index] |
|
|
| print("map_ids_documents: ", map_ids_documents) |
|
|
| |
| summary_text = "\n".join(summary_documents) |
| print("summary_documents: ", summary_text) |
|
|
| if just_get_documments: |
| return summary_text |
|
|
| summary_chain = summary_prompt | current_llm |
| final_summary = summary_chain.invoke({"texts": summary_text, "language": self.language}) |
| |
| return final_summary |
|
|
| def get_summary(self, vectorstore, ai_model, type_model, api_key, project_id_watsonx, just_get_documments=False, k=10): |
|
|
| final_summary_prompt = PromptTemplate( |
| input_variables=["texts", "language"], |
| template=""" |
| Combine the following texts into a cohesive and structured summary: |
| ------------ |
| {texts} |
| ------------ |
| Preserve the original meaning without adding external information or interpretations. |
| Ensure clarity, logical flow, and coherence between the combined points. |
| The summary must be in {language}. |
| The output must be in markdown format. |
| Summary: |
| """ |
| ) |
| |
| return self.summarizer_by_k_means(vectorstore, ai_model, type_model, api_key, project_id_watsonx, k, final_summary_prompt, just_get_documments) |
| |
| |
| def get_specialist_opinion(self, vectorstore, ai_model, type_model, api_key, project_id_watsonx, specialist_prompt): |
| questions_prompt = PromptTemplate( |
| input_variables=["text", "specialist_prompt", "language"], |
| template=""" |
| * Act as a specialist based on the following instructions and behaviour that you will follow: |
| ------------ |
| {specialist_prompt} |
| ------------ |
| * Based on your role as specialist, create some different sintetized and concise aspects to ask to the knowledge base of the document about the following text: |
| ------------ |
| {text} |
| ------------ |
| * The key aspects and questions must be provided in JSON format with the following structure: |
| {{ |
| "aspects": [ |
| "Aspect 1", |
| "Aspect 2", |
| "Aspect 3", |
| "Aspect 4", |
| "Aspect 5", |
| "Aspect 6", |
| "Aspect 7", |
| "Aspect 8", |
| "Aspect 9", |
| "Aspect 10", |
| ] |
| }} |
| ------------ |
| *Example of valid output: |
| {{ |
| "aspects": [ |
| "Finished date of the project", |
| "Payment of the project", |
| "Project extension" |
| ] |
| }} |
| ------------ |
| * The aspects must be redacted in the language of {language}. |
| * The given structure must be followed strictly in front of the keys, just use the list of aspects, do not add any other key. |
| * Generate until 10 different aspects. |
| ------------ |
| Answer: |
| """ |
| ) |
| if not vectorstore: |
| return TRANSLATIONS[self.language]["load_pdf_first"] |
| |
| print(ai_model) |
| print(type_model) |
| current_llm, _ = self.set_llm(ai_model, type_model, api_key, project_id_watsonx) |
|
|
| summary_text = self.get_summary(vectorstore, ai_model, type_model, api_key, project_id_watsonx, True, 10) |
| questions_chain = questions_prompt | current_llm |
| questions = questions_chain.invoke({"text": summary_text, "specialist_prompt": specialist_prompt, "language": self.language}) |
|
|
| print(questions) |
|
|
| |
| match = re.search(r'\{.*\}', questions, re.DOTALL) |
| if match: |
| questions = match.group(0) |
| else: |
| raise ValueError("No valid JSON found in the response") |
|
|
| questions = questions.strip() |
| questions = json.loads(questions) |
|
|
| print(questions) |
|
|
| if len(questions["aspects"]) > 15: |
| questions["aspects"] = questions["aspects"][:15] |
| else: |
| questions["aspects"] = questions["aspects"] |
|
|
| aspects_text = "\n".join([f"* {aspect}: {self.get_qa_response(vectorstore, aspect, [], ai_model, type_model, api_key, project_id_watsonx, 2)}" for aspect in questions["aspects"]]) |
|
|
| return aspects_text |
| |
| |
| """ Act煤a como un abogado altamente experimentado en derecho civil y contractual. |
| |
| Examina si existen cl谩usulas abusivas, desproporcionadas o contrarias a la normativa vigente, y expl铆calas con claridad. |
| Basa tu an谩lisis en principios relevantes del derecho civil y contractual. |
| Ofrece un argumento estructurado y recomendaciones pr谩cticas. |
| Si hay m煤ltiples interpretaciones posibles, pres茅ntalas de manera objetiva. |
| Mant茅n un tono profesional, preciso y fundamentado. |
| |
| Basado en lo que analices, proporciona una evaluaci贸n legal detallada """ |
|
|
| """ Eres profesional en gerencia de proyectos y tienes una amplia experiencia en la creaci贸n, direcci贸n y ejecuci贸n de proyectos de tecnologia. |
| |
| |
| - Basa tu analisis en los objetivos el proyecto, el nicho en que se enfocan y su propuesta de valor. |
| - Ofrece un argumento estructurado y recomendaciones pr谩cticas en base a otros posibles nichos y soluciones relacionadas. |
| - Mant茅n un tono profesional, preciso y fundamentado. |
| Basado en el documento y tu experiencia, proporciona una evaluaci贸n detallada de los proyectos y actividades que se analizaron. |
| """ |
|
|
| """ Act煤a como un psicologo experto en recursos humanos, con amplia experiencia en el mejoramiento de hoas de vida de aspirantes a empleados. |
| |
| Basado en el siguiente texto que detalla una vacante de trabajo, proporciona una evaluaci贸n detallada de c贸mo esa persona puede mejorar su perfil para ser contratada. |
| |
| Descripci贸n de la vacante: |
| |
| """ |
|
|
| """ Act煤a como un asesor e ingeniero financiero experto en lectura de reportes y an谩lisis de datos. |
| |
| Basado en los datos y conclusiones del reporte, proporciona una evaluaci贸n financiera detallada y posibles escenarios tanto negativos como positivos que se puedan presentar. |
| Establece el riesgo que se corre en cada escenario, la probabilidad de ocurrencia de cada uno y la magnitud del impacto en el recurso. |
| Si hay m煤ltiples interpretaciones posibles, pres茅ntalas de manera objetiva. |
| Realiza una hip贸tesis que pronostique el futuro de la situaci贸n o recurso analizado, teniendo en cuenta los datos y conclusiones del reporte. |
| Presenta tus hipotesis en 3 aspectos, corto, mediano y largo plazo. |
| Mant茅n un tono profesional, preciso y fundamentado. |
| |
| Basado en lo que analices, proporciona una evaluaci贸n en detalle sobre los activos, reportes y/o recursos que se analizaron""" |