Skip to content

Analysis Agent

The Analysis Agent performs geospatial analysis with transparent, reproducible code generation.

geoagent.core.analysis_agent

Analysis Agent for performing geospatial analysis operations.

The Analysis Agent takes data references and performs various geospatial analyses while generating transparent Python code for reproducibility.

AnalysisAgent

Agent responsible for geospatial analysis operations.

The Analysis Agent performs raster and vector analysis operations and generates Python code showing exactly what was computed.

Source code in geoagent/core/analysis_agent.py
class AnalysisAgent:
    """Agent responsible for geospatial analysis operations.

    The Analysis Agent performs raster and vector analysis operations
    and generates Python code showing exactly what was computed.
    """

    def __init__(self, llm: Any, tools: Optional[Dict[str, Any]] = None):
        """Initialize the Analysis Agent.

        Args:
            llm: Language model instance for analysis planning
            tools: Dictionary of available analysis tools (raster, vector, etc.)
        """
        self.llm = llm
        self.tools = tools or {}
        self._setup_tools()

    def _setup_tools(self):
        """Setup and initialize analysis tools."""
        try:
            # TODO: Import and register RasterAnalysisTool and VectorAnalysisTool
            # once the tool implementations are finalized.

            logger.info("Analysis tools setup (using placeholders)")

        except ImportError as e:
            logger.warning(f"Some analysis tools not available: {e}")
            # Graceful fallback - tools will be added when available

    def analyze(self, plan: PlannerOutput, data: DataResult) -> AnalysisResult:
        """Perform analysis on the provided data.

        Args:
            plan: Original query plan with analysis intent
            data: Data retrieved by the Data Agent

        Returns:
            AnalysisResult with computed results and generated code
        """
        logger.info(f"Starting analysis for intent: {plan.intent}")

        try:
            # Determine analysis type based on intent and data type
            analysis_type = self._determine_analysis_type(plan, data)

            if analysis_type == "land_cover":
                return self._handle_land_cover(plan, data)
            elif analysis_type == "elevation":
                return self._handle_elevation(plan, data)
            elif analysis_type == "spectral_index":
                return self._compute_spectral_index(plan, data)
            elif analysis_type == "zonal_statistics":
                return self._compute_zonal_statistics(plan, data)
            elif analysis_type == "time_series":
                return self._compute_time_series(plan, data)
            elif analysis_type == "change_detection":
                return self._compute_change_detection(plan, data)
            elif analysis_type == "vector_analysis":
                return self._perform_vector_analysis(plan, data)
            else:
                return self._perform_general_analysis(plan, data)

        except Exception as e:
            logger.error(f"Analysis failed: {e}")
            return AnalysisResult(
                result_data={"error": str(e)},
                code_generated=f"# Analysis failed: {e}",
                success=False,
                error_message=str(e),
            )

    def _determine_analysis_type(self, plan: PlannerOutput, data: DataResult) -> str:
        """Determine the type of analysis to perform.

        Args:
            plan: Query plan with intent
            data: Available data

        Returns:
            Analysis type string
        """
        intent = plan.intent.lower()
        analysis_type_hint = (plan.analysis_type or "").lower()

        # Check plan.analysis_type first (most reliable signal from planner)
        if analysis_type_hint in ("land_cover", "classification", "lulc"):
            return "land_cover"
        if analysis_type_hint in ("elevation", "dem", "terrain", "slope", "hillshade"):
            return "elevation"
        if analysis_type_hint in ("ndvi", "evi", "savi", "ndbi", "ndwi", "mndwi"):
            return "spectral_index"

        # Land cover analysis (from intent keywords)
        if any(
            term in intent
            for term in [
                "land_cover",
                "land cover",
                "landcover",
                "lulc",
                "land use",
                "classification",
            ]
        ):
            return "land_cover"

        # DEM / elevation analysis (from intent keywords)
        if any(
            term in intent
            for term in [
                "dem",
                "elevation",
                "terrain",
                "slope",
                "hillshade",
                "topograph",
            ]
        ):
            return "elevation"

        # Spectral index analysis
        if any(
            index in intent
            for index in ["ndvi", "evi", "savi", "ndbi", "spectral", "index"]
        ):
            return "spectral_index"

        # Zonal statistics
        if any(
            term in intent
            for term in ["zonal", "statistics", "mean", "median", "sum", "aggregate"]
        ):
            return "zonal_statistics"

        # Time series analysis
        if any(
            term in intent
            for term in ["time series", "temporal", "trend", "change over time"]
        ):
            return "time_series"

        # Change detection
        if any(
            term in intent
            for term in ["change", "detection", "difference", "before", "after"]
        ):
            return "change_detection"

        # Vector operations
        if data.data_type == "vector" or any(
            term in intent for term in ["buffer", "intersect", "union", "clip"]
        ):
            return "vector_analysis"

        return "general"

    def _compute_spectral_index(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Compute spectral indices from satellite imagery.

        Args:
            plan: Query plan with index specifications
            data: Satellite imagery data

        Returns:
            AnalysisResult with computed indices
        """
        intent = plan.intent.lower()

        # Determine which index to compute
        if "ndvi" in intent:
            index_type = "ndvi"
        elif "evi" in intent:
            index_type = "evi"
        elif "savi" in intent:
            index_type = "savi"
        else:
            index_type = "ndvi"  # Default

        # Try real computation first
        if index_type == "ndvi" and data.items:
            try:
                return self._compute_ndvi_real(plan, data)
            except Exception as e:
                logger.warning(f"Real NDVI computation failed: {e}")

        # Fall back to mock if real computation fails
        return self._create_mock_analysis(index_type, data)

    def _compute_ndvi_real(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Actually compute NDVI from STAC item COG bands.

        Uses rasterio to read red/NIR bands and compute NDVI,
        then saves result as a temporary GeoTIFF for visualization.
        """
        import numpy as np
        import rasterio
        from rasterio.windows import from_bounds

        # Find first item with red and nir bands
        # Planetary Computer uses B04 (red) and B08 (NIR); other catalogs use
        # lowercase "red"/"nir"
        RED_KEYS = ["red", "B04", "b04", "Red"]
        NIR_KEYS = ["nir", "B08", "b08", "Nir", "NIR"]

        item = None
        red_key = None
        nir_key = None
        for candidate in data.items:
            assets = candidate.get("assets", {})
            found_red = next((k for k in RED_KEYS if k in assets), None)
            found_nir = next((k for k in NIR_KEYS if k in assets), None)
            if found_red and found_nir:
                red_href = assets[found_red].get("href", "")
                nir_href = assets[found_nir].get("href", "")
                if red_href.startswith("http") and nir_href.startswith("http"):
                    item = candidate
                    red_key = found_red
                    nir_key = found_nir
                    break

        if item is None:
            raise ValueError("No STAC item found with red and nir COG bands")

        red_url = item["assets"][red_key]["href"]
        nir_url = item["assets"][nir_key]["href"]
        item_id = item.get("id", "unknown")

        logger.info(f"Computing NDVI for item {item_id}")

        # Read within bbox if available (windowed read for performance)
        bbox_wgs84 = None
        if plan.location and "bbox" in plan.location:
            bbox_wgs84 = plan.location["bbox"]

        with rasterio.open(red_url) as red_src:
            if bbox_wgs84:
                # Transform bbox from WGS84 to raster CRS if needed
                from pyproj import Transformer

                raster_crs = red_src.crs
                if raster_crs and str(raster_crs) != "EPSG:4326":
                    transformer = Transformer.from_crs(
                        "EPSG:4326", raster_crs, always_xy=True
                    )
                    x_min, y_min = transformer.transform(bbox_wgs84[0], bbox_wgs84[1])
                    x_max, y_max = transformer.transform(bbox_wgs84[2], bbox_wgs84[3])
                    bbox_native = [x_min, y_min, x_max, y_max]
                else:
                    bbox_native = bbox_wgs84

                window = from_bounds(*bbox_native, red_src.transform)
                # Ensure window has positive dimensions
                window = window.round_offsets().round_lengths()
                if window.width < 1 or window.height < 1:
                    raise ValueError("Bbox too small for raster resolution")
                red = red_src.read(1, window=window).astype("float32")
                win_transform = red_src.window_transform(window)
            else:
                red = red_src.read(1).astype("float32")
                win_transform = red_src.transform
                window = None
            red_profile = red_src.profile.copy()

        with rasterio.open(nir_url) as nir_src:
            if window is not None:
                nir = nir_src.read(1, window=window).astype("float32")
            else:
                nir = nir_src.read(1).astype("float32")

        # Compute NDVI
        with np.errstate(divide="ignore", invalid="ignore"):
            ndvi = (nir - red) / (nir + red)
            ndvi = np.where(np.isfinite(ndvi), ndvi, np.nan)

        # Compute statistics
        valid = ndvi[np.isfinite(ndvi)]
        stats = {
            "mean": float(np.nanmean(valid)) if len(valid) > 0 else None,
            "min": float(np.nanmin(valid)) if len(valid) > 0 else None,
            "max": float(np.nanmax(valid)) if len(valid) > 0 else None,
            "std": float(np.nanstd(valid)) if len(valid) > 0 else None,
            "pixels": int(len(valid)),
        }

        # Save NDVI as temporary GeoTIFF
        ndvi_path = os.path.join(tempfile.gettempdir(), f"ndvi_{item_id}.tif")
        out_profile = red_profile.copy()
        out_profile.update(
            dtype="float32",
            count=1,
            driver="GTiff",
            compress="deflate",
            height=ndvi.shape[0],
            width=ndvi.shape[1],
            transform=win_transform,
            nodata=np.nan,
        )
        with rasterio.open(ndvi_path, "w", **out_profile) as dst:
            dst.write(ndvi, 1)

        logger.info(f"NDVI saved to {ndvi_path}, mean={stats['mean']:.3f}")

        # Generate reproducible code
        code = f"""import os
import rasterio
import numpy as np
from rasterio.windows import from_bounds
from pyproj import Transformer

os.environ["AWS_NO_SIGN_REQUEST"] = "YES"

red_url = "{red_url}"
nir_url = "{nir_url}"
bbox_wgs84 = {bbox_wgs84}  # [west, south, east, north] in EPSG:4326

# Read red and NIR bands (windowed read within bbox)
with rasterio.open(red_url) as red_src:
    # Transform bbox to raster CRS
    transformer = Transformer.from_crs("EPSG:4326", red_src.crs, always_xy=True)
    x_min, y_min = transformer.transform(bbox_wgs84[0], bbox_wgs84[1])
    x_max, y_max = transformer.transform(bbox_wgs84[2], bbox_wgs84[3])
    window = from_bounds(x_min, y_min, x_max, y_max, red_src.transform)
    red = red_src.read(1, window=window).astype("float32")
    profile = red_src.profile.copy()
    transform = red_src.window_transform(window)

with rasterio.open(nir_url) as nir_src:
    nir = nir_src.read(1, window=window).astype("float32")

# Compute NDVI = (NIR - Red) / (NIR + Red)
ndvi = (nir - red) / (nir + red)
ndvi = np.where(np.isfinite(ndvi), ndvi, np.nan)

print(f"NDVI shape: {{ndvi.shape}}")
print(f"NDVI range: {{np.nanmin(ndvi):.3f}} to {{np.nanmax(ndvi):.3f}}")
print(f"NDVI mean: {{np.nanmean(ndvi):.3f}}")
"""

        result_data = {
            "analysis_type": "ndvi",
            "item_id": item_id,
            "ndvi_stats": stats,
            "ndvi_path": ndvi_path,
        }

        viz_hints = {
            "type": "ndvi",
            "ndvi_path": ndvi_path,
            "colormap": "RdYlGn",
            "vmin": -0.2,
            "vmax": 0.8,
            "title": f"NDVI - {item_id}",
        }

        return AnalysisResult(
            result_data=result_data,
            code_generated=code,
            visualization_hints=viz_hints,
        )

    def _handle_land_cover(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Handle land cover data - passthrough with appropriate viz hints.

        Args:
            plan: Query plan
            data: Land cover data from STAC

        Returns:
            AnalysisResult with land cover viz configuration
        """
        location_name = ""
        if plan.location:
            location_name = plan.location.get("name", "")

        result_data = {
            "analysis_type": "land_cover",
            "data_type": "categorical",
            "items_found": data.total_items,
            "collection": (data.items[0].get("collection", "") if data.items else ""),
        }

        bbox = plan.location.get("bbox") if plan.location else None
        code = f"""import planetary_computer
import leafmap.maplibregl as leafmap
from pystac_client import Client

# Search for land cover data - {location_name}
catalog = Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=planetary_computer.sign_inplace,
)

search = catalog.search(
    collections=["io-lulc-9-class"],
    bbox={bbox},
    max_items=1,
)

items = list(search.items())
print(f"Found {{len(items)}} land cover items")

# Visualize
m = leafmap.Map()
if items:
    item = items[0]
    m.add_stac_layer(
        collection="io-lulc-9-class",
        item=item.id,
        assets=["data"],
        titiler_endpoint="planetary-computer",
        name="Land Cover",
        fit_bounds=True,
    )
m
"""

        viz_hints = {
            "type": "land_cover",
            "asset_key": "data",
            "title": f"Land Cover - {location_name}" if location_name else "Land Cover",
        }

        return AnalysisResult(
            result_data=result_data,
            code_generated=code,
            visualization_hints=viz_hints,
        )

    def _handle_elevation(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Handle DEM/elevation data - passthrough with appropriate viz hints.

        Args:
            plan: Query plan
            data: DEM data from STAC

        Returns:
            AnalysisResult with elevation viz configuration
        """
        location_name = ""
        if plan.location:
            location_name = plan.location.get("name", "")

        result_data = {
            "analysis_type": "elevation",
            "data_type": "continuous",
            "items_found": data.total_items,
            "collection": (data.items[0].get("collection", "") if data.items else ""),
        }

        bbox = plan.location.get("bbox") if plan.location else None
        code = f"""import planetary_computer
import leafmap.maplibregl as leafmap
from pystac_client import Client

# Search for DEM data - {location_name}
catalog = Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=planetary_computer.sign_inplace,
)

search = catalog.search(
    collections=["cop-dem-glo-30"],
    bbox={bbox},
    max_items=1,
)

items = list(search.items())
print(f"Found {{len(items)}} DEM items")

# Visualize
m = leafmap.Map()
if items:
    item = items[0]
    m.add_stac_layer(
        collection="cop-dem-glo-30",
        item=item.id,
        assets=["data"],
        titiler_endpoint="planetary-computer",
        name="Elevation (DEM)",
        fit_bounds=True,
    )
m
"""

        viz_hints = {
            "type": "elevation",
            "colormap": "terrain",
            "asset_key": "data",
            "title": (
                f"Elevation (DEM) - {location_name}"
                if location_name
                else "Elevation (DEM)"
            ),
        }

        return AnalysisResult(
            result_data=result_data,
            code_generated=code,
            visualization_hints=viz_hints,
        )

    def _compute_zonal_statistics(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Compute zonal statistics for areas of interest.

        Args:
            plan: Query plan with zones specification
            data: Raster or vector data

        Returns:
            AnalysisResult with computed statistics
        """
        if "raster" in self.tools:
            try:
                raster_tool = self.tools["raster"]

                # Extract zones from plan parameters or location
                zones = plan.parameters.get("zones") or plan.location

                result = raster_tool.zonal_statistics(data.items, zones)
                code = self._generate_zonal_code(data.items, zones)

                viz_hints = {
                    "type": "choropleth",
                    "color_column": "mean",
                    "colormap": "viridis",
                }

                return AnalysisResult(
                    result_data=result,
                    code_generated=code,
                    visualization_hints=viz_hints,
                )

            except Exception as e:
                logger.error(f"Zonal statistics failed: {e}")
                return self._create_mock_analysis("zonal", data)
        else:
            return self._create_mock_analysis("zonal", data)

    def _compute_time_series(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Compute time series analysis from temporal data.

        Args:
            plan: Query plan with temporal specifications
            data: Time series data

        Returns:
            AnalysisResult with time series analysis
        """
        if "raster" in self.tools:
            try:
                raster_tool = self.tools["raster"]

                # Extract location for time series
                location = plan.location

                result = raster_tool.time_series_analysis(data.items, location)
                code = self._generate_timeseries_code(data.items, location)

                viz_hints = {
                    "type": "time_series",
                    "x_column": "date",
                    "y_column": "value",
                    "title": "Time Series Analysis",
                }

                return AnalysisResult(
                    result_data=result,
                    code_generated=code,
                    visualization_hints=viz_hints,
                )

            except Exception as e:
                logger.error(f"Time series analysis failed: {e}")
                return self._create_mock_analysis("time_series", data)
        else:
            return self._create_mock_analysis("time_series", data)

    def _compute_change_detection(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Perform change detection analysis.

        Args:
            plan: Query plan with change detection parameters
            data: Multi-temporal data

        Returns:
            AnalysisResult with change analysis
        """
        if "raster" in self.tools:
            try:
                raster_tool = self.tools["raster"]

                result = raster_tool.change_detection(data.items)
                code = self._generate_change_code(data.items)

                viz_hints = {
                    "type": "change_map",
                    "colormap": "RdYlBu",
                    "center_zero": True,
                }

                return AnalysisResult(
                    result_data=result,
                    code_generated=code,
                    visualization_hints=viz_hints,
                )

            except Exception as e:
                logger.error(f"Change detection failed: {e}")
                return self._create_mock_analysis("change", data)
        else:
            return self._create_mock_analysis("change", data)

    def _perform_vector_analysis(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Perform vector analysis operations.

        Args:
            plan: Query plan with vector operations
            data: Vector data

        Returns:
            AnalysisResult with vector analysis results
        """
        if "vector" in self.tools:
            try:
                vector_tool = self.tools["vector"]

                # Determine vector operation
                intent = plan.intent.lower()
                if "buffer" in intent:
                    distance = plan.parameters.get("distance", 1000)
                    result = vector_tool.buffer(data.items, distance)
                    code = self._generate_buffer_code(data.items, distance)
                elif "intersect" in intent:
                    result = vector_tool.intersect(data.items)
                    code = self._generate_intersect_code(data.items)
                else:
                    result = vector_tool.general_analysis(data.items, plan.intent)
                    code = self._generate_general_vector_code(data.items, plan.intent)

                viz_hints = {
                    "type": "vector_map",
                    "style": {"color": "blue", "weight": 2},
                }

                return AnalysisResult(
                    result_data=result,
                    code_generated=code,
                    visualization_hints=viz_hints,
                )

            except Exception as e:
                logger.error(f"Vector analysis failed: {e}")
                return self._create_mock_analysis("vector", data)
        else:
            return self._create_mock_analysis("vector", data)

    def _perform_general_analysis(
        self, plan: PlannerOutput, data: DataResult
    ) -> AnalysisResult:
        """Perform general analysis when specific type cannot be determined.

        Args:
            plan: Query plan
            data: Available data

        Returns:
            AnalysisResult with general analysis
        """
        # Basic data summary
        result = {
            "data_summary": {
                "total_items": data.total_items,
                "data_type": data.data_type,
                "intent": plan.intent,
            },
            "items_preview": data.items[:5] if data.items else [],
        }

        code = f"""# General analysis for: {plan.intent}
# Data type: {data.data_type}
# Total items: {data.total_items}

import json
data_summary = {json.dumps(result, indent=2)}
print("Analysis Summary:", data_summary)
"""

        viz_hints = {"type": "summary", "show_data_info": True}

        return AnalysisResult(
            result_data=result, code_generated=code, visualization_hints=viz_hints
        )

    def _generate_index_code(self, items: List[Dict], index_type: str) -> str:
        """Generate Python code for spectral index calculation.

        Args:
            items: STAC items or data references
            index_type: Type of spectral index

        Returns:
            Generated Python code string
        """
        if index_type == "ndvi":
            return f"""# NDVI Calculation
import rasterio
import numpy as np

# Load red and NIR bands from STAC items
items = {json.dumps(items[:2], indent=2)}

def calculate_ndvi(red_band, nir_band):
    \"\"\"Calculate NDVI from red and NIR bands.\"\"\"
    ndvi = (nir_band - red_band) / (nir_band + red_band)
    return ndvi

# Process each item
for item in items:
    red_asset = item['assets']['red']['href']
    nir_asset = item['assets']['nir']['href']

    with rasterio.open(red_asset) as red_src:
        red = red_src.read(1).astype(float)

    with rasterio.open(nir_asset) as nir_src:
        nir = nir_src.read(1).astype(float)

    ndvi = calculate_ndvi(red, nir)
    print(f"NDVI calculated for {{item['id']}}")
    print(f"NDVI range: {{np.nanmin(ndvi):.3f}} to {{np.nanmax(ndvi):.3f}}")
"""

        elif index_type == "evi":
            return f"""# EVI Calculation
import rasterio
import numpy as np

# Enhanced Vegetation Index calculation
items = {json.dumps(items[:2], indent=2)}

def calculate_evi(red, nir, blue, G=2.5, C1=6.0, C2=7.5, L=1.0):
    \"\"\"Calculate EVI from RGB and NIR bands.\"\"\"
    evi = G * ((nir - red) / (nir + C1*red - C2*blue + L))
    return evi

# Process items for EVI
for item in items:
    # Load required bands
    red_asset = item['assets']['red']['href']
    nir_asset = item['assets']['nir']['href']
    blue_asset = item['assets']['blue']['href']

    # Calculate EVI
    evi = calculate_evi(red, nir, blue)
    print(f"EVI calculated for {{item['id']}}")
"""

        return f"# {index_type.upper()} calculation code would be generated here"

    def _generate_zonal_code(self, items: List[Dict], zones: Any) -> str:
        """Generate code for zonal statistics."""
        return f"""# Zonal Statistics Calculation
import rasterio
import geopandas as gpd
from rasterio.mask import mask
import numpy as np

# Load raster data
items = {json.dumps(items[:1], indent=2)}
zones = {json.dumps(zones, default=str)}

def zonal_statistics(raster_path, zones_geom):
    \"\"\"Calculate zonal statistics for geometries.\"\"\"
    with rasterio.open(raster_path) as src:
        # Mask raster with each zone
        stats = []
        for zone in zones_geom:
            masked, transform = mask(src, [zone], crop=True)
            data = masked[0]
            valid_data = data[data != src.nodata]

            if len(valid_data) > 0:
                zone_stats = {{
                    'mean': float(np.mean(valid_data)),
                    'median': float(np.median(valid_data)),
                    'std': float(np.std(valid_data)),
                    'min': float(np.min(valid_data)),
                    'max': float(np.max(valid_data))
                }}
            else:
                zone_stats = {{'mean': None, 'median': None}}

            stats.append(zone_stats)

    return stats

# Calculate statistics for each zone
print("Computing zonal statistics...")
"""

    def _generate_timeseries_code(self, items: List[Dict], location: Any) -> str:
        """Generate code for time series analysis."""
        return f"""# Time Series Analysis
import rasterio
import pandas as pd
import numpy as np
from datetime import datetime

# Time series data
items = {json.dumps(items[:5], indent=2)}
location = {json.dumps(location, default=str)}

def extract_time_series(items, point_location):
    \"\"\"Extract time series values at a location.\"\"\"
    ts_data = []

    for item in items:
        # Get date from item
        date_str = item['properties']['datetime']
        date = pd.to_datetime(date_str)

        # Extract value at location
        asset_href = item['assets']['red']['href']  # or chosen band
        with rasterio.open(asset_href) as src:
            # Convert location to raster coordinates
            row, col = src.index(location['lon'], location['lat'])
            value = src.read(1)[row, col]

            ts_data.append({{
                'date': date,
                'value': float(value) if value != src.nodata else None
            }})

    return pd.DataFrame(ts_data)

# Extract time series
ts_df = extract_time_series(items, location)
print("Time series extracted:")
print(ts_df.head())
"""

    def _generate_change_code(self, items: List[Dict]) -> str:
        """Generate code for change detection."""
        return f"""# Change Detection Analysis
import rasterio
import numpy as np

# Multi-temporal data
items = {json.dumps(items[:2], indent=2)}

def detect_change(before_raster, after_raster):
    \"\"\"Compute change between two raster datasets.\"\"\"
    with rasterio.open(before_raster) as src1:
        before = src1.read(1).astype(float)

    with rasterio.open(after_raster) as src2:
        after = src2.read(1).astype(float)

    # Calculate change
    change = after - before
    percent_change = ((after - before) / before) * 100

    return {{
        'absolute_change': change,
        'percent_change': percent_change,
        'change_stats': {{
            'mean_change': float(np.nanmean(change)),
            'std_change': float(np.nanstd(change))
        }}
    }}

# Detect changes between time periods
if len(items) >= 2:
    before_asset = items[0]['assets']['red']['href']
    after_asset = items[-1]['assets']['red']['href']

    change_result = detect_change(before_asset, after_asset)
    print("Change detection completed")
    print(f"Mean change: {{change_result['change_stats']['mean_change']:.3f}}")
"""

    def _generate_buffer_code(self, items: List[Dict], distance: float) -> str:
        """Generate code for buffer analysis."""
        return f"""# Buffer Analysis
import geopandas as gpd

# Vector data
items = {json.dumps(items[:1], indent=2)}
buffer_distance = {distance}

def create_buffer(geometries, distance):
    \"\"\"Create buffer around geometries.\"\"\"
    gdf = gpd.GeoDataFrame(items)
    buffered = gdf.buffer(distance)
    return buffered

# Create buffers
buffered_geoms = create_buffer(items, buffer_distance)
print(f"Created buffers with {{buffer_distance}}m radius")
"""

    def _generate_intersect_code(self, items: List[Dict]) -> str:
        """Generate code for intersection analysis."""
        return f"""# Intersection Analysis
import geopandas as gpd

# Vector data for intersection
items = {json.dumps(items[:2], indent=2)}

def intersect_geometries(geom_list):
    \"\"\"Find intersections between geometries.\"\"\"
    gdf1 = gpd.GeoDataFrame([geom_list[0]])
    gdf2 = gpd.GeoDataFrame([geom_list[1]])

    intersection = gpd.overlay(gdf1, gdf2, how='intersection')
    return intersection

# Perform intersection
intersections = intersect_geometries(items)
print("Intersection analysis completed")
"""

    def _generate_general_vector_code(self, items: List[Dict], intent: str) -> str:
        """Generate general vector analysis code."""
        return f"""# General Vector Analysis: {intent}
import geopandas as gpd
import pandas as pd

# Vector data
items = {json.dumps(items[:3], indent=2)}

def analyze_vector_data(data, intent):
    \"\"\"Perform general vector analysis.\"\"\"
    gdf = gpd.GeoDataFrame(data)

    analysis_result = {{
        'total_features': len(gdf),
        'geometry_types': gdf.geom_type.value_counts().to_dict(),
        'bounds': gdf.total_bounds.tolist(),
        'area_stats': {{
            'mean_area': float(gdf.area.mean()),
            'total_area': float(gdf.area.sum())
        }} if 'Polygon' in gdf.geom_type.values else None
    }}

    return analysis_result

# Analyze vector data
result = analyze_vector_data(items, "{intent}")
print("Vector analysis completed:")
print(result)
"""

    def _get_index_viz_hints(self, index_type: str, result: Dict) -> Dict[str, Any]:
        """Get visualization hints for spectral indices."""
        index_configs = {
            "ndvi": {"colormap": "RdYlGn", "vmin": -1, "vmax": 1, "title": "NDVI"},
            "evi": {"colormap": "Greens", "vmin": -1, "vmax": 1, "title": "EVI"},
            "savi": {"colormap": "YlOrRd", "vmin": -1, "vmax": 1, "title": "SAVI"},
        }

        return index_configs.get(
            index_type, {"colormap": "viridis", "title": index_type.upper()}
        )

    def _create_mock_analysis(
        self, analysis_type: str, data: DataResult
    ) -> AnalysisResult:
        """Create mock analysis result when tools are not available.

        Args:
            analysis_type: Type of analysis attempted
            data: Input data

        Returns:
            Mock AnalysisResult for development
        """
        logger.info(f"Creating mock {analysis_type} analysis result")

        mock_result = {
            "analysis_type": analysis_type,
            "data_processed": data.total_items,
            "mock": True,
            "summary": f"Mock {analysis_type} analysis completed",
        }

        if analysis_type == "ndvi":
            mock_result.update(
                {"ndvi_stats": {"mean": 0.65, "min": -0.2, "max": 0.95, "std": 0.18}}
            )

        mock_code = f"""# Mock {analysis_type} analysis
# This is a placeholder while tools are being developed

import numpy as np
import matplotlib.pyplot as plt

# Mock analysis for {analysis_type}
print("Mock {analysis_type} analysis completed")
result = {mock_result}
"""

        return AnalysisResult(
            result_data=mock_result,
            code_generated=mock_code,
            visualization_hints={"type": analysis_type, "mock": True},
        )
__init__(self, llm, tools=None) special

Initialize the Analysis Agent.

Parameters:

Name Type Description Default
llm Any

Language model instance for analysis planning

required
tools Optional[Dict[str, Any]]

Dictionary of available analysis tools (raster, vector, etc.)

None
Source code in geoagent/core/analysis_agent.py
def __init__(self, llm: Any, tools: Optional[Dict[str, Any]] = None):
    """Initialize the Analysis Agent.

    Args:
        llm: Language model instance for analysis planning
        tools: Dictionary of available analysis tools (raster, vector, etc.)
    """
    self.llm = llm
    self.tools = tools or {}
    self._setup_tools()
analyze(self, plan, data)

Perform analysis on the provided data.

Parameters:

Name Type Description Default
plan PlannerOutput

Original query plan with analysis intent

required
data DataResult

Data retrieved by the Data Agent

required

Returns:

Type Description
AnalysisResult

AnalysisResult with computed results and generated code

Source code in geoagent/core/analysis_agent.py
def analyze(self, plan: PlannerOutput, data: DataResult) -> AnalysisResult:
    """Perform analysis on the provided data.

    Args:
        plan: Original query plan with analysis intent
        data: Data retrieved by the Data Agent

    Returns:
        AnalysisResult with computed results and generated code
    """
    logger.info(f"Starting analysis for intent: {plan.intent}")

    try:
        # Determine analysis type based on intent and data type
        analysis_type = self._determine_analysis_type(plan, data)

        if analysis_type == "land_cover":
            return self._handle_land_cover(plan, data)
        elif analysis_type == "elevation":
            return self._handle_elevation(plan, data)
        elif analysis_type == "spectral_index":
            return self._compute_spectral_index(plan, data)
        elif analysis_type == "zonal_statistics":
            return self._compute_zonal_statistics(plan, data)
        elif analysis_type == "time_series":
            return self._compute_time_series(plan, data)
        elif analysis_type == "change_detection":
            return self._compute_change_detection(plan, data)
        elif analysis_type == "vector_analysis":
            return self._perform_vector_analysis(plan, data)
        else:
            return self._perform_general_analysis(plan, data)

    except Exception as e:
        logger.error(f"Analysis failed: {e}")
        return AnalysisResult(
            result_data={"error": str(e)},
            code_generated=f"# Analysis failed: {e}",
            success=False,
            error_message=str(e),
        )