Próximo seminario web: Seguridad empresarial para Claude Code | 21 de abril · 11:00 a. m. PST. Regístrese aquí →

Cognita: Creación de aplicaciones RAG modulares y de código abierto para la producción

Por TrueFoundry

Actualizado: April 16, 2024

Resumir con

Supongamos que hay un equipo A asignado para desarrollar la aplicación RAG para caso de uso-1, luego está equipo B que está desarrollando la aplicación RAG para caso de uso-2, y luego está equipo C, que solo está planificando su próximo caso de uso de la aplicación RAG. ¿Ha deseado que la creación de canalizaciones de RAG en varios equipos hubiera sido fácil? ¿No es necesario que cada equipo comience desde cero, sino de forma modular en la que cada equipo pueda usar la misma funcionalidad básica y desarrollar sus propias aplicaciones de manera efectiva sobre ella sin ninguna interferencia?

¡¡No te preocupes!! Es por eso que creamos Cognita. Si bien RAG es innegablemente impresionante, el proceso de crear una aplicación funcional con él puede resultar abrumador. Hay mucho que entender en relación con las prácticas de implementación y desarrollo, que van desde la selección de los modelos de IA adecuados para el caso de uso específico hasta la organización de los datos de manera eficaz para obtener la información deseada. Si bien herramientas como Cadena LANG y Índice Llama existen para simplificar el proceso de diseño de prototipos, aún no existe una plantilla RAG de código abierto accesible y lista para usar que incorpore las mejores prácticas y ofrezca soporte modular, lo que permita a cualquier persona utilizarla rápida y fácilmente.

Ventajas de Cognita:

  1. Un repositorio central reutilizable de analizadores, cargadores, integradores y recuperadores.
  2. Capacidad para que los usuarios no técnicos jueguen con la interfaz de usuario: cargue documentos y ejecute QnA utilizando módulos creados por el equipo de desarrollo.
  3. Totalmente impulsado por API, lo que permite la integración con otros sistemas.

Visión general

Profundizando en el funcionamiento interno de Cognita, nuestro objetivo era lograr un equilibrio entre la personalización total y la adaptabilidad y, al mismo tiempo, garantizar la facilidad de uso desde el primer momento. Dado el rápido ritmo de los avances en RAG e IA, era imperativo para nosotros diseñar Cognita teniendo en cuenta la escalabilidad, lo que permitiría una integración perfecta de los nuevos avances y los diversos casos de uso. Esto nos llevó a dividir el proceso RAG en distintos pasos modulares (como se muestra en el diagrama anterior, que se analizará en las secciones siguientes), lo que facilitó el mantenimiento del sistema, la adición de nuevas funcionalidades, como la interoperabilidad con otras bibliotecas de IA, y permitió a los usuarios adaptar la plataforma a sus requisitos específicos. Nuestro objetivo sigue siendo proporcionar a los usuarios una herramienta sólida que no solo satisfaga sus necesidades actuales, sino que también evolucione junto con la tecnología, incluidos cambios arquitectónicos más amplios, como MCP frente a RAG, garantizando un valor a largo plazo.

Componentes

Cognita está diseñado en torno a siete módulos diferentes, cada uno personalizable y controlable para adaptarse a diferentes necesidades:

  1. Cargadores de datos
  2. Analizadores
  3. Embebedores
  4. Cambiadores de posición
  5. DB vectoriales
  6. Almacén de metadatos
  7. Controladores de consultas

Cargadores de datos

Estos cargan los datos de diferentes fuentes, como directorios locales, cubos de S3, bases de datos, Verdadera fundición artefactos, etc. Actualmente, Cognita admite la carga de datos desde el directorio local, la URL web, el repositorio de Github y los artefactos de Truefoundry. Se pueden añadir fácilmente más cargadores de datos en backend/módulos/cargadores de datos/ . Una vez que se agrega un cargador de datos, debe registrarlo para que la aplicación RAG pueda usarlo en backend/modules/dataloaders/__init__.py Para registrar un cargador de datos, añada lo siguiente:

register_dataloader("MyDataLoader", MyDataLoaderClass)

Analizadores

En este paso, tratamos diferentes tipos de datos, como archivos de texto normales, PDF e incluso archivos Markdown. El objetivo es convertir todos estos tipos diferentes en un formato común para que podamos trabajar con ellos más fácilmente más adelante. Esta parte, llamada análisis, suele ser la que lleva más tiempo y es difícil de implementar cuando estamos configurando un sistema como este. Sin embargo, el uso de Cognita puede ayudar porque ya nos permite gestionar la ardua tarea de gestionar las canalizaciones de datos.

Después de esto, dividimos los datos analizados en fragmentos uniformes. Pero, ¿por qué necesitamos esto? El texto que obtenemos de los archivos puede tener diferentes longitudes. Si utilizamos estos textos largos directamente, acabaremos añadiendo un montón de información innecesaria. Además, dado que todos los LLM solo pueden procesar una cierta cantidad de texto a la vez, no podremos incluir todo el contexto importante necesario para la pregunta. Por lo tanto, vamos a dividir el texto en partes más pequeñas para cada sección. Intuitivamente, los fragmentos más pequeños contendrán conceptos relevantes y serán menos ruidosos en comparación con los fragmentos más grandes.

Actualmente admitimos el análisis de Markdown, PDF y Texto archivos. Se pueden agregar fácilmente más analizadores de datos en backend/módulos/analizadores/ . Una vez que se agrega un analizador, debe registrarlo para que la aplicación RAG pueda usarlo en backend/modules/parsers/__init__.py Para registrar un analizador, añada lo siguiente:

register_parser("MyParser", MyParserClass)

Embebedores

Una vez que hayamos dividido los datos en partes más pequeñas, queremos encontrar los fragmentos más importantes para una pregunta específica. Una forma rápida y eficaz de hacerlo es utilizar un modelo previamente entrenado (modelo de incrustación) para convertir nuestros datos y la pregunta en códigos especiales denominados incrustaciones. Luego, comparamos las incrustaciones de cada fragmento de datos con las de la pregunta. Al medir el similitud de coseno entre estas incrustaciones, podemos averiguar qué partes están más estrechamente relacionadas con la pregunta, lo que nos ayuda a encontrar las mejores para usar.

Hay muchos modelos previamente entrenados disponibles para incrustar los datos, como los modelos de OpenAI, Cohere, etc. Los más populares se pueden descubrir a través de El punto de referencia de incrustación masiva de texto de HuggingFace (MTEB) tabla de clasificación. Brindamos soporte para incrustaciones de OpenAI, embeddings de TrueFoundry y también para las actuales SOTA incrustaciones (a partir de abril de 2024) de pan mixto - ai.

Se pueden agregar fácilmente más incrustadores debajo backend/módulos/integrador/ . Una vez que se agrega un incrustador, debe registrarlo para que la aplicación RAG pueda usarlo en backend/modules/embedders/__init__.py Para registrar un analizador, añada lo siguiente:

register_embedder("MyEmbedder", MyEmbedderClass)

Nota: Recuerda que las incrustaciones no son el único método para encontrar partes importantes. ¡También podríamos usar un LLM para esta tarea! Sin embargo, los LLM son mucho más grandes que los modelos de incrustación y tienen un límite en cuanto a la cantidad de texto que pueden procesar a la vez. Por eso es más inteligente usar las incrustaciones para elegir primero los k fragmentos principales. Luego, podemos usar los LLM en estos pocos fragmentos para determinar cuáles son los mejores para usarlos como contexto para responder a nuestra pregunta.

Cambiadores de posición

Una vez que el paso de incrustación encuentra algunas posibles coincidencias, que pueden ser muchas, se aplica un paso de reclasificación. Cambie la clasificación para asegurarse de que los mejores resultados estén en la parte superior. Como resultado, podemos elegir los x documentos principales, lo que hace que nuestro contexto sea más conciso y la consulta rápida sea más corta. Brindamos soporte para SOTA reranker (a partir de abril de 2024) desde pan mixto - ai que se implementa en backend/módulos/cambio de posicionamiento/

VectorDB

Una vez que creamos vectores para textos, los almacenamos en algo llamado base de datos vectorial. Esta base de datos realiza un seguimiento de estos vectores para que podamos encontrarlos rápidamente más adelante utilizando diferentes métodos. Las bases de datos normales organizan los datos en tablas, como filas y columnas, pero las bases de datos vectoriales son especiales porque almacenan y encuentran datos basados en estos vectores. Esto es muy útil para cosas como reconocer imágenes, entender el lenguaje o recomendar cosas. Por ejemplo, en un sistema de recomendaciones, cada artículo que quieras recomendar (como una película o un producto) se convierte en un vector, con diferentes partes del vector que representan diferentes características del artículo, como su género o precio. Del mismo modo, en el lenguaje, cada palabra o documento se convierte en un vector, y partes del vector representan características de la palabra o el documento, como la frecuencia con la que se usa la palabra o su significado. Estas bases de datos vectoriales están diseñadas para gestionarlos de manera eficiente. Usando diferentes formas de medir qué tan cerca están los vectores entre sí, por ejemplo, qué tan similares son o qué tan separados están, encontramos los vectores que están más cerca de la consulta de usuario dada. Las formas más comunes de medir esto son la distancia euclidiana, la similitud de cosenos y el producto de puntos.

Hay varias bases de datos vectoriales disponibles en el mercado, como Qdrant, SingleStore, Weaviate, etc. Actualmente admitimos Qarant y Tienda única. La clase db vectorial Qdrant se define en /backend/modules/vector_db/qdrant.py, mientras que la clase db vectorial SingleStore se define en /backend/modules/vector_db/singlestore.py

También se pueden agregar otros dbs vectoriales en el vector_db carpeta y se puede registrar en /backend/modules/vector_db/__init__.py

Para añadir cualquier soporte de base de datos vectorial en Cognita, el usuario debe hacer lo siguiente:

  • Crea la clase que hereda de Base Vector DB (desde backend.modules.vector_db.base importar BaseVectorDB) e inicialízalo con Configuración de VectorDB (desde backend.types importar VectorDBConfig)
  • Implemente los siguientes métodos:
    • crear_colección: Para inicializar la colección/proyecto/tabla en vector db.
    • upsert_documents: Para insertar los documentos en la base de datos.
    • get_collections: Obtenga todas las colecciones presentes en la base de datos.
    • eliminar_colección: Para eliminar la colección de la base de datos.
    • get_vector_store: Para obtener el almacén de vectores de la colección dada.
    • get_vector_client: Para obtener el cliente vectorial para la colección dada, si lo hubiera.
    • lista_data_punto_vectores: Para enumerar los vectores ya presentes en la base de datos que son similares a los documentos que se están insertando.
    • eliminar vectores de puntos de datos: Para eliminar los vectores de la base de datos, se utiliza para eliminar los vectores antiguos del documento actualizado.

Ahora mostramos cómo podemos agregar una nueva base de datos vectorial al sistema RAG. Tomamos un ejemplo de ambos Qarant y Tienda única vectores dbs.

Integración de Qdrant

Qarant es una base de datos vectorial de código abierto y un motor de búsqueda vectorial escrito en Rust. Proporciona vectores rápidos y escalables búsqueda de similitud servicio con una cómoda API. Para agregar Qdrant vector db al sistema RAG, siga los pasos que se indican a continuación:

En el archivo.env puede agregar lo siguiente

VECTOR_DB_CONFIG = '{"url»: "<url_here>«, «provider»: «qdrant"}' # URL de Qdrant para la instancia implementada
VECTOR_DB_CONFIG=' {"proveedor» :"qdrant», "local» :"true "} ' # Para una instancia de Qdrant basada en archivos locales sin Docker
  1. Crear una clase nueva Base de datos vectorial QDRANT en backend/modules/vector_db/qdrant.py que hereda de Base Vector DB e inicialízalo con Configuración de VectorDB
from langchain_community.vectorstores.qdrant import Qdrant
from qdrant_client.http.models import VectorParams, Distance
from qdrant_client import QdrantClient, models

class QdrantVectorDB(BaseVectorDB):
    def __init__(self, config: VectorDBConfig):
        ...
        # Initialize the qdrant client
        self.qdrant_client = QdrantClient(
            url=self.url,
            port=self.port,
            prefer_grpc=self.prefer_grpc,
            prefix=self.prefix,
        )

  1. Anular el crear_colección método para crear una colección en Qdrant
 def create_collection(self, collection_name: str, embeddings: Embeddings):
    # Calculate embedding size
    partial_embeddings = embeddings.embed_documents(["Initial document"])
    vector_size = len(partial_embeddings[0])

    # Create collection given the embeding dimension and simalrity metric
    self.qdrant_client.create_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(
            size=vector_size,  # embedding dimension
            distance=Distance.COSINE,
        ),
        replication_factor=3,
    )

    # Create metadata index for the collection
    self.qdrant_client.create_payload_index(
        collection_name=collection_name,
        field_name=f"metadata.{DATA_POINT_FQN_METADATA_KEY}",
        field_schema=models.PayloadSchemaType.KEYWORD,
    )

  1. Anular el upsert_documents método para insertar los documentos en la base de datos
def upsert_documents(
    self,
    collection_name: str,
    documents,
    embeddings: Embeddings,
    incremental: bool = True,
):
    # Get the data point fqns from the documents
    # fqns uniquely identify the documents
    # This is used to delete the outdated documents from the db
    data_point_fqns = []
    for document in documents:
        if document.metadata.get(DATA_POINT_FQN_METADATA_KEY):
            data_point_fqns.append(
                document.metadata.get(DATA_POINT_FQN_METADATA_KEY)
            )

    # Add Documents
    Qdrant(
        client=self.qdrant_client,
        collection_name=collection_name,
        embeddings=embeddings,
    ).add_documents(documents=documents)

    # Get the record ids that are already present in the db (Existing documents that are modified)
    # These docs should be removed from the db
    # Used when incremental indexing is enabled
    record_ids_to_be_upserted: List[str] = self._get_records_to_be_upserted(
        collection_name=collection_name,
        data_point_fqns=data_point_fqns,
        incremental=incremental,
    )

    # Delete Documents
    if len(record_ids_to_be_upserted):
        for i in range(0, len(record_ids_to_be_upserted), BATCH_SIZE):
            record_ids_to_be_processed = record_ids_to_be_upserted[
                i : i + BATCH_SIZE
            ]
            self.qdrant_client.delete(
                collection_name=collection_name,
                points_selector=models.PointIdsList(
                    points=record_ids_to_be_processed,
                ),
            )

# This method is used to get the records that are already present in the db
def _get_records_to_be_upserted(
    self, collection_name: str,
    data_point_fqns: List[str],
    incremental: bool
):
    if not incremental:
        return []
    # For incremental deletion, we delete the documents with the same document_id
    stop = False
    offset = None
    record_ids_to_be_upserted = []

    # we fetch the records in batches based on the data_point_fqns
    while stop is not True:
        records, next_offset = self.qdrant_client.scroll(
            collection_name=collection_name,
            scroll_filter=models.Filter(
                should=[
                    models.FieldCondition(
                        key=f"metadata.{DATA_POINT_FQN_METADATA_KEY}",
                        match=models.MatchAny(
                            any=data_point_fqns,
                        ),
                    ),
                ]
            ),
            limit=BATCH_SIZE,
            offset=offset,
            with_payload=False,
            with_vectors=False,
        )
        for record in records:
            record_ids_to_be_upserted.append(record.id)
            if len(record_ids_to_be_upserted) > MAX_SCROLL_LIMIT:
                stop = True
                break
        if next_offset is None:
            stop = True
        else:
            offset = next_offset
    return record_ids_to_be_upserted

  1. Anular el get_collections método para obtener todas las colecciones presentes en la base de datos
def get_collections(self):
    collections = self.qdrant_client.list_collections()
    return [collection.name for collection in collections]

  1. Anular el eliminar_colección método para eliminar la colección de la base de datos
def delete_collection(self, collection_name: str):
    self.qdrant_client.delete_collection(collection_name=collection_name)

  1. Anular el get_vector_store método para obtener el almacén vectorial de la colección dada
def get_vector_store(self, collection_name: str, embeddings: Embeddings):
    return Qdrant(
            client=self.qdrant_client,
            embeddings=embeddings,
            collection_name=collection_name,
        )

  1. Anular el get_vector_client método para obtener el cliente vectorial para la colección dada, si lo hay
def get_vector_client(self):
    return self.qdrant_client

  1. Anular el lista_data_punto_vectores método para enumerar los vectores ya presentes en la base de datos que son similares a los documentos que se están insertando
def list_data_point_vectors(
    self, collection_name: str,
    data_source_fqn: str,
    batch_size: int
) -> List[DataPointVector]:

    stop = False
    offset = None
    data_point_vectors: List[DataPointVector] = []

    # Get the vectors that match with the doc unique identifier
    # Data point is an object that has vector_id, fqn and hash
    # hash is calculated during parsing and chunking phase
    while stop is not True:
        records, next_offset = self.qdrant_client.scroll(
            collection_name=collection_name,
            limit=batch_size,
            with_payload=[
                f"metadata.{DATA_POINT_FQN_METADATA_KEY}",
                f"metadata.{DATA_POINT_HASH_METADATA_KEY}",
            ],
            # filter the records based on the data_point_fqn
            scroll_filter=models.Filter(
                should=[
                    models.FieldCondition(
                        key=f"metadata.{DATA_POINT_FQN_METADATA_KEY}",
                        match=models.MatchText(
                            text=data_source_fqn,
                        ),
                    ),
                ]
            ),
            with_vectors=False,
            offset=offset,
        )
        for record in records:
            metadata: dict = record.payload.get("metadata")
            if (
                metadata
                and metadata.get(DATA_POINT_FQN_METADATA_KEY)
                and metadata.get(DATA_POINT_HASH_METADATA_KEY)
            ):
                # Create a DataPointVector object
                data_point_vectors.append(
                    DataPointVector(
                        data_point_vector_id=record.id,
                        data_point_fqn=metadata.get(DATA_POINT_FQN_METADATA_KEY),
                        data_point_hash=metadata.get(DATA_POINT_HASH_METADATA_KEY),
                    )
                )
            if len(data_point_vectors) > MAX_SCROLL_LIMIT:
                stop = True
                break
        if next_offset is None:
            stop = True
        else:
            offset = next_offset
    return data_point_vectors

  1. Anular el eliminar vectores de puntos de datos método para eliminar los vectores de la base de datos, utilizado para eliminar los vectores antiguos del documento actualizado
def delete_data_point_vectors(
    self,
    collection_name: str,
    data_point_vectors: List[DataPointVector],
    batch_size: int = BATCH_SIZE,
):

    vectors_to_be_deleted_count = len(data_point_vectors)
    deleted_vectors_count = 0

    # delete the vectors in batches
    for i in range(0, vectors_to_be_deleted_count, batch_size):
        data_point_vectors_to_be_processed = data_point_vectors[i : i + batch_size]
        # Delete the vectors from the db
        self.qdrant_client.delete(
            collection_name=collection_name,
            points_selector=models.PointIdsList(
                points=[
                    document_vector_point.data_point_vector_id
                    for document_vector_point in data_point_vectors_to_be_processed
                ],
            ),
        )
        # Increment the deleted vectors count
        deleted_vectors_count = deleted_vectors_count + len(data_point_vectors_to_be_processed)

Integración con una sola tienda

SingleStore ofrece una potente funcionalidad de base de datos vectorial que se adapta perfectamente a aplicaciones basadas en inteligencia artificial, chatbots, reconocimiento de imágenes y más, lo que elimina la necesidad de ejecutar una base de datos vectorial especializada únicamente para sus cargas de trabajo vectoriales. A diferencia de las bases de datos vectoriales tradicionales, SingleStore almacena los datos vectoriales en tablas relacionales junto con otros tipos de datos. La ubicación conjunta de los datos vectoriales con los datos relacionados le permite consultar fácilmente los metadatos ampliados y otros atributos de los datos vectoriales con toda la potencia de SQL.

SingleStore ofrece un nivel gratuito para que los desarrolladores comiencen con su base de datos vectorial. Puedes registrarte para obtener una cuenta gratuita aquí. Al registrarte, ve a Nube -> espacio de trabajo -> Crear usuario. Usa las credenciales para conectarte a la instancia de SingleStore.

En el archivo.env puede agregar lo siguiente

VECTOR_DB_CONFIG = '{"url»: "<url_here>«, «proveedor»: «tienda única"}' # url: mysql://{usuario}: {contraseña} @ {host}: {port}/{db}

Para agregar la base de datos vectorial SingleStore al sistema RAG, siga los pasos que se indican a continuación:

  1. Queremos agregar columnas adicionales a la tabla para almacenar la identificación del vector, por lo tanto, anulamos SingleStoreDB.
from langchain_community.vectorstores.singlestoredb import SingleStoreDB
import singlestoredb as s2

class SSDB(SingleStoreDB):
    def _create_table(self: SingleStoreDB) -> None:
        """Create table if it doesn't exist."""
        conn = self.connection_pool.connect()
        try:
            cur = conn.cursor()
            # Overriding the default table creation behaviour this adds id as autoinc primary key
            try:
                if self.use_vector_index:
                    index_options = ""
                    if self.vector_index_options and len(self.vector_index_options) > 0:
                        index_options = "INDEX_OPTIONS '{}'".format(
                            json.dumps(self.vector_index_options)
                        )
                    cur.execute(
                        """CREATE TABLE IF NOT EXISTS {}
                        (id BIGINT AUTO_INCREMENT PRIMARY KEY, {} TEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,
                        {} VECTOR({}, F32) NOT NULL, {} JSON,
                        VECTOR INDEX {} ({}) {});""".format(
                            self.table_name,
                            self.content_field,
                            self.vector_field,
                            self.vector_size,
                            self.metadata_field,
                            self.vector_index_name,
                            self.vector_field,
                            index_options,
                        ),
                    )
                else:
                    cur.execute(
                        """CREATE TABLE IF NOT EXISTS {}
                        (id BIGINT AUTO_INCREMENT PRIMARY KEY, {} TEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,
                        {} BLOB, {} JSON);""".format(
                            self.table_name,
                            self.content_field,
                            self.vector_field,
                            self.metadata_field,
                        ),
                    )
            finally:
                cur.close()
        finally:
            conn.close()


    def add_texts(
        self,
        texts: Iterable[str],
        metadatas: Optional[List[dict]] = None,
        embeddings: Optional[List[List[float]]] = None,
        **kwargs: Any,
    ) -> List[str]:
        """Add more texts to the vectorstore.

        Args:
            texts (Iterable[str]): Iterable of strings/text to add to the vectorstore.
            metadatas (Optional[List[dict]], optional): Optional list of metadatas.
                Defaults to None.
            embeddings (Optional[List[List[float]]], optional): Optional pre-generated
                embeddings. Defaults to None.

        Returns:
            List[str]: empty list
        """
        conn = self.connection_pool.connect()
        try:
            cur = conn.cursor()
            try:
                # Write data to singlestore db
                for i, text in enumerate(texts):
                    # Use provided values by default or fallback
                    metadata = metadatas[i] if metadatas else {}
                    embedding = (
                        embeddings[i]
                        if embeddings
                        else self.embedding.embed_documents([text])[0]
                    )
                    # Overriding insert statement to handle autoincrement id
                    cur.execute(
                        "INSERT INTO {} (content, vector, metadata) VALUES (%s, JSON_ARRAY_PACK(%s), %s)".format(
                            self.table_name
                        ),
                        (
                            text,
                            "[{}]".format(",".join(map(str, embedding))),
                            json.dumps(metadata),
                        ),
                    )
                if self.use_vector_index:
                    cur.execute("OPTIMIZE TABLE {} FLUSH;".format(self.table_name))
            finally:
                cur.close()
        finally:
            conn.close()
        return []

  1. Crear una clase nueva Base de datos vectorial de una sola tienda en backend/modules/vector_db/singlestore.py que hereda de Base Vector DB e inicialízalo con Configuración de VectorDB
class SingleStoreVectorDB(BaseVectorDB):
    def __init__(self, config: VectorDBConfig):
        # url: mysql://{user}:{password}@{host}:{port}/{db}
        self.host = config.url

  1. Anular el crear_colección método para crear una colección en SingleStore
def create_collection(self, collection_name: str, embeddings: Embeddings):
    # Calculate embedding size
    partial_embeddings = embeddings.embed_documents(["Initial document"])
    vector_size = len(partial_embeddings[0])

    # Create collection
    # we keep vector_filed, content_field as default values
    SSDB(
        embedding=embeddings,
        host=self.host,
        table_name=collection_name,
        vector_size=vector_size,
        use_vector_index=True,
    )

  1. Anular el upsert_documents método para insertar los documentos en la base de datos
def upsert_documents(
    self,
    collection_name: str,
    documents: List[Document],
    embeddings: Embeddings,
    incremental: bool = True,
):
    # Get the data point fqns from the documents
    data_point_fqns = []
    for document in documents:
        if document.metadata.get(DATA_POINT_FQN_METADATA_KEY):
            data_point_fqns.append(
                document.metadata.get(DATA_POINT_FQN_METADATA_KEY)
            )

    try:
        # Add documents to the table
        SSDB.from_documents(
            embedding=embeddings,
            documents=documents,
            table_name=collection_name,
            host=self.host,
        )
        logger.debug(
            f"[SingleStore] Added {len(documents)} documents to collection {collection_name}"
        )
    except Exception as e:
        logger.error(
            f"[SingleStore] Failed to add documents to collection {collection_name}: {e}"
        )

  1. Anular el get_collections método para obtener todas las colecciones presentes en la base de datos
def get_collections(self) -> List[str]:
    # Create connection to SingleStore
    conn = s2.connect(self.host)
    try:
        cur = conn.cursor()
        try:
            # Get all the tables in the db
            cur.execute("SHOW TABLES")
            return [row[0] for row in cur.fetchall()]
        except Exception as e:
            logger.error(
                f"[SingleStore] Failed to get collections: {e}"
            )
        finally:
            cur.close()
    except Exception as e:
        logger.error(
            f"[SingleStore] Failed to get collections: {e}"
        )
    finally:
        conn.close()

  1. Anular el eliminar_colección método para eliminar la colección de la base de datos

def delete_collection(self, collection_name: str):
    # Create connection to SingleStore
    conn = s2.connect(self.host)
    try:
        cur = conn.cursor()
        try:
            # Drop the table
            cur.execute(f"DROP TABLE {collection_name}")
            logger.debug(f"[SingleStore] Deleted collection {collection_name}")
        except Exception as e:
            logger.error(
                f"[SingleStore] Failed to delete collection {collection_name}: {e}"
            )
        finally:
            cur.close()
    except Exception as e:
        logger.error(
            f"[SingleStore] Failed to delete collection {collection_name}: {e}"
        )
    finally:
        conn.close()

  1. Anular el get_vector_store método para obtener el almacén vectorial de la colección dada
def get_vector_store(self, collection_name: str, embeddings: Embeddings):
    return SSDB(
        embedding=embeddings,
        host=self.host,
        table_name=collection_name,
    )

  1. Anular el get_vector_client método para obtener el cliente vectorial para la colección dada, si lo hay
def get_vector_client(self):
    return s2.connect(self.host)

  1. Anular el lista_data_punto_vectores método para enumerar los vectores ya presentes en la base de datos que son similares a los documentos que se están insertando
def list_data_point_vectors(
    self,
    collection_name: str,
    data_source_fqn: str,
) -> List[DataPointVector]:
    data_point_vectors: List[DataPointVector] = []
    logger.debug(f"data_source_fqn: {data_source_fqn}")

    # Get all data point vectors from table upto MAX_SCROLL_LIMIT
    conn = s2.connect(self.host)
    try:
        curr = conn.cursor()

        # Remove all data point vectors with the same data_source_fqn
        curr.execute(f"SELECT * FROM {collection_name} WHERE JSON_EXTRACT_JSON(metadata, '{DATA_POINT_FQN_METADATA_KEY}') LIKE '%{data_source_fqn}%' LIMIT {MAX_SCROLL_LIMIT}")

        for record in curr:
            # id, content, vector, metadata
            id, _, _, metadata = record
            if (
                metadata
                and metadata.get(DATA_POINT_FQN_METADATA_KEY)
                and metadata.get(DATA_POINT_HASH_METADATA_KEY)
            ):
                # Create a DataPointVector object
                data_point_vectors.append(
                    DataPointVector(
                        data_point_vector_id=id,
                        data_point_fqn=metadata.get(DATA_POINT_FQN_METADATA_KEY),
                        data_point_hash=metadata.get(DATA_POINT_HASH_METADATA_KEY),
                    )
                )
    except Exception as e:
        logger.error(
            f"[SingleStore] Failed to list data point vectors: {e}"
        )
    finally:
        conn.close()

    logger.debug(
        f"[SingleStore] Listing {len(data_point_vectors)} data point vectors for collection {collection_name}"
    )
    return data_point_vectors

  1. Anular el eliminar vectores de puntos de datos método para eliminar los vectores de la base de datos, utilizado para eliminar los vectores antiguos del documento actualizado
def delete_data_point_vectors(
    self,
    collection_name: str,
    data_point_vectors: List[DataPointVector],
    batch_size: int =BATCH_SIZE,
):
    if len(data_point_vectors) > 0:
        # Create connection to SingleStore
        conn = s2.connect(self.host)

        try:
            vectors_to_be_deleted_count = len(data_point_vectors)
            curr = conn.cursor()

            # Delete the vectors as per the data_point_vector_id
            curr.execute(
                f"DELETE FROM {collection_name} WHERE id in ({', '.join(data_point_vector.data_point_vector_id for data_point_vector in data_point_vectors)})"
            )
            logger.debug(
                f"[SingleStore] Deleted {vectors_to_be_deleted_count} data point vectors"
            )
        except Exception as e:
            logger.error(
                f"[SingleStore] Failed to delete data point vectors: {e}"
            )
        finally:
            conn.close()

Almacén de metadatos

Contiene las configuraciones necesarias que definen de forma exclusiva un proyecto o una aplicación RAG. Una aplicación RAG puede contener un conjunto de documentos de una o más fuentes de datos combinadas, lo que denominamos colección. Los documentos de estas fuentes de datos se indexan en la base de datos vectorial mediante métodos de carga, análisis e incrustación de datos. Para cada caso de uso de RAG, el almacén de metadatos contiene:

  • Nombre de la colección
  • Nombre de la base de datos vectorial asociada utilizada
  • Fuentes de datos vinculadas
  • Configuración de análisis para cada fuente de datos
  • Modelo de incrustación y su configuración a utilizar

Actualmente definimos dos formas de almacenar estos datos, una localmente y otros usos True foudry. Estas tiendas se definen en - backend/modules/metada_store/

Controladores de consultas

Una vez que los datos estén indexados y almacenados en una base de datos vectorial, ahora es el momento de combinar todas las partes para usar nuestra aplicación. ¡Los Query Controllers solo hacen eso! Nos ayudan a recuperar la respuesta a la consulta del usuario correspondiente. Los pasos típicos de un controlador de consultas son los siguientes:

  1. Los usuarios envían una carga útil de solicitud que contiene la consulta, el nombre de la colección, la configuración de llm, el mensaje, el recuperador y su configuración.
  2. Basado en el nombre de la colección relevante base de datos vectorial se completa con su configuración, como el incrustador utilizado, el tipo de base de datos vectorial, etc.
  3. Basado en el consulta, los documentos relevantes se recuperan mediante el perro perdiguero de vector db.
  4. Los documentos recuperados forman el contexto y junto con la consulta a.k.a pregunta se entrega a la LLM para generar la respuesta. Este paso también puede implicar un ajuste rápido.
  5. Si es necesario, junto con la respuesta generada, también se devuelven fragmentos de documentos relevantes en la respuesta.

Nota: En el caso de los agentes, las etapas intermedias también se pueden transmitir. La decisión depende de la aplicación específica.

Los métodos del controlador de consultas se pueden exponer directamente como una API, añadiendo decoradores http a las funciones respectivas.

Para agregar su propio controlador de consultas, lleve a cabo los siguientes pasos:

  1. Cada caso de uso de RAG normalmente debe tener una carpeta independiente que contenga el controlador de consultas. Supongamos que nuestra carpeta es aplicación-2. Por lo tanto, escribiremos nuestro controlador en /backend/modules/query_controller/app-2/controller.py
  2. Añadir controlador de consultas decorador a tu clase y pasa el nombre de tu controlador personalizado como argumento
  3. Agregue métodos a este controlador según sus necesidades y use nuestros decoradores http como correo, conseguir, eliminar para convertir tus métodos en una API
from backend.server.decorator import post
@query_controller("/app-2")
class MyCustomController():
    ...
    @post("/answer")
    def answer(query: str):
        # Write code to express your logic for answer
        # This API will be exposed as POST /my-controller/answer
        ...

  1. Importa tu clase de controlador personalizada en backend/modules/query_controllers/__init__.py
from backend.modules.query_controllers.sample_controller.controller import MyCustomController

Un ejemplo de controlador de consultas está escrito en: /backend/modules/query_controller/example/controller.py Consulte para una mejor comprensión

Cognita - Flujo de procesos

Un proceso típico de Cognita consta de dos fases:

  1. Indexación de datos
  2. Generación de respuestas

Indexación de datos

Esta fase implica cargar datos de las fuentes, analizar los documentos presentes en estas fuentes e indexarlos en la base de datos vectorial. Para gestionar las grandes cantidades de documentos que se encuentran en la producción, Cognita va un paso más allá.

  1. Cognita agrupa los documentos en lotes, en lugar de indexarlos todos juntos.
  2. Cognita mantiene los cálculos y realiza un seguimiento del hash de los documentos, de modo que cada vez que se agrega un nuevo documento a la fuente de datos para indexar solo esos documentos se indexan en lugar de indexar la fuente de datos completa, lo que ahorra mucho tiempo y computación.
  3. Este modo de indexación también se conoce como INCREMENTAL indexación, también hay otro modo que es compatible con Cognita, que es COMPLETO indexación. COMPLETO la indexación vuelve a introducir los datos en la base de datos vectorial independientemente de los datos vectoriales presentes en la colección dada.

Generación de respuestas

La fase de generación de respuestas hace un llamado al /respuesta punto final de su definido Controlador de consultas y genera la respuesta para la consulta solicitada.

Uso de Cognita UI

Los siguientes pasos mostrarán cómo usar la interfaz de usuario de cognita para consultar documentos:

1. Crear fuente de datos

  • Haga clic en Fuentes de datos lengüeta
Datasource
  • Haga clic + Nueva fuente de datos
  • El tipo de fuente de datos puede ser un archivo del directorio local, una URL web, una URL de github o proporcionar un FQN del artefacto de Truefoundry.
    • Por ejemplo: Si directorio local está seleccionado, suba los archivos desde su máquina y haga clic Enviar.
  • La lista de fuentes de datos creadas estará disponible en la pestaña Fuentes de datos.
DataSourceList

2. Crear colección

  • Haga clic en Colecciones lengüeta
  • Haga clic + Nueva colección
collection
  • Introduzca el nombre de la colección
  • Seleccione el modelo de incrustación
  • Agregue la fuente de datos creada anteriormente y la configuración necesaria
  • Haga clic Proceso para crear la colección e indexar los datos.
ingestionstarted

3. Tan pronto como crees la colección, comienza la ingesta de datos. Puedes ver su estado seleccionando tu colección en la pestaña de colecciones. También puedes añadir fuentes de datos adicionales más adelante e indexarlas en la colección.

ingestioncomplete

4. Generación de respuestas

responsegen
  • Selecciona la colección
  • Seleccione el LLM y su configuración
  • Seleccione el recuperador de documentos
  • Escriba el mensaje o utilice el mensaje predeterminado
  • Haga la consulta

¡Empieza ahora!

Reserva una pDemo personalizada o regístrate hoy para empezar a crear sus casos de uso de RAG.

La forma más rápida de crear, gobernar y escalar su IA

Inscríbase
Tabla de contenido

Controle, implemente y rastree la IA en su propia infraestructura

Reserva 30 minutos con nuestro Experto en IA

Reserve una demostración

La forma más rápida de crear, gobernar y escalar su IA

Demo del libro

Descubra más

July 20, 2023
|
5 minutos de lectura

LLMOps CoE: la próxima frontera en el panorama de los MLOps

May 25, 2023
|
5 minutos de lectura

LLM de código abierto: abrazar o perecer

August 27, 2025
|
5 minutos de lectura

Mapeando el mercado de la IA local: desde chips hasta aviones de control

November 13, 2025
|
5 minutos de lectura

GPT-5.1 frente a GPT-5:9 mejoras importantes que necesita saber

April 22, 2026
|
5 minutos de lectura

Mercados de agentes de IA: el futuro de la automatización de nivel empresarial

No se ha encontrado ningún artículo.
Detailed Guide to What is an AI Gateway?
April 22, 2026
|
5 minutos de lectura

¿Qué es AI Gateway? Conceptos básicos y guía

No se ha encontrado ningún artículo.
April 22, 2026
|
5 minutos de lectura

Aprovechar la puerta de enlace de IA de TrueFoundry para el cumplimiento de FIPS

No se ha encontrado ningún artículo.
April 22, 2026
|
5 minutos de lectura

Integración de GraySwan con TrueFoundry

No se ha encontrado ningún artículo.
April 22, 2026
|
5 minutos de lectura

Un chatbot de control de calidad impulsado por LLM sobre sus datos en la nube

Ingeniería y producto
LLMS y GenAI
April 22, 2026
|
5 minutos de lectura

Evaluación comparativa de Llama-2-13B

LLMS y GenAI
April 22, 2026
|
5 minutos de lectura

¿Qué es Lora Fine Tuning? La guía definitiva

LLMS y GenAI
April 22, 2026
|
5 minutos de lectura

Reduzca sus costos de infraestructura para los modelos ML/LLM

Ingeniería y producto

Blogs recientes

Realice un recorrido rápido por el producto
Comience el recorrido por el producto
Visita guiada por el producto