Source code for langchain.document_loaders.cube_semantic

import json
import logging
import time
from typing import List

import requests

from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader

logger = logging.getLogger(__name__)


[docs]class CubeSemanticLoader(BaseLoader): """Load Cube semantic layer metadata. Args: cube_api_url: REST API endpoint. Use the REST API of your Cube's deployment. Please find out more information here: https://cube.dev/docs/http-api/rest#configuration-base-path cube_api_token: Cube API token. Authentication tokens are generated based on your Cube's API secret. Please find out more information here: https://cube.dev/docs/security#generating-json-web-tokens-jwt load_dimension_values: Whether to load dimension values for every string dimension or not. dimension_values_limit: Maximum number of dimension values to load. dimension_values_max_retries: Maximum number of retries to load dimension values. dimension_values_retry_delay: Delay between retries to load dimension values. """ def __init__( self, cube_api_url: str, cube_api_token: str, load_dimension_values: bool = True, dimension_values_limit: int = 10_000, dimension_values_max_retries: int = 10, dimension_values_retry_delay: int = 3, ): self.cube_api_url = cube_api_url self.cube_api_token = cube_api_token self.load_dimension_values = load_dimension_values self.dimension_values_limit = dimension_values_limit self.dimension_values_max_retries = dimension_values_max_retries self.dimension_values_retry_delay = dimension_values_retry_delay def _get_dimension_values(self, dimension_name: str) -> List[str]: """Makes a call to Cube's REST API load endpoint to retrieve values for dimensions. These values can be used to achieve a more accurate filtering. """ logger.info("Loading dimension values for: {dimension_name}...") headers = { "Content-Type": "application/json", "Authorization": self.cube_api_token, } query = { "query": { "dimensions": [dimension_name], "limit": self.dimension_values_limit, } } retries = 0 while retries < self.dimension_values_max_retries: response = requests.request( "POST", f"{self.cube_api_url}/load", headers=headers, data=json.dumps(query), ) if response.status_code == 200: response_data = response.json() if ( "error" in response_data and response_data["error"] == "Continue wait" ): logger.info("Retrying...") retries += 1 time.sleep(self.dimension_values_retry_delay) continue else: dimension_values = [ item[dimension_name] for item in response_data["data"] ] return dimension_values else: logger.error("Request failed with status code:", response.status_code) break if retries == self.dimension_values_max_retries: logger.info("Maximum retries reached.") return []
[docs] def load(self) -> List[Document]: """Makes a call to Cube's REST API metadata endpoint. Returns: A list of documents with attributes: - page_content=column_title + column_description - metadata - table_name - column_name - column_data_type - column_member_type - column_title - column_description - column_values """ headers = { "Content-Type": "application/json", "Authorization": self.cube_api_token, } response = requests.get(f"{self.cube_api_url}/meta", headers=headers) response.raise_for_status() raw_meta_json = response.json() cubes = raw_meta_json.get("cubes", []) docs = [] for cube in cubes: if cube.get("type") != "view": continue cube_name = cube.get("name") measures = cube.get("measures", []) dimensions = cube.get("dimensions", []) for item in measures + dimensions: column_member_type = "measure" if item in measures else "dimension" dimension_values = [] item_name = str(item.get("name")) item_type = str(item.get("type")) if ( self.load_dimension_values and column_member_type == "dimension" and item_type == "string" ): dimension_values = self._get_dimension_values(item_name) metadata = dict( table_name=str(cube_name), column_name=item_name, column_data_type=item_type, column_title=str(item.get("title")), column_description=str(item.get("description")), column_member_type=column_member_type, column_values=dimension_values, ) page_content = f"{str(item.get('title'))}, " page_content += f"{str(item.get('description'))}" docs.append(Document(page_content=page_content, metadata=metadata)) return docs