Module ttemtoolbox.main

Expand source code
#!/usr/bin/env python
from ttemtoolbox import process_ttem, process_gamma, process_well, process_water, lithology_connect
from ttemtoolbox import tools
from ttemtoolbox import __version__
from pathlib import Path
import argparse
import shutil
import sys
import geopandas as gpd
import pandas as pd


def create_parser():
    synopsis = 'This is a python interface for ttemtoolbox program'
    name = 'ttemtoolbox'
    parser = argparse.ArgumentParser(
        name, description=synopsis)
    parser.add_argument('-c',"--config_path", metavar="PATH", help = 'Run entire ttem rock physics tranform process')
    parser.add_argument("--get_config",action='store_true', help='Generate default config file')
    parser.add_argument("-f","--force_clean", help="To force remove all files for new program",
                        action="store_true")
    parser.add_argument("--example_data", help="To download example data",
                        action="store_true")
    parser.add_argument("-v", "--version", action='version', version='ttemtoolbox {}'.format(__version__))
    subparser = parser.add_subparsers()
    subparser_ttem = subparser.add_parser('ttem')
    subparser_ttem.add_argument('ttem', metavar='PATH', help = 'Path to config file')
    subparser_ttem.add_argument('--doi_path', metavar='PATH', help='Path to doi file')
    subparser_ttem.add_argument('--layer_exclude', nargs='+', metavar='int(s)', type=int,
                                   help='Specify exclude layers when processing ttem data, \
                                   this can also be done in config file')
    subparser_ttem.add_argument('--line_exclude', nargs='+', metavar='int(s)', type=int,
                                   help='Specify exclude lines when processing ttem data, \
                                   this can also be done in config file')
    subparser_ttem.add_argument('--ID_exclude', nargs='+', metavar='int(s)', type=int,
                                help='Specify exclude ID when processing ttem data, \
                                   this can also be done in config file')
    subparser_ttem.add_argument('--resample', metavar='int', type=int,
                                help='Specify resample factor when processing ttem data, \
                                   this can also be done in config file')
    subparser_ttem.add_argument('--reproject', metavar='str', type=str, help='Reproject ttemdata to a new crs,\
                                e.g: EPSG:4326')
    subparser_ttem.add_argument('--unit', metavar='str', help='Use "meter" or "feet", default is meter')
    subparser_lithology = subparser.add_parser('lithology')
    subparser_lithology.add_argument('lithology', metavar='PATH', help = 'Path to config file')
    subparser_lithology.add_argument('--reproject', metavar='str', type=str, help='Reproject welllog data to a new crs')
    subparser_lithology.add_argument('--resample', metavar='int', type=int, help='Resample welllog data')
    subparser_lithology.add_argument('--unit', metavar='str', help='"meter" or "feet", default is meter')
    subparser_water = subparser.add_parser('water')
    subparser_water.add_argument('water', metavar='PATH', help = 'Path to config file')
    subparser_water.add_argument('-w','--well_no', metavar='str[s]', help='Download specific well number')
    subparser_water = subparser.add_parser('connect')
    subparser_water.add_argument('connect', metavar='PATH', help = 'Path to config file')
    return parser

def cmd_line_parse(iargs=None):
    default_config_path = Path(__file__).parent.joinpath('defaults/CONFIG')
    default_data_path = Path(__file__).parents[2].joinpath('data')
    parser = create_parser()
    inps = parser.parse_args(args=iargs)
    if inps.config_path:
        inps.config_path = Path(inps.config_path).resolve()
        if inps.config_path.is_dir():
            inps.config_path = inps.config_path.joinpath('CONFIG')
    if inps.force_clean:
        print('All result will be purged')
    if inps.get_config:
        copypath = Path.cwd().joinpath('CONFIG')
        shutil.copyfile(default_config_path, copypath)
        print('Default CONFIG file generated in {}'.format(copypath))
        sys.exit(0)
    if inps.example_data:
        copypath = Path.cwd().joinpath('data')
        data_files = default_data_path.glob('*')
        for file in data_files:
            shutil.copy(file, copypath)
        sys.exit(0)

    return inps

def step_ttem(config: dict, inps: dict) -> gpd.GeoDataFrame:
    print('Step1: Process ttem')
    if inps.get('layer_exclude'):
        config['layer_exclude'] = inps['layer_exclude']
    if inps.get('line_exclude'):
        config['line_exclude'] = inps['line_exclude']
    if inps.get('ID_exclude'):
        config['ID_exclude'] = inps['ID_exclude']
    if inps.get('resample'):
        config['ttem_resample'] = inps['resample']
    else:
        config['ttem_resample'] = None
    if inps.get('doi_path'):
        config['doi_path'] = inps['doi_path']
    if inps.get('reproject'):
        config['ttem_reproject_crs'] = inps['reproject']
    if inps.get('unit'):
        config['ttem_unit'] = inps['unit']

    if Path(config['ttem_path']).is_file():
        
        ttem = process_ttem.ProcessTTEM(
            fname = config['ttem_path'],
            doi_path=config['doi_path'],
            layer_exclude = config['layer_exclude'],
            line_exclude = config['line_exclude'],
            ID_exclude = config['ID_exclude'],
            resample = config['ttem_resample'],
            unit = config['ttem_unit']
        )
    else:
        raise TypeError('TTEM file not found in {}'.format(config['ttem_path']))
    if config['ttem_crs'] is not None:
        ttem.set_crs(config['ttem_crs'])
    if config['ttem_reproject_crs'] is not None:
        ttem.reproject(config['ttem_reproject_crs'])
    
    ttem.summary().to_csv(config['deliver'].joinpath(Path(config['ttem_path']).stem + '_summary.csv'), index=False)
    ttem.to_shp(config['deliver'].joinpath(Path(config['ttem_path']).stem + '.shp'))
    ttem.data.to_csv(config['ttem_temp'].joinpath(Path(config['ttem_path']).stem+ '.csv'), index=False)
    return ttem.data


def step_lithology(config: dict, inps: dict)-> gpd.GeoDataFrame:
    print('Step2: Process lithology')
    if inps.get('reproject'):
        config['lithology_reproject_crs'] = inps['reproject']
    if inps.get('resample'):
        config['lithology_resample'] = inps['resample']
    if inps.get('unit'):
        config['lithology_unit'] = inps['unit']
    if Path(config['well_path']).is_file():
        lithology = process_well.ProcessWell(
            fname = config['well_path'],
            crs=config['lithology_crs'],
            unit = config['lithology_unit']
        )
    else:
        raise TypeError('Lithology file not found in {}'.format(config['well_path']))
    if config['lithology_resample'] is not None:
        lithology.resample(config['lithology_resample'])
    if config['lithology_reproject_crs'] is not None:
        lithology.reproject(config['lithology_reproject_crs'])
    lithology.summary().to_csv(config['deliver'].joinpath(Path(config['well_path']).stem + '_summary.csv'))
    lithology.to_shp(config['deliver'].joinpath(Path(config['well_path']).stem + '.shp'))
    lithology.data.to_csv(config['well_temp'].joinpath(Path(config['well_path']).stem+ '.csv'), index=False)
    return lithology.data


def step_gamma(config, inps):
    return

def step_water(config : dict, inps: dict) -> tuple:
    print('Step4: Process water level data')
    if inps.get('well_no'):
        config['USGS_well_NO'] = inps['well_no']
    concat_list = []
    meta_data_list = []
    for well in config['USGS_well_NO']:
        water = process_water.format_usgs_water(well, config['water_temp'])
        concat_list.append(water)
        meta_data_list.append(pd.DataFrame(water.attrs, index=[0]))
    water = pd.concat(concat_list)
    water.reset_index(drop=True, inplace=True)
    meta = pd.concat(meta_data_list)
    meta.reset_index(drop=True, inplace=True)
    with pd.ExcelWriter(config['deliver'].joinpath('water_level.xlsx')) as writer:
        water.to_excel(writer, sheet_name='water_level', index=False)
        meta.to_excel(writer, sheet_name='metadata', index=False)
    print('Water level data saved in {}'.format(config['deliver'].joinpath('water_level.xlsx')))
    return water, meta


def step_connect(config: dict, inps:dict, 
                 ttem: gpd.GeoDataFrame = None, 
                 lithology: gpd.GeoDataFrame = None):
    if ttem is None and lithology is None:
        ttemlist = Path(config['ttem_temp']).glob('*.csv')
        temp_ttem = pd.concat([pd.read_csv(file) for file in ttemlist])
        ttem =  gpd.GeoDataFrame(temp_ttem, geometry=gpd.points_from_xy(temp_ttem['X'], temp_ttem['Y']), 
                                crs=config['ttem_reproject_crs'])
        lithologylist = Path(config['well_temp']).glob('*.csv')
        temp_lithology = pd.concat([pd.read_csv(file) for file in lithologylist])
        lithology = gpd.GeoDataFrame(temp_lithology, geometry=gpd.points_from_xy(temp_lithology['X'], temp_lithology['Y']), 
                                crs=config['lithology_reproject_crs'])
    matched_ttem, matched_lithology = lithology_connect.select_closest(ttem, lithology,
                                                                       search_radius = config['search_radius'])
    stitched = lithology_connect.ttem_well_connect(matched_ttem, matched_lithology)
    stitched.to_csv(Path(config['deliver']).joinpath('ttem_well_connect.csv'))
    print('connected file saved to {}'.format(Path(config['deliver']).joinpath('ttem_well_connect.csv')))
    return stitched
    
def main(iargs=None):
    inps = vars(cmd_line_parse(iargs)) # parse CLI input to dict
    #########run entire ttem rock physics tranform process
    if inps.get('config_path'):
        print('Run entire ttem rock physics tranform process')
        user_config = tools.parse_config(inps['config_path'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        ttemdata = step_ttem(config, inps)
        lithology = step_lithology(config, inps)
        water, meta = step_water(config, inps)
        stitched = step_connect(config,inps, ttemdata, lithology)
        
    if inps.get('force_clean'):
        print('All result will be purged')
        tools.clean_output()
        
    #########Step1: Process ttem
    if inps.get('ttem'):
        user_config = tools.parse_config(inps['ttem'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        step_ttem(config, inps)
        
    #########Step2: Process lithology
    if inps.get('lithology'):
        user_config = tools.parse_config(inps['lithology'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        step_lithology(config, inps)
        
    #########Step3: Process gamma
    if inps.get('gamma'):
        print('This feature is still under development ')
    #########Step4: Process water level
    if inps.get('water'):
        user_config = tools.parse_config(inps['water'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        step_water(config, inps)
    #########Step5: ttem well connect
    if inps.get('connect'):
        user_config = tools.parse_config(inps['connect'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        step_connect(config, inps)

if __name__ == '__main__':
    if len(sys.argv) == 1:
        sys.argv.append('-h')
    main(sys.argv[1:])

Functions

def cmd_line_parse(iargs=None)
Expand source code
def cmd_line_parse(iargs=None):
    default_config_path = Path(__file__).parent.joinpath('defaults/CONFIG')
    default_data_path = Path(__file__).parents[2].joinpath('data')
    parser = create_parser()
    inps = parser.parse_args(args=iargs)
    if inps.config_path:
        inps.config_path = Path(inps.config_path).resolve()
        if inps.config_path.is_dir():
            inps.config_path = inps.config_path.joinpath('CONFIG')
    if inps.force_clean:
        print('All result will be purged')
    if inps.get_config:
        copypath = Path.cwd().joinpath('CONFIG')
        shutil.copyfile(default_config_path, copypath)
        print('Default CONFIG file generated in {}'.format(copypath))
        sys.exit(0)
    if inps.example_data:
        copypath = Path.cwd().joinpath('data')
        data_files = default_data_path.glob('*')
        for file in data_files:
            shutil.copy(file, copypath)
        sys.exit(0)

    return inps
def create_parser()
Expand source code
def create_parser():
    synopsis = 'This is a python interface for ttemtoolbox program'
    name = 'ttemtoolbox'
    parser = argparse.ArgumentParser(
        name, description=synopsis)
    parser.add_argument('-c',"--config_path", metavar="PATH", help = 'Run entire ttem rock physics tranform process')
    parser.add_argument("--get_config",action='store_true', help='Generate default config file')
    parser.add_argument("-f","--force_clean", help="To force remove all files for new program",
                        action="store_true")
    parser.add_argument("--example_data", help="To download example data",
                        action="store_true")
    parser.add_argument("-v", "--version", action='version', version='ttemtoolbox {}'.format(__version__))
    subparser = parser.add_subparsers()
    subparser_ttem = subparser.add_parser('ttem')
    subparser_ttem.add_argument('ttem', metavar='PATH', help = 'Path to config file')
    subparser_ttem.add_argument('--doi_path', metavar='PATH', help='Path to doi file')
    subparser_ttem.add_argument('--layer_exclude', nargs='+', metavar='int(s)', type=int,
                                   help='Specify exclude layers when processing ttem data, \
                                   this can also be done in config file')
    subparser_ttem.add_argument('--line_exclude', nargs='+', metavar='int(s)', type=int,
                                   help='Specify exclude lines when processing ttem data, \
                                   this can also be done in config file')
    subparser_ttem.add_argument('--ID_exclude', nargs='+', metavar='int(s)', type=int,
                                help='Specify exclude ID when processing ttem data, \
                                   this can also be done in config file')
    subparser_ttem.add_argument('--resample', metavar='int', type=int,
                                help='Specify resample factor when processing ttem data, \
                                   this can also be done in config file')
    subparser_ttem.add_argument('--reproject', metavar='str', type=str, help='Reproject ttemdata to a new crs,\
                                e.g: EPSG:4326')
    subparser_ttem.add_argument('--unit', metavar='str', help='Use "meter" or "feet", default is meter')
    subparser_lithology = subparser.add_parser('lithology')
    subparser_lithology.add_argument('lithology', metavar='PATH', help = 'Path to config file')
    subparser_lithology.add_argument('--reproject', metavar='str', type=str, help='Reproject welllog data to a new crs')
    subparser_lithology.add_argument('--resample', metavar='int', type=int, help='Resample welllog data')
    subparser_lithology.add_argument('--unit', metavar='str', help='"meter" or "feet", default is meter')
    subparser_water = subparser.add_parser('water')
    subparser_water.add_argument('water', metavar='PATH', help = 'Path to config file')
    subparser_water.add_argument('-w','--well_no', metavar='str[s]', help='Download specific well number')
    subparser_water = subparser.add_parser('connect')
    subparser_water.add_argument('connect', metavar='PATH', help = 'Path to config file')
    return parser
def main(iargs=None)
Expand source code
def main(iargs=None):
    inps = vars(cmd_line_parse(iargs)) # parse CLI input to dict
    #########run entire ttem rock physics tranform process
    if inps.get('config_path'):
        print('Run entire ttem rock physics tranform process')
        user_config = tools.parse_config(inps['config_path'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        ttemdata = step_ttem(config, inps)
        lithology = step_lithology(config, inps)
        water, meta = step_water(config, inps)
        stitched = step_connect(config,inps, ttemdata, lithology)
        
    if inps.get('force_clean'):
        print('All result will be purged')
        tools.clean_output()
        
    #########Step1: Process ttem
    if inps.get('ttem'):
        user_config = tools.parse_config(inps['ttem'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        step_ttem(config, inps)
        
    #########Step2: Process lithology
    if inps.get('lithology'):
        user_config = tools.parse_config(inps['lithology'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        step_lithology(config, inps)
        
    #########Step3: Process gamma
    if inps.get('gamma'):
        print('This feature is still under development ')
    #########Step4: Process water level
    if inps.get('water'):
        user_config = tools.parse_config(inps['water'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        step_water(config, inps)
    #########Step5: ttem well connect
    if inps.get('connect'):
        user_config = tools.parse_config(inps['connect'])
        tools.clean_output(Path(user_config['output']))
        config = tools.create_dir_structure(user_config)
        step_connect(config, inps)
def step_connect(config: dict, inps: dict, ttem: geopandas.geodataframe.GeoDataFrame = None, lithology: geopandas.geodataframe.GeoDataFrame = None)
Expand source code
def step_connect(config: dict, inps:dict, 
                 ttem: gpd.GeoDataFrame = None, 
                 lithology: gpd.GeoDataFrame = None):
    if ttem is None and lithology is None:
        ttemlist = Path(config['ttem_temp']).glob('*.csv')
        temp_ttem = pd.concat([pd.read_csv(file) for file in ttemlist])
        ttem =  gpd.GeoDataFrame(temp_ttem, geometry=gpd.points_from_xy(temp_ttem['X'], temp_ttem['Y']), 
                                crs=config['ttem_reproject_crs'])
        lithologylist = Path(config['well_temp']).glob('*.csv')
        temp_lithology = pd.concat([pd.read_csv(file) for file in lithologylist])
        lithology = gpd.GeoDataFrame(temp_lithology, geometry=gpd.points_from_xy(temp_lithology['X'], temp_lithology['Y']), 
                                crs=config['lithology_reproject_crs'])
    matched_ttem, matched_lithology = lithology_connect.select_closest(ttem, lithology,
                                                                       search_radius = config['search_radius'])
    stitched = lithology_connect.ttem_well_connect(matched_ttem, matched_lithology)
    stitched.to_csv(Path(config['deliver']).joinpath('ttem_well_connect.csv'))
    print('connected file saved to {}'.format(Path(config['deliver']).joinpath('ttem_well_connect.csv')))
    return stitched
def step_gamma(config, inps)
Expand source code
def step_gamma(config, inps):
    return
def step_lithology(config: dict, inps: dict) ‑> geopandas.geodataframe.GeoDataFrame
Expand source code
def step_lithology(config: dict, inps: dict)-> gpd.GeoDataFrame:
    print('Step2: Process lithology')
    if inps.get('reproject'):
        config['lithology_reproject_crs'] = inps['reproject']
    if inps.get('resample'):
        config['lithology_resample'] = inps['resample']
    if inps.get('unit'):
        config['lithology_unit'] = inps['unit']
    if Path(config['well_path']).is_file():
        lithology = process_well.ProcessWell(
            fname = config['well_path'],
            crs=config['lithology_crs'],
            unit = config['lithology_unit']
        )
    else:
        raise TypeError('Lithology file not found in {}'.format(config['well_path']))
    if config['lithology_resample'] is not None:
        lithology.resample(config['lithology_resample'])
    if config['lithology_reproject_crs'] is not None:
        lithology.reproject(config['lithology_reproject_crs'])
    lithology.summary().to_csv(config['deliver'].joinpath(Path(config['well_path']).stem + '_summary.csv'))
    lithology.to_shp(config['deliver'].joinpath(Path(config['well_path']).stem + '.shp'))
    lithology.data.to_csv(config['well_temp'].joinpath(Path(config['well_path']).stem+ '.csv'), index=False)
    return lithology.data
def step_ttem(config: dict, inps: dict) ‑> geopandas.geodataframe.GeoDataFrame
Expand source code
def step_ttem(config: dict, inps: dict) -> gpd.GeoDataFrame:
    print('Step1: Process ttem')
    if inps.get('layer_exclude'):
        config['layer_exclude'] = inps['layer_exclude']
    if inps.get('line_exclude'):
        config['line_exclude'] = inps['line_exclude']
    if inps.get('ID_exclude'):
        config['ID_exclude'] = inps['ID_exclude']
    if inps.get('resample'):
        config['ttem_resample'] = inps['resample']
    else:
        config['ttem_resample'] = None
    if inps.get('doi_path'):
        config['doi_path'] = inps['doi_path']
    if inps.get('reproject'):
        config['ttem_reproject_crs'] = inps['reproject']
    if inps.get('unit'):
        config['ttem_unit'] = inps['unit']

    if Path(config['ttem_path']).is_file():
        
        ttem = process_ttem.ProcessTTEM(
            fname = config['ttem_path'],
            doi_path=config['doi_path'],
            layer_exclude = config['layer_exclude'],
            line_exclude = config['line_exclude'],
            ID_exclude = config['ID_exclude'],
            resample = config['ttem_resample'],
            unit = config['ttem_unit']
        )
    else:
        raise TypeError('TTEM file not found in {}'.format(config['ttem_path']))
    if config['ttem_crs'] is not None:
        ttem.set_crs(config['ttem_crs'])
    if config['ttem_reproject_crs'] is not None:
        ttem.reproject(config['ttem_reproject_crs'])
    
    ttem.summary().to_csv(config['deliver'].joinpath(Path(config['ttem_path']).stem + '_summary.csv'), index=False)
    ttem.to_shp(config['deliver'].joinpath(Path(config['ttem_path']).stem + '.shp'))
    ttem.data.to_csv(config['ttem_temp'].joinpath(Path(config['ttem_path']).stem+ '.csv'), index=False)
    return ttem.data
def step_water(config: dict, inps: dict) ‑> tuple
Expand source code
def step_water(config : dict, inps: dict) -> tuple:
    print('Step4: Process water level data')
    if inps.get('well_no'):
        config['USGS_well_NO'] = inps['well_no']
    concat_list = []
    meta_data_list = []
    for well in config['USGS_well_NO']:
        water = process_water.format_usgs_water(well, config['water_temp'])
        concat_list.append(water)
        meta_data_list.append(pd.DataFrame(water.attrs, index=[0]))
    water = pd.concat(concat_list)
    water.reset_index(drop=True, inplace=True)
    meta = pd.concat(meta_data_list)
    meta.reset_index(drop=True, inplace=True)
    with pd.ExcelWriter(config['deliver'].joinpath('water_level.xlsx')) as writer:
        water.to_excel(writer, sheet_name='water_level', index=False)
        meta.to_excel(writer, sheet_name='metadata', index=False)
    print('Water level data saved in {}'.format(config['deliver'].joinpath('water_level.xlsx')))
    return water, meta