Data Agent¶
The Data Agent searches and retrieves geospatial data from STAC catalogs and other sources.
geoagent.core.data_agent
¶
Data Agent for fetching geospatial data from various sources.
The Data Agent is responsible for searching and retrieving geospatial data based on structured query parameters from the Planner Agent.
DataAgent
¶
Agent responsible for fetching geospatial data from various sources.
The Data Agent takes structured query parameters and searches for relevant geospatial data using STAC catalogs, DuckDB queries, and other data sources.
Source code in geoagent/core/data_agent.py
class DataAgent:
"""Agent responsible for fetching geospatial data from various sources.
The Data Agent takes structured query parameters and searches for relevant
geospatial data using STAC catalogs, DuckDB queries, and other data sources.
"""
def __init__(self, llm: Any, tools: Optional[Dict[str, Any]] = None):
"""Initialize the Data Agent.
Args:
llm: Language model instance for decision making
tools: Dictionary of available data tools (stac, duckdb, etc.)
"""
self.llm = llm
self.tools = tools or {}
self._setup_tools()
def _setup_tools(self):
"""Setup and initialize data tools."""
try:
if "stac" not in self.tools:
self.tools["stac"] = STACSearchWrapper()
logger.info("Data tools initialized (STAC search available)")
except Exception as e:
logger.warning(f"Some data tools not available: {e}")
def search_data(self, plan: PlannerOutput) -> DataResult:
"""Search for geospatial data based on the query plan.
Args:
plan: Structured query parameters from Planner Agent
Returns:
DataResult containing found data items and metadata
"""
logger.info(f"Searching data for intent: {plan.intent}")
try:
# Determine data type based on intent and dataset
data_type = self._determine_data_type(plan)
if data_type == "raster":
return self._search_raster_data(plan)
elif data_type == "vector":
return self._search_vector_data(plan)
elif data_type == "tabular":
return self._search_tabular_data(plan)
else:
# Fallback: try multiple sources
return self._search_multi_source(plan)
except Exception as e:
logger.error(f"Error searching data: {e}")
return DataResult(
items=[], metadata={"error": str(e)}, data_type="unknown", total_items=0
)
def _determine_data_type(self, plan: PlannerOutput) -> str:
"""Determine the primary data type needed based on the query plan.
Args:
plan: Query plan with intent and parameters
Returns:
Data type: "raster", "vector", or "tabular"
"""
intent = plan.intent.lower()
dataset = (plan.dataset or "").lower()
# Raster analysis indicators
raster_keywords = [
"ndvi",
"spectral",
"index",
"satellite",
"imagery",
"sentinel",
"landsat",
"modis",
"pixel",
"band",
]
# Vector analysis indicators
vector_keywords = [
"boundary",
"polygon",
"point",
"line",
"geometry",
"administrative",
"road",
"building",
"parcel",
]
# Check for explicit raster datasets
if any(sat in dataset for sat in ["sentinel", "landsat", "modis", "aster"]):
return "raster"
# Check intent for raster operations
if any(keyword in intent for keyword in raster_keywords):
return "raster"
# Check intent for vector operations
if any(keyword in intent for keyword in vector_keywords):
return "vector"
# Default to raster for satellite-based analysis
return "raster"
def _search_raster_data(self, plan: PlannerOutput) -> DataResult:
"""Search for raster/satellite imagery using STAC catalogs.
Args:
plan: Query plan with spatial/temporal parameters
Returns:
DataResult with STAC items
"""
if "stac" not in self.tools:
logger.warning("STAC tool not available")
return self._create_mock_result("raster", plan)
try:
# Build STAC search parameters
search_params = self._build_stac_params(plan)
# Execute STAC search
stac_tool = self.tools["stac"]
items = stac_tool.search(**search_params)
return DataResult(
items=items,
metadata={
"search_params": search_params,
"catalog": getattr(stac_tool, "catalog_url", "unknown"),
},
data_type="raster",
total_items=len(items),
search_query=search_params,
)
except Exception as e:
logger.error(f"STAC search failed: {e}")
return self._create_mock_result("raster", plan)
def _search_vector_data(self, plan: PlannerOutput) -> DataResult:
"""Search for vector data using various sources.
Args:
plan: Query plan with spatial parameters
Returns:
DataResult with vector data references
"""
if "duckdb" not in self.tools:
logger.warning("DuckDB tool not available")
return self._create_mock_result("vector", plan)
try:
# Build query for vector data
query_params = self._build_vector_params(plan)
# Execute DuckDB query
duckdb_tool = self.tools["duckdb"]
results = duckdb_tool.query(**query_params)
return DataResult(
items=results,
metadata={"query_params": query_params, "source": "duckdb"},
data_type="vector",
total_items=len(results),
search_query=query_params,
)
except Exception as e:
logger.error(f"Vector search failed: {e}")
return self._create_mock_result("vector", plan)
def _search_tabular_data(self, plan: PlannerOutput) -> DataResult:
"""Search for tabular data using DuckDB.
Args:
plan: Query plan with data requirements
Returns:
DataResult with tabular data
"""
if "duckdb" not in self.tools:
logger.warning("DuckDB tool not available")
return self._create_mock_result("tabular", plan)
try:
# Build tabular query
query_params = self._build_tabular_params(plan)
# Execute query
duckdb_tool = self.tools["duckdb"]
results = duckdb_tool.query(**query_params)
return DataResult(
items=results,
metadata={"query_params": query_params, "source": "duckdb"},
data_type="tabular",
total_items=len(results),
search_query=query_params,
)
except Exception as e:
logger.error(f"Tabular search failed: {e}")
return self._create_mock_result("tabular", plan)
def _search_multi_source(self, plan: PlannerOutput) -> DataResult:
"""Search across multiple data sources and combine results.
Args:
plan: Query plan
Returns:
Combined DataResult
"""
# Try raster first, then vector if no results
raster_result = self._search_raster_data(plan)
if raster_result.total_items > 0:
return raster_result
vector_result = self._search_vector_data(plan)
if vector_result.total_items > 0:
return vector_result
# Return empty result if nothing found
return DataResult(
items=[],
metadata={"searched_sources": ["raster", "vector"]},
data_type="unknown",
total_items=0,
)
def _build_stac_params(self, plan: PlannerOutput) -> Dict[str, Any]:
"""Build STAC search parameters from query plan.
Args:
plan: Query plan with spatial/temporal info
Returns:
STAC search parameters dictionary
"""
params = {}
# Add spatial parameters
if plan.location:
if "bbox" in plan.location:
params["bbox"] = plan.location["bbox"]
elif "geometry" in plan.location:
params["intersects"] = plan.location["geometry"]
# Add temporal parameters
if plan.time_range:
start_date = plan.time_range.get("start_date")
end_date = plan.time_range.get("end_date")
if start_date and end_date:
params["datetime"] = f"{start_date}/{end_date}"
# Add collection/dataset filters
if plan.dataset:
# Use the planner-provided dataset directly as the collection ID
params["collections"] = [plan.dataset]
# Resolve collection from analysis_type or intent if not already set
if "collections" not in params:
analysis_type = (plan.analysis_type or "").lower()
intent_lower = plan.intent.lower()
# Check analysis_type first (most reliable signal from planner)
if analysis_type in ("land_cover", "classification", "lulc"):
params["collections"] = ["io-lulc-9-class"]
elif analysis_type in (
"elevation",
"dem",
"terrain",
"slope",
"hillshade",
):
params["collections"] = ["cop-dem-glo-30"]
# Then check intent keywords
elif any(
kw in intent_lower
for kw in ["land cover", "landcover", "lulc", "land use"]
):
params["collections"] = ["io-lulc-9-class"]
elif any(
kw in intent_lower
for kw in ["dem", "elevation", "terrain", "height", "topograph"]
):
params["collections"] = ["cop-dem-glo-30"]
elif any(
kw in intent_lower
for kw in [
"ndvi",
"evi",
"vegetation",
"spectral",
"band",
"imagery",
]
):
params["collections"] = ["sentinel-2-l2a"]
# Add cloud cover filter only for imagery collections (not DEM/land cover)
# Heuristic: imagery collections often contain these keywords
current_collections = set(params.get("collections", []))
imagery_keywords = ("sentinel", "landsat", "naip", "modis")
is_imagery = any(
any(kw in (c or "").lower() for kw in imagery_keywords)
for c in current_collections
)
if is_imagery:
max_cloud = plan.parameters.get("max_cloud_cover") or plan.parameters.get(
"cloud_cover"
)
if max_cloud is not None:
params["query"] = {"eo:cloud_cover": {"lt": max_cloud}}
# Add limit to prevent too many results
params["max_items"] = plan.parameters.get("max_items", 10)
return params
def _build_vector_params(self, plan: PlannerOutput) -> Dict[str, Any]:
"""Build vector query parameters from plan.
Args:
plan: Query plan
Returns:
Vector query parameters
"""
params = {
"intent": plan.intent,
"location": plan.location,
"parameters": plan.parameters,
}
return params
def _build_tabular_params(self, plan: PlannerOutput) -> Dict[str, Any]:
"""Build tabular query parameters from plan.
Args:
plan: Query plan
Returns:
Tabular query parameters
"""
params = {
"intent": plan.intent,
"dataset": plan.dataset,
"parameters": plan.parameters,
}
return params
def _create_mock_result(self, data_type: str, plan: PlannerOutput) -> DataResult:
"""Create a mock result when tools are not available.
Args:
data_type: Type of data that was requested
plan: Original query plan
Returns:
Mock DataResult for development/testing
"""
logger.info(f"Creating mock {data_type} result for development")
mock_items = []
if data_type == "raster":
mock_items = [
{
"id": "mock_sentinel2_item",
"collection": "sentinel-2-l2a",
"geometry": (
plan.location.get("geometry") if plan.location else None
),
"properties": {
"datetime": "2024-07-15T10:30:00Z",
"cloud_cover": 5.2,
},
"assets": {
"red": {"href": "mock://red.tif"},
"nir": {"href": "mock://nir.tif"},
},
}
]
return DataResult(
items=mock_items,
metadata={"mock": True, "reason": "tools_not_available"},
data_type=data_type,
total_items=len(mock_items),
)
__init__(self, llm, tools=None)
special
¶
Initialize the Data Agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm |
Any |
Language model instance for decision making |
required |
tools |
Optional[Dict[str, Any]] |
Dictionary of available data tools (stac, duckdb, etc.) |
None |
Source code in geoagent/core/data_agent.py
def __init__(self, llm: Any, tools: Optional[Dict[str, Any]] = None):
"""Initialize the Data Agent.
Args:
llm: Language model instance for decision making
tools: Dictionary of available data tools (stac, duckdb, etc.)
"""
self.llm = llm
self.tools = tools or {}
self._setup_tools()
search_data(self, plan)
¶
Search for geospatial data based on the query plan.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
plan |
PlannerOutput |
Structured query parameters from Planner Agent |
required |
Returns:
| Type | Description |
|---|---|
DataResult |
DataResult containing found data items and metadata |
Source code in geoagent/core/data_agent.py
def search_data(self, plan: PlannerOutput) -> DataResult:
"""Search for geospatial data based on the query plan.
Args:
plan: Structured query parameters from Planner Agent
Returns:
DataResult containing found data items and metadata
"""
logger.info(f"Searching data for intent: {plan.intent}")
try:
# Determine data type based on intent and dataset
data_type = self._determine_data_type(plan)
if data_type == "raster":
return self._search_raster_data(plan)
elif data_type == "vector":
return self._search_vector_data(plan)
elif data_type == "tabular":
return self._search_tabular_data(plan)
else:
# Fallback: try multiple sources
return self._search_multi_source(plan)
except Exception as e:
logger.error(f"Error searching data: {e}")
return DataResult(
items=[], metadata={"error": str(e)}, data_type="unknown", total_items=0
)
STACSearchWrapper
¶
Wrapper around pystac_client for STAC searches.
Source code in geoagent/core/data_agent.py
class STACSearchWrapper:
"""Wrapper around pystac_client for STAC searches."""
def __init__(self, catalog_url: str = None):
self.catalog_url = (
catalog_url or "https://planetarycomputer.microsoft.com/api/stac/v1"
)
self._client = None
@property
def client(self):
if self._client is None:
from pystac_client import Client
# Use planetary_computer modifier for signed URLs if available
modifier = None
if "planetarycomputer" in self.catalog_url:
try:
import planetary_computer
modifier = planetary_computer.sign_inplace
except ImportError:
logger.warning(
"planetary_computer not installed. "
"Run: pip install planetary-computer"
)
self._client = Client.open(self.catalog_url, modifier=modifier)
return self._client
def search(
self,
bbox=None,
datetime=None,
collections=None,
max_items=10,
**kwargs,
):
"""Search STAC catalog and return list of item dicts."""
search_params = {}
if bbox:
search_params["bbox"] = bbox
if datetime:
search_params["datetime"] = datetime
if collections:
search_params["collections"] = collections
if "query" in kwargs:
search_params["query"] = kwargs.pop("query")
search_params["limit"] = max_items
search = self.client.search(**search_params)
results = []
for item in search.items():
if len(results) >= max_items:
break
item_data = {
"id": item.id,
"collection": item.collection_id,
"geometry": item.geometry,
"bbox": list(item.bbox) if item.bbox else None,
"properties": item.properties,
"assets": {},
}
for asset_key, asset in item.assets.items():
item_data["assets"][asset_key] = {
"href": asset.href,
"type": asset.media_type,
"title": asset.title,
"roles": asset.roles if asset.roles else [],
}
results.append(item_data)
return results
search(self, bbox=None, datetime=None, collections=None, max_items=10, **kwargs)
¶
Search STAC catalog and return list of item dicts.
Source code in geoagent/core/data_agent.py
def search(
self,
bbox=None,
datetime=None,
collections=None,
max_items=10,
**kwargs,
):
"""Search STAC catalog and return list of item dicts."""
search_params = {}
if bbox:
search_params["bbox"] = bbox
if datetime:
search_params["datetime"] = datetime
if collections:
search_params["collections"] = collections
if "query" in kwargs:
search_params["query"] = kwargs.pop("query")
search_params["limit"] = max_items
search = self.client.search(**search_params)
results = []
for item in search.items():
if len(results) >= max_items:
break
item_data = {
"id": item.id,
"collection": item.collection_id,
"geometry": item.geometry,
"bbox": list(item.bbox) if item.bbox else None,
"properties": item.properties,
"assets": {},
}
for asset_key, asset in item.assets.items():
item_data["assets"][asset_key] = {
"href": asset.href,
"type": asset.media_type,
"title": asset.title,
"roles": asset.roles if asset.roles else [],
}
results.append(item_data)
return results