Spaces:
Sleeping
Sleeping
| # ============================================================================= | |
| # βββββββββββββ IMPORTS βββββββββββββ | |
| # ============================================================================= | |
| import base64 | |
| import glob | |
| import hashlib | |
| import json | |
| import os | |
| import pandas as pd | |
| import pytz | |
| import random | |
| import re | |
| import shutil | |
| import streamlit as st | |
| import time | |
| import traceback | |
| import uuid | |
| import zipfile | |
| from PIL import Image | |
| from azure.cosmos import CosmosClient, PartitionKey, exceptions | |
| from datetime import datetime | |
| from git import Repo | |
| from github import Github | |
| from gradio_client import Client, handle_file | |
| import tempfile | |
| import io | |
| import requests | |
| import numpy as np | |
| from urllib.parse import quote | |
| # ============================================================================= | |
| # βββββββββββββ EXTERNAL HELP LINKS (Always visible in sidebar) βββββββββββββ | |
| # ============================================================================= | |
| external_links = [ | |
| {"title": "CosmosDB GenAI Full Text Search", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search", "emoji": "π»"}, | |
| {"title": "CosmosDB SQL API Client Library", "url": "https://learn.microsoft.com/en-us/python/api/overview/azure/cosmos-readme?view=azure-python", "emoji": "π»"}, | |
| {"title": "CosmosDB Index and Query Vectors", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-python-vector-index-query", "emoji": "π»"}, | |
| {"title": "CosmosDB NoSQL Materialized Views", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/materialized-views", "emoji": "π»"}, | |
| {"title": "LangChain Vector Store Guide", "url": "https://python.langchain.com/docs/integrations/vectorstores/azure_cosmos_db_no_sql/", "emoji": "π»"}, | |
| {"title": "Vector Database Prompt Engineering RAG for Python", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/vector-database?source=recommendations", "emoji": "π»"}, | |
| {"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "π»"}, | |
| {"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "π"}, | |
| {"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "π»"}, | |
| {"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "π"}, | |
| {"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "π"}, | |
| ] | |
| # ============================================================================= | |
| # βββββββββββββ APP CONFIGURATION βββββββββββββ | |
| # ============================================================================= | |
| Site_Name = 'π GitCosmos' | |
| title = "π GitCosmos" | |
| helpURL = 'https://huggingface.co/awacke1' | |
| bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/' | |
| icons = 'πππ«' | |
| st.set_page_config( | |
| page_title=title, | |
| page_icon=icons, | |
| layout="wide", | |
| initial_sidebar_state="auto", | |
| menu_items={ | |
| 'Get Help': helpURL, | |
| 'Report a bug': bugURL, | |
| 'About': title | |
| } | |
| ) | |
| # Cosmos DB & App URLs | |
| ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
| DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
| CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
| Key = os.environ.get("Key") | |
| LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" | |
| CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer' | |
| # ============================================================================= | |
| # βββββββββββββ HELPER FUNCTIONS βββββββββββββ | |
| # ============================================================================= | |
| def get_download_link(file_path): | |
| with open(file_path, "rb") as file: | |
| contents = file.read() | |
| b64 = base64.b64encode(contents).decode() | |
| file_name = os.path.basename(file_path) | |
| return f'<a href="data:file/txt;base64,{b64}" download="{file_name}">Download {file_name} π</a>' | |
| def generate_unique_id(): | |
| timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
| unique_uuid = str(uuid.uuid4()) | |
| return_value = f"{timestamp}-{unique_uuid}" | |
| st.write('New ID: ' + return_value) | |
| return return_value | |
| def generate_filename(prompt, file_type): | |
| central = pytz.timezone('US/Central') | |
| safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
| safe_prompt = re.sub(r'\W+', '', prompt)[:90] | |
| return f"{safe_date_time}{safe_prompt}.{file_type}" | |
| def create_file(filename, prompt, response, should_save=True): | |
| if not should_save: | |
| return | |
| with open(filename, 'w', encoding='utf-8') as file: | |
| file.write(prompt + "\n\n" + response) | |
| def load_file(file_name): | |
| with open(file_name, "r", encoding='utf-8') as file: | |
| content = file.read() | |
| return content | |
| def display_glossary_entity(k): | |
| search_urls = { | |
| "π": lambda k: f"/?q={k}", | |
| "π": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", | |
| "π": lambda k: f"https://www.google.com/search?q={quote(k)}", | |
| "π₯": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", | |
| } | |
| links_md = ' '.join([f"<a href='{url(k)}' target='_blank'>{emoji}</a>" for emoji, url in search_urls.items()]) | |
| st.markdown(f"{k} {links_md}", unsafe_allow_html=True) | |
| def create_zip_of_files(files): | |
| zip_name = "all_files.zip" | |
| with zipfile.ZipFile(zip_name, 'w') as zipf: | |
| for file in files: | |
| zipf.write(file) | |
| return zip_name | |
| def get_video_html(video_path, width="100%"): | |
| video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" | |
| return f''' | |
| <video width="{width}" controls autoplay loop> | |
| <source src="{video_url}" type="video/mp4"> | |
| Your browser does not support video. | |
| </video> | |
| ''' | |
| def get_audio_html(audio_path, width="100%"): | |
| audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" | |
| return f''' | |
| <audio controls style="width:{width}"> | |
| <source src="{audio_url}" type="audio/mpeg"> | |
| Your browser does not support audio. | |
| </audio> | |
| ''' | |
| def preprocess_text(text): | |
| text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n') | |
| text = text.replace('"', '\\"') | |
| text = re.sub(r'[\t]', ' ', text) | |
| text = re.sub(r'[^\x00-\x7F]+', '', text) | |
| return text.strip() | |
| def sanitize_json_text(text): | |
| text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text) | |
| text = text.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") | |
| return text | |
| # ============================================================================= | |
| # βββββββββββββ COSMOS DB FUNCTIONS βββββββββββββ | |
| # ============================================================================= | |
| def get_databases(client): | |
| return [db['id'] for db in client.list_databases()] | |
| def get_containers(database): | |
| return [container['id'] for container in database.list_containers()] | |
| def get_documents(container, limit=None): | |
| query = "SELECT * FROM c ORDER BY c._ts DESC" | |
| items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) | |
| return items | |
| def insert_record(container, record): | |
| try: | |
| container.create_item(body=record) | |
| return True, "Inserted! π" | |
| except exceptions.CosmosHttpResponseError as e: | |
| return False, f"HTTP error: {str(e)} π¨" | |
| except Exception as e: | |
| return False, f"Error: {str(e)} π±" | |
| def update_record(container, updated_record): | |
| try: | |
| container.upsert_item(body=updated_record) | |
| return True, f"Updated {updated_record['id']} π οΈ" | |
| except exceptions.CosmosHttpResponseError as e: | |
| return False, f"HTTP error: {str(e)} π¨" | |
| except Exception as e: | |
| return False, f"Error: {traceback.format_exc()} π±" | |
| def delete_record(container, record): | |
| try: | |
| if "id" not in record: | |
| return False, "Record must contain an 'id' field. π" | |
| doc_id = record["id"] | |
| partition_key_value = record.get("pk", doc_id) | |
| container.delete_item(item=doc_id, partition_key=partition_key_value) | |
| return True, f"Record {doc_id} successfully deleted from Cosmos DB. ποΈ" | |
| except exceptions.CosmosResourceNotFoundError: | |
| return True, f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). ποΈ" | |
| except exceptions.CosmosHttpResponseError as e: | |
| return False, f"HTTP error deleting {doc_id}: {str(e)}. π¨" | |
| except Exception as e: | |
| return False, f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. π±" | |
| def archive_current_container(database_name, container_name, client): | |
| try: | |
| base_dir = "./cosmos_archive_current_container" | |
| if os.path.exists(base_dir): | |
| shutil.rmtree(base_dir) | |
| os.makedirs(base_dir) | |
| db_client = client.get_database_client(database_name) | |
| container_client = db_client.get_container_client(container_name) | |
| items = list(container_client.read_all_items()) | |
| container_dir = os.path.join(base_dir, container_name) | |
| os.makedirs(container_dir) | |
| for item in items: | |
| item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") | |
| with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: | |
| json.dump(item, f, indent=2) | |
| archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| shutil.make_archive(archive_name, 'zip', base_dir) | |
| return get_download_link(f"{archive_name}.zip") | |
| except Exception as e: | |
| return f"Archive error: {str(e)} π’" | |
| # ============================================================================= | |
| # βββββββββββββ ADVANCED COSMOS FUNCTIONS βββββββββββββ | |
| # (Include your advanced Cosmos functions here if needed) | |
| # ============================================================================= | |
| # For example, you might have functions like: | |
| # def create_new_container(...): | |
| # ... | |
| # ============================================================================= | |
| # βββββββββββββ NEW COSMOSDB DEMO FUNCTIONS βββββββββββββ | |
| # Each function below corresponds to one of your provided code snippets. | |
| # ============================================================================= | |
| def demo_create_database(): | |
| st.markdown("### Demo: Create Database") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_name") | |
| if st.button("Create Database", key="btn_create_db"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.create_database(database_name) | |
| st.success(f"Database '{database_name}' created.") | |
| st.write(database) | |
| except exceptions.CosmosResourceExistsError: | |
| database = client_demo.get_database_client(database_name) | |
| st.info(f"Database '{database_name}' already exists.") | |
| st.write(database) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_create_container(): | |
| st.markdown("### Demo: Create Container") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_for_container") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_name") | |
| partition_key = st.text_input("Enter Partition Key Path", value="/productName", key="demo_partition_key") | |
| if st.button("Create Container", key="btn_create_container"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.create_container(id=container_name, partition_key=PartitionKey(path=partition_key)) | |
| st.success(f"Container '{container_name}' created.") | |
| st.write(container) | |
| except exceptions.CosmosResourceExistsError: | |
| container = database.get_container_client(container_name) | |
| st.info(f"Container '{container_name}' already exists.") | |
| st.write(container) | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"HTTP error: {str(e)}") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_create_analytical_container(): | |
| st.markdown("### Demo: Create Analytical Store Enabled Container") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_for_analytical") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_analytical") | |
| partition_key = st.text_input("Enter Partition Key Path", value="/productName", key="demo_partition_key_analytical") | |
| if st.button("Create Analytical Container", key="btn_create_analytical"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.create_container( | |
| id=container_name, | |
| partition_key=PartitionKey(path=partition_key), | |
| analytical_storage_ttl=-1 | |
| ) | |
| st.success(f"Analytical container '{container_name}' created.") | |
| st.write(container) | |
| except exceptions.CosmosResourceExistsError: | |
| container = database.get_container_client(container_name) | |
| st.info(f"Container '{container_name}' already exists.") | |
| st.write(container) | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"HTTP error: {str(e)}") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_get_existing_container(): | |
| st.markdown("### Demo: Get Existing Container") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_get") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_get") | |
| if st.button("Get Container", key="btn_get_container"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.get_container_client(container_name) | |
| st.success(f"Retrieved container '{container_name}'.") | |
| st.write(container) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_insert_data(): | |
| st.markdown("### Demo: Insert Data") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_insert") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_insert") | |
| num_items = st.number_input("Number of items to insert", min_value=1, max_value=20, value=9, key="demo_num_items") | |
| if st.button("Insert Data", key="btn_insert_data"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.get_container_client(container_name) | |
| for i in range(1, int(num_items) + 1): | |
| container.upsert_item({ | |
| 'id': f'item{i}', | |
| 'productName': 'Widget', | |
| 'productModel': f'Model {i}' | |
| }) | |
| st.success(f"Inserted {num_items} items.") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_delete_data(): | |
| st.markdown("### Demo: Delete Data") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_delete") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_delete") | |
| query_model = st.text_input("Product Model to delete", value="Model 2", key="demo_query_model") | |
| if st.button("Delete Data", key="btn_delete_data"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.get_container_client(container_name) | |
| items = list(container.query_items( | |
| query=f'SELECT * FROM products p WHERE p.productModel = "{query_model}"', | |
| enable_cross_partition_query=True | |
| )) | |
| count = 0 | |
| for item in items: | |
| container.delete_item(item, partition_key=item.get("productName", "Widget")) | |
| count += 1 | |
| st.success(f"Deleted {count} items with productModel = '{query_model}'.") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_query_database(): | |
| st.markdown("### Demo: Query Database") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_query") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_query") | |
| query_str = st.text_area("Enter SQL Query", value='SELECT * FROM mycontainer r WHERE r.id="item3"', key="demo_query_str") | |
| if st.button("Run Query", key="btn_query_database"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.get_container_client(container_name) | |
| results = list(container.query_items(query=query_str, enable_cross_partition_query=True)) | |
| if results: | |
| for item in results: | |
| st.json(item) | |
| else: | |
| st.info("No results found.") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_parameterized_query(): | |
| st.markdown("### Demo: Parameterized Query") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_param") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_param") | |
| model_value = st.text_input("Enter productModel value", value="Model 7", key="demo_model_value") | |
| if st.button("Run Parameterized Query", key="btn_param_query"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.get_container_client(container_name) | |
| discontinued_items = container.query_items( | |
| query='SELECT * FROM products p WHERE p.productModel = @model', | |
| parameters=[{"name": "@model", "value": model_value}], | |
| enable_cross_partition_query=True | |
| ) | |
| for item in discontinued_items: | |
| st.json(item) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_get_db_properties(): | |
| st.markdown("### Demo: Get Database Properties") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_props") | |
| if st.button("Get Properties", key="btn_db_props"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| props = database.read() | |
| st.json(props) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_get_throughput(): | |
| st.markdown("### Demo: Get Throughput (Database & Container)") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_throughput") | |
| container_name = st.text_input("Enter Container Name (for container throughput)", value="testContainer", key="demo_container_throughput") | |
| if st.button("Get Throughput", key="btn_get_throughput"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| db_offer = database.get_throughput() | |
| st.write(f"Database Offer: {db_offer.properties['id']} with throughput {db_offer.properties['content']['offerThroughput']}") | |
| try: | |
| container = database.get_container_client(container_name) | |
| container_offer = container.get_throughput() | |
| st.write(f"Container Offer: {container_offer.properties['id']} with throughput {container_offer.properties['content']['offerThroughput']}") | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"Container throughput error: {str(e)}") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_modify_container_properties(): | |
| st.markdown("### Demo: Modify Container Properties (Set default TTL)") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_modify") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_modify") | |
| new_ttl = st.number_input("Enter new default TTL (seconds)", min_value=0, value=10, key="demo_new_ttl") | |
| if st.button("Modify Container", key="btn_modify_container"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.get_container_client(container_name) | |
| # Note: replace_container is used here as per your snippet. | |
| database.replace_container( | |
| container, | |
| partition_key=PartitionKey(path="/productName"), | |
| default_ttl=new_ttl, | |
| ) | |
| container_props = container.read() | |
| st.write("New default TTL:", container_props.get("defaultTtl")) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| def demo_item_response_headers(): | |
| st.markdown("### Demo: Using Item Point Operation Response Headers") | |
| database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_headers") | |
| container_name = st.text_input("Enter Container Name", value="products", key="demo_container_headers") | |
| if st.button("Create Item & Show Headers", key="btn_item_headers"): | |
| url = os.environ.get("ACCOUNT_URI") | |
| key_env = os.environ.get("ACCOUNT_KEY") | |
| if not url or not key_env: | |
| st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
| return | |
| client_demo = CosmosClient(url, credential=key_env) | |
| try: | |
| database = client_demo.get_database_client(database_name) | |
| container = database.get_container_client(container_name) | |
| operation_response = container.create_item({"id": "test_item", "productName": "test_item"}) | |
| headers = operation_response.get_response_headers() | |
| st.write("ETag:", headers.get("etag")) | |
| st.write("Request Charge:", headers.get("x-ms-request-charge")) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| # ============================================================================= | |
| # βββββββββββββ FILE & MEDIA MANAGEMENT FUNCTIONS βββββββββββββ | |
| # ============================================================================= | |
| def display_saved_files_in_sidebar(): | |
| all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True) | |
| st.sidebar.markdown("## π Files") | |
| for file in all_files: | |
| col1, col2, col3 = st.sidebar.columns([6, 2, 1]) | |
| with col1: | |
| st.markdown(f"π {file}") | |
| with col2: | |
| st.sidebar.download_button( | |
| label="β¬οΈ", | |
| data=open(file, 'rb').read(), | |
| file_name=file | |
| ) | |
| with col3: | |
| if st.sidebar.button("π", key=f"delete_{file}"): | |
| os.remove(file) | |
| st.rerun() | |
| def display_file_viewer(file_path): | |
| content = load_file(file_path) | |
| if content: | |
| st.markdown("### π File Viewer") | |
| st.markdown(f"**{file_path}**") | |
| file_stats = os.stat(file_path) | |
| st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes") | |
| st.markdown("---") | |
| st.markdown(content) | |
| st.download_button("β¬οΈ", data=content, file_name=os.path.basename(file_path), mime="text/markdown") | |
| def display_file_editor(file_path): | |
| if 'file_content' not in st.session_state: | |
| st.session_state.file_content = {} | |
| if file_path not in st.session_state.file_content: | |
| content = load_file(file_path) | |
| if content is not None: | |
| st.session_state.file_content[file_path] = content | |
| else: | |
| return | |
| st.markdown("### βοΈ Edit File") | |
| st.markdown(f"**Editing:** {file_path}") | |
| md_tab, code_tab = st.tabs(["Markdown", "Code"]) | |
| with md_tab: | |
| st.markdown(st.session_state.file_content[file_path]) | |
| with code_tab: | |
| new_content = st.text_area("Edit:", value=st.session_state.file_content[file_path], height=400, key=f"editor_{hash(file_path)}", on_change=lambda: None) | |
| col1, col2 = st.columns([1, 5]) | |
| with col1: | |
| if st.button("πΎ Save"): | |
| sanitized = sanitize_json_text(new_content) | |
| try: | |
| json.loads(sanitized) | |
| except Exception as e: | |
| st.error(f"Sanitization failed: {str(e)}") | |
| return | |
| if save_file_content(file_path, sanitized): | |
| st.session_state.file_content[file_path] = sanitized | |
| st.success("Saved! π") | |
| time.sleep(1) | |
| st.rerun() | |
| with col2: | |
| st.download_button("β¬οΈ", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown") | |
| def save_file_content(file_path, content): | |
| try: | |
| with open(file_path, 'w', encoding='utf-8') as file: | |
| file.write(content) | |
| return True | |
| except Exception as e: | |
| st.error(f"Save error: {str(e)}") | |
| return False | |
| def update_file_management_section(): | |
| if 'file_view_mode' not in st.session_state: | |
| st.session_state.file_view_mode = None | |
| if 'current_file' not in st.session_state: | |
| st.session_state.current_file = None | |
| if 'file_content' not in st.session_state: | |
| st.session_state.file_content = {} | |
| all_files = sorted(glob.glob("*.md"), reverse=True) | |
| st.sidebar.title("π Files") | |
| if st.sidebar.button("π Delete All"): | |
| for file in all_files: | |
| os.remove(file) | |
| st.session_state.file_content = {} | |
| st.session_state.current_file = None | |
| st.session_state.file_view_mode = None | |
| st.rerun() | |
| if st.sidebar.button("β¬οΈ Download All"): | |
| zip_file = create_zip_of_files(all_files) | |
| st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) | |
| for file in all_files: | |
| col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1]) | |
| with col1: | |
| if st.button("π", key=f"view_{file}"): | |
| st.session_state.current_file = file | |
| st.session_state.file_view_mode = 'view' | |
| if file not in st.session_state.file_content: | |
| content = load_file(file) | |
| if content is not None: | |
| st.session_state.file_content[file] = content | |
| st.rerun() | |
| with col2: | |
| st.markdown(get_download_link(file), unsafe_allow_html=True) | |
| with col3: | |
| if st.button("π", key=f"edit_{file}"): | |
| st.session_state.current_file = file | |
| st.session_state.file_view_mode = 'edit' | |
| if file not in st.session_state.file_content: | |
| content = load_file(file) | |
| if content is not None: | |
| st.session_state.file_content[file] = content | |
| st.rerun() | |
| with col4: | |
| if st.button("π", key=f"delete_{file}"): | |
| os.remove(file) | |
| if file in st.session_state.file_content: | |
| del st.session_state.file_content[file] | |
| if st.session_state.current_file == file: | |
| st.session_state.current_file = None | |
| st.session_state.file_view_mode = None | |
| st.rerun() | |
| st.sidebar.markdown("---") | |
| st.sidebar.title("External Help Links") | |
| for link in external_links: | |
| st.sidebar.markdown(f"{link['emoji']} [{link['title']}]({link['url']})", unsafe_allow_html=True) | |
| if st.session_state.current_file: | |
| if st.session_state.file_view_mode == 'view': | |
| display_file_viewer(st.session_state.current_file) | |
| elif st.session_state.file_view_mode == 'edit': | |
| display_file_editor(st.session_state.current_file) | |
| # ============================================================================= | |
| # βββββββββββββ SIDEBAR DATA GRID (Records with formatted timestamps) βββββββββββββ | |
| # ============================================================================= | |
| def show_sidebar_data_grid(): | |
| if st.session_state.get("current_container"): | |
| try: | |
| records = get_documents(st.session_state.current_container) | |
| data = [] | |
| for rec in records: | |
| ts = rec.get("timestamp", "") | |
| try: | |
| dt = datetime.fromisoformat(ts) | |
| formatted = dt.strftime("%I:%M %p %m/%d/%Y") | |
| except Exception: | |
| formatted = ts | |
| data.append({ | |
| "ID": rec.get("id", ""), | |
| "Name": rec.get("name", ""), | |
| "Timestamp": formatted | |
| }) | |
| df = pd.DataFrame(data) | |
| st.sidebar.markdown("### π Data Grid") | |
| st.sidebar.dataframe(df[["Name", "Timestamp"]]) | |
| except Exception as e: | |
| st.sidebar.error(f"Data grid error: {str(e)}") | |
| else: | |
| st.sidebar.info("No container selected for data grid.") | |
| # ============================================================================= | |
| # βββββββββββββ DOCUMENTS LIST VIEW (Editable List with Sorting) βββββββββββββ | |
| # ============================================================================= | |
| def edit_documents_list(container): | |
| records = get_documents(container) | |
| sort_option = st.selectbox("Sort by", ["Timestamp", "Name"], key="sort_option") | |
| if sort_option == "Name": | |
| records.sort(key=lambda r: r.get("name", "").lower()) | |
| else: | |
| records.sort(key=lambda r: r.get("timestamp", ""), reverse=True) | |
| data = [] | |
| for rec in records: | |
| ts = rec.get("timestamp", "") | |
| try: | |
| dt = datetime.fromisoformat(ts) | |
| formatted = dt.strftime("%I:%M %p %m/%d/%Y") | |
| except Exception: | |
| formatted = ts | |
| data.append({ | |
| "ID": rec.get("id", ""), | |
| "Name": rec.get("name", ""), | |
| "Content": rec.get("content", "")[:100] + "..." if rec.get("content", "") else "", | |
| "Timestamp": formatted | |
| }) | |
| df = pd.DataFrame(data) | |
| edited_df = st.data_editor(df[["Name", "Content", "Timestamp"]], key="docs_editor", num_rows="dynamic") | |
| if st.button("πΎ Save List Changes"): | |
| for idx, row in edited_df.iterrows(): | |
| original = data[idx] | |
| if row["Name"] != original["Name"] or row["Content"] != original["Content"]: | |
| doc_id = original["ID"] | |
| doc = next((r for r in records if r.get("id") == doc_id), None) | |
| if doc: | |
| doc["name"] = row["Name"] | |
| doc["content"] = row["Content"] | |
| success, message = update_record(container, doc) | |
| if success: | |
| st.success(f"Updated {doc_id} π") | |
| else: | |
| st.error(f"Error updating {doc_id}: {message}") | |
| st.rerun() | |
| # ============================================================================= | |
| # βββββββββββββ VIDEO & AUDIO UI FUNCTIONS βββββββββββββ | |
| # ============================================================================= | |
| def validate_and_preprocess_image(file_data, target_size=(576, 1024)): | |
| try: | |
| st.write("Preprocessing image...") | |
| if isinstance(file_data, bytes): | |
| img = Image.open(io.BytesIO(file_data)) | |
| elif hasattr(file_data, 'read'): | |
| if hasattr(file_data, 'seek'): | |
| file_data.seek(0) | |
| img = Image.open(file_data) | |
| elif isinstance(file_data, Image.Image): | |
| img = file_data | |
| else: | |
| raise ValueError(f"Unsupported input: {type(file_data)}") | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| aspect_ratio = img.size[0] / img.size[1] | |
| if aspect_ratio > target_size[0] / target_size[1]: | |
| new_width = target_size[0] | |
| new_height = int(new_width / aspect_ratio) | |
| else: | |
| new_height = target_size[1] | |
| new_width = int(new_height * aspect_ratio) | |
| new_width = (new_width // 2) * 2 | |
| new_height = (new_height // 2) * 2 | |
| resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| final_img = Image.new('RGB', target_size, (255, 255, 255)) | |
| paste_x = (target_size[0] - new_width) // 2 | |
| paste_y = (target_size[1] - new_height) // 2 | |
| final_img.paste(resized_img, (paste_x, paste_y)) | |
| return final_img | |
| except Exception as e: | |
| st.error(f"Image error: {str(e)}") | |
| return None | |
| def add_video_generation_ui(container): | |
| st.markdown("### π₯ Video Gen") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| uploaded_file = st.file_uploader("Upload Image πΌοΈ", type=['png', 'jpg', 'jpeg']) | |
| with col2: | |
| st.markdown("#### Params") | |
| motion = st.slider("π Motion", 1, 255, 127) | |
| fps = st.slider("π¬ FPS", 1, 30, 6) | |
| with st.expander("Advanced"): | |
| use_custom = st.checkbox("Custom Seed") | |
| seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None | |
| if uploaded_file is not None: | |
| try: | |
| file_data = uploaded_file.read() | |
| preview1, preview2 = st.columns(2) | |
| with preview1: | |
| st.write("Original") | |
| st.image(Image.open(io.BytesIO(file_data)), use_column_width=True) | |
| with preview2: | |
| proc_img = validate_and_preprocess_image(io.BytesIO(file_data)) | |
| if proc_img: | |
| st.write("Processed") | |
| st.image(proc_img, use_column_width=True) | |
| else: | |
| st.error("Preprocess failed") | |
| return | |
| if st.button("π₯ Generate"): | |
| with st.spinner("Generating video..."): | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file: | |
| proc_img.save(temp_file.name, format='PNG') | |
| try: | |
| client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN")) | |
| result = client.predict( | |
| image=temp_file.name, | |
| seed=seed if seed is not None else int(time.time() * 1000), | |
| randomize_seed=seed is None, | |
| motion_bucket_id=motion, | |
| fps_id=fps, | |
| api_name="/video" | |
| ) | |
| if result and isinstance(result, tuple) and len(result) >= 1: | |
| video_path = result[0].get('video') if isinstance(result[0], dict) else None | |
| if video_path and os.path.exists(video_path): | |
| video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" | |
| shutil.copy(video_path, video_filename) | |
| st.success(f"Video generated! π") | |
| st.video(video_filename) | |
| if container: | |
| video_record = { | |
| "id": generate_unique_id(), | |
| "pk": generate_unique_id(), | |
| "type": "generated_video", | |
| "filename": video_filename, | |
| "seed": seed if seed is not None else "random", | |
| "motion": motion, | |
| "fps": fps, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| success, message = insert_record(container, video_record) | |
| if success: | |
| st.success("DB record saved!") | |
| else: | |
| st.error(f"DB error: {message}") | |
| else: | |
| st.error("Invalid result format") | |
| else: | |
| st.error("No result returned") | |
| except Exception as e: | |
| st.error(f"Video gen error: {str(e)}") | |
| finally: | |
| try: | |
| os.unlink(temp_file.name) | |
| st.write("Temp file removed") | |
| except Exception as e: | |
| st.warning(f"Cleanup error: {str(e)}") | |
| except Exception as e: | |
| st.error(f"Upload error: {str(e)}") | |
| # ============================================================================= | |
| # βββββββββββββ NEW ITEM & FIELD FUNCTIONS βββββββββββββ | |
| # ============================================================================= | |
| def new_item_default(container): | |
| new_id = generate_unique_id() | |
| default_doc = { | |
| "id": new_id, | |
| "pk": new_id, | |
| "name": "New Sample Document", | |
| "content": "Start editing your document here...", | |
| "timestamp": datetime.now().isoformat(), | |
| "type": "sample" | |
| } | |
| success, message = insert_record(container, default_doc) | |
| if success: | |
| st.success("New sample document created! β¨") | |
| return default_doc | |
| else: | |
| st.error("Error creating new item: " + message) | |
| return None | |
| def auto_save_edit(): | |
| try: | |
| edited_str = st.session_state.doc_editor | |
| try: | |
| json.loads(edited_str) | |
| except Exception: | |
| edited_str = sanitize_json_text(edited_str) | |
| edited_doc = json.loads(edited_str) | |
| container = st.session_state.current_container | |
| container.upsert_item(edited_doc) | |
| st.success("Auto-saved! πΎ") | |
| except Exception as e: | |
| st.error(f"Auto-save error: {str(e)}") | |
| def add_field_to_doc(): | |
| key = st.session_state.new_field_key | |
| value = st.session_state.new_field_value | |
| try: | |
| doc = json.loads(st.session_state.doc_editor) | |
| doc[key] = value | |
| st.session_state.doc_editor = json.dumps(doc, indent=2) | |
| auto_save_edit() | |
| st.success(f"Added field {key} π") | |
| except Exception as e: | |
| st.error(f"Error adding field: {str(e)}") | |
| # ============================================================================= | |
| # βββββββββββββ VECTOR SEARCH INTERFACE βββββββββββββ | |
| # ============================================================================= | |
| def vector_keyword_search(keyword, container): | |
| try: | |
| query = f"SELECT * FROM c WHERE CONTAINS(c.content, '{keyword}')" | |
| results = list(container.query_items(query=query, enable_cross_partition_query=True)) | |
| return results | |
| except Exception as e: | |
| st.error(f"Vector search error: {str(e)}") | |
| return [] | |
| def display_search_results(keyword, container): | |
| results = vector_keyword_search(keyword, container) | |
| st.markdown("### π Search Results") | |
| for res in results: | |
| doc_id = res.get("id", "") | |
| exp = st.expander(f"Result {doc_id}") | |
| with exp: | |
| edited = st.text_area("Edit Document", value=json.dumps(res, indent=2), key=f"search_{doc_id}") | |
| if st.button(f"πΎ Save changes for {doc_id}", key=f"save_search_{doc_id}"): | |
| try: | |
| updated_doc = json.loads(edited) | |
| container.upsert_item(body=updated_doc) | |
| st.success(f"Updated {doc_id}!") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error saving {doc_id}: {str(e)}") | |
| # ============================================================================= | |
| # βββββββββββββ NEW AI MODALITY RECORD TEMPLATES βββββββββββββ | |
| # ============================================================================= | |
| def new_ai_record(container): | |
| new_id = generate_unique_id() | |
| default_doc = { | |
| "id": new_id, | |
| "pk": new_id, | |
| "name": "AI Modality Record", | |
| "function_url": "https://example.com/function", | |
| "input_text": "### Input (markdown)\n\nType your input here.", | |
| "output_text": "### Output (markdown)\n\nResult will appear here.", | |
| "timestamp": datetime.now().isoformat(), | |
| "type": "ai_modality" | |
| } | |
| success, message = insert_record(container, default_doc) | |
| if success: | |
| st.success("New AI modality record created! π‘") | |
| return default_doc | |
| else: | |
| st.error("Error creating AI record: " + message) | |
| return None | |
| def new_links_record(container): | |
| new_id = generate_unique_id() | |
| links_md = "\n".join([f"- {link['emoji']} [{link['title']}]({link['url']})" for link in external_links]) | |
| default_doc = { | |
| "id": new_id, | |
| "pk": new_id, | |
| "name": "Portal Links Record", | |
| "function_url": "", | |
| "input_text": links_md, | |
| "output_text": "", | |
| "timestamp": datetime.now().isoformat(), | |
| "type": "ai_modality" | |
| } | |
| success, message = insert_record(container, default_doc) | |
| if success: | |
| st.success("New Portal Links record created! π") | |
| return default_doc | |
| else: | |
| st.error("Error creating links record: " + message) | |
| return None | |
| # ============================================================================= | |
| # βββββββββββββ SIDEBAR DATA GRID (Editable Names Grid) βββββββββββββ | |
| # ============================================================================= | |
| def edit_names_grid(container): | |
| records = get_documents(container) | |
| data = [] | |
| for rec in records: | |
| ts = rec.get("timestamp", "") | |
| try: | |
| dt = datetime.fromisoformat(ts) | |
| formatted = dt.strftime("%I:%M %p %m/%d/%Y") | |
| except Exception: | |
| formatted = ts | |
| data.append({ | |
| "ID": rec.get("id", ""), | |
| "Name": rec.get("name", ""), | |
| "Timestamp": formatted | |
| }) | |
| df = pd.DataFrame(data) | |
| edited_df = st.sidebar.data_editor(df[["Name", "Timestamp"]], key="names_editor", num_rows="dynamic") | |
| if st.sidebar.button("πΎ Save Name Changes"): | |
| for idx, row in edited_df.iterrows(): | |
| original = df.iloc[idx] | |
| if row["Name"] != original["Name"]: | |
| doc_id = original["ID"] | |
| doc = next((r for r in records if r.get("id") == doc_id), None) | |
| if doc: | |
| doc["name"] = row["Name"] | |
| success, message = update_record(container, doc) | |
| if success: | |
| st.sidebar.success(f"Updated Name for {doc_id} to '{row['Name']}'") | |
| else: | |
| st.sidebar.error(f"Update error for {doc_id}: {message}") | |
| st.rerun() | |
| # ============================================================================= | |
| # βββββββββββββ SEARCH DOCUMENTS UI βββββββββββββ | |
| # This function was missing and is now defined. | |
| # ============================================================================= | |
| def search_documents_ui(container): | |
| with st.sidebar.form("search_form"): | |
| keyword = st.text_input("Search Keyword", key="search_keyword") | |
| submitted = st.form_submit_button("π Search") | |
| if submitted and keyword: | |
| display_search_results(keyword, container) | |
| # ============================================================================= | |
| # βββββββββββββ MAIN FUNCTION βββββββββββββ | |
| # ============================================================================= | |
| def main(): | |
| st.markdown("### π GitCosmos - Cosmos & Git Hub") | |
| st.markdown(f"[π Portal]({CosmosDBUrl})") | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| st.session_state.setdefault("current_container", None) | |
| if Key: | |
| st.session_state.primary_key = Key | |
| st.session_state.logged_in = True | |
| else: | |
| st.error("Missing Cosmos Key πβ") | |
| return | |
| # ββ New: CosmosDB Demo Features Section in Sidebar ββ | |
| st.sidebar.markdown("## CosmosDB Demo Features") | |
| demo_feature = st.sidebar.selectbox("Select a Demo", | |
| ["Select", "Create Database", "Create Container", "Create Analytical Container", | |
| "Get Existing Container", "Insert Data", "Delete Data", "Query Database", | |
| "Parameterized Query", "Get Database Properties", "Get Throughput", | |
| "Modify Container Properties", "Item Response Headers"], | |
| key="demo_select") | |
| if demo_feature != "Select": | |
| if demo_feature == "Create Database": | |
| demo_create_database() | |
| elif demo_feature == "Create Container": | |
| demo_create_container() | |
| elif demo_feature == "Create Analytical Container": | |
| demo_create_analytical_container() | |
| elif demo_feature == "Get Existing Container": | |
| demo_get_existing_container() | |
| elif demo_feature == "Insert Data": | |
| demo_insert_data() | |
| elif demo_feature == "Delete Data": | |
| demo_delete_data() | |
| elif demo_feature == "Query Database": | |
| demo_query_database() | |
| elif demo_feature == "Parameterized Query": | |
| demo_parameterized_query() | |
| elif demo_feature == "Get Database Properties": | |
| demo_get_db_properties() | |
| elif demo_feature == "Get Throughput": | |
| demo_get_throughput() | |
| elif demo_feature == "Modify Container Properties": | |
| demo_modify_container_properties() | |
| elif demo_feature == "Item Response Headers": | |
| demo_item_response_headers() | |
| # ββ Existing Sidebar Items (Item Management, File Management, etc.) | |
| st.sidebar.markdown("## π οΈ Item Management") | |
| if st.sidebar.button("New Item"): | |
| if st.session_state.get("current_container"): | |
| new_doc = new_item_default(st.session_state.current_container) | |
| if new_doc: | |
| st.session_state.doc_editor = json.dumps(new_doc, indent=2) | |
| else: | |
| st.warning("No container selected!") | |
| st.sidebar.text_input("New Field Key", key="new_field_key") | |
| st.sidebar.text_input("New Field Value", key="new_field_value") | |
| if st.sidebar.button("Add Field"): | |
| if "doc_editor" in st.session_state: | |
| add_field_to_doc() | |
| else: | |
| st.warning("No document loaded to add a field.") | |
| if st.sidebar.button("New AI Record"): | |
| if st.session_state.get("current_container"): | |
| new_ai_record(st.session_state.current_container) | |
| else: | |
| st.warning("No container selected!") | |
| if st.sidebar.button("New Links Record"): | |
| if st.session_state.get("current_container"): | |
| new_links_record(st.session_state.current_container) | |
| else: | |
| st.warning("No container selected!") | |
| st.sidebar.markdown("## π Vector Search") | |
| search_documents_ui(st.session_state.get("current_container")) | |
| show_sidebar_data_grid() | |
| # (Your remaining navigation, file management, and document editing UI follow) | |
| try: | |
| if st.session_state.get("client") is None: | |
| st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) | |
| st.sidebar.title("π Navigator") | |
| databases = get_databases(st.session_state.client) | |
| selected_db = st.sidebar.selectbox("ποΈ DB", databases) | |
| if selected_db != st.session_state.get("selected_database"): | |
| st.session_state.selected_database = selected_db | |
| st.session_state.selected_container = None | |
| st.session_state.selected_document_id = None | |
| st.session_state.current_index = 0 | |
| st.rerun() | |
| if st.session_state.selected_database: | |
| database = st.session_state.client.get_database_client(st.session_state.selected_database) | |
| if "show_new_container_form" not in st.session_state: | |
| st.session_state.show_new_container_form = False | |
| if st.sidebar.button("π New Container"): | |
| st.session_state.show_new_container_form = True | |
| if st.session_state.show_new_container_form: | |
| with st.sidebar.form("new_container_form"): | |
| new_container_id = st.text_input("Container ID", value="aiml-container") | |
| new_partition_key = st.text_input("Partition Key", value="/pk") | |
| new_analytical = st.checkbox("Enable Analytical Store", value=True) | |
| submitted = st.form_submit_button("Create Container") | |
| if submitted: | |
| analytical_ttl = -1 if new_analytical else None | |
| # Assuming create_new_container is defined in your advanced functions: | |
| new_container = database.create_container( | |
| id=new_container_id, | |
| partition_key=PartitionKey(path=new_partition_key), | |
| analytical_storage_ttl=analytical_ttl | |
| ) | |
| if new_container: | |
| st.success(f"Container '{new_container_id}' created.") | |
| default_id = generate_unique_id() | |
| default_item = { | |
| "id": default_id, | |
| "pk": default_id, | |
| "name": "Default Image Prompt", | |
| "prompt": "Enter your image prompt here", | |
| "timestamp": datetime.now().isoformat(), | |
| "type": "image_prompt" | |
| } | |
| insert_success, insert_message = insert_record(new_container, default_item) | |
| if insert_success: | |
| st.info("Default templated item created in new container.") | |
| else: | |
| st.error(f"Default item insertion error: {insert_message}") | |
| st.session_state.show_new_container_form = False | |
| st.session_state.new_container_created = new_container_id | |
| st.rerun() | |
| containers = get_containers(database) | |
| if "new_container_created" in st.session_state and st.session_state.new_container_created not in containers: | |
| containers.append(st.session_state.new_container_created) | |
| selected_container = st.sidebar.selectbox("π Container", containers) | |
| if selected_container != st.session_state.get("selected_container"): | |
| st.session_state.selected_container = selected_container | |
| st.session_state.selected_document_id = None | |
| st.session_state.current_index = 0 | |
| st.rerun() | |
| if st.session_state.selected_container: | |
| container = database.get_container_client(st.session_state.selected_container) | |
| st.session_state.current_container = container | |
| if st.sidebar.button("π¦ Export"): | |
| download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) | |
| if download_link.startswith('<a'): | |
| st.markdown(download_link, unsafe_allow_html=True) | |
| else: | |
| st.error(download_link) | |
| documents = get_documents(container) | |
| total_docs = len(documents) | |
| num_docs = st.slider("Docs", 1, 20, 1) | |
| documents_to_display = documents[:num_docs] if total_docs > num_docs else documents | |
| st.sidebar.info(f"Showing {len(documents_to_display)} docs") | |
| view_options = ['List', 'Markdown', 'Code', 'Run AI', 'Clone', 'New'] | |
| selected_view = st.sidebar.selectbox("View", view_options, index=1) | |
| if selected_view == 'List': | |
| edit_documents_list(container) | |
| elif selected_view == 'Markdown': | |
| st.markdown("#### π Markdown") | |
| if documents: | |
| doc = documents[st.session_state.current_index] | |
| content = json.dumps(doc, indent=2) | |
| st.markdown(f"```json\n{content}\n```") | |
| col_prev, col_next = st.columns(2) | |
| with col_prev: | |
| if st.button("β¬ οΈ") and st.session_state.current_index > 0: | |
| st.session_state.current_index -= 1 | |
| st.rerun() | |
| with col_next: | |
| if st.button("β‘οΈ") and st.session_state.current_index < total_docs - 1: | |
| st.session_state.current_index += 1 | |
| st.rerun() | |
| elif selected_view == 'Code': | |
| st.markdown("#### π» Code Editor") | |
| if documents: | |
| doc = documents[st.session_state.current_index] | |
| if "doc_editor" not in st.session_state: | |
| st.session_state.doc_editor = json.dumps(doc, indent=2) | |
| edited = st.text_area("Edit JSON", value=st.session_state.doc_editor, height=300, key="doc_editor", on_change=lambda: auto_save_edit()) | |
| col_prev, col_next = st.columns(2) | |
| with col_prev: | |
| if st.button("β¬ οΈ") and st.session_state.current_index > 0: | |
| st.session_state.current_index -= 1 | |
| st.rerun() | |
| with col_next: | |
| if st.button("β‘οΈ") and st.session_state.current_index < total_docs - 1: | |
| st.session_state.current_index += 1 | |
| st.rerun() | |
| col_save, col_delete = st.columns(2) | |
| with col_save: | |
| if st.button("πΎ Save", key=f'save_{st.session_state.current_index}'): | |
| try: | |
| updated_doc = json.loads(edited) | |
| container.upsert_item(body=updated_doc) | |
| st.success(f"Saved {updated_doc['id']}") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Save err: {str(e)}") | |
| with col_delete: | |
| if st.button("ποΈ Delete", key=f'delete_{st.session_state.current_index}'): | |
| try: | |
| current_doc = json.loads(edited) | |
| success, message = delete_record(container, current_doc) | |
| if success: | |
| st.success(message) | |
| st.rerun() | |
| else: | |
| st.error(message) | |
| except Exception as e: | |
| st.error(f"Delete err: {str(e)}") | |
| elif selected_view == 'Run AI': | |
| st.markdown("#### π€ Run AI") | |
| ai_query = st.text_area("Enter your query for ArXiv search:", key="arxiv_query", height=100) | |
| if st.button("Send"): | |
| st.session_state.last_query = ai_query | |
| # perform_ai_lookup would be your AI function call. | |
| st.info("Performing AI lookup (function not implemented in this demo)...") | |
| elif selected_view == 'Clone': | |
| st.markdown("#### π Clone") | |
| if documents: | |
| doc = documents[st.session_state.current_index] | |
| st.markdown(f"Original ID: {doc.get('id', '')}") | |
| new_id = st.text_input("New ID", value=generate_unique_id(), key='new_clone_id') | |
| new_name = st.text_input("New Name", value=f"Clone_{new_id[:8]}", key='new_clone_name') | |
| new_doc = {'id': new_id, 'pk': new_id, 'name': new_name, **{k: v for k, v in doc.items() if k not in ['id', 'name', 'pk', '_rid', '_self', '_etag', '_attachments', '_ts']}} | |
| doc_str = st.text_area("Edit JSON", value=json.dumps(new_doc, indent=2), height=300, key='clone_preview') | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("π Regenerate"): | |
| new_id = generate_unique_id() | |
| st.session_state.new_clone_id = new_id | |
| st.rerun() | |
| with col2: | |
| if st.button("πΎ Save Clone"): | |
| try: | |
| final_doc = json.loads(doc_str) | |
| for field in ['_rid', '_self', '_etag', '_attachments', '_ts']: | |
| final_doc.pop(field, None) | |
| container.create_item(body=final_doc) | |
| st.success(f"Cloned {final_doc['id']}") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Clone err: {str(e)}") | |
| col_prev, col_next = st.columns(2) | |
| with col_prev: | |
| if st.button("β¬ οΈ") and st.session_state.current_index > 0: | |
| st.session_state.current_index -= 1 | |
| st.rerun() | |
| with col_next: | |
| if st.button("β‘οΈ") and st.session_state.current_index < total_docs - 1: | |
| st.session_state.current_index += 1 | |
| st.rerun() | |
| elif selected_view == 'New': | |
| st.markdown("#### β New Doc") | |
| if st.button("π€ Auto-Gen"): | |
| auto_doc = { | |
| "id": generate_unique_id(), | |
| "pk": generate_unique_id(), | |
| "name": f"Auto {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| "content": "Auto-generated record.", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| success, message = insert_record(container, auto_doc) | |
| if success: | |
| st.success(message) | |
| st.rerun() | |
| else: | |
| st.error(message) | |
| else: | |
| new_id = st.text_input("ID", value=generate_unique_id(), key='new_id') | |
| default_doc = { | |
| "id": new_id, | |
| "pk": new_id, | |
| "name": "New Doc", | |
| "content": "", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| new_doc_str = st.text_area("JSON", value=json.dumps(default_doc, indent=2), height=300) | |
| if st.button("β Create"): | |
| try: | |
| cleaned = preprocess_text(new_doc_str) | |
| new_doc = json.loads(cleaned) | |
| new_doc['id'] = new_id | |
| new_doc['pk'] = new_id | |
| success, message = insert_record(container, new_doc) | |
| if success: | |
| st.success(f"Created {new_doc['id']}") | |
| st.rerun() | |
| else: | |
| st.error(message) | |
| except Exception as e: | |
| st.error(f"Create err: {str(e)}") | |
| st.subheader(f"π {st.session_state.selected_container}") | |
| if documents_to_display: | |
| df = pd.DataFrame(documents_to_display) | |
| st.dataframe(df) | |
| else: | |
| st.info("No docs.") | |
| update_file_management_section() | |
| except exceptions.CosmosHttpResponseError as e: | |
| st.error(f"Cosmos error: {str(e)} π¨") | |
| except Exception as e: | |
| st.error(f"Error: {str(e)} π±") | |
| if st.session_state.logged_in and st.sidebar.button("πͺ Logout"): | |
| st.markdown("#### πͺ Logout") | |
| st.session_state.logged_in = False | |
| st.session_state.selected_records = [] | |
| st.session_state.client = None | |
| st.session_state.selected_database = None | |
| st.session_state.selected_container = None | |
| st.session_state.selected_document_id = None | |
| st.session_state.current_index = 0 | |
| st.rerun() | |
| show_sidebar_data_grid() | |
| if __name__ == "__main__": | |
| main() | |