"""
Gemini Web UI Tools
===================
This module contains reusable utility functions and core logic for the Gemini Web UI.
"""
import colorsys
import hashlib
import io
import json
import logging
import mimetypes
import os
import re
import subprocess
import webbrowser
from pathlib import Path
from typing import Any, Final
from collections.abc import Generator
import streamlit as st
from PIL import Image, UnidentifiedImageError
from streamlit.delta_generator import DeltaGenerator
from streamlit.runtime.uploaded_file_manager import UploadedFile
from gwebui.schemas import ChatMessage, SessionInfo, StreamEvent, ToolStat
# Pre-compiled regex patterns
SAFE_KEY_PATTERN: Final[re.Pattern[str]] = re.compile(r"[^A-Za-z0-9_-]")
FILENAME_SAFE_PATTERN: Final[re.Pattern[str]] = re.compile(r"[^A-Za-z0-9._-]+")
JSON_MATCH_PATTERN: Final[re.Pattern[str]] = re.compile(
r"\{.*\}", flags=re.DOTALL
)
# File extension to emoji mapping
FILE_EMOJI_MAP: dict[str, str] = {
".pdf": "đ",
".py": "đ",
".txt": "đ",
".md": "đ",
".rst": "đ",
".csv": "đ",
".json": "đ",
".xlsx": "đ",
".xls": "đ",
".png": "đŧī¸",
".jpg": "đŧī¸",
".jpeg": "đŧī¸",
".gif": "đŧī¸",
".svg": "đŧī¸",
".webp": "đŧī¸",
".zip": "đĻ",
".tar": "đĻ",
".gz": "đĻ",
".7z": "đĻ",
".rar": "đĻ",
".mp3": "đĩ",
".wav": "đĩ",
".ogg": "đĩ",
".flac": "đĩ",
".mp4": "đĨ",
".mov": "đĨ",
".avi": "đĨ",
".mkv": "đĨ",
".html": "đ",
".css": "đ",
".js": "đ",
".ts": "đ",
".jsx": "đ",
".tsx": "đ",
}
[docs]
def get_file_emoji(path: Path) -> str:
"""Return an emoji based on the file type or extension."""
if path.is_dir():
return "đ"
return FILE_EMOJI_MAP.get(path.suffix.lower(), "đ")
[docs]
def open_in_browser(path: Path) -> None:
"""Open a file in the default web browser using its local URI."""
try:
webbrowser.open(path.absolute().as_uri())
except Exception as exc: # pragma: no cover
st.error(f"Failed to open {path.name} in browser: {exc}")
[docs]
def adjust_color_nuance(
hex_color: str,
saturation_factor: float = 0.5,
lightness_factor: float = 0.1,
) -> str:
"""Adjust the nuance of a colour by reducing saturation and shifting lightness."""
hex_color = hex_color.lstrip("#")
red, green, blue = tuple(
int(hex_color[i : i + 2], base=16) / 255.0 for i in (0, 2, 4)
)
hue, lightness, saturation = colorsys.rgb_to_hls(red, green, blue)
saturation *= saturation_factor
if lightness > 0.5:
lightness -= lightness_factor
else:
lightness += lightness_factor
lightness = max(0, min(1, lightness))
saturation = max(0, min(1, saturation))
red, green, blue = colorsys.hls_to_rgb(hue, lightness, saturation)
return "#{:02x}{:02x}{:02x}".format(
int(red * 255), int(green * 255), int(blue * 255)
)
[docs]
def get_text_color(hex_color: str) -> str:
"""Determine the best text colour (black or white) for a given background."""
hex_color = hex_color.lstrip("#")
r, g, b = tuple(int(hex_color[i : i + 2], 16) for i in (0, 2, 4))
luminance: float = (0.299 * r + 0.587 * g + 0.114 * b) / 255
return "#000000" if luminance > 0.5 else "#ffffff"
[docs]
def get_upload_dir() -> Path:
"""Get the directory for storing uploaded files."""
env_dir: str | None = os.environ.get("GEMINI_WEBUI_UPLOAD_DIR")
if env_dir:
path: Path = Path(env_dir)
path.mkdir(parents=True, exist_ok=True)
return path
upload_dir: Path = Path.cwd() / "uploads"
upload_dir.mkdir(parents=True, exist_ok=True)
return upload_dir
[docs]
def node_key(path: Path) -> str:
"""Create a key-safe identifier from a filesystem path."""
return SAFE_KEY_PATTERN.sub("_", path.as_posix())
[docs]
def iter_visible_children(
root: Path, include_hidden: bool = False
) -> list[Path]:
"""List visible children of a directory sorted with folders first."""
if not root.exists() or not root.is_dir():
return []
children: list[Path] = [
child
for child in root.iterdir()
if include_hidden or not child.name.startswith(".")
]
children.sort(key=lambda p: (not p.is_dir(), p.name.lower()))
return children
[docs]
def render_file_tree(
root: Path,
*,
container: DeltaGenerator,
allow_delete: bool = False,
include_hidden: bool = False,
key_prefix: str,
level: int = 0,
) -> None:
"""Render a recursive file tree using nested expanders and buttons."""
children: list[Path] = iter_visible_children(root, include_hidden)
if not children:
container.caption("Empty")
return
for child in children:
n_key: str = f"{key_prefix}_{node_key(child)}"
if child.is_dir():
expanded_key: str = f"exp_{n_key}"
expanded: bool = st.session_state.get(expanded_key, False)
caret: str = "âž" if expanded else "â¸"
indent_weight: float = max(level * 0.08, 0.001)
dir_cols: list[DeltaGenerator] = container.columns(
[indent_weight, 0.84, 0.16],
gap="small",
vertical_alignment="center",
)
with dir_cols[1]:
if st.button(
label=f"{caret} {get_file_emoji(child)} {child.name}",
key=f"dir_{n_key}",
use_container_width=True,
):
st.session_state[expanded_key] = not expanded
st.rerun()
expanded = st.session_state.get(expanded_key, False)
if expanded:
subtree: DeltaGenerator = container.container()
render_file_tree(
child,
container=subtree,
allow_delete=allow_delete,
include_hidden=include_hidden,
key_prefix=n_key,
level=level + 1,
)
else:
indent_weight: float = max(level * 0.08, 0.001)
cols_spec: list[float] = (
[indent_weight, 0.78, 0.22]
if allow_delete
else [indent_weight, 1.0]
)
cols: list[DeltaGenerator] = container.columns(
cols_spec, gap="small", vertical_alignment="center"
)
target_col: DeltaGenerator = cols[1] if len(cols) > 1 else cols[0]
with target_col:
if st.button(
label=f"{get_file_emoji(child)} {child.name}",
key=f"open_{n_key}",
use_container_width=True,
):
open_in_browser(child)
if allow_delete:
with cols[2]:
if st.button(
"đī¸",
key=f"del_{n_key}",
use_container_width=True,
):
child.unlink()
st.rerun()
[docs]
def resize_image_if_needed(file_bytes: bytes, filename: str) -> bytes:
"""Resize image to max 512px dimension if it is an image file."""
try:
valid_exts: list[str] = [
".jpg",
".jpeg",
".png",
".webp",
".bmp",
".gif",
".heic",
".heif",
".tiff",
".jfif",
]
if not any(filename.lower().endswith(ext) for ext in valid_exts):
return file_bytes
with Image.open(io.BytesIO(file_bytes)) as img:
orig_format: str | None = img.format
max_size: int = 512
if max(img.size) > max_size:
ratio: float = max_size / max(img.size)
new_size: tuple[int, int] = (
int(img.width * ratio),
int(img.height * ratio),
)
resample = getattr(
getattr(Image, "Resampling", Image), "LANCZOS"
)
resized_img: Image.Image = img.resize(new_size, resample)
save_format: str = orig_format if orig_format else "PNG"
if save_format == "JPEG" and resized_img.mode in (
"RGBA",
"LA",
"P",
):
resized_img = resized_img.convert("RGB")
out_buffer: io.BytesIO = io.BytesIO()
resized_img.save(out_buffer, format=save_format)
return out_buffer.getvalue()
except UnidentifiedImageError:
pass
except OSError as e:
logging.error(f"Image processing bound exception for {filename}: {e}")
except Exception as e:
logging.error(f"Unexpected image resizing error for {filename}: {e}")
return file_bytes
[docs]
def read_uploaded_file_bytes(uploaded_file: UploadedFile) -> bytes:
"""Read bytes from a Streamlit UploadedFile."""
getvalue: Any | None = getattr(uploaded_file, "getvalue", None)
if callable(getvalue):
return bytes(getvalue())
return bytes(uploaded_file.getbuffer())
[docs]
def safe_upload_filename(
original_name: str | None, mime_type: str | None
) -> str:
"""Derive a filesystem-safe filename from a browser-supplied name."""
candidate: str = (original_name or "").strip()
candidate = Path(candidate).name
candidate = FILENAME_SAFE_PATTERN.sub("_", candidate).strip("._")
if not candidate:
candidate = "upload"
suffix: str = Path(candidate).suffix
if not suffix and mime_type:
mime_main: str = mime_type.split(";", 1)[0].strip().lower()
ext_map: dict[str, str] = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/webp": ".webp",
"text/markdown": ".md",
"application/pdf": ".pdf",
}
ext: str | None = ext_map.get(mime_main)
if not ext:
ext = mimetypes.guess_extension(mime_main)
if ext:
candidate += ext
return candidate
[docs]
def parse_chat_submission(
chat_submission: Any,
) -> tuple[str, list[UploadedFile]]:
"""Parse Streamlit chat input submission."""
if isinstance(chat_submission, str):
return chat_submission, []
get: Any | None = getattr(chat_submission, "get", None)
if callable(get):
prompt_value: Any = (
(get("text") or "") or (get("value") or "") or (get("prompt") or "")
)
prompt: str = str(prompt_value).strip()
raw_files: Any = (
get("files") or get("uploaded_files") or get("attachments") or []
)
attached_files: list[UploadedFile] = (
list(raw_files) if isinstance(raw_files, (list, tuple)) else []
)
return prompt, attached_files
prompt_attr: Any = (
getattr(chat_submission, "text", None)
or getattr(chat_submission, "value", None)
or getattr(chat_submission, "prompt", None)
or ""
)
prompt = str(prompt_attr).strip()
raw_files = (
getattr(chat_submission, "files", None)
or getattr(chat_submission, "uploaded_files", None)
or getattr(chat_submission, "attachments", None)
or []
)
attached_files = (
list(raw_files) if isinstance(raw_files, (list, tuple)) else []
)
if not prompt:
prompt = str(chat_submission).strip()
return prompt, attached_files
[docs]
def get_project_hash(project_path: Path | None = None) -> str:
"""Compute the SHA256 hash of the project path for gemini-cli storage."""
if project_path is None:
project_path = Path.cwd()
return hashlib.sha256(
project_path.absolute().as_posix().encode("utf-8")
).hexdigest()
[docs]
def get_session_dir() -> Path:
"""Locate the gemini-cli session storage directory for the current project."""
return Path.home() / ".gemini" / "tmp" / get_project_hash() / "chats"
@st.cache_data(ttl=60)
def _cached_list_sessions(
session_dir_str: str, session_dir_mtime: float
) -> list[SessionInfo]:
session_dir: Path = Path(session_dir_str)
if not session_dir.exists():
return []
sessions: list[SessionInfo] = []
for f in session_dir.glob("session-*.json"):
try:
content: str = f.read_text(encoding="utf-8")
data: dict[str, Any] = json.loads(content)
messages: list[dict[str, Any]] = data.get("messages", [])
first_user_msg: str = next(
(
str(m.get("content", ""))
for m in messages
if m.get("type") == "user"
),
"New Chat",
)
title: str = (
(first_user_msg[:30] + "...")
if len(first_user_msg) > 30
else first_user_msg
)
timestamp: str = data.get("lastUpdated") or data.get(
"startTime", ""
)
sessions.append(
SessionInfo(
session_id=str(data.get("sessionId", "")),
title=title,
timestamp=timestamp,
file_path=str(f.absolute()),
)
)
except Exception:
continue
sessions.sort(key=lambda x: x.timestamp, reverse=True)
return sessions
[docs]
def list_available_sessions() -> list[SessionInfo]:
"""List all sessions available in the gemini-cli storage."""
session_dir: Path = get_session_dir()
if not session_dir.exists():
return []
try:
mtime = session_dir.stat().st_mtime
except OSError:
mtime = 0.0
return _cached_list_sessions(str(session_dir), mtime)
@st.cache_data(ttl=60)
def _cached_load_session(
session_id: str, session_dir_str: str, session_dir_mtime: float
) -> list[ChatMessage]:
session_dir: Path = Path(session_dir_str)
short_id: str = session_id[:8]
files: list[Path] = list(session_dir.glob(f"session-*-{short_id}.json"))
if not files:
return []
target_file: Path | None = None
for f in files:
try:
content = f.read_text(encoding="utf-8")
if session_id in content:
target_file = f
break
except Exception:
continue
if not target_file:
target_file = files[0]
try:
data: dict[str, Any] = json.loads(
target_file.read_text(encoding="utf-8")
)
messages: list[dict[str, Any]] = data.get("messages", [])
formatted_messages: list[ChatMessage] = []
pending_tool_calls: list[dict[str, Any]] = []
for m in messages:
msg_type: str = m.get("type", "unknown")
content: str = m.get("content", "")
role: str = "assistant"
if msg_type == "user":
role = "user"
elif msg_type == "gemini":
role = "assistant"
elif msg_type == "info":
content = f"âšī¸ *{content}*"
role = "assistant"
elif msg_type == "error":
content = f"â *{content}*"
role = "assistant"
model: str | None = m.get("model")
current_tool_calls = m.get("toolCalls", [])
if role == "assistant" and content.strip():
all_tools = pending_tool_calls + current_tool_calls
tool_stats: list[ToolStat] = []
if all_tools:
stats_map: dict[str, dict[str, int]] = {}
for t in all_tools:
name = t.get("name", "Unknown")
status = t.get("status", "unknown")
if name not in stats_map:
stats_map[name] = {
"success": 0,
"failure": 0,
"total": 0,
}
stats_map[name]["total"] += 1
if status == "success":
stats_map[name]["success"] += 1
else:
stats_map[name]["failure"] += 1
for k, v in stats_map.items():
pct = v["success"] / v["total"] if v["total"] > 0 else 0
status_color = (
"green"
if pct == 1.0
else ("red" if pct == 0 else "orange")
)
tool_stats.append(
ToolStat(
name=k, color=status_color, count=v["total"]
)
)
valid_role = (
role
if role in ("user", "assistant", "system")
else "assistant"
)
formatted_messages.append(
ChatMessage(
role=valid_role,
content=content,
model=model,
tools=tool_stats,
)
) # type: ignore
pending_tool_calls = []
elif role == "assistant" and not content.strip():
pending_tool_calls.extend(current_tool_calls)
else:
if role == "user":
pending_tool_calls = []
valid_role = (
role
if role in ("user", "assistant", "system")
else "assistant"
)
formatted_messages.append(
ChatMessage(role=valid_role, content=content, model=model)
) # type: ignore
return formatted_messages
except Exception as e:
logging.error(f"Failed to load session {session_id}: {e}")
return []
[docs]
def load_session_from_disk(session_id: str) -> list[ChatMessage]:
"""Load a session's message history from disk."""
session_dir: Path = get_session_dir()
try:
mtime = session_dir.stat().st_mtime
except OSError:
mtime = 0.0
return _cached_load_session(session_id, str(session_dir), mtime)
[docs]
def save_uploaded_files(
attached_files: list[UploadedFile], upload_dir: Path
) -> list[str]:
"""Save uploaded files to the specified directory."""
saved_files: list[str] = []
for uploaded_file in attached_files:
safe_name: str = safe_upload_filename(
getattr(uploaded_file, "name", None),
getattr(uploaded_file, "type", None),
)
file_path: Path = upload_dir / safe_name
file_bytes: bytes = read_uploaded_file_bytes(uploaded_file)
file_bytes = resize_image_if_needed(file_bytes, safe_name)
if file_path.exists():
stem: str = file_path.stem
suffix: str = file_path.suffix
i: int = 1
while True:
candidate: Path = upload_dir / f"{stem}_{i}{suffix}"
if not candidate.exists():
file_path = candidate
break
i += 1
with open(file_path, mode="wb") as f:
f.write(file_bytes)
saved_files.append(str(file_path.absolute()))
return saved_files
[docs]
def build_gemini_command(
prompt: str,
upload_dir: Path,
session_id: str | None,
allowed_tools: list[str],
model: str | None = None,
stream: bool = False,
) -> list[str]:
"""Construct the Gemini CLI command."""
output_format = "stream-json" if stream else "json"
cmd: list[str] = [
"gemini",
prompt,
"-o",
output_format,
"--include-directories",
str(upload_dir.absolute()),
"--allowed-tools",
",".join(allowed_tools),
]
if model:
cmd.extend(["--model", model])
if session_id is not None:
cmd.extend(["--resume", str(session_id)])
return cmd
[docs]
def run_gemini_cli_stream(cmd: list[str]) -> Generator[StreamEvent, None, None]:
"""Execute the Gemini CLI command and yield streaming events."""
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
encoding="utf-8",
)
if process.stdout:
for line in process.stdout:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
yield StreamEvent(**data)
except json.JSONDecodeError:
continue
except Exception as e:
logging.warning(f"Failed to parse StreamEvent: {e}")
continue
return_code = process.wait()
if return_code != 0:
stderr = process.stderr.read() if process.stderr else ""
yield StreamEvent(type="error", content=stderr, code=return_code)
except Exception as e:
yield StreamEvent(type="error", content=str(e))
[docs]
def delete_session(session_id: str) -> bool:
"""Delete a session file from disk."""
session_dir: Path = get_session_dir()
short_id: str = session_id[:8]
files: list[Path] = list(session_dir.glob(f"session-*-{short_id}.json"))
if not files:
return False
target_file: Path | None = None
for f in files:
try:
content = f.read_text(encoding="utf-8")
if session_id in content:
target_file = f
break
except Exception:
continue
if not target_file:
target_file = files[0]
try:
target_file.unlink()
return True
except Exception:
return False