Skip to content

GeoAgent

The main GeoAgent class orchestrates the 4-agent pipeline.

geoagent.core.agent

Main GeoAgent orchestrator using LangGraph for agent coordination.

This module contains the main GeoAgent class that orchestrates the entire geospatial analysis pipeline using multiple specialized agents.

AgentState (dict)

State passed between agents in the LangGraph workflow.

Source code in geoagent/core/agent.py
class AgentState(TypedDict):
    """State passed between agents in the LangGraph workflow."""

    query: str
    plan: Optional[PlannerOutput]
    data: Optional[DataResult]
    analysis: Optional[AnalysisResult]
    map: Optional[Any]
    code: str
    error: Optional[str]
    should_analyze: bool
    should_visualize: bool

GeoAgent

Main GeoAgent orchestrator for geospatial analysis workflows.

GeoAgent coordinates multiple specialized agents to perform end-to-end geospatial data analysis from natural language queries.

Source code in geoagent/core/agent.py
class GeoAgent:
    """Main GeoAgent orchestrator for geospatial analysis workflows.

    GeoAgent coordinates multiple specialized agents to perform end-to-end
    geospatial data analysis from natural language queries.
    """

    def __init__(
        self,
        llm: Optional[Any] = None,
        provider: Optional[str] = None,
        model: Optional[str] = None,
        catalogs: Optional[List[str]] = None,
    ):
        """Initialize GeoAgent with LLM and configuration.

        Args:
            llm: Language model instance. If None, uses get_default_llm()
            provider: LLM provider name (e.g., 'openai', 'anthropic')
            model: Specific model name
            catalogs: List of STAC catalog URLs to search
        """
        self.llm = llm or get_default_llm()
        self.provider = provider
        self.model = model
        self.catalogs = catalogs or []

        # Fetch available collections from the Planetary Computer STAC (with fallback)
        try:
            self.collection_index = get_collection_index()
        except Exception as e:
            logger.warning(
                f"Failed to fetch collection index: {e}. Proceeding without it."
            )
            self.collection_index = []

        # Initialize specialized agents
        self.planner = Planner(self.llm, collections=self.collection_index)
        self.data_agent = DataAgent(self.llm)
        self.analysis_agent = AnalysisAgent(self.llm)
        self.viz_agent = VizAgent(self.llm)

        # Initialize workflow graph
        self.workflow = self._create_workflow()

        logger.info("GeoAgent initialized successfully")

    def chat(self, query: str) -> GeoAgentResponse:
        """Main method to process a natural language query.

        Args:
            query: Natural language geospatial analysis query

        Returns:
            GeoAgentResponse with complete pipeline results
        """
        logger.info(f"Processing query: {query}")
        start_time = time.time()

        try:
            # Initialize state
            initial_state = AgentState(
                query=query,
                plan=None,
                data=None,
                analysis=None,
                map=None,
                code="",
                error=None,
                should_analyze=True,
                should_visualize=True,
            )

            # Execute workflow
            if LANGGRAPH_AVAILABLE and self.workflow:
                final_state = self.workflow.invoke(initial_state)
            else:
                # Fallback to sequential execution
                final_state = self._sequential_execution(initial_state)

            # Create response
            execution_time = time.time() - start_time

            response = GeoAgentResponse(
                plan=final_state["plan"],
                data=final_state["data"],
                analysis=final_state["analysis"],
                map=final_state["map"],
                code=final_state["code"],
                success=final_state["error"] is None,
                error_message=final_state["error"],
                execution_time=execution_time,
            )

            logger.info(f"Query processed successfully in {execution_time:.2f}s")
            return response

        except Exception as e:
            execution_time = time.time() - start_time
            logger.error(f"Query processing failed: {e}")

            return GeoAgentResponse(
                plan=PlannerOutput(intent=query, confidence=0.0),
                success=False,
                error_message=str(e),
                execution_time=execution_time,
            )

    def search(self, query: str) -> DataResult:
        """Shortcut method to just search for data without analysis.

        Args:
            query: Natural language data search query

        Returns:
            DataResult with found data
        """
        logger.info(f"Data search for: {query}")

        try:
            # Parse query into plan
            plan = self._parse_query(query)

            # Search for data
            data_result = self.data_agent.search_data(plan)

            logger.info(f"Found {data_result.total_items} data items")
            return data_result

        except Exception as e:
            logger.error(f"Data search failed: {e}")
            return DataResult(
                items=[], metadata={"error": str(e)}, data_type="unknown", total_items=0
            )

    def analyze(self, query: str) -> GeoAgentResponse:
        """Shortcut method for search + analysis without visualization.

        Args:
            query: Natural language analysis query

        Returns:
            GeoAgentResponse with data and analysis results
        """
        logger.info(f"Analysis for: {query}")

        try:
            # Parse query
            plan = self._parse_query(query)

            # Search data
            data = self.data_agent.search_data(plan)

            # Perform analysis
            analysis = self.analysis_agent.analyze(plan, data)

            response = GeoAgentResponse(
                plan=plan,
                data=data,
                analysis=analysis,
                code=analysis.code_generated,
                success=analysis.success,
                error_message=analysis.error_message,
            )

            logger.info("Analysis completed")
            return response

        except Exception as e:
            logger.error(f"Analysis failed: {e}")
            return GeoAgentResponse(
                plan=PlannerOutput(intent=query, confidence=0.0),
                success=False,
                error_message=str(e),
            )

    def visualize(self, query: str) -> GeoAgentResponse:
        """Run full pipeline including MapLibre GL visualization.

        Args:
            query: Natural language query for complete analysis

        Returns:
            GeoAgentResponse with MapLibre map visualization
        """
        return self.chat(query)  # Full pipeline is the same as chat

    def _create_workflow(self) -> Optional[Any]:
        """Create LangGraph workflow for agent coordination.

        Returns:
            Compiled LangGraph workflow or None if LangGraph unavailable
        """
        if not LANGGRAPH_AVAILABLE:
            return None

        try:
            # Create state graph
            workflow = StateGraph(AgentState)

            # Add nodes
            workflow.add_node("plan", self._plan_node)
            workflow.add_node("fetch_data", self._fetch_data_node)
            workflow.add_node("analyze", self._analyze_node)
            workflow.add_node("visualize", self._visualize_node)

            # Define edges
            workflow.set_entry_point("plan")
            workflow.add_edge("plan", "fetch_data")
            workflow.add_conditional_edges(
                "fetch_data",
                self._should_analyze,
                {True: "analyze", False: "visualize"},
            )
            workflow.add_conditional_edges(
                "analyze", self._should_visualize, {True: "visualize", False: END}
            )
            workflow.add_edge("visualize", END)

            return workflow.compile()

        except Exception as e:
            logger.warning(f"Could not create LangGraph workflow: {e}")
            return None

    def _sequential_execution(self, state: AgentState) -> AgentState:
        """Fallback sequential execution when LangGraph is not available.

        Args:
            state: Initial agent state

        Returns:
            Final agent state
        """
        logger.info("Using sequential execution (LangGraph not available)")

        try:
            # Step 1: Plan
            state = self._plan_node(state)

            # Step 2: Fetch data
            state = self._fetch_data_node(state)

            # Step 3: Analyze (if needed)
            if (
                state["should_analyze"]
                and state["data"]
                and state["data"].total_items > 0
            ):
                state = self._analyze_node(state)

            # Step 4: Visualize (if needed)
            if state["should_visualize"]:
                state = self._visualize_node(state)

            return state

        except Exception as e:
            state["error"] = str(e)
            logger.error(f"Sequential execution failed: {e}")
            return state

    def _plan_node(self, state: AgentState) -> AgentState:
        """Planning node - parse natural language query into structured parameters.

        Args:
            state: Current agent state

        Returns:
            Updated state with plan
        """
        logger.debug("Executing planning node")

        try:
            plan = self._parse_query(state["query"])
            state["plan"] = plan

            # Determine if we need analysis and visualization
            intent_lower = plan.intent.lower()

            # Analysis is needed for computational tasks
            analysis_keywords = [
                "calculate",
                "compute",
                "analyze",
                "ndvi",
                "evi",
                "index",
                "statistics",
                "mean",
                "median",
                "change",
                "trend",
                "zonal",
            ]
            needs_analysis = any(kw in intent_lower for kw in analysis_keywords)
            # Land cover and elevation need analysis routing for proper viz hints
            analysis_type_hint = (plan.analysis_type or "").lower()
            if analysis_type_hint in (
                "land_cover",
                "classification",
                "lulc",
                "elevation",
                "dem",
                "terrain",
            ):
                needs_analysis = True
            state["should_analyze"] = needs_analysis

            # Visualization is usually desired unless explicitly asking for just data
            viz_skip_keywords = ["download", "list", "count", "metadata"]
            state["should_visualize"] = not any(
                kw in intent_lower for kw in viz_skip_keywords
            )

            logger.debug(
                f"Plan created: analyze={state['should_analyze']}, visualize={state['should_visualize']}"
            )

        except Exception as e:
            state["error"] = f"Planning failed: {e}"
            logger.error(state["error"])

        return state

    def _fetch_data_node(self, state: AgentState) -> AgentState:
        """Data fetching node - search and retrieve geospatial data.

        Args:
            state: Current agent state

        Returns:
            Updated state with data
        """
        logger.debug("Executing data fetching node")

        try:
            if state["plan"]:
                data = self.data_agent.search_data(state["plan"])
                state["data"] = data

                # Generate reproducible search code
                state["code"] += self._generate_search_code(state["plan"], data)

                logger.debug(
                    f"Data fetched: {data.total_items} items of type {data.data_type}"
                )
            else:
                state["error"] = "No plan available for data fetching"

        except Exception as e:
            state["error"] = f"Data fetching failed: {e}"
            logger.error(state["error"])

        return state

    def _generate_search_code(self, plan: PlannerOutput, data: Any) -> str:
        """Generate reproducible Python code for the STAC search.

        Args:
            plan: Query plan used for the search
            data: Search results

        Returns:
            Python code string
        """
        bbox = plan.location.get("bbox") if plan.location else None
        location_name = plan.location.get("name", "") if plan.location else ""
        time_range = plan.time_range
        datetime_str = ""
        if time_range:
            datetime_str = (
                f"{time_range.get('start_date', '')}/{time_range.get('end_date', '')}"
            )

        # Build collection: use planner output directly; fallback to Sentinel-2 if absent
        dataset = plan.dataset
        collection = dataset if dataset else "sentinel-2-l2a"

        # Cloud cover filter only for imagery collections
        cloud_filter = ""
        imagery_collections = {
            "sentinel-2-l2a",
            "landsat-c2-l2",
            "naip",
            "sentinel-1-grd",
        }
        max_cc = plan.parameters.get("max_cloud_cover")
        if max_cc is not None and collection in imagery_collections:
            cloud_filter = f'\n    query={{"eo:cloud_cover": {{"lt": {max_cc}}}}},'

        code = f'''import planetary_computer
from pystac_client import Client

# Search Planetary Computer STAC catalog{f" - {location_name}" if location_name else ""}
catalog = Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=planetary_computer.sign_inplace,
)

search = catalog.search(
    collections=["{collection}"],
    bbox={bbox},{f"""
    datetime="{datetime_str}",""" if datetime_str else ""}{cloud_filter}
    max_items=10,
)

items = list(search.items())
print(f"Found {{len(items)}} items")
for item in items:
    cc = item.properties.get("eo:cloud_cover", "N/A")
    print(f"  {{item.id}} - cloud cover: {{cc}}%")
'''
        return code

    def _analyze_node(self, state: AgentState) -> AgentState:
        """Analysis node - perform geospatial analysis on data.

        Args:
            state: Current agent state

        Returns:
            Updated state with analysis results
        """
        logger.debug("Executing analysis node")

        try:
            if state["plan"] and state["data"]:
                analysis = self.analysis_agent.analyze(state["plan"], state["data"])
                state["analysis"] = analysis
                state["code"] += analysis.code_generated + "\n"

                if not analysis.success:
                    state["error"] = analysis.error_message

                logger.debug(f"Analysis completed: success={analysis.success}")
            else:
                state["error"] = "Missing plan or data for analysis"

        except Exception as e:
            state["error"] = f"Analysis failed: {e}"
            logger.error(state["error"])

        return state

    def _visualize_node(self, state: AgentState) -> AgentState:
        """Visualization node - create map visualization.

        Args:
            state: Current agent state

        Returns:
            Updated state with map
        """
        logger.debug("Executing visualization node")

        try:
            if state["plan"]:
                viz_map = self.viz_agent.create_visualization(
                    state["plan"], state["data"], state["analysis"]
                )
                state["map"] = viz_map

                # Add visualization code
                state["code"] += self._generate_viz_code(state["plan"], state["data"])

                logger.debug("Map visualization created")
            else:
                state["error"] = "Missing plan for visualization"

        except Exception as e:
            state["error"] = f"Visualization failed: {e}"
            logger.error(state["error"])

        return state

    def _generate_viz_code(self, plan: PlannerOutput, data: Any) -> str:
        """Generate reproducible visualization code.

        Args:
            plan: Query plan
            data: Data result

        Returns:
            Python code string for visualization
        """
        if not data or not data.items:
            return ""

        item = data.items[0]
        item_id = item.get("id", "")
        collection = item.get("collection", "")

        if not collection:
            return ""

        # Determine assets
        assets = item.get("assets", {})
        if "visual" in assets:
            assets_str = '"visual"'
        elif "B04" in assets and "B03" in assets and "B02" in assets:
            assets_str = '["B04", "B03", "B02"]'
        elif "red" in assets and "green" in assets and "blue" in assets:
            assets_str = '["red", "green", "blue"]'
        else:
            assets_str = '"visual"'

        code = f"""
# Visualize on an interactive map
import leafmap.maplibregl as leafmap

m = leafmap.Map()
m.add_stac_layer(
    collection="{collection}",
    item="{item_id}",
    assets={assets_str},
    titiler_endpoint="planetary-computer",
    name="{item_id}",
    fit_bounds=True,
)
m
"""
        return code

    def _should_analyze(self, state: AgentState) -> bool:
        """Conditional edge function to determine if analysis is needed.

        Args:
            state: Current agent state

        Returns:
            True if analysis should be performed
        """
        return (
            state["should_analyze"]
            and state["data"] is not None
            and state["data"].total_items > 0
            and state["error"] is None
        )

    def _should_visualize(self, state: AgentState) -> bool:
        """Conditional edge function to determine if visualization is needed.

        Args:
            state: Current agent state

        Returns:
            True if visualization should be performed
        """
        return state["should_visualize"] and state["error"] is None

    def _parse_query(self, query: str) -> PlannerOutput:
        """Parse natural language query into structured plan.

        Uses LLM for intent/parameter extraction, then geocodes the location.
        Falls back to regex-based parsing if LLM fails.

        Args:
            query: Natural language query

        Returns:
            PlannerOutput with parsed parameters
        """
        logger.debug(f"Parsing query: {query}")

        try:
            # Use LLM-based planner for robust parsing
            plan = self.planner.parse_query(query)
            logger.info(
                f"LLM parsed: location={plan.location}, time={plan.time_range}, "
                f"dataset={plan.dataset}, params={plan.parameters}"
            )

            # Geocode location name to bbox if needed
            if (
                plan.location
                and "name" in plan.location
                and "bbox" not in plan.location
            ):
                geocoded = self._geocode_location(plan.location["name"])
                if geocoded:
                    plan.location = geocoded
                else:
                    logger.warning(f"Could not geocode: {plan.location['name']}")

            # Post-process: LLM sometimes puts time_range in parameters
            if plan.time_range is None and "time_range" in plan.parameters:
                tr = plan.parameters.pop("time_range")
                if isinstance(tr, (list, tuple)) and len(tr) == 2:
                    plan.time_range = {
                        "start_date": tr[0],
                        "end_date": tr[1],
                    }

            # Post-process: normalize cloud cover thresholds
            cc = plan.parameters.get("cloud_cover")
            if cc is not None:
                # "cloud-free" (0) is unrealistic; use 10% threshold
                if cc <= 0:
                    plan.parameters["cloud_cover"] = 10
                plan.parameters["max_cloud_cover"] = plan.parameters.pop("cloud_cover")

            return plan

        except Exception as e:
            logger.warning(f"LLM planner failed ({e}), falling back to regex parser")
            return self._parse_query_fallback(query)

    def _parse_query_fallback(self, query: str) -> PlannerOutput:
        """Fallback regex-based query parser when LLM is unavailable.

        Args:
            query: Natural language query

        Returns:
            PlannerOutput with parsed parameters
        """
        query_lower = query.lower()
        intent = query.strip()

        location = self._extract_location(query)
        time_range = self._extract_time_range(query_lower)

        dataset = None
        if "sentinel" in query_lower or "sentinel-2" in query_lower:
            dataset = "sentinel-2"
        elif "landsat" in query_lower:
            dataset = "landsat"
        elif "modis" in query_lower:
            dataset = "modis"

        parameters = {}
        if "cloud-free" in query_lower or "cloud free" in query_lower:
            parameters["max_cloud_cover"] = 10
        elif "low cloud" in query_lower or "low-cloud" in query_lower:
            parameters["max_cloud_cover"] = 20
        elif "cloud cover" in query_lower or "cloudy" in query_lower:
            parameters["max_cloud_cover"] = 20

        return PlannerOutput(
            intent=intent,
            location=location,
            time_range=time_range,
            dataset=dataset,
            parameters=parameters,
            confidence=0.5,
        )

    def _geocode_location(self, place_name: str) -> Optional[Dict[str, Any]]:
        """Geocode a place name to a bbox.

        Args:
            place_name: Place name string (e.g., "Knoxville", "San Francisco")

        Returns:
            Location dict with bbox and name, or None
        """
        try:
            from geopy.geocoders import Nominatim

            geolocator = Nominatim(user_agent="geoagent")
            result = geolocator.geocode(place_name, exactly_one=True, timeout=5)

            if result:
                lat, lon = result.latitude, result.longitude
                bbox = [lon - 0.1, lat - 0.1, lon + 0.1, lat + 0.1]
                name = result.address.split(",")[0]
                logger.info(f"Geocoded '{place_name}' -> {name} ({lat:.4f}, {lon:.4f})")
                return {"bbox": bbox, "name": name}
        except ImportError:
            logger.warning("geopy not installed")
        except Exception as e:
            logger.warning(f"Geocoding failed for '{place_name}': {e}")

        # Try fallback city lookup
        return self._extract_location_fallback(place_name.lower())

    def _extract_location(self, query: str) -> Optional[Dict[str, Any]]:
        """Extract location from query using geocoding (for regex fallback parser).

        Tries to find a place name in the query and geocode it to a bbox.

        Args:
            query: Natural language query

        Returns:
            Location dict with bbox and name, or None
        """
        try:
            from geopy.geocoders import Nominatim

            geolocator = Nominatim(user_agent="geoagent")

            # Try to extract place name - remove common non-location words
            import re

            # Remove analysis terms to isolate location
            cleaned = re.sub(
                r"\b(show|display|compute|calculate|analyze|find|get|plot|map|"
                r"ndvi|evi|savi|imagery|image|images|satellite|sentinel-?\d*|landsat|"
                r"modis|for|in|of|the|from|during|between|and|with|using|"
                r"cloud[- ]?free|low[- ]?cloud|cloud\s*cover|cloudy|"
                r"recent|latest|best|high[- ]?resolution|"
                r"january|february|march|april|may|june|july|august|"
                r"september|october|november|december|"
                r"jan|feb|mar|apr|jun|jul|aug|sep|oct|nov|dec|"
                r"\d{4})\b",
                "",
                query,
                flags=re.IGNORECASE,
            ).strip()

            # Clean up extra whitespace
            cleaned = re.sub(r"\s+", " ", cleaned).strip(" ,.-")

            if not cleaned or len(cleaned) < 2:
                logger.debug("No location found in query")
                return None

            logger.debug(f"Geocoding: '{cleaned}'")
            result = geolocator.geocode(cleaned, exactly_one=True, timeout=5)

            if result:
                lat, lon = result.latitude, result.longitude
                # Create bbox around the point (~0.1 degrees ≈ 10km)
                bbox = [lon - 0.1, lat - 0.1, lon + 0.1, lat + 0.1]
                name = result.address.split(",")[0]
                logger.info(f"Geocoded '{cleaned}' -> {name} ({lat:.4f}, {lon:.4f})")
                return {"bbox": bbox, "name": name}
            else:
                logger.warning(f"Could not geocode: '{cleaned}'")
                return None

        except ImportError:
            logger.warning("geopy not installed, using fallback location parsing")
            return self._extract_location_fallback(query.lower())
        except Exception as e:
            logger.warning(f"Geocoding failed: {e}")
            return self._extract_location_fallback(query.lower())

    def _extract_location_fallback(self, query_lower: str) -> Optional[Dict[str, Any]]:
        """Fallback location extraction using hardcoded city lookups.

        Args:
            query_lower: Lowercased query string

        Returns:
            Location dict or None
        """
        cities = {
            "san francisco": {
                "bbox": [-122.5, 37.7, -122.3, 37.8],
                "name": "San Francisco",
            },
            "new york": {"bbox": [-74.1, 40.6, -73.9, 40.8], "name": "New York"},
            "los angeles": {
                "bbox": [-118.4, 33.9, -118.1, 34.1],
                "name": "Los Angeles",
            },
            "chicago": {"bbox": [-87.8, 41.7, -87.5, 42.0], "name": "Chicago"},
            "seattle": {"bbox": [-122.4, 47.5, -122.2, 47.7], "name": "Seattle"},
            "denver": {"bbox": [-105.1, 39.6, -104.8, 39.8], "name": "Denver"},
            "houston": {"bbox": [-95.5, 29.6, -95.2, 29.9], "name": "Houston"},
            "miami": {"bbox": [-80.3, 25.7, -80.1, 25.9], "name": "Miami"},
            "california": {"bbox": [-124.4, 32.5, -114.1, 42.0], "name": "California"},
        }
        for city, loc in cities.items():
            if city in query_lower:
                return loc
        return None

    def _extract_time_range(self, query_lower: str) -> Optional[Dict[str, str]]:
        """Extract time range from query text.

        Handles patterns like 'July 2024', 'in 2025', 'June 2023', etc.

        Args:
            query_lower: Lowercased query string

        Returns:
            Dict with start_date and end_date, or None
        """
        import re

        months = {
            "january": ("01", "31"),
            "jan": ("01", "31"),
            "february": ("02", "28"),
            "feb": ("02", "28"),
            "march": ("03", "31"),
            "mar": ("03", "31"),
            "april": ("04", "30"),
            "apr": ("04", "30"),
            "may": ("05", "31"),
            "june": ("06", "30"),
            "jun": ("06", "30"),
            "july": ("07", "31"),
            "jul": ("07", "31"),
            "august": ("08", "31"),
            "aug": ("08", "31"),
            "september": ("09", "30"),
            "sep": ("09", "30"),
            "october": ("10", "31"),
            "oct": ("10", "31"),
            "november": ("11", "30"),
            "nov": ("11", "30"),
            "december": ("12", "31"),
            "dec": ("12", "31"),
        }

        # Match "Month YYYY" or "YYYY" patterns
        for month_name, (month_num, last_day) in months.items():
            pattern = rf"\b{month_name}\s+(\d{{4}})\b"
            match = re.search(pattern, query_lower)
            if match:
                year = match.group(1)
                return {
                    "start_date": f"{year}-{month_num}-01",
                    "end_date": f"{year}-{month_num}-{last_day}",
                }

        # Match bare year
        year_match = re.search(r"\b(20\d{2})\b", query_lower)
        if year_match:
            year = year_match.group(1)
            return {"start_date": f"{year}-01-01", "end_date": f"{year}-12-31"}

        return None
__init__(self, llm=None, provider=None, model=None, catalogs=None) special

Initialize GeoAgent with LLM and configuration.

Parameters:

Name Type Description Default
llm Optional[Any]

Language model instance. If None, uses get_default_llm()

None
provider Optional[str]

LLM provider name (e.g., 'openai', 'anthropic')

None
model Optional[str]

Specific model name

None
catalogs Optional[List[str]]

List of STAC catalog URLs to search

None
Source code in geoagent/core/agent.py
def __init__(
    self,
    llm: Optional[Any] = None,
    provider: Optional[str] = None,
    model: Optional[str] = None,
    catalogs: Optional[List[str]] = None,
):
    """Initialize GeoAgent with LLM and configuration.

    Args:
        llm: Language model instance. If None, uses get_default_llm()
        provider: LLM provider name (e.g., 'openai', 'anthropic')
        model: Specific model name
        catalogs: List of STAC catalog URLs to search
    """
    self.llm = llm or get_default_llm()
    self.provider = provider
    self.model = model
    self.catalogs = catalogs or []

    # Fetch available collections from the Planetary Computer STAC (with fallback)
    try:
        self.collection_index = get_collection_index()
    except Exception as e:
        logger.warning(
            f"Failed to fetch collection index: {e}. Proceeding without it."
        )
        self.collection_index = []

    # Initialize specialized agents
    self.planner = Planner(self.llm, collections=self.collection_index)
    self.data_agent = DataAgent(self.llm)
    self.analysis_agent = AnalysisAgent(self.llm)
    self.viz_agent = VizAgent(self.llm)

    # Initialize workflow graph
    self.workflow = self._create_workflow()

    logger.info("GeoAgent initialized successfully")
analyze(self, query)

Shortcut method for search + analysis without visualization.

Parameters:

Name Type Description Default
query str

Natural language analysis query

required

Returns:

Type Description
GeoAgentResponse

GeoAgentResponse with data and analysis results

Source code in geoagent/core/agent.py
def analyze(self, query: str) -> GeoAgentResponse:
    """Shortcut method for search + analysis without visualization.

    Args:
        query: Natural language analysis query

    Returns:
        GeoAgentResponse with data and analysis results
    """
    logger.info(f"Analysis for: {query}")

    try:
        # Parse query
        plan = self._parse_query(query)

        # Search data
        data = self.data_agent.search_data(plan)

        # Perform analysis
        analysis = self.analysis_agent.analyze(plan, data)

        response = GeoAgentResponse(
            plan=plan,
            data=data,
            analysis=analysis,
            code=analysis.code_generated,
            success=analysis.success,
            error_message=analysis.error_message,
        )

        logger.info("Analysis completed")
        return response

    except Exception as e:
        logger.error(f"Analysis failed: {e}")
        return GeoAgentResponse(
            plan=PlannerOutput(intent=query, confidence=0.0),
            success=False,
            error_message=str(e),
        )
chat(self, query)

Main method to process a natural language query.

Parameters:

Name Type Description Default
query str

Natural language geospatial analysis query

required

Returns:

Type Description
GeoAgentResponse

GeoAgentResponse with complete pipeline results

Source code in geoagent/core/agent.py
def chat(self, query: str) -> GeoAgentResponse:
    """Main method to process a natural language query.

    Args:
        query: Natural language geospatial analysis query

    Returns:
        GeoAgentResponse with complete pipeline results
    """
    logger.info(f"Processing query: {query}")
    start_time = time.time()

    try:
        # Initialize state
        initial_state = AgentState(
            query=query,
            plan=None,
            data=None,
            analysis=None,
            map=None,
            code="",
            error=None,
            should_analyze=True,
            should_visualize=True,
        )

        # Execute workflow
        if LANGGRAPH_AVAILABLE and self.workflow:
            final_state = self.workflow.invoke(initial_state)
        else:
            # Fallback to sequential execution
            final_state = self._sequential_execution(initial_state)

        # Create response
        execution_time = time.time() - start_time

        response = GeoAgentResponse(
            plan=final_state["plan"],
            data=final_state["data"],
            analysis=final_state["analysis"],
            map=final_state["map"],
            code=final_state["code"],
            success=final_state["error"] is None,
            error_message=final_state["error"],
            execution_time=execution_time,
        )

        logger.info(f"Query processed successfully in {execution_time:.2f}s")
        return response

    except Exception as e:
        execution_time = time.time() - start_time
        logger.error(f"Query processing failed: {e}")

        return GeoAgentResponse(
            plan=PlannerOutput(intent=query, confidence=0.0),
            success=False,
            error_message=str(e),
            execution_time=execution_time,
        )
search(self, query)

Shortcut method to just search for data without analysis.

Parameters:

Name Type Description Default
query str

Natural language data search query

required

Returns:

Type Description
DataResult

DataResult with found data

Source code in geoagent/core/agent.py
def search(self, query: str) -> DataResult:
    """Shortcut method to just search for data without analysis.

    Args:
        query: Natural language data search query

    Returns:
        DataResult with found data
    """
    logger.info(f"Data search for: {query}")

    try:
        # Parse query into plan
        plan = self._parse_query(query)

        # Search for data
        data_result = self.data_agent.search_data(plan)

        logger.info(f"Found {data_result.total_items} data items")
        return data_result

    except Exception as e:
        logger.error(f"Data search failed: {e}")
        return DataResult(
            items=[], metadata={"error": str(e)}, data_type="unknown", total_items=0
        )
visualize(self, query)

Run full pipeline including MapLibre GL visualization.

Parameters:

Name Type Description Default
query str

Natural language query for complete analysis

required

Returns:

Type Description
GeoAgentResponse

GeoAgentResponse with MapLibre map visualization

Source code in geoagent/core/agent.py
def visualize(self, query: str) -> GeoAgentResponse:
    """Run full pipeline including MapLibre GL visualization.

    Args:
        query: Natural language query for complete analysis

    Returns:
        GeoAgentResponse with MapLibre map visualization
    """
    return self.chat(query)  # Full pipeline is the same as chat