Visualization Agent¶
The Visualization Agent creates interactive maps using leafmap's MapLibre backend.
geoagent.core.viz_agent
¶
Visualization Agent for creating geospatial maps and visualizations.
The Visualization Agent creates interactive MapLibre GL visualizations using leafmap's maplibregl backend for high-performance 3D mapping.
MockMapLibreMap
¶
Mock MapLibre map object when leafmap.maplibregl is not available.
Source code in geoagent/core/viz_agent.py
class MockMapLibreMap:
"""Mock MapLibre map object when leafmap.maplibregl is not available."""
def __init__(self, center=[0, 0], zoom=5, height="600px", **kwargs):
self.layers = []
self.center = center
self.zoom = zoom
self.height = height
self.title = ""
self._style = "open-street-map"
def set_center(self, lon, lat, zoom=None):
self.center = [lon, lat]
if zoom is not None:
self.zoom = zoom
def add_cog_layer(self, url, name=None, fit_bounds=False, **kwargs):
"""Add Cloud Optimized GeoTIFF layer."""
self.layers.append(
{
"type": "cog",
"url": url,
"name": name or f"COG Layer {len(self.layers)+1}",
"fit_bounds": fit_bounds,
**kwargs,
}
)
def add_raster(self, url, layer_name=None, fit_bounds=False, **kwargs):
"""Add raster layer (fallback to COG)."""
self.add_cog_layer(url, name=layer_name, fit_bounds=fit_bounds, **kwargs)
def add_geojson(self, data, layer_name=None, style=None, **kwargs):
"""Add GeoJSON layer."""
self.layers.append(
{
"type": "geojson",
"data": data,
"name": layer_name or f"GeoJSON Layer {len(self.layers)+1}",
"style": style,
**kwargs,
}
)
def add_pmtiles(self, url, name=None, **kwargs):
"""Add PMTiles vector layer."""
self.layers.append(
{
"type": "pmtiles",
"url": url,
"name": name or f"PMTiles Layer {len(self.layers)+1}",
**kwargs,
}
)
def add_basemap(self, basemap="open-street-map"):
"""Set basemap style."""
self._style = basemap
def add_layer(self, layer_dict):
"""Add generic layer."""
self.layers.append(layer_dict)
def add_source(self, source_id, source_dict):
"""Add data source."""
# Mock implementation
pass
def add_title(self, title):
"""Add title to map."""
self.title = title
def to_html(self, filename=None):
"""Export map to HTML."""
html = f"""
<div style="text-align: center;">
<h3>{self.title}</h3>
<p>Mock MapLibre Map</p>
<p>Center: {self.center}, Zoom: {self.zoom}</p>
<p>Layers: {len(self.layers)}</p>
<ul>
"""
for layer in self.layers:
html += f"<li>{layer.get('name', 'Unnamed')} ({layer.get('type', 'unknown')})</li>"
html += "</ul></div>"
if filename:
with open(filename, "w") as f:
f.write(html)
return html
def __repr__(self):
return f"MockMapLibreMap(center={self.center}, zoom={self.zoom}, layers={len(self.layers)})"
add_basemap(self, basemap='open-street-map')
¶
Set basemap style.
Source code in geoagent/core/viz_agent.py
def add_basemap(self, basemap="open-street-map"):
"""Set basemap style."""
self._style = basemap
add_cog_layer(self, url, name=None, fit_bounds=False, **kwargs)
¶
Add Cloud Optimized GeoTIFF layer.
Source code in geoagent/core/viz_agent.py
def add_cog_layer(self, url, name=None, fit_bounds=False, **kwargs):
"""Add Cloud Optimized GeoTIFF layer."""
self.layers.append(
{
"type": "cog",
"url": url,
"name": name or f"COG Layer {len(self.layers)+1}",
"fit_bounds": fit_bounds,
**kwargs,
}
)
add_geojson(self, data, layer_name=None, style=None, **kwargs)
¶
Add GeoJSON layer.
Source code in geoagent/core/viz_agent.py
def add_geojson(self, data, layer_name=None, style=None, **kwargs):
"""Add GeoJSON layer."""
self.layers.append(
{
"type": "geojson",
"data": data,
"name": layer_name or f"GeoJSON Layer {len(self.layers)+1}",
"style": style,
**kwargs,
}
)
add_layer(self, layer_dict)
¶
Add generic layer.
Source code in geoagent/core/viz_agent.py
def add_layer(self, layer_dict):
"""Add generic layer."""
self.layers.append(layer_dict)
add_pmtiles(self, url, name=None, **kwargs)
¶
Add PMTiles vector layer.
Source code in geoagent/core/viz_agent.py
def add_pmtiles(self, url, name=None, **kwargs):
"""Add PMTiles vector layer."""
self.layers.append(
{
"type": "pmtiles",
"url": url,
"name": name or f"PMTiles Layer {len(self.layers)+1}",
**kwargs,
}
)
add_raster(self, url, layer_name=None, fit_bounds=False, **kwargs)
¶
Add raster layer (fallback to COG).
Source code in geoagent/core/viz_agent.py
def add_raster(self, url, layer_name=None, fit_bounds=False, **kwargs):
"""Add raster layer (fallback to COG)."""
self.add_cog_layer(url, name=layer_name, fit_bounds=fit_bounds, **kwargs)
add_source(self, source_id, source_dict)
¶
Add data source.
Source code in geoagent/core/viz_agent.py
def add_source(self, source_id, source_dict):
"""Add data source."""
# Mock implementation
pass
add_title(self, title)
¶
Add title to map.
Source code in geoagent/core/viz_agent.py
def add_title(self, title):
"""Add title to map."""
self.title = title
to_html(self, filename=None)
¶
Export map to HTML.
Source code in geoagent/core/viz_agent.py
def to_html(self, filename=None):
"""Export map to HTML."""
html = f"""
<div style="text-align: center;">
<h3>{self.title}</h3>
<p>Mock MapLibre Map</p>
<p>Center: {self.center}, Zoom: {self.zoom}</p>
<p>Layers: {len(self.layers)}</p>
<ul>
"""
for layer in self.layers:
html += f"<li>{layer.get('name', 'Unnamed')} ({layer.get('type', 'unknown')})</li>"
html += "</ul></div>"
if filename:
with open(filename, "w") as f:
f.write(html)
return html
VizAgent
¶
Agent responsible for creating geospatial visualizations.
The Visualization Agent takes data and analysis results and creates appropriate leafmap visualizations for display in Jupyter notebooks.
Source code in geoagent/core/viz_agent.py
class VizAgent:
"""Agent responsible for creating geospatial visualizations.
The Visualization Agent takes data and analysis results and creates
appropriate leafmap visualizations for display in Jupyter notebooks.
"""
def __init__(self, llm: Any, tools: Optional[Dict[str, Any]] = None):
"""Initialize the Visualization Agent.
Args:
llm: Language model instance for visualization decisions
tools: Dictionary of available visualization tools
"""
self.llm = llm
self.tools = tools or {}
self._setup_tools()
def _setup_tools(self):
"""Setup and initialize visualization tools."""
try:
# Import visualization tools
# TODO: Enable when actual tools are implemented
# from ..tools.viz import VizTool
# if 'viz' not in self.tools:
# self.tools['viz'] = VizTool()
logger.info("Visualization tools setup (using placeholders)")
except ImportError as e:
logger.warning(f"Visualization tools not available: {e}")
# Graceful fallback - use leafmap directly
def create_visualization(
self,
plan: PlannerOutput,
data: Optional[DataResult] = None,
analysis: Optional[AnalysisResult] = None,
target_map: Optional[Any] = None,
) -> Any:
"""Create appropriate visualization based on available data and analysis.
Args:
plan: Original query plan for context
data: Data retrieved by Data Agent
analysis: Analysis results from Analysis Agent
target_map: Optional existing map to render on instead of creating
a new one. When provided, layers are added to this map.
Returns:
MapLibre Map object ready for display
"""
self._target_map = target_map
if target_map is not None:
self._prepare_target_map(target_map)
logger.info("Creating visualization")
try:
# Determine visualization type based on available data and analysis
viz_type = self._determine_viz_type(plan, data, analysis)
if viz_type == "raster_layer":
return self._create_raster_visualization(plan, data, analysis)
elif viz_type == "vector_layer":
return self._create_vector_visualization(plan, data, analysis)
elif viz_type == "analysis_result":
return self._create_analysis_visualization(plan, data, analysis)
elif viz_type == "time_series":
return self._create_time_series_visualization(plan, data, analysis)
elif viz_type == "split_map":
return self._create_split_map_visualization(plan, data, analysis)
else:
return self._create_default_visualization(plan, data, analysis)
except Exception as e:
logger.error(f"Visualization creation failed: {e}")
return self._create_error_visualization(str(e))
def _prepare_target_map(self, target_map: Any) -> None:
"""Ensure an existing map widget can receive dynamic updates."""
try:
if hasattr(target_map, "use_message_queue"):
target_map.use_message_queue(True)
if (
hasattr(target_map, "create_container")
and getattr(target_map, "container", None) is None
):
target_map.create_container()
except Exception as e:
logger.debug(f"Could not prepare target map: {e}")
def _determine_viz_type(
self,
plan: PlannerOutput,
data: Optional[DataResult] = None,
analysis: Optional[AnalysisResult] = None,
) -> str:
"""Determine the appropriate visualization type.
Args:
plan: Query plan with intent
data: Available data
analysis: Analysis results
Returns:
Visualization type string
"""
# If analysis has specific visualization hints, use those
if analysis and analysis.visualization_hints:
viz_hints = analysis.visualization_hints
if viz_hints.get("type") == "time_series":
return "time_series"
elif viz_hints.get("type") == "split_map":
return "split_map"
# Check for change detection (typically needs split map)
if analysis and "change" in plan.intent.lower():
return "split_map"
# Check data type
if data:
if data.data_type == "raster":
if analysis:
return "analysis_result" # Processed raster
else:
return "raster_layer" # Raw raster
elif data.data_type == "vector":
return "vector_layer"
# Check for time series in intent
if any(
term in plan.intent.lower() for term in ["time series", "temporal", "trend"]
):
return "time_series"
return "default"
def _create_raster_visualization(
self,
plan: PlannerOutput,
data: DataResult,
analysis: Optional[AnalysisResult] = None,
) -> Any:
"""Create visualization for raster data.
Args:
plan: Query plan
data: Raster data
analysis: Optional analysis results
Returns:
MapLibre Map with raster layers
"""
m = create_map(target_map=getattr(self, "_target_map", None))
# Set map center based on data location
if plan.location and "bbox" in plan.location:
bbox = plan.location["bbox"]
center_lat = (bbox[1] + bbox[3]) / 2
center_lon = (bbox[0] + bbox[2]) / 2
m.set_center(center_lon, center_lat, zoom=10)
# Add raster layers using STAC
for i, item in enumerate(data.items[:1]):
item_id = item.get("id", f"Layer {i+1}")
collection = item.get("collection", "")
# Skip mock items
assets = item.get("assets", {})
if not assets or any(
v.get("href", "").startswith("mock://") for v in assets.values()
):
logger.debug(f"Skipping mock item {item_id}")
continue
try:
# Use add_stac_layer for best performance
if MAPLIBRE_AVAILABLE and collection:
viz_assets = self._select_viz_assets(
assets, plan.intent, collection
)
if isinstance(viz_assets, list):
viz_assets = [a for a in viz_assets if a != "rendered_preview"]
elif viz_assets == "rendered_preview":
viz_assets = []
if not viz_assets:
best_asset = self._select_best_asset(assets, plan.intent)
if best_asset and best_asset != "rendered_preview":
viz_assets = [best_asset]
logger.info(
f"Adding STAC layer: collection={collection}, "
f"item={item_id}, assets={viz_assets}"
)
# Build kwargs for add_stac_layer
layer_kwargs = {
"collection": collection,
"item": item_id,
"assets": viz_assets,
"name": f"{collection[:20]}_{item_id[:15]}",
"fit_bounds": True,
"overwrite": True,
}
# Add before_id if available (keeps layers below labels)
if hasattr(m, "first_symbol_layer_id"):
layer_kwargs["before_id"] = m.first_symbol_layer_id
# Add colormap for specific collections
if collection in ("cop-dem-glo-30", "3dep-lidar-hag"):
layer_kwargs["colormap_name"] = "terrain"
elif collection in ("io-lulc-9-class",):
# LULC has its own colormap
pass
elif collection.startswith("jrc-gsw"):
layer_kwargs["colormap_name"] = "Blues"
elif collection in ("modis-14A1-061",):
layer_kwargs["colormap_name"] = "hot"
elif collection in ("modis-10A1-061",):
layer_kwargs["colormap_name"] = "Blues_r"
elif collection in ("modis-11A1-061",):
layer_kwargs["colormap_name"] = "RdYlBu_r"
elif "viirs" in collection:
layer_kwargs["colormap_name"] = "inferno"
m.add_stac_layer(**layer_kwargs)
logger.info(f"Successfully added STAC layer: {item_id}")
# Explicitly fit bounds to the item bbox if available
item_bbox = item.get("bbox")
if item_bbox and len(item_bbox) >= 4:
try:
m.fit_bounds(
[
[item_bbox[0], item_bbox[1]],
[item_bbox[2], item_bbox[3]],
]
)
logger.info(f"Fitted bounds to: {item_bbox}")
except Exception as e:
logger.debug(f"Could not fit bounds: {e}")
# Fallback to set_center
center_lon = (item_bbox[0] + item_bbox[2]) / 2
center_lat = (item_bbox[1] + item_bbox[3]) / 2
m.set_center(center_lon, center_lat, zoom=10)
else:
# Fallback to COG layer with signed URL
asset_key = self._select_best_asset(assets, plan.intent)
if asset_key and asset_key in assets:
asset_url = assets[asset_key]["href"]
logger.info(f"Adding COG layer: {asset_url}")
m.add_cog_layer(
asset_url,
name=item_id,
titiler_endpoint=PC_TITILER_ENDPOINT,
fit_bounds=True,
)
except Exception as e:
logger.error(f"Could not add raster layer {item_id}: {e}")
import traceback
traceback.print_exc()
# Add title
title = f"Raster Visualization: {plan.intent}"
self._add_title_to_map(m, title)
return m
def _create_vector_visualization(
self,
plan: PlannerOutput,
data: DataResult,
analysis: Optional[AnalysisResult] = None,
) -> Any:
"""Create visualization for vector data.
Args:
plan: Query plan
data: Vector data
analysis: Optional analysis results
Returns:
MapLibre Map with vector layers
"""
m = create_map(target_map=getattr(self, "_target_map", None))
# Set map center
if plan.location and "bbox" in plan.location:
bbox = plan.location["bbox"]
center_lat = (bbox[1] + bbox[3]) / 2
center_lon = (bbox[0] + bbox[2]) / 2
m.set_center(center_lon, center_lat, zoom=10)
# Add vector layers
for i, item in enumerate(data.items):
if "geometry" in item:
layer_name = f"Vector Layer {i+1}"
# Determine styling based on analysis
style = {"color": "blue", "weight": 2, "fillOpacity": 0.3}
if analysis and analysis.visualization_hints:
style.update(analysis.visualization_hints.get("style", {}))
try:
if "viz" in self.tools:
viz_tool = self.tools["viz"]
viz_tool.add_vector_layer(m, item, layer_name, style)
else:
# Fallback: add as GeoJSON
m.add_geojson(item, name=layer_name, style=style)
except Exception as e:
logger.warning(f"Could not add vector layer {layer_name}: {e}")
title = f"Vector Visualization: {plan.intent}"
self._add_title_to_map(m, title)
return m
def _create_analysis_visualization(
self, plan: PlannerOutput, data: DataResult, analysis: AnalysisResult
) -> Any:
"""Create visualization for analysis results.
Args:
plan: Query plan
data: Source data
analysis: Analysis results with visualization hints
Returns:
MapLibre Map showing analysis results
"""
m = create_map(target_map=getattr(self, "_target_map", None))
# Set map center
if plan.location and "bbox" in plan.location:
bbox = plan.location["bbox"]
center_lat = (bbox[1] + bbox[3]) / 2
center_lon = (bbox[0] + bbox[2]) / 2
m.set_center(center_lon, center_lat, zoom=10)
# Use visualization hints from analysis
viz_hints = analysis.visualization_hints
try:
if "viz" in self.tools:
viz_tool = self.tools["viz"]
viz_tool.add_analysis_layer(m, analysis.result_data, viz_hints)
else:
# Fallback visualization based on analysis type
self._add_analysis_fallback(m, data, analysis)
except Exception as e:
logger.warning(f"Could not create analysis visualization: {e}")
# Fall back to data visualization
return self._create_raster_visualization(plan, data)
# Add analysis info
title = f"Analysis Results: {plan.intent}"
self._add_title_to_map(m, title)
self._add_analysis_legend(m, analysis)
return m
def _create_time_series_visualization(
self,
plan: PlannerOutput,
data: DataResult,
analysis: Optional[AnalysisResult] = None,
) -> Any:
"""Create time series visualization.
Args:
plan: Query plan
data: Time series data
analysis: Optional analysis results
Returns:
MapLibre Map with time series visualization
"""
m = create_map(target_map=getattr(self, "_target_map", None))
# Set map center
if plan.location:
if "bbox" in plan.location:
bbox = plan.location["bbox"]
center_lat = (bbox[1] + bbox[3]) / 2
center_lon = (bbox[0] + bbox[2]) / 2
elif "lat" in plan.location and "lon" in plan.location:
center_lat = plan.location["lat"]
center_lon = plan.location["lon"]
else:
center_lat, center_lon = 0, 0
m.set_center(center_lon, center_lat, zoom=10)
# Add time series layers
try:
if "viz" in self.tools:
viz_tool = self.tools["viz"]
viz_tool.add_time_series_layers(m, data.items)
else:
# Fallback: add first and last items
if len(data.items) >= 2:
first_item = data.items[0]
last_item = data.items[-1]
# Add layers if they have assets
if "assets" in first_item and "assets" in last_item:
asset_key = self._select_best_asset(
first_item["assets"], plan.intent
)
if asset_key:
first_url = first_item["assets"][asset_key]["href"]
last_url = last_item["assets"][asset_key]["href"]
m.add_cog_layer(
first_url, name="Time Series Start", fit_bounds=True
)
m.add_cog_layer(
last_url, name="Time Series End", fit_bounds=False
)
except Exception as e:
logger.warning(f"Could not create time series visualization: {e}")
title = f"Time Series Visualization: {plan.intent}"
self._add_title_to_map(m, title)
return m
def _create_split_map_visualization(
self,
plan: PlannerOutput,
data: DataResult,
analysis: Optional[AnalysisResult] = None,
) -> Any:
"""Create split map for before/after comparisons.
Args:
plan: Query plan
data: Comparison data
analysis: Optional analysis results
Returns:
Split-panel leafmap Map
"""
try:
# Create split map if we have multiple time periods
if len(data.items) >= 2:
first_item = data.items[0]
last_item = data.items[-1]
if "assets" in first_item and "assets" in last_item:
asset_key = self._select_best_asset(
first_item["assets"], plan.intent
)
if asset_key:
left_url = first_item["assets"][asset_key]["href"]
right_url = last_item["assets"][asset_key]["href"]
# Create split map
m = create_map(target_map=getattr(self, "_target_map", None))
# Set center
if plan.location and "bbox" in plan.location:
bbox = plan.location["bbox"]
center_lat = (bbox[1] + bbox[3]) / 2
center_lon = (bbox[0] + bbox[2]) / 2
m.set_center(center_lon, center_lat, zoom=10)
# Add layers to both sides
m.add_cog_layer(left_url, name="Before", fit_bounds=True)
m.add_cog_layer(right_url, name="After", fit_bounds=False)
title = f"Before/After Comparison: {plan.intent}"
self._add_title_to_map(m, title)
return m
# Fallback if split map cannot be created
return self._create_raster_visualization(plan, data, analysis)
except Exception as e:
logger.warning(f"Could not create split map: {e}")
return self._create_default_visualization(plan, data, analysis)
def _create_default_visualization(
self,
plan: PlannerOutput,
data: Optional[DataResult] = None,
analysis: Optional[AnalysisResult] = None,
) -> Any:
"""Create default visualization when specific type cannot be determined.
Args:
plan: Query plan
data: Available data
analysis: Available analysis
Returns:
Basic leafmap Map
"""
m = create_map(target_map=getattr(self, "_target_map", None))
# Set center based on location if available
if plan.location:
if "bbox" in plan.location:
bbox = plan.location["bbox"]
center_lat = (bbox[1] + bbox[3]) / 2
center_lon = (bbox[0] + bbox[2]) / 2
m.set_center(center_lon, center_lat, zoom=10)
elif "geometry" in plan.location:
# Try to get centroid from geometry
try:
import shapely.geometry as sg
geom = sg.shape(plan.location["geometry"])
centroid = geom.centroid
m.set_center(centroid.x, centroid.y, zoom=10)
except Exception:
pass
# Add simple data visualization if available
if data and data.items:
try:
if data.data_type == "raster":
# Try to add first raster item using STAC layer
item = data.items[0]
item_id = item.get("id", "")
collection = item.get("collection", "")
assets = item.get("assets", {})
# Skip mock items
if assets and not any(
v.get("href", "").startswith("mock://") for v in assets.values()
):
if MAPLIBRE_AVAILABLE and collection:
viz_assets = self._select_viz_assets(
assets, plan.intent, collection
)
if isinstance(viz_assets, list):
viz_assets = [
a for a in viz_assets if a != "rendered_preview"
]
elif viz_assets == "rendered_preview":
viz_assets = []
if not viz_assets:
best_asset = self._select_best_asset(
assets, plan.intent
)
if best_asset and best_asset != "rendered_preview":
viz_assets = [best_asset]
logger.info(
f"Default viz: adding STAC layer {collection}/{item_id}"
)
layer_kwargs = {
"collection": collection,
"item": item_id,
"assets": viz_assets,
"name": f"{collection[:20]}_{item_id[:15]}",
"fit_bounds": True,
"overwrite": True,
}
if hasattr(m, "first_symbol_layer_id"):
layer_kwargs["before_id"] = m.first_symbol_layer_id
# Add colormap for specific collections
if collection in ("cop-dem-glo-30", "3dep-lidar-hag"):
layer_kwargs["colormap_name"] = "terrain"
elif collection.startswith("jrc-gsw"):
layer_kwargs["colormap_name"] = "Blues"
elif collection in ("modis-14A1-061",):
layer_kwargs["colormap_name"] = "hot"
elif collection in ("modis-10A1-061",):
layer_kwargs["colormap_name"] = "Blues_r"
elif collection in ("modis-11A1-061",):
layer_kwargs["colormap_name"] = "RdYlBu_r"
elif "viirs" in collection:
layer_kwargs["colormap_name"] = "inferno"
m.add_stac_layer(**layer_kwargs)
# Fit bounds to item
item_bbox = item.get("bbox")
if item_bbox and len(item_bbox) >= 4:
try:
m.fit_bounds(
[
[item_bbox[0], item_bbox[1]],
[item_bbox[2], item_bbox[3]],
]
)
except Exception:
center_lon = (item_bbox[0] + item_bbox[2]) / 2
center_lat = (item_bbox[1] + item_bbox[3]) / 2
m.set_center(center_lon, center_lat, zoom=10)
else:
# Fallback to COG
asset_key = self._select_best_asset(assets, plan.intent)
if asset_key and asset_key in assets:
asset_url = assets[asset_key]["href"]
if not asset_url.startswith("mock://"):
m.add_cog_layer(
asset_url, name="Data Layer", fit_bounds=True
)
elif data.data_type == "vector":
# Try to add vector data
for item in data.items[:3]: # Limit to 3 items
if "geometry" in item:
m.add_geojson(item, name="Vector Data")
except Exception as e:
logger.error(f"Could not add data to default visualization: {e}")
import traceback
traceback.print_exc()
title = f"GeoAgent Map: {plan.intent}"
self._add_title_to_map(m, title)
return m
def _create_error_visualization(self, error_message: str) -> Any:
"""Create error visualization when something goes wrong.
Args:
error_message: Error description
Returns:
Basic leafmap Map with error information
"""
m = create_map(target_map=getattr(self, "_target_map", None))
m.add_basemap("OpenStreetMap")
# Add error message
self._add_title_to_map(m, f"Visualization Error: {error_message}")
return m
def _select_viz_assets(
self, assets: Dict[str, Any], intent: str, collection: str = ""
) -> list:
"""Select asset names for STAC layer visualization.
Returns a list of asset keys suitable for add_stac_layer.
Args:
assets: Available STAC assets
intent: Analysis intent
collection: STAC collection name (helps determine appropriate assets)
Returns:
List of asset key strings (e.g., ["visual"] or ["B04", "B03", "B02"])
"""
intent_lower = intent.lower()
collection_lower = collection.lower() if collection else ""
# Collection-specific asset selection (like geoai pattern)
if "sentinel-2" in collection_lower:
# Sentinel-2 true color RGB
if "B04" in assets and "B03" in assets and "B02" in assets:
return ["B04", "B03", "B02"]
if "visual" in assets:
return ["visual"]
if "landsat" in collection_lower:
# Landsat RGB (L2 has blue, L1 may not)
if "red" in assets and "green" in assets and "blue" in assets:
return ["red", "green", "blue"]
# Or use available bands
if "red" in assets and "green" in assets:
if "nir08" in assets:
return ["nir08", "red", "green"] # False color
return ["red", "green", "red"] # Fallback
if "naip" in collection_lower:
# NAIP imagery
if "image" in assets:
return ["image"]
if "sentinel-1" in collection_lower:
# Sentinel-1 SAR
if "vv" in assets:
return ["vv"]
if "cop-dem" in collection_lower or "3dep" in collection_lower:
# DEM data
if "data" in assets:
return ["data"]
if "aster" in collection_lower:
# ASTER imagery
if "VNIR" in assets:
return ["VNIR"]
# MODIS collections typically use "data" or a specific band
if "modis" in collection_lower:
if "data" in assets:
return ["data"]
# Some MODIS products have numbered bands
for candidate in ("500m", "250m", "NDVI", "EVI", "LST_Day_1km"):
if candidate in assets:
return [candidate]
# JRC Global Surface Water
if "jrc-gsw" in collection_lower:
for candidate in ("occurrence", "change", "seasonality", "data"):
if candidate in assets:
return [candidate]
# USDA Cropland Data Layer
if "usda-cdl" in collection_lower:
if "data" in assets:
return ["data"]
# 3DEP LIDAR products
if "3dep-lidar" in collection_lower:
if "data" in assets:
return ["data"]
# HLS (Harmonized Landsat Sentinel)
if "hls" in collection_lower:
# Similar band layout to Sentinel-2
if "B04" in assets and "B03" in assets and "B02" in assets:
return ["B04", "B03", "B02"]
if "visual" in assets:
return ["visual"]
# VIIRS nighttime lights
if "viirs" in collection_lower:
if "data" in assets:
return ["data"]
# DEM and land cover collections use "data" or "map" asset
if any(
term in intent_lower
for term in [
"dem",
"elevation",
"terrain",
"land_cover",
"land cover",
"landcover",
"lulc",
"land use",
]
):
if "data" in assets:
return ["data"]
if "map" in assets:
return ["map"]
# For imagery/visual requests, prefer true color composite
if "visual" in assets:
return ["visual"]
# For NDVI, show NIR-Red false color or just visual
if any(term in intent_lower for term in ["ndvi", "vegetation"]):
if "nir" in assets and "red" in assets and "green" in assets:
return ["nir", "red", "green"]
if "B08" in assets and "B04" in assets and "B03" in assets:
return ["B08", "B04", "B03"]
# RGB composite
if "red" in assets and "green" in assets and "blue" in assets:
return ["red", "green", "blue"]
if "B04" in assets and "B03" in assets and "B02" in assets:
return ["B04", "B03", "B02"]
# Fallback: look for common asset names
for possible in ["visual", "image", "data"]:
if possible in assets:
return [possible]
# Last resort: first available asset
if assets:
return [list(assets.keys())[0]]
return []
def _select_best_asset(self, assets: Dict[str, Any], intent: str) -> Optional[str]:
"""Select the best asset for visualization based on intent.
Args:
assets: Available STAC assets
intent: Analysis intent
Returns:
Best asset key or None
"""
intent_lower = intent.lower()
# For NDVI and vegetation analysis, prefer red or nir
if any(term in intent_lower for term in ["ndvi", "vegetation", "green"]):
for key in ["nir", "red", "B04", "B08"]:
if key in assets:
return key
# For RGB visualization
if any(term in intent_lower for term in ["rgb", "color", "visual"]):
for key in ["visual", "rgb", "red"]:
if key in assets:
return key
# Default preference order
preference_order = [
"visual",
"rgb",
"red",
"nir",
"B04",
"B03",
"B02",
"B08",
"swir",
"B11",
"B12",
]
for key in preference_order:
if key in assets and key != "rendered_preview":
return key
# Return first available asset
if assets:
for key in assets.keys():
if key != "rendered_preview":
return key
return None
def _add_analysis_fallback(
self, m: Any, data: DataResult, analysis: AnalysisResult
):
"""Add analysis visualization fallback when viz tools are not available.
Args:
m: Map to add layers to
data: Source data
analysis: Analysis results
"""
viz_hints = analysis.visualization_hints
viz_type = viz_hints.get("type", "") if viz_hints else ""
# Handle typed data via STAC layer (land cover, elevation, water, fire, etc.)
_stac_viz_types = (
"land_cover",
"elevation",
"water_mapping",
"fire_detection",
"snow_cover",
"surface_temperature",
"event_impact",
)
if viz_type in _stac_viz_types and data and data.items:
item = data.items[0]
item_id = item.get("id", "")
collection = item.get("collection", "")
if MAPLIBRE_AVAILABLE and collection:
asset_key = viz_hints.get("asset_key", "data")
try:
layer_kwargs = {
"collection": collection,
"item": item_id,
"assets": [asset_key],
"name": viz_hints.get(
"title", f"{collection[:20]}_{item_id[:15]}"
),
"fit_bounds": True,
"overwrite": True,
}
# Add before_id if available
if hasattr(m, "first_symbol_layer_id"):
layer_kwargs["before_id"] = m.first_symbol_layer_id
# Add colormap for specific viz types
_viz_colormap_map = {
"elevation": "terrain",
"water_mapping": "Blues",
"fire_detection": "hot",
"snow_cover": "Blues_r",
"surface_temperature": "RdYlBu_r",
}
cmap = _viz_colormap_map.get(viz_type)
if cmap:
layer_kwargs["colormap_name"] = cmap
m.add_stac_layer(**layer_kwargs)
logger.info(f"Added {viz_type} STAC layer: {collection}/{item_id}")
# Fit bounds to item
item_bbox = item.get("bbox")
if item_bbox and len(item_bbox) >= 4:
try:
m.fit_bounds(
[
[item_bbox[0], item_bbox[1]],
[item_bbox[2], item_bbox[3]],
]
)
except Exception:
center_lon = (item_bbox[0] + item_bbox[2]) / 2
center_lat = (item_bbox[1] + item_bbox[3]) / 2
m.set_center(center_lon, center_lat, zoom=10)
return
except Exception as e:
logger.warning(f"Could not add {viz_type} STAC layer: {e}")
# Check if we have a computed NDVI raster to display
ndvi_path = None
if viz_hints and "ndvi_path" in viz_hints:
ndvi_path = viz_hints["ndvi_path"]
elif isinstance(analysis.result_data, dict):
ndvi_path = analysis.result_data.get("ndvi_path")
if ndvi_path and os.path.exists(ndvi_path):
try:
m.add_raster(
ndvi_path,
layer_name="NDVI",
colormap="RdYlGn",
vmin=-0.2,
vmax=0.8,
fit_bounds=True,
)
logger.info(f"Added NDVI raster layer from {ndvi_path}")
return
except Exception as e:
logger.warning(f"Could not add NDVI raster: {e}")
# Fallback: try to add raw band from data
if data.items and "assets" in data.items[0]:
asset_key = self._select_best_asset(data.items[0]["assets"], "ndvi")
if asset_key:
asset_url = data.items[0]["assets"][asset_key]["href"]
if not asset_url.startswith("mock://"):
try:
m.add_cog_layer(
asset_url, name="NDVI Analysis", fit_bounds=True
)
except Exception as e:
logger.warning(f"Could not add COG layer: {e}")
def _add_title_to_map(self, m: Any, title: str):
"""Add title to the map.
Args:
m: MapLibre map to add title to
title: Title text
"""
try:
# Try to add title using MapLibre functionality
if hasattr(m, "add_title"):
m.add_title(title)
elif hasattr(m, "title"):
# For MockMapLibreMap
m.title = title
else:
# Fallback: log the title
logger.info(f"Map title: {title}")
except Exception as e:
logger.debug(f"Could not add title to map: {e}")
def _add_analysis_legend(self, m: Any, analysis: AnalysisResult):
"""Add legend for analysis results.
Args:
m: Map to add legend to
analysis: Analysis results with visualization hints
"""
try:
viz_hints = analysis.visualization_hints
if "colormap" in viz_hints and "vmin" in viz_hints and "vmax" in viz_hints:
# Add colorbar legend (leafmap maplibregl uses cmap/label)
if hasattr(m, "add_colorbar"):
m.add_colorbar(
cmap=viz_hints["colormap"],
vmin=viz_hints["vmin"],
vmax=viz_hints["vmax"],
label=viz_hints.get("title", "Analysis Result"),
)
except Exception as e:
logger.debug(f"Could not add legend to map: {e}")
__init__(self, llm, tools=None)
special
¶
Initialize the Visualization Agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm |
Any |
Language model instance for visualization decisions |
required |
tools |
Optional[Dict[str, Any]] |
Dictionary of available visualization tools |
None |
Source code in geoagent/core/viz_agent.py
def __init__(self, llm: Any, tools: Optional[Dict[str, Any]] = None):
"""Initialize the Visualization Agent.
Args:
llm: Language model instance for visualization decisions
tools: Dictionary of available visualization tools
"""
self.llm = llm
self.tools = tools or {}
self._setup_tools()
create_visualization(self, plan, data=None, analysis=None, target_map=None)
¶
Create appropriate visualization based on available data and analysis.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
plan |
PlannerOutput |
Original query plan for context |
required |
data |
Optional[geoagent.core.models.DataResult] |
Data retrieved by Data Agent |
None |
analysis |
Optional[geoagent.core.models.AnalysisResult] |
Analysis results from Analysis Agent |
None |
target_map |
Optional[Any] |
Optional existing map to render on instead of creating a new one. When provided, layers are added to this map. |
None |
Returns:
| Type | Description |
|---|---|
Any |
MapLibre Map object ready for display |
Source code in geoagent/core/viz_agent.py
def create_visualization(
self,
plan: PlannerOutput,
data: Optional[DataResult] = None,
analysis: Optional[AnalysisResult] = None,
target_map: Optional[Any] = None,
) -> Any:
"""Create appropriate visualization based on available data and analysis.
Args:
plan: Original query plan for context
data: Data retrieved by Data Agent
analysis: Analysis results from Analysis Agent
target_map: Optional existing map to render on instead of creating
a new one. When provided, layers are added to this map.
Returns:
MapLibre Map object ready for display
"""
self._target_map = target_map
if target_map is not None:
self._prepare_target_map(target_map)
logger.info("Creating visualization")
try:
# Determine visualization type based on available data and analysis
viz_type = self._determine_viz_type(plan, data, analysis)
if viz_type == "raster_layer":
return self._create_raster_visualization(plan, data, analysis)
elif viz_type == "vector_layer":
return self._create_vector_visualization(plan, data, analysis)
elif viz_type == "analysis_result":
return self._create_analysis_visualization(plan, data, analysis)
elif viz_type == "time_series":
return self._create_time_series_visualization(plan, data, analysis)
elif viz_type == "split_map":
return self._create_split_map_visualization(plan, data, analysis)
else:
return self._create_default_visualization(plan, data, analysis)
except Exception as e:
logger.error(f"Visualization creation failed: {e}")
return self._create_error_visualization(str(e))
create_map(target_map=None, **kwargs)
¶
Create a MapLibre map object (real if available, otherwise mock).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_map |
If provided, return this map instead of creating a new one. Allows rendering on an existing map widget (e.g., the Solara UI map). |
None |
|
**kwargs |
Passed to MapLibreMap or MockMapLibreMap constructors. |
{} |
Returns:
| Type | Description |
|---|---|
Map object for visualization. |
Source code in geoagent/core/viz_agent.py
def create_map(target_map=None, **kwargs):
"""Create a MapLibre map object (real if available, otherwise mock).
Args:
target_map: If provided, return this map instead of creating a new one.
Allows rendering on an existing map widget (e.g., the Solara UI map).
**kwargs: Passed to MapLibreMap or MockMapLibreMap constructors.
Returns:
Map object for visualization.
"""
if target_map is not None:
return target_map
if MAPLIBRE_AVAILABLE:
kwargs["add_sidebar"] = True
kwargs["sidebar_visible"] = False
kwargs["add_floating_sidebar"] = False
return MapLibreMap(**kwargs)
else:
return MockMapLibreMap(**kwargs)