Skip to content
Snippets Groups Projects
Commit 152eb5d6 authored by Carl Philipp Klemm's avatar Carl Philipp Klemm
Browse files

add data aggrigation code

parent d5a9aafd
No related branches found
No related tags found
No related merge requests found
from chargefile import ChargeFile
CYCLES_PER_STEP = 4
STEP_COUNT = 12
def charge_cylces_in_step(globalstep: int, substep: int = -1):
out = 0
if substep < 0:
substep = STEP_COUNT
if substep >= 7:
out += 1
if substep >= 11:
out += CYCLES_PER_STEP
if substep >= 1 and (globalstep / STEP_COUNT) % 10 == 0:
out += 1
return out
def charge_cycles_at_step(globalstep: int, substep: int):
count = 0
for i in range(globalstep - 1):
count += charge_cylces_in_step(globalstep)
count += charge_cylces_in_step(globalstep, substep)
return count
def thermal_cylces_in_step(globalstep: int, substep: int = -1):
out = 0
if substep < 0:
substep = STEP_COUNT
if substep >= 1 and (globalstep / STEP_COUNT) % 10 != 0:
out += CYCLES_PER_STEP
if substep >= 2:
out += CYCLES_PER_STEP
if substep >= 4:
out += CYCLES_PER_STEP
if substep >= 6:
out += CYCLES_PER_STEP
if substep >= 8:
out += CYCLES_PER_STEP
if substep >= 10:
out += CYCLES_PER_STEP
if substep >= 12:
out += 1
return out
def thermal_cycles_at_step(globalstep: int, substep: int):
count = 0
for i in range(globalstep - 1):
count += thermal_cylces_in_step(globalstep)
count += thermal_cylces_in_step(globalstep, substep)
return count
non_charge_cycle_cell = list(range(0, 4))
non_thermal_cycle_cell = list(range(11, 21))
cell_thermal_range = {
0: [35, 55],
1: [35, 55],
2: [35, 55],
3: [35, 55],
4: [35, 55],
5: [35, 55],
6: [35, 55],
7: [35, 45],
8: [35, 45],
9: [35, 45],
10: [35, 45],
11: [35, 35],
12: [35, 35],
13: [35, 35],
14: [45, 45],
15: [45, 45],
16: [45, 45],
17: [35, 55],
18: [35, 55],
19: [35, 55],
20: [35, 55],
}
class CellMeta:
def __init__(self, cellid: int, globalstep: int, substep: int, charge_files: list[ChargeFile], total_cells: int):
closest_avg = ChargeFile.FindClosest(charge_files, globalstep, -1)
closest_charge = ChargeFile.FindClosest(charge_files, globalstep, cellid)
assert closest_charge.cell == cellid
self.charge_cycles = charge_cycles_at_step(globalstep, substep) if cellid not in non_charge_cycle_cell else 0
self.thermal_cycles = thermal_cycles_at_step(globalstep, substep) if cellid not in non_thermal_cycle_cell else 0
self.last_avg_cap = abs(closest_avg.capacity) / total_cells if closest_avg is not None else -1
self.last_avg_cap_step = closest_avg.step if closest_avg is not None else -1
self.last_cap = abs(closest_charge.capacity) if closest_charge is not None else -1
self.last_cap_step = closest_charge.step if closest_charge is not None else -1
self.thermal_range = cell_thermal_range[cellid]
import csv
from parseerror import ParseError
import os
def calc_capacity(charge_curve: list[dict]):
capacity = 0.0
prev_time = -1
prev_current = -1
for entry in charge_curve:
if prev_time > 0:
delta_s = entry['time'] - prev_time
current = (entry['current'] + prev_current) / 2
capacity += current * (delta_s / (60.0 * 60.0))
prev_time = entry['time']
prev_current = entry['current']
return capacity
class ChargeFile:
def __init__(self, filename: str):
self.start_voltage = 0
self.end_voltage = 0
self.capacity = 0
self.cell = -1
self.discharge = False
self.current = 0
self.full_cycle = False
self.step = 0
if os.path.split(filename)[1].startswith("single_cell_charge") or os.path.split(filename)[1].startswith("single_cell_discharge"):
tokens = filename.split('.')[0].split('_')
self.step = int(tokens[-2])
self.cell = int(tokens[-1])
elif os.path.split(filename)[1].startswith("charge_for"):
self.step = int(filename.split('.')[0].split('_')[-1])
else:
raise ParseError(f"File name {os.path.split(filename)[1]} not in the expected sheme for ChargeFile")
with open(filename, newline='') as csvfile:
reader = csv.reader(csvfile, delimiter=',', quotechar='"')
reader.__next__()
timestr = reader.__next__()[0]
if timestr != "time":
raise ParseError(f"Expected time got {timestr}")
charge_curve = list()
for row in reader:
charge_curve.append({'time': int(row[0]), 'voltage': float(row[1]), 'current': float(row[2])})
self.current = charge_curve[int(len(charge_curve) / 2)]['current']
self.discharge = self.current < 0
self.start_voltage = charge_curve[0]['voltage']
self.end_voltage = charge_curve[-1]['voltage']
self.capacity = calc_capacity(charge_curve)
self.full_cycle = self.start_voltage > 4.05 and self.end_voltage < 3.15 or self.start_voltage < 3.15 and self.end_voltage > 4.05
def FindClosest(charge_files: list, step, cellid=-1, full_cycle=True):
closest_file = None
for charge_file in charge_files:
if charge_file.cell != cellid:
continue
if not full_cycle or charge_file.full_cycle:
if closest_file is not None:
if abs(step - closest_file.step) > abs(step - charge_file.step):
closest_file = charge_file
else:
closest_file = charge_file
if closest_file is None:
print(f"Warning could not find a charge {"full" if full_cycle else ""} file close to {step} for cell {cellid}")
return closest_file
import argparse
import os
from tqdm import tqdm
from chargefile import ChargeFile
from spectrafile import SpectraFile
if __name__ == "__main__":
parser = argparse.ArgumentParser("KissExpiramentCreateDataset")
parser.add_argument('--data', '-d', required=True, help="Data input directory")
parser.add_argument('--out', '-o', required=True, help="output directory")
parser.add_argument('--cell_count', '-c', type=int, required=True, help="number of active cells")
args = parser.parse_args()
filenames = [f for f in os.listdir(args.data) if os.path.isfile(os.path.join(args.data, f))]
charge_filenames = [f for f in filenames if f.startswith("charge") or f.startswith("single_cell_")]
spectra_filenames = [f for f in filenames if not f.startswith("charge") and not f.startswith("single_cell_") and f != "expiramentlog.csv"]
print(f"found {len(spectra_filenames)} spectra")
print(f"found {len(charge_filenames)} charge/discharge sequences")
if not os.path.exists(args.out):
os.makedirs(args.out)
charge_files = list()
for filename in charge_filenames:
charge_files.append(ChargeFile(os.path.join(args.data, filename)))
cells = set()
for filename in tqdm(spectra_filenames):
tokens = filename.split('.')[0].split('-')
step = int(tokens[0])
cellid = int(tokens[1])
substep = int(tokens[2])
cells.add(cellid)
celldir = os.path.join(args.out, str(cellid))
if not os.path.exists(celldir):
os.makedirs(celldir)
sf = SpectraFile(os.path.join(args.data, filename), cellid, step, substep, charge_files, args.cell_count)
sf.write(celldir)
if len(cells) != int(args.cell_count):
print(f"INCORRECT CELL COUNT!! found {len(cells)} but expected {args.cell_count}")
class ParseError(Exception):
def __init__(self, message):
self.message = message
import os
from cellmeta import CellMeta
from eisgenerator import EisSpectra
from parseerror import ParseError
from chargefile import ChargeFile
class SpectraFile:
def __init__(self, filename: str, cellid: int, step: int, substep: int, charge_files: list[ChargeFile], total_cells: int):
self.cellid = cellid
self.step = step
self.substep = substep
self.filename = filename
self.temperature = -1
self.ocv = -1
self.meta = CellMeta(cellid, step, substep, charge_files, total_cells)
self.filename = os.path.split(filename)[1]
self.spectra = EisSpectra.loadFromDisk(filename)
header = self.spectra.header.split('"')[1].split(',')
self.temperature = float(header[2])
self.ocv = float(header[3])
if int(header[0]) != step or int(header[1]) != cellid:
raise ParseError(f"file name and file content of SpectraFile {filename} do not match")
def write(self, directory: str):
meta_dsc_string = "step, substep, cellid, temparature, ocv, charge_cycles, thermal_cycles, last_avg_cap, last_avg_step, last_cap, last_cap_step"
metastring = f"{self.step}, {self.substep}, {self.cellid}, {self.temperature}, {self.ocv}, {self.meta.charge_cycles}, {self.meta.thermal_cycles}, "
metastring += f"{self.meta.last_avg_cap}, {self.meta.last_avg_cap_step}, {self.meta.last_cap}, {self.meta.last_cap_step}"
self.spectra.headerDescription = meta_dsc_string
self.spectra.header = metastring
self.spectra.saveToDisk(os.path.join(directory, self.filename))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment