Module ttemtoolbox.core.process_well
Expand source code
#!/usr/bin/env python
# process_well.py
# Created: 2023-11-18
# Version 11.18.2023
# Author: Jiawei Li
import os
import pathlib
import re
import requests
import numpy as np
import pandas as pd
import geopandas as gpd
from pyproj import Transformer
from itertools import compress
from pathlib import Path
from ttemtoolbox.defaults import constants
from ttemtoolbox import utils
from ttemtoolbox.utils import tools
from collections import namedtuple
class ProcessWell:
"""
This class is use to process and format lithology well logs (from excel or csv) and water level data (from USGS).\
All data were assume under metric unit (m). \n
:param lithologyfname: one or a list of string, pathlib.PurePath object, pandas dataframe. The input files(s) \
shall be either csv or excel file(s) that contains lithology and location data. sheet name and column name \
needs to be clearly marked as Lithology, Location, Latitude, Longitude, Depth_top, Depth_bottom or anything\
similiar, keyword(s) can be modified under tTEM_toolbox/defaults/constants.py.\
"""
def __init__(self,
fname: str| pathlib.PurePath | list,
crs: str = 'epsg:4326',
unit: str = 'feet'):
if isinstance(fname, str | pathlib.PurePath):
self.fname = [fname]
print('reading lithology from {}'.format(Path(fname).name))
elif isinstance(fname, list):
if len(fname) == 0:
raise ValueError('Input file path is empty')
else:
self.fname = fname
print('reading lithology from {}'.format([Path(f).name for f in fname]))
if unit == 'feet':
self.unit = 'feet'
self.unitconvert = 3.28084
elif unit == 'meter':
self.unit = 'meter'
self.unitconvert = 1
self._crs = crs
self.data = self._format_well()
self.crs = self.data.crs
@staticmethod
def _find_all_readable(path:pathlib.PurePath)->list:
"""
This will receive a single path-like input and try to filter all readable file paths for well logs uses.
:param path: path-like pathlib.PurePath object or string
:return: list of pathlib.PurePath objects
"""
readable_ext = constants.CSV_EXTENSION + constants.EXCEL_EXTENSION
if not isinstance(path, (str, pathlib.PurePath)):
raise TypeError('Input path must be a string or pathlib.PurePath object')
if Path(path).is_dir():
file_list = [f for f in Path(path).iterdir() if f.suffix in readable_ext]
if len(file_list) == 0:
raise ValueError('No {} file found in {}'.format(readable_ext, path))
return file_list
elif Path(path).is_file():
if Path(path).suffix in readable_ext:
file_list = [path]
else:
raise ValueError('Input file does not have extension of {}'.format(readable_ext))
return file_list
@staticmethod
def _format_input(fname:str| pathlib.PurePath| list| pd.DataFrame) -> list:
"""
This will format input file path(s) to a list of pandas dataframe (read from csv) and/or dict that includes all sheets in the excel\
file, each sheet were pandas dataframe. If input is a pandas dataframe, it will return the input dataframe in a list.
:param fname: one or a list of string, pathlib.PurePath object, pandas dataframe
:return: a list of pandas dataframe and/or dict
"""
if isinstance(fname, (str, pathlib.PurePath)):
fname = [fname]
elif isinstance(fname, list):
pass
else:
raise TypeError('Input must be one or a list of string, pathlib.PurePath objects')
export_list = []
for path in fname:
file_list = ProcessWell._find_all_readable(path)
excels = [file for file in file_list if Path(file).suffix in constants.EXCEL_EXTENSION]
csvs = [file for file in file_list if Path(file).suffix in constants.CSV_EXTENSION]
read_excels = [pd.read_excel(file, sheet_name=None) for file in excels]
read_csvs = [pd.read_csv(file) for file in csvs]
combined = read_excels + read_csvs
export_list.append(combined)
result = [item for sublist in export_list for item in sublist]
return result
@staticmethod
def _read_lithology(fname: str| pathlib.PurePath |list| pd.DataFrame, mtoft=1) -> pd.DataFrame:
"""
Try to read lithology sheet from Excel file with tab name similar to 'Lithology', or csv file contains lithology data.
:param fname: one or a list of string, pathlib.PurePath object, pandas dataframe
:return:
"""
result = ProcessWell._format_input(fname)
lithology_list = []
for single_file in result:
if isinstance(single_file, dict): # which means it is an Excel file
match_sheet_name = tools.keyword_search(single_file, constants.LITHOLOGY_SHEET_NAMES)
if len(match_sheet_name) == 0:
continue
lithology_sheet = single_file[match_sheet_name[0]]
lithology_list.append(lithology_sheet)
if isinstance(single_file, pd.DataFrame): # which means it is a csv file
match_column_lithology = tools.keyword_search(single_file, constants.LITHOLOGY_COLUMN_NAMES_KEYWORD)
if match_column_lithology > 0:
lithology_sheet = single_file
lithology_list.append(lithology_sheet)
concat_list = []
for sheet in lithology_list:
match_column_lithology = tools.keyword_search(sheet, constants.LITHOLOGY_COLUMN_NAMES_KEYWORD)
match_column_bore = tools.keyword_search(sheet, constants.LITHOLOGY_COLUMN_NAMES_BORE)
match_column_depth_top = tools.keyword_search(sheet, constants.LITHOLOGY_COLUMN_NAMES_DEPTH_TOP)
match_column_depth_bottom = tools.keyword_search(sheet, constants.LITHOLOGY_COLUMN_NAMES_DEPTH_BOTTOM)
lithology = pd.DataFrame(sheet[match_column_lithology[0]])
lithology.columns = ['Keyword']
lithology['Bore'] = sheet[match_column_bore[0]]
lithology['Depth_top'] = sheet[match_column_depth_top[0]]/mtoft
lithology['Depth_top']= lithology['Depth_top'].round(2)
lithology['Depth_bottom'] = sheet[match_column_depth_bottom[0]]/mtoft
lithology['Depth_bottom'] = lithology['Depth_bottom'].round(2)
lithology['Thickness'] = lithology['Depth_bottom'].subtract(lithology['Depth_top'])
concat_list.append(lithology)
result = pd.concat(concat_list)
result = result[['Bore', 'Depth_top', 'Depth_bottom', 'Thickness', 'Keyword']]
return result
@staticmethod
def _read_spatial(fname: str| pathlib.PurePath, mtoft=1) -> pd.DataFrame:
"""
Similiar to _read_lithology, but read location sheet from Excel file with tab name similar to 'Location', \
or csv file contains location data.
:param fname: fname: one or a list of string, pathlib.PurePath object, pandas dataframe
:return:
"""
result = ProcessWell._format_input(fname)
location_list = []
for single_file in result:
if isinstance(single_file, dict):
match_sheet_name = utils.tools.keyword_search(single_file, constants.LOCATION_SHEET_NAMES)
if len(match_sheet_name) == 0:
continue
location_sheet = single_file[match_sheet_name[0]]
location_list.append(location_sheet)
if isinstance(single_file, pd.DataFrame):
match_column_location = utils.tools.keyword_search(single_file, constants.LOCATION_COLUMN_NAMES_LON)
if match_column_location > 0:
location_sheet = single_file
location_list.append(location_sheet)
concat_list = []
for sheet in location_list:
match_column_lat = utils.tools.keyword_search(sheet, constants.LOCATION_COLUMN_NAMES_LAT)
match_column_lon = utils.tools.keyword_search(sheet, constants.LOCATION_COLUMN_NAMES_LON)
match_column_elevation = utils.tools.keyword_search(sheet, constants.LOCATION_COLUMN_NAMES_ELEVATION)
location = pd.DataFrame(sheet[match_column_lat[0]])
location.columns = ['Latitude']
location['Longitude'] = sheet[match_column_lon[0]]
location['Bore'] = sheet['Bore']
location['Elevation'] = sheet[match_column_elevation[0]]/mtoft
location['Elevation'] = location['Elevation'].round(2)
concat_list.append(location)
result = pd.concat(concat_list)
return result
@staticmethod
def _fill(group, factor=100) -> pd.DataFrame:
newgroup = group.loc[group.index.repeat(group.Thickness * factor)]
mul_per_gr = newgroup.groupby('Elevation_top').cumcount()
newgroup['Elevation_top'] = newgroup['Elevation_top'].subtract(mul_per_gr * 1 / factor)
newgroup['Depth_top'] = newgroup['Depth_top'].add(mul_per_gr * 1 / factor)
newgroup['Depth_bottom'] = newgroup['Depth_top'].add(1 / factor)
newgroup['Elevation_bottom'] = newgroup['Elevation_top'].subtract(1 / factor)
newgroup['Thickness'] = 1 / factor
return newgroup
@staticmethod
def _lithology_location_connect(lithology: pd.DataFrame,
location: pd.DataFrame) -> pd.DataFrame:
"""
Connect lithology and location data by Borehole ID
:param lithology: lithology dataframe
:param location: location dataframe
:return: combined dataframe
"""
lithology_group = lithology.groupby('Bore')
concatlist = []
for name, group in lithology_group:
group_location = location[location['Bore'] == name]
if group_location.empty:
continue
group['Y'] = group_location['Latitude'].iloc[0]
group['X'] = group_location['Longitude'].iloc[0]
group['Z'] = group_location['Elevation'].iloc[0]
group['Elevation_top'] = group['Z'].subtract(group['Depth_top'])
group['Elevation_bottom'] = group['Z'].subtract(group['Depth_bottom'])
concatlist.append(group)
result = pd.concat(concatlist)
return result
@staticmethod
def _assign_keyword_as_value(welllog_df) -> pd.DataFrame:
conditionlist = [
(welllog_df["Keyword"] == "fine grain"),
(welllog_df["Keyword"] == "mix grain"),
(welllog_df["Keyword"] == "coarse grain")
]
choicelist = [1, 2, 3]
welllog_df["Keyword_n"] = np.select(conditionlist, choicelist)
return welllog_df
def _format_well(self) -> gpd.GeoDataFrame:
lithology = self._read_lithology(self.fname, self.unitconvert)
location = self._read_spatial(self.fname, self.unitconvert)
self.data = self._lithology_location_connect(lithology, location)
self.data = ProcessWell._assign_keyword_as_value(self.data)
self.data.reset_index(drop=True, inplace=True)
gdf = gpd.GeoDataFrame(self.data, geometry=gpd.points_from_xy(self.data['X'], self.data['Y']),
crs=self._crs)
self.data = gdf
return self.data
def reproject(self, crs: str) -> gpd.GeoDataFrame:
"""
Reproject the data to a given coordinate system.
Parameters:
- crs (str): The coordinate system to reproject the data to.
Returns:
- geopandas.GeoDataFrame: The reprojected data.
"""
self.data = self.data.to_crs(crs)
self._crs = crs
self.data['X'] = self.data.geometry.x
self.data['Y'] = self.data.geometry.y
return self.data
def resample(self, scale: int) -> gpd.GeoDataFrame:
"""
Upscales the data by a given scale factor.
Parameters:
- scale (int): The scale factor to upscale the data by.
Returns:
- geopandas.GeoDataFrame: The upscaled data.
"""
group = self.data.groupby('Bore')
self.data = group.apply(lambda x:ProcessWell._fill(x, scale))
self.data.reset_index(drop=True, inplace=True)
print('resampling lithology to {} '.format(1/scale))
return self.data
def summary(self):
groups = self.data.groupby('Bore')
concat_list = []
for bore, group in groups:
total_thickness = group['Thickness'].sum()
keywordgroup = group.groupby('Keyword')
keyword_summary = keywordgroup.agg({
'Thickness': 'sum',
'X': 'first',
'Y': 'first',
'Z': 'first'
})
keyword_summary[keyword_summary.index.name] = keyword_summary.index.values
keyword_summary['ratio'] = keyword_summary['Thickness'] / total_thickness
keyword_summary.reset_index(drop=True, inplace=True)
keyword_summary['bore'] = bore
keyword_summary['unit'] = 'meter'
keyword_summary['total_thickness'] = total_thickness
concat_list.append(keyword_summary)
output = pd.concat(concat_list)
return output
def to_shp(self, output_filepath: str| pathlib.PurePath) -> None:
"""
Save the data to a shapefile.
Parameters:
- path (str | pathlib.PurePath): The path to save the shapefile to.
"""
summary = self.summary()
gdf = gpd.GeoDataFrame(summary, geometry=gpd.points_from_xy(summary['X'], summary['Y']),
crs=self._crs)
if Path(output_filepath).suffix.lower() == '.shp':
gdf.to_file(output_filepath, driver='ESRI Shapefile')
print('The output file saved to {}'.format(Path(output_filepath).resolve()))
elif Path(output_filepath).suffix.lower() == '.gpkg':
gdf.to_file(output_filepath, driver='GPKG', layer=Path(self.fname[0]).stem)
print('The output file saved to {}'.format(Path(output_filepath).resolve()))
elif Path(output_filepath).suffix.lower() == '.geojson':
gdf.to_file(output_filepath, driver='GeoJSON')
print('The output file saved to {}'.format(Path(output_filepath).resolve()))
else:
raise ValueError("The output file format is not supported, please use .shp, .gpkg, or .geojson")
if __name__ == "__main__":
print('This is a module, please import it to use it.')
a = ProcessWell([r'C:\Users\jldz9\PycharmProjects\tTEM_toolbox\data\Well_log.xlsx'])
Classes
class ProcessWell (fname: pathlib.PurePath | str | list, crs: str = 'epsg:4326', unit: str = 'feet')
-
This class is use to process and format lithology well logs (from excel or csv) and water level data (from USGS). All data were assume under metric unit (m).
:param lithologyfname: one or a list of string, pathlib.PurePath object, pandas dataframe. The input files(s) shall be either csv or excel file(s) that contains lithology and location data. sheet name and column name needs to be clearly marked as Lithology, Location, Latitude, Longitude, Depth_top, Depth_bottom or anything similiar, keyword(s) can be modified under tTEM_toolbox/defaults/constants.py.
Expand source code
class ProcessWell: """ This class is use to process and format lithology well logs (from excel or csv) and water level data (from USGS).\ All data were assume under metric unit (m). \n :param lithologyfname: one or a list of string, pathlib.PurePath object, pandas dataframe. The input files(s) \ shall be either csv or excel file(s) that contains lithology and location data. sheet name and column name \ needs to be clearly marked as Lithology, Location, Latitude, Longitude, Depth_top, Depth_bottom or anything\ similiar, keyword(s) can be modified under tTEM_toolbox/defaults/constants.py.\ """ def __init__(self, fname: str| pathlib.PurePath | list, crs: str = 'epsg:4326', unit: str = 'feet'): if isinstance(fname, str | pathlib.PurePath): self.fname = [fname] print('reading lithology from {}'.format(Path(fname).name)) elif isinstance(fname, list): if len(fname) == 0: raise ValueError('Input file path is empty') else: self.fname = fname print('reading lithology from {}'.format([Path(f).name for f in fname])) if unit == 'feet': self.unit = 'feet' self.unitconvert = 3.28084 elif unit == 'meter': self.unit = 'meter' self.unitconvert = 1 self._crs = crs self.data = self._format_well() self.crs = self.data.crs @staticmethod def _find_all_readable(path:pathlib.PurePath)->list: """ This will receive a single path-like input and try to filter all readable file paths for well logs uses. :param path: path-like pathlib.PurePath object or string :return: list of pathlib.PurePath objects """ readable_ext = constants.CSV_EXTENSION + constants.EXCEL_EXTENSION if not isinstance(path, (str, pathlib.PurePath)): raise TypeError('Input path must be a string or pathlib.PurePath object') if Path(path).is_dir(): file_list = [f for f in Path(path).iterdir() if f.suffix in readable_ext] if len(file_list) == 0: raise ValueError('No {} file found in {}'.format(readable_ext, path)) return file_list elif Path(path).is_file(): if Path(path).suffix in readable_ext: file_list = [path] else: raise ValueError('Input file does not have extension of {}'.format(readable_ext)) return file_list @staticmethod def _format_input(fname:str| pathlib.PurePath| list| pd.DataFrame) -> list: """ This will format input file path(s) to a list of pandas dataframe (read from csv) and/or dict that includes all sheets in the excel\ file, each sheet were pandas dataframe. If input is a pandas dataframe, it will return the input dataframe in a list. :param fname: one or a list of string, pathlib.PurePath object, pandas dataframe :return: a list of pandas dataframe and/or dict """ if isinstance(fname, (str, pathlib.PurePath)): fname = [fname] elif isinstance(fname, list): pass else: raise TypeError('Input must be one or a list of string, pathlib.PurePath objects') export_list = [] for path in fname: file_list = ProcessWell._find_all_readable(path) excels = [file for file in file_list if Path(file).suffix in constants.EXCEL_EXTENSION] csvs = [file for file in file_list if Path(file).suffix in constants.CSV_EXTENSION] read_excels = [pd.read_excel(file, sheet_name=None) for file in excels] read_csvs = [pd.read_csv(file) for file in csvs] combined = read_excels + read_csvs export_list.append(combined) result = [item for sublist in export_list for item in sublist] return result @staticmethod def _read_lithology(fname: str| pathlib.PurePath |list| pd.DataFrame, mtoft=1) -> pd.DataFrame: """ Try to read lithology sheet from Excel file with tab name similar to 'Lithology', or csv file contains lithology data. :param fname: one or a list of string, pathlib.PurePath object, pandas dataframe :return: """ result = ProcessWell._format_input(fname) lithology_list = [] for single_file in result: if isinstance(single_file, dict): # which means it is an Excel file match_sheet_name = tools.keyword_search(single_file, constants.LITHOLOGY_SHEET_NAMES) if len(match_sheet_name) == 0: continue lithology_sheet = single_file[match_sheet_name[0]] lithology_list.append(lithology_sheet) if isinstance(single_file, pd.DataFrame): # which means it is a csv file match_column_lithology = tools.keyword_search(single_file, constants.LITHOLOGY_COLUMN_NAMES_KEYWORD) if match_column_lithology > 0: lithology_sheet = single_file lithology_list.append(lithology_sheet) concat_list = [] for sheet in lithology_list: match_column_lithology = tools.keyword_search(sheet, constants.LITHOLOGY_COLUMN_NAMES_KEYWORD) match_column_bore = tools.keyword_search(sheet, constants.LITHOLOGY_COLUMN_NAMES_BORE) match_column_depth_top = tools.keyword_search(sheet, constants.LITHOLOGY_COLUMN_NAMES_DEPTH_TOP) match_column_depth_bottom = tools.keyword_search(sheet, constants.LITHOLOGY_COLUMN_NAMES_DEPTH_BOTTOM) lithology = pd.DataFrame(sheet[match_column_lithology[0]]) lithology.columns = ['Keyword'] lithology['Bore'] = sheet[match_column_bore[0]] lithology['Depth_top'] = sheet[match_column_depth_top[0]]/mtoft lithology['Depth_top']= lithology['Depth_top'].round(2) lithology['Depth_bottom'] = sheet[match_column_depth_bottom[0]]/mtoft lithology['Depth_bottom'] = lithology['Depth_bottom'].round(2) lithology['Thickness'] = lithology['Depth_bottom'].subtract(lithology['Depth_top']) concat_list.append(lithology) result = pd.concat(concat_list) result = result[['Bore', 'Depth_top', 'Depth_bottom', 'Thickness', 'Keyword']] return result @staticmethod def _read_spatial(fname: str| pathlib.PurePath, mtoft=1) -> pd.DataFrame: """ Similiar to _read_lithology, but read location sheet from Excel file with tab name similar to 'Location', \ or csv file contains location data. :param fname: fname: one or a list of string, pathlib.PurePath object, pandas dataframe :return: """ result = ProcessWell._format_input(fname) location_list = [] for single_file in result: if isinstance(single_file, dict): match_sheet_name = utils.tools.keyword_search(single_file, constants.LOCATION_SHEET_NAMES) if len(match_sheet_name) == 0: continue location_sheet = single_file[match_sheet_name[0]] location_list.append(location_sheet) if isinstance(single_file, pd.DataFrame): match_column_location = utils.tools.keyword_search(single_file, constants.LOCATION_COLUMN_NAMES_LON) if match_column_location > 0: location_sheet = single_file location_list.append(location_sheet) concat_list = [] for sheet in location_list: match_column_lat = utils.tools.keyword_search(sheet, constants.LOCATION_COLUMN_NAMES_LAT) match_column_lon = utils.tools.keyword_search(sheet, constants.LOCATION_COLUMN_NAMES_LON) match_column_elevation = utils.tools.keyword_search(sheet, constants.LOCATION_COLUMN_NAMES_ELEVATION) location = pd.DataFrame(sheet[match_column_lat[0]]) location.columns = ['Latitude'] location['Longitude'] = sheet[match_column_lon[0]] location['Bore'] = sheet['Bore'] location['Elevation'] = sheet[match_column_elevation[0]]/mtoft location['Elevation'] = location['Elevation'].round(2) concat_list.append(location) result = pd.concat(concat_list) return result @staticmethod def _fill(group, factor=100) -> pd.DataFrame: newgroup = group.loc[group.index.repeat(group.Thickness * factor)] mul_per_gr = newgroup.groupby('Elevation_top').cumcount() newgroup['Elevation_top'] = newgroup['Elevation_top'].subtract(mul_per_gr * 1 / factor) newgroup['Depth_top'] = newgroup['Depth_top'].add(mul_per_gr * 1 / factor) newgroup['Depth_bottom'] = newgroup['Depth_top'].add(1 / factor) newgroup['Elevation_bottom'] = newgroup['Elevation_top'].subtract(1 / factor) newgroup['Thickness'] = 1 / factor return newgroup @staticmethod def _lithology_location_connect(lithology: pd.DataFrame, location: pd.DataFrame) -> pd.DataFrame: """ Connect lithology and location data by Borehole ID :param lithology: lithology dataframe :param location: location dataframe :return: combined dataframe """ lithology_group = lithology.groupby('Bore') concatlist = [] for name, group in lithology_group: group_location = location[location['Bore'] == name] if group_location.empty: continue group['Y'] = group_location['Latitude'].iloc[0] group['X'] = group_location['Longitude'].iloc[0] group['Z'] = group_location['Elevation'].iloc[0] group['Elevation_top'] = group['Z'].subtract(group['Depth_top']) group['Elevation_bottom'] = group['Z'].subtract(group['Depth_bottom']) concatlist.append(group) result = pd.concat(concatlist) return result @staticmethod def _assign_keyword_as_value(welllog_df) -> pd.DataFrame: conditionlist = [ (welllog_df["Keyword"] == "fine grain"), (welllog_df["Keyword"] == "mix grain"), (welllog_df["Keyword"] == "coarse grain") ] choicelist = [1, 2, 3] welllog_df["Keyword_n"] = np.select(conditionlist, choicelist) return welllog_df def _format_well(self) -> gpd.GeoDataFrame: lithology = self._read_lithology(self.fname, self.unitconvert) location = self._read_spatial(self.fname, self.unitconvert) self.data = self._lithology_location_connect(lithology, location) self.data = ProcessWell._assign_keyword_as_value(self.data) self.data.reset_index(drop=True, inplace=True) gdf = gpd.GeoDataFrame(self.data, geometry=gpd.points_from_xy(self.data['X'], self.data['Y']), crs=self._crs) self.data = gdf return self.data def reproject(self, crs: str) -> gpd.GeoDataFrame: """ Reproject the data to a given coordinate system. Parameters: - crs (str): The coordinate system to reproject the data to. Returns: - geopandas.GeoDataFrame: The reprojected data. """ self.data = self.data.to_crs(crs) self._crs = crs self.data['X'] = self.data.geometry.x self.data['Y'] = self.data.geometry.y return self.data def resample(self, scale: int) -> gpd.GeoDataFrame: """ Upscales the data by a given scale factor. Parameters: - scale (int): The scale factor to upscale the data by. Returns: - geopandas.GeoDataFrame: The upscaled data. """ group = self.data.groupby('Bore') self.data = group.apply(lambda x:ProcessWell._fill(x, scale)) self.data.reset_index(drop=True, inplace=True) print('resampling lithology to {} '.format(1/scale)) return self.data def summary(self): groups = self.data.groupby('Bore') concat_list = [] for bore, group in groups: total_thickness = group['Thickness'].sum() keywordgroup = group.groupby('Keyword') keyword_summary = keywordgroup.agg({ 'Thickness': 'sum', 'X': 'first', 'Y': 'first', 'Z': 'first' }) keyword_summary[keyword_summary.index.name] = keyword_summary.index.values keyword_summary['ratio'] = keyword_summary['Thickness'] / total_thickness keyword_summary.reset_index(drop=True, inplace=True) keyword_summary['bore'] = bore keyword_summary['unit'] = 'meter' keyword_summary['total_thickness'] = total_thickness concat_list.append(keyword_summary) output = pd.concat(concat_list) return output def to_shp(self, output_filepath: str| pathlib.PurePath) -> None: """ Save the data to a shapefile. Parameters: - path (str | pathlib.PurePath): The path to save the shapefile to. """ summary = self.summary() gdf = gpd.GeoDataFrame(summary, geometry=gpd.points_from_xy(summary['X'], summary['Y']), crs=self._crs) if Path(output_filepath).suffix.lower() == '.shp': gdf.to_file(output_filepath, driver='ESRI Shapefile') print('The output file saved to {}'.format(Path(output_filepath).resolve())) elif Path(output_filepath).suffix.lower() == '.gpkg': gdf.to_file(output_filepath, driver='GPKG', layer=Path(self.fname[0]).stem) print('The output file saved to {}'.format(Path(output_filepath).resolve())) elif Path(output_filepath).suffix.lower() == '.geojson': gdf.to_file(output_filepath, driver='GeoJSON') print('The output file saved to {}'.format(Path(output_filepath).resolve())) else: raise ValueError("The output file format is not supported, please use .shp, .gpkg, or .geojson")
Methods
def reproject(self, crs: str) ‑> geopandas.geodataframe.GeoDataFrame
-
Reproject the data to a given coordinate system.
Parameters: - crs (str): The coordinate system to reproject the data to.
Returns: - geopandas.GeoDataFrame: The reprojected data.
Expand source code
def reproject(self, crs: str) -> gpd.GeoDataFrame: """ Reproject the data to a given coordinate system. Parameters: - crs (str): The coordinate system to reproject the data to. Returns: - geopandas.GeoDataFrame: The reprojected data. """ self.data = self.data.to_crs(crs) self._crs = crs self.data['X'] = self.data.geometry.x self.data['Y'] = self.data.geometry.y return self.data
def resample(self, scale: int) ‑> geopandas.geodataframe.GeoDataFrame
-
Upscales the data by a given scale factor.
Parameters: - scale (int): The scale factor to upscale the data by.
Returns: - geopandas.GeoDataFrame: The upscaled data.
Expand source code
def resample(self, scale: int) -> gpd.GeoDataFrame: """ Upscales the data by a given scale factor. Parameters: - scale (int): The scale factor to upscale the data by. Returns: - geopandas.GeoDataFrame: The upscaled data. """ group = self.data.groupby('Bore') self.data = group.apply(lambda x:ProcessWell._fill(x, scale)) self.data.reset_index(drop=True, inplace=True) print('resampling lithology to {} '.format(1/scale)) return self.data
def summary(self)
-
Expand source code
def summary(self): groups = self.data.groupby('Bore') concat_list = [] for bore, group in groups: total_thickness = group['Thickness'].sum() keywordgroup = group.groupby('Keyword') keyword_summary = keywordgroup.agg({ 'Thickness': 'sum', 'X': 'first', 'Y': 'first', 'Z': 'first' }) keyword_summary[keyword_summary.index.name] = keyword_summary.index.values keyword_summary['ratio'] = keyword_summary['Thickness'] / total_thickness keyword_summary.reset_index(drop=True, inplace=True) keyword_summary['bore'] = bore keyword_summary['unit'] = 'meter' keyword_summary['total_thickness'] = total_thickness concat_list.append(keyword_summary) output = pd.concat(concat_list) return output
def to_shp(self, output_filepath: str | pathlib.PurePath) ‑> None
-
Save the data to a shapefile.
Parameters: - path (str | pathlib.PurePath): The path to save the shapefile to.
Expand source code
def to_shp(self, output_filepath: str| pathlib.PurePath) -> None: """ Save the data to a shapefile. Parameters: - path (str | pathlib.PurePath): The path to save the shapefile to. """ summary = self.summary() gdf = gpd.GeoDataFrame(summary, geometry=gpd.points_from_xy(summary['X'], summary['Y']), crs=self._crs) if Path(output_filepath).suffix.lower() == '.shp': gdf.to_file(output_filepath, driver='ESRI Shapefile') print('The output file saved to {}'.format(Path(output_filepath).resolve())) elif Path(output_filepath).suffix.lower() == '.gpkg': gdf.to_file(output_filepath, driver='GPKG', layer=Path(self.fname[0]).stem) print('The output file saved to {}'.format(Path(output_filepath).resolve())) elif Path(output_filepath).suffix.lower() == '.geojson': gdf.to_file(output_filepath, driver='GeoJSON') print('The output file saved to {}'.format(Path(output_filepath).resolve())) else: raise ValueError("The output file format is not supported, please use .shp, .gpkg, or .geojson")