Python Script for Generating and Parsing Histogram Files of Synthetic Populations
This Python script is designed to generate histogram files from synthetic populations data. It includes functions to create histograms based on the data in CSV format, read specific headers and meta information from binary files, and iterate through binary files to extract, decode, and store histogram data efficiently. Primarily, it utilizes libraries such as NumPy for data manipulation and Pandas for reading CSV files. The script demonstrates advanced file handling, usage of regular expressions for data extraction, and object-oriented programming principles for structuring the code logically. It is intended for use in scientific research or data analysis tasks, where processing and visualizing large datasets of synthetic populations is required.
import numpy as np import os import pandas as pd import re from tqdm import tqdm from typing import Tuple, Union from typing import BinaryIO from io import BytesIO from itertools import tee, chain class subArrayFixer: def start(self, n): return None fixer = subArrayFixer() def mkhist(path : str, bins=100) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: df = pd.read_csv(path) assert isinstance(df, pd.DataFrame) G = df['G'] BP = df['BP'] RP = df['RP'] assert G is not None and BP is not None and RP is not None C = BP-RP hist, x, y = np.histogram2d(C, G, bins=bins) return hist, x, y def get_hist_file_meta(path): with open(path, 'rb') as f: numLengthBytes = f.read(16) length = int.from_bytes(numLengthBytes, 'little') bins = f.read(16) bins = int.from_bytes(bins, 'little') return length, bins def get_hist_file_length(path): with open(path, 'rb') as f: numLengthBytes = f.read(16) length = int.from_bytes(numLengthBytes, 'little') return length def get_hist_file_bins(path): with open(path, 'rb') as f: f.seek(16) numBinsBytes = f.read(16) numBins = int.from_bytes(numBinsBytes, 'little') return numBins def iter_hist_file(path, bs=1024): with open(path, 'rb') as infile: headerSizeOffset = 32 start = headerSizeOffset idx = 0 fSize = os.stat(path).st_size while (idx*bs) < fSize: # after the first table use a three block wide scanner to avoid straddling Header seekPos = headerSizeOffset+(idx-1)*bs if idx >= 1 else headerSizeOffset + idx*bs blockSize = bs*3 if idx >= 1 else bs infile.seek(seekPos,0) block = infile.read(blockSize) pop = re.search(b"HIST AGE:.+ BINS:.+ HEADEREND", block) if pop and idx > 4: age = float(re.findall(r"HIST AGE:(\d+\.\d+)", pop.group().decode())[0]) popLocation = headerSizeOffset + (idx-1)*bs + pop.start(0) dataSize = popLocation - start infile.seek(start) byteBlock = infile.read(dataSize) with open('byteBlockTest.bin', 'wb') as f: f.write(byteBlock) popData = read_population(byteBlock) start = popLocation idx += 4 yield age, popData idx += 1 def read_population(byteBlock): subArrays = re.finditer(b"\x93NUMPY\x01\x00.\x00", byteBlock) TrailingSubArrays, LeadingSubArrays = tee(subArrays) next(LeadingSubArrays) LeadingSubArrays = chain(LeadingSubArrays, [fixer]) output = {"H": list(), "X": list(), "Y": list()} translation = {0: "H", 1: "X", 2: "Y"} # print("===============") for ID, (subArrayLeft, subArrayRight) in enumerate(zip(TrailingSubArrays, LeadingSubArrays)): # print(subArrayLeft, subArrayRight) arrayContents = byteBlock[subArrayLeft.start(0):subArrayRight.start(0)] arrayAsFileObject = BytesIO(arrayContents) arr = np.load(arrayAsFileObject) output[translation[ID]] = arr return output def decode_hist_file(path : Union[str, BinaryIO]) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: if isinstance(path, str): infIO = open(path, 'rb') infile = infIO.read() else: infile = path.read() populations = re.finditer(b"HIST AGE:.+ BINS:.+ HEADEREND", infile) TrailingPopulation, LeadingPopulations = tee(populations) next(LeadingPopulations) LeadingPopulations = chain(LeadingPopulations, [fixer]) H = list() X = list() Y = list() ages = list() for mIDX, (matchleft, matchright) in tqdm(enumerate(zip(TrailingPopulation, LeadingPopulations))): popAge = matchleft.group(0).decode() try: age = float(re.findall(r"HIST AGE:(\d+\.\d+)", popAge)[0]) except IndexError: print(popAge) return 1 ages.append(age) subPop = infile[matchleft.end(0):matchright.start(0)] subArrays = re.finditer(b"\x93NUMPY\x01\x00.\x00", subPop) TrailingSubArrays, LeadingSubArrays = tee(subArrays) next(LeadingSubArrays) LeadingSubArrays = chain(LeadingSubArrays, [fixer]) for ID, (subArrayLeft, subArrayRight) in enumerate(zip(TrailingSubArrays, LeadingSubArrays)): arrayContents = subPop[subArrayLeft.start(0):subArrayRight.start(0)] arrayAsFileObject = BytesIO(arrayContents) arr = np.load(arrayAsFileObject) if ID == 0: H.append(arr) elif ID == 1: X.append(arr) elif ID == 2: Y.append(arr) return np.array(ages), np.array(H), np.array(X), np.array(Y) if isinstance(path, str): infIO.close() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Generate histogram files from the synthetic populations stored in a directory") parser.add_argument("dir", help="directory with synthetic populations, each must start with fCMD and end with .out") parser.add_argument("bn", help="number of bins", type=int) parser.add_argument("-o", "--output", help="path to save all histograms too", default="histograms.npy", type=str) args = parser.parse_args() files = os.listdir(args.dir) files = list(filter(lambda x: x.startswith("fCMD") and x.endswith(".out"), files)) metadata = list() for file in files: cleaned = file[:-4] split = cleaned.split('_') metadata.append((float(split[1]), float(split[2]), float(split[3]))) paths = list(map(lambda x: os.path.join(args.dir, x), files)) with open(args.output, 'wb') as outfile: numOut = len(paths) outfile.write(numOut.to_bytes(16,'little')) outfile.write(args.bn.to_bytes(16,'little')) for pIDX, (path, meta) in tqdm(enumerate(zip(paths, metadata)), total=len(paths)): Header = f"HIST AGE:{meta[0]} BINS:{args.bn} HEADEREND" outfile.write(Header.encode('utf-8')) hist,x,y = mkhist(path, bins=args.bn) np.save(outfile, hist) np.save(outfile, x) np.save(outfile, y) # age, H, X, Y = decode_hist_file(args.output) # import matplotlib.pyplot as plt # plt.imshow(H[0]) # plt.show()