diff --git a/data_split_merge/general_splitter.py b/data_split_merge/general_splitter.py index 9e01cabbd525cda83be9f9a250f23ca6a0fef141..0fb7b672a87482e30a1ea154d24c8f49712bc3d0 100644 --- a/data_split_merge/general_splitter.py +++ b/data_split_merge/general_splitter.py @@ -1,4 +1,6 @@ +import copy import json +from multiprocessing import Pool import os import argparse import laspy @@ -31,32 +33,27 @@ LIST_OF_LAS_FIELDS = ['label', 'treeID'] def split(file_path, output_folder, capacity=5000): - # create the output folder if it does not exist if not os.path.exists(output_folder): os.makedirs(output_folder) - # Generate unique filename - unique_filename = 'split_pipeline_' + str(os.getpid()) + '.json' - - with open(unique_filename, 'w') as f: - f.write( - SPLIT_TEMPLATE - .replace('input.las', file_path) - .replace('5000', str(capacity)) - .replace('output_#.las', - os.path.join(output_folder, - Path(file_path).stem + '---#.las')) - ) - # run the pipeline - - try: - subprocess.run(['pdal', 'pipeline', unique_filename], check=True) - except subprocess.CalledProcessError as e: - print(f'Command failed with error {e.returncode}. Output was:\n{e.output}') + # Make a deep copy of SPLIT_TEMPLATE + split_template_copy = copy.deepcopy(SPLIT_TEMPLATE) + + # map to dictionary + split_template_copy = json.loads(split_template_copy) - # remove the pipeline file - os.remove(unique_filename) + # Modify the copied dictionary directly + split_template_copy['pipeline'][0] = file_path + split_template_copy['pipeline'][1]['capacity'] = str(capacity) + split_template_copy['pipeline'][2]['filename'] = os.path.join(output_folder, Path(file_path).stem + '---#.las') + pipeline = pdal.Pipeline(json.dumps(split_template_copy)) + try: + pipeline.execute() + metadata = pipeline.metadata + log = pipeline.log + except RuntimeError as e: + print(f'An error occurred: {e}') def transfer_extra_fields(file_path, output_folder): # get list of extra fields in the input file using laspy @@ -111,8 +108,8 @@ class GeneralSplitter(object): def process_file(self, file, capacity, output_folder): - # create the temporary output folder - temp_output_folder = 'temp_output_folder' + # create the temporary output folder with unique name + temp_output_folder = Path(file).stem + '_temp' if not os.path.exists(temp_output_folder): os.makedirs(temp_output_folder) @@ -127,31 +124,30 @@ class GeneralSplitter(object): subprocess.run(['rm', '-r', temp_output_folder], check=True) - def split_and_transfer_in_folder(self): - # get list of files in the input folder + def split_and_transfer_in_folder_parallel(self, processes=4): + files = [os.path.join(self.input_folder, f) for f in os.listdir(self.input_folder) if f.endswith('.las')] + with Pool(processes=processes) as pool: + pool.starmap(splitter.process_file, [(file, splitter.capacity, splitter.output_folder) for file in files]) + + def split_and_transfer_in_folder_serial(self): files = [os.path.join(self.input_folder, f) for f in os.listdir(self.input_folder) if f.endswith('.las')] for file in tqdm(files): self.process_file(file, self.capacity, self.output_folder) - # use parallel processing - # Parallel(n_jobs=4)(delayed(self.process_file)(file, self.capacity, self.output_folder) for file in tqdm(files)) - - - - -# filepath = '/home/nibio/mutable-outside-world/code/nibio_graph_sem_seg/data_split_merge/input_folder' -# output_folder = '/home/nibio/mutable-outside-world/code/nibio_graph_sem_seg/data_split_merge/output' -# splitter = GeneralSplitter(filepath, output_folder) -# splitter.split_and_transfer_in_folder() - + if __name__ == '__main__': parser = argparse.ArgumentParser(description='Split a LAS file into multiple LAS files') parser.add_argument('--input_folder', type=str, help='Input LAS file') parser.add_argument('--output_folder', type=str, help='Output folder') + parser.add_argument('--processes', type=int, default=4, help='Number of processes to use') + parser.add_argument('--parallel', action='store_true', default='True', help='Use parallel processing') parser.add_argument('--capacity', type=int, default=5000, help='Capacity of each output LAS file') args = parser.parse_args() splitter = GeneralSplitter(args.input_folder, args.output_folder, args.capacity) - splitter.split_and_transfer_in_folder() \ No newline at end of file + if args.parallel: + splitter.split_and_transfer_in_folder_parallel(args.processes) + else: + splitter.split_and_transfer_in_folder_serial() \ No newline at end of file