Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Sentinel-2 burn scar analysis

University of Tasmania
import datetime
import json
import logging
from pathlib import Path

import boto3
import geopandas as gpd
import leafmap
import numpy as np
import rasterio as rio
from botocore import UNSIGNED
from botocore.config import Config
from pystac_client import Client
from rasterio.mask import mask
from rasterio.warp import Resampling, reproject

logging.basicConfig(level=logging.ERROR)
imagery_dir = Path(r"E:\tassie-trees-geospatial\imagery")
if imagery_dir is None:
    imagery_dir = Path("../data/imagery")
if not imagery_dir.exists():
    imagery_dir.mkdir(parents=True, exist_ok=True)
aoi_path = Path("../data/vector/proposed_study_sites.geojson")
aoi_subset = {"column": "region", "value": "Tarkine south"}
start_date = "2025-01-01"
end_date = "2025-03-31"
search_range = 14
start_datetime = datetime.datetime.strptime(start_date, "%Y-%m-%d")
end_datetime = datetime.datetime.strptime(end_date, "%Y-%m-%d")
max_cloud = 5
region_code = None  # confine search to this tile

date_ranges = {
    "pre": [
        (start_datetime - datetime.timedelta(days=search_range)).strftime("%Y-%m-%d"),
        (start_datetime + datetime.timedelta(days=search_range)).strftime("%Y-%m-%d"),
    ],
    "post": [
        (end_datetime - datetime.timedelta(days=search_range)).strftime("%Y-%m-%d"),
        (end_datetime + datetime.timedelta(days=search_range)).strftime("%Y-%m-%d"),
    ],
}

collections = ["ga_s2am_ard_3", "ga_s2bm_ard_3", "ga_s2cm_ard_3"]
stac_api_url = "https://explorer.dea.ga.gov.au/stac/"
client = Client.open(stac_api_url)
aoi = gpd.read_file(aoi_path)
aoi = aoi[aoi[aoi_subset["column"]] == aoi_subset["value"]]
if aoi.empty:
    raise ValueError(f"No AOI found for {aoi_subset['column']} = {aoi_subset['value']}")

if aoi.crs != "EPSG:4326":
    aoi = aoi.to_crs("EPSG:4326")

aoi_dis = aoi.dissolve()
if aoi_dis.empty:
    raise ValueError("AOI geometry is empty after dissolving.")

geom = aoi_dis.geometry.values[0].__geo_interface__
final_items = {}
datasets = []
for date_set, range_minmax in date_ranges.items():
    # TODO: ensure tiles are the same for pre and post dates
    logging.info(
        f"Searching for {date_set} date range: {range_minmax[0]} to {range_minmax[1]}"
    )
    search_offset = 0
    item = None

    while item is None or item.properties["eo:cloud_cover"] > max_cloud:
        if date_set == "pre":
            expanded_start = (
                datetime.datetime.strptime(range_minmax[0], "%Y-%m-%d")
                - datetime.timedelta(days=search_offset)
            ).strftime("%Y-%m-%d")
            expanded_end = range_minmax[1]
        elif date_set == "post":
            expanded_start = range_minmax[0]
            expanded_end = (
                datetime.datetime.strptime(range_minmax[1], "%Y-%m-%d")
                + datetime.timedelta(days=search_offset)
            ).strftime("%Y-%m-%d")

        logging.info(
            f"Searching with expanded range: {expanded_start} to {expanded_end}"
        )

        search = client.search(
            collections=collections,
            datetime=f"{expanded_start}/{expanded_end}",
            intersects=geom,
            max_items=100,
        )

        items = list(search.get_items())
        if not items:
            search_offset += 7
            if search_offset > 365:
                raise ValueError(
                    f"No items found for {date_set} within 1 year. Please check the date range or AOI."
                )
            continue

        item = min(items, key=lambda x: x.properties["eo:cloud_cover"])

        if item.properties["eo:cloud_cover"] > max_cloud:
            search_offset += 7
            if search_offset > 365:
                raise ValueError(
                    f"No items with cloud cover <= {max_cloud}% found for {date_set} within 1 year. Please check the date range or AOI."
                )
            item = None
        else:
            logging.info(
                f"Found item for {date_set} with {item.properties['eo:cloud_cover']}% cloud cover"
            )

    gdf = gpd.GeoDataFrame.from_features([item.to_dict() for item in items])
    if gdf.empty:
        raise ValueError("GeoDataFrame is empty after converting search results.")

    final_items[date_set] = item
    s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
    for asset, properties in item.assets.items():
        if asset.startswith("nbart_") or asset in [
            "oa_s2cloudless_mask",
            "oa_s2cloudless_prob",
        ]:
            bucket = properties.href.split("/")[2]
            key = "/".join(properties.href.split("/")[3:])
            asset_source_basename = key.split("/")[-1]
            local_path = imagery_dir / item.properties["title"] / asset_source_basename
            if local_path.exists():
                logging.info(f"File {local_path} already exists, skipping download.")
            else:
                logging.info(f"Downloading {asset} from {bucket}/{key} to {local_path}")
                local_path.parent.mkdir(parents=True, exist_ok=True)
                s3.download_file(bucket, key, str(local_path))
            datasets.append(
                {
                    "asset": asset,
                    "filepath": Path(local_path).resolve(),
                    "date_group": date_set,
                }
            )
        else:
            logging.info(
                f"Skipping asset {asset} as it is not a NBAR or cloudless asset."
            )

    item_footprint_filename = (
        imagery_dir / item.properties["title"] / f"{item.properties['title']}.geojson"
    )
    gdf[gdf["title"] == item.properties["title"]].to_file(
        item_footprint_filename, driver="GeoJSON"
    )

    asset_metadata_filename = (
        imagery_dir
        / item.properties["title"]
        / f"{item.properties['title']}_assets.json"
    )

    assets_dict = {key: asset.to_dict() for key, asset in item.assets.items()}
    with open(asset_metadata_filename, "w") as f:
        json.dump(assets_dict, f, indent=2)
def calculate_nbr(datasets, date_group):
    group_datasets = [d for d in datasets if d["date_group"] == date_group]

    nir_dataset = next(d for d in group_datasets if d["asset"] == "nbart_nir_2")
    swir_dataset = next(d for d in group_datasets if d["asset"] == "nbart_swir_2")

    with rio.open(nir_dataset["filepath"]) as nir_src:
        nir_data = nir_src.read(1).astype(np.float32)
        profile = nir_src.profile

    with rio.open(swir_dataset["filepath"]) as swir_src:
        swir_data = swir_src.read(1).astype(np.float32)

    # TODO: handle division by zero by filling with tiny non-zero value epsilon = 1e-10
    denominator = nir_data + swir_data
    nbr = np.where(denominator != 0, (nir_data - swir_data) / denominator, np.nan)

    profile.update(dtype=rio.float32, nodata=np.nan)

    output_path = imagery_dir / f"nbr_{date_group}.tif"
    with rio.open(output_path, "w", **profile) as dst:
        dst.write(nbr, 1)

    logging.info(f"NBR for {date_group} saved to {output_path}")
    return output_path, nbr


nbr_pre_path, nbr_pre = calculate_nbr(datasets, "pre")
nbr_post_path, nbr_post = calculate_nbr(datasets, "post")
dnbr = nbr_pre - nbr_post
dnbr_output_path = imagery_dir / "dnbr.tif"

with rio.open(nbr_pre_path) as src:
    profile = src.profile

profile.update(dtype=rio.float32, nodata=np.nan)

with rio.open(dnbr_output_path, "w", **profile) as dst:
    dst.write(dnbr, 1)
dnbr_classes = [
    {
        "class": 1,
        "label": "Enhanced regrowth",
        "min": -0.500,
        "max": -0.251,
    },
    {
        "class": 2,
        "label": "Regrowth",
        "min": -0.250,
        "max": -0.101,
    },
    {"class": 3, "label": "Unburned", "min": -0.100, "max": 0.099},
    {"class": 4, "label": "Low burn", "min": 0.100, "max": 0.269},
    {"class": 5, "label": "Moderate-low burn", "min": 0.270, "max": 0.439},
    {"class": 6, "label": "Moderate-high burn", "min": 0.440, "max": 0.659},
    {"class": 7, "label": "High burn", "min": 0.660, "max": 1.300},
]

dnbr_classified = np.zeros_like(dnbr, dtype=np.uint8)

for cls in dnbr_classes:
    cls_mask = (dnbr >= cls["min"]) & (dnbr <= cls["max"])
    dnbr_classified[cls_mask] = cls["class"]

dnbr_classified_path = imagery_dir / "dnbr_classified.tif"

profile.update(dtype=rio.uint8, nodata=0)

with rio.open(dnbr_classified_path, "w", **profile) as dst:
    dst.write(dnbr_classified, 1)

logging.info(f"Classified dNBR saved to {dnbr_classified_path}")
def create_false_color_composite(datasets, date_group):
    group_datasets = [d for d in datasets if d["date_group"] == date_group]

    green_dataset = next(d for d in group_datasets if d["asset"] == "nbart_green")
    swir2_dataset = next(d for d in group_datasets if d["asset"] == "nbart_swir_2")
    nir2_dataset = next(d for d in group_datasets if d["asset"] == "nbart_nir_2")

    with rio.open(green_dataset["filepath"]) as green_src:
        green_data = green_src.read(1)
        profile = green_src.profile.copy()

    with rio.open(swir2_dataset["filepath"]) as swir2_src:
        swir2_data = np.empty_like(green_data, dtype=np.int16)
        reproject(
            source=swir2_src.read(1),
            destination=swir2_data,
            src_transform=swir2_src.transform,
            src_crs=swir2_src.crs,
            dst_transform=profile["transform"],
            dst_crs=profile["crs"],
            resampling=Resampling.bilinear,
        )

    with rio.open(nir2_dataset["filepath"]) as nir2_src:
        nir2_data = np.empty_like(green_data, dtype=np.int16)
        reproject(
            source=nir2_src.read(1),
            destination=nir2_data,
            src_transform=nir2_src.transform,
            src_crs=nir2_src.crs,
            dst_transform=profile["transform"],
            dst_crs=profile["crs"],
            resampling=Resampling.bilinear,
        )

    def normalize_to_byte(data):
        p2, p98 = np.percentile(data[data > 0], [2, 98])
        stretched = np.clip((data - p2) / (p98 - p2) * 255, 0, 255)
        return stretched.astype(np.uint8)

    swir2_norm = normalize_to_byte(swir2_data)
    nir2_norm = normalize_to_byte(nir2_data)
    green_norm = normalize_to_byte(green_data)

    composite_data = np.stack([swir2_norm, nir2_norm, green_norm], axis=0)

    profile.update(
        count=3,
        dtype=rio.uint8,
        nodata=0,
        compress="jpeg",
        photometric="ycbcr",
        interleave="pixel",
        tiled=True,
        blockxsize=512,
        blockysize=512,
    )

    band_descriptions = [
        "SWIR2",
        "NIR2",
        "Green",
    ]

    output_path = imagery_dir / f"false_color_{date_group}.tif"
    with rio.open(output_path, "w", **profile) as dst:
        dst.write(composite_data)
        for i, desc in enumerate(band_descriptions, 1):
            dst.set_band_description(i, desc)

    logging.info(f"False color composite for {date_group} saved to {output_path}")
    return output_path


false_color_pre = create_false_color_composite(datasets, "pre")
false_color_post = create_false_color_composite(datasets, "post")
m = leafmap.Map()
m.split_map(
    left_layer=str(false_color_pre),
    right_layer=str(false_color_post),
    left_label="pre-fire (2024-11-12)",
    right_label="Post-fire (2025-03-27)",
)
aoi_style = {
    "stroke": True,
    "color": "red",
    "weight": 2,
    "opactity": 1,
    "fill": True,
    "fillColor": "#ffffff",
    "fillOpacity": 0.0,
    "clickable": True,
}
m.add_gdf(
    aoi[aoi["name"].isin(["Harman River", "Stanley River", "Wilson River"])],
    layer_name="Study site (proposed)",
    style=aoi_style,
    hover_style={"weight": aoi_style["weight"] + 1, "fillOpacity": 0.1},
)
m.zoom_to_gdf(aoi)

m
Loading...