Spaces:
Runtime error
Runtime error
| import os | |
| import sys | |
| import re | |
| import numpy as np | |
| import cv2 | |
| import json | |
| import yaml | |
| import vis_utils as v_uts | |
| import struct | |
| from cv_base import ( | |
| Faces, Aux, Obj, DEFAULT_MATERIAL | |
| ) | |
| hasTorch = True | |
| try: | |
| import torch | |
| except: | |
| hasTorch = False | |
| import functools | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from PIL import Image | |
| try: | |
| from plyfile import PlyData | |
| except: | |
| "no ply" | |
| import pdb | |
| b=pdb.set_trace | |
| def default(x, val): | |
| return val if x is None else x | |
| class IOShop: | |
| def __init__(self, name, **kwargs): | |
| ioFuncs = {'depth': DepthIO, | |
| 'image': ImageIO, | |
| 'flow': FlowIO, | |
| 'segment': SegmentIO, | |
| 'prob': ProbIO, | |
| 'video': VideoIO} | |
| self.io = ioFuncs[name](**kwargs) | |
| def load(self, file_name, **kwargs): | |
| return self.io.load(file_name, **kwargs) | |
| def dump(self, file_name, file, **kwargs): | |
| self.io.dump(file_name, file, **kwargs) | |
| class BaseIO: | |
| def __init__(self, appex='jpg'): | |
| self.type = 'image' | |
| self.appex = appex | |
| def load(self, file_name): | |
| file_name = '%s.%s' % (file_name, self.appex) | |
| image = cv2.imread(file_name, cv2.IMREAD_UNCHANGED) | |
| assert not (image is None), '%s not exists' % file_name | |
| return image | |
| def dump(self, file_name, file): | |
| v_uts.mkdir_if_need(os.path.dirname(file_name)) | |
| file_name = '%s.%s' % (file_name, self.appex) | |
| cv2.imwrite(file_name, file) | |
| class ImageIO(BaseIO): | |
| def __init__(self, appex='jpg'): | |
| super(ImageIO, self).__init__(appex=appex) | |
| self.type = 'image' | |
| def load(self, file_name): | |
| if file_name.endswith('heic') or file_name.endswith('HEIC'): | |
| byte = read2byte(file_name) | |
| image = decodeImage(byte) | |
| else: | |
| image = super(ImageIO, self).load(file_name) | |
| return image | |
| def imwrite(file_name, data, order='rgb'): | |
| cv2.imwrite(file_name, data[:, :, ::-1]) | |
| class SegmentIO(BaseIO): | |
| def __init__(self): | |
| super(SegmentIO, self).__init__(appex='png') | |
| self.type = 'segment' | |
| class ProbIO(BaseIO): | |
| def __init__(self): | |
| super(ProbIO, self).__init__() | |
| self.type = 'prob' | |
| self.max_class = 4 | |
| def load(self, file_name, channels=None): | |
| image = cv2.imread(file_name, cv2.IMREAD_UNCHANGED) | |
| channels = default(channels, self.max_class) | |
| output = np.zeros(image.shape[:2]) | |
| # for i in range(channels): | |
| def dump(self, file_name, file): | |
| """ | |
| height, width, channel | |
| """ | |
| output = np.zeros((height, width), dtype=np.uint16) | |
| h, w, c = file.shape | |
| for i in range(c): | |
| output = output + np.uint16(file[:, :, i] * 255) + i * 256 | |
| cv2.imwrite(file_name, output.astype('uint16')) | |
| class MeshIO(BaseIO): | |
| def __init__(self): | |
| super().__init__(appex='obj') | |
| self.type = 'mesh' | |
| def dump_obj(self, filename, obj): | |
| export_obj(filename, obj) | |
| def load_obj(self, filename): | |
| return load_obj(filename) | |
| def normalize_normal(mat): | |
| mat = (mat / 255.0 * 2.0 - 1.0).astype('float32') | |
| l1 = np.linalg.norm(mat, axis=2) | |
| for j in range(3): | |
| mat[:,:,j] /= (l1 + 1e-9) | |
| return mat | |
| class NormalIO(BaseIO): | |
| def __init__(self, xyz='rgb'): | |
| """ | |
| rgb: means the normal saved in the order of x: r ... | |
| """ | |
| self._xyz = xyz | |
| def read(self, filename): | |
| normal = cv2.imread(filename, cv2.IMREAD_UNCHANGED) | |
| if self._xyz == 'rgb': | |
| normal = normal[:, :, ::-1] | |
| normal = normalize_normal(normal) | |
| return normal | |
| class DepthIO(BaseIO): | |
| def __init__(self, bit=8): | |
| super(DepthIO, self).__init__(appex='pfm') | |
| assert bit in [8, 16] | |
| scale = {8: 1, 16: 2} | |
| self.bits = scale[bit] | |
| self.dump_vis = True | |
| def load(self, path): | |
| """Read pfm file. | |
| Args: | |
| path (str): path to file | |
| Returns: | |
| tuple: (data, scale) | |
| """ | |
| path = '%s.%s' % (path, self.appex) | |
| with open(path, "rb") as file: | |
| color = None | |
| width = None | |
| height = None | |
| scale = None | |
| endian = None | |
| header = file.readline().rstrip() | |
| if header.decode("ascii") == "PF": | |
| color = True | |
| elif header.decode("ascii") == "Pf": | |
| color = False | |
| else: | |
| raise Exception("Not a PFM file: " + path) | |
| dim_match = re.match(r"^(\d+)\s(\d+)\s$", file.readline().decode("ascii")) | |
| if dim_match: | |
| width, height = list(map(int, dim_match.groups())) | |
| else: | |
| raise Exception("Malformed PFM header.") | |
| scale = float(file.readline().decode("ascii").rstrip()) | |
| if scale < 0: | |
| # little-endian | |
| endian = "<" | |
| scale = -scale | |
| else: | |
| # big-endian | |
| endian = ">" | |
| data = np.fromfile(file, endian + "f") | |
| shape = (height, width, 3) if color else (height, width) | |
| data = np.reshape(data, shape) | |
| data = np.flipud(data) | |
| return data, scale | |
| def dump(self, path, image, scale=1): | |
| """Write pfm file. | |
| Args: | |
| path (str): pathto file | |
| image (array): data | |
| scale (int, optional): Scale. Defaults to 1. | |
| """ | |
| v_uts.mkdir_if_need(os.path.dirname(path)) | |
| path = path + '.pfm' | |
| with open(path, "wb") as file: | |
| color = None | |
| if image.dtype.name != "float32": | |
| raise Exception("Image dtype must be float32.") | |
| image = np.flipud(image) | |
| if len(image.shape) == 3 and image.shape[2] == 3: # color image | |
| color = True | |
| elif ( | |
| len(image.shape) == 2 or len(image.shape) == 3 and image.shape[2] == 1 | |
| ): # greyscale | |
| color = False | |
| else: | |
| raise Exception("Image must have H x W x 3, H x W x 1 or H x W dimensions.") | |
| file.write("PF\n" if color else "Pf\n".encode()) | |
| file.write("%d %d\n".encode() % (image.shape[1], image.shape[0])) | |
| endian = image.dtype.byteorder | |
| if endian == "<" or endian == "=" and sys.byteorder == "little": | |
| scale = -scale | |
| file.write("%f\n".encode() % scale) | |
| image.tofile(file) | |
| if self.dump_vis: | |
| self.dump_visualize(path[:-4], image, self.bits) | |
| def to8UC3(depth, scale=1000): | |
| """ | |
| Convert depth image to 8UC3 format. | |
| """ | |
| h, w = depth.shape | |
| max_depth = (256.0 ** 3 - 1) / scale | |
| # Clip depth values exceeding the maximum depth | |
| depth = np.clip(depth, 0, max_depth) | |
| # Scale the depth values | |
| value = depth * scale | |
| # Split the depth values into three channels | |
| ch = np.zeros((h, w, 3), dtype=np.uint8) | |
| ch[:, :, 0] = np.uint8(value / (256 ** 2)) | |
| ch[:, :, 1] = np.uint8((value % (256 ** 2)) / 256) | |
| ch[:, :, 2] = np.uint8(value % 256) | |
| return ch | |
| def read8UC3(depth, scale=1000): | |
| """ | |
| Convert 8UC3 image to scaled depth representation. | |
| """ | |
| if isinstance(depth, str): | |
| depth = cv2.imread(depth, cv2.IMREAD_UNCHANGED) | |
| # Merge the three channels into a single depth value | |
| depth_uint16 = depth[:, :, 0] * (256 ** 2) + \ | |
| depth[:, :, 1] * 256 + depth[:, :, 2] | |
| # Convert depth to the scaled representation | |
| depth = depth_uint16.astype(np.float32) / scale | |
| return depth | |
| def dump_visualize(path, depth, bits=1): | |
| depth_min = depth.min() | |
| depth_max = depth.max() | |
| max_val = (2**(8*bits))-1 | |
| if depth_max - depth_min > np.finfo("float").eps: | |
| out = max_val * (depth - depth_min) / (depth_max - depth_min) | |
| else: | |
| out = 0 | |
| if bits == 1: | |
| cv2.imwrite(path + ".png", out.astype("uint8")) | |
| elif bits == 2: | |
| cv2.imwrite(path + ".png", out.astype("uint16")) | |
| return | |
| def load_png(path): | |
| depth = cv2.imread(path, cv2.IMREAD_UNCHANGED) | |
| return depth | |
| def dump_png(path, depth, bits=2, max_depth=20.0): | |
| assert (path.endswith(".png")) | |
| max_val = (2**(8*bits))-1 | |
| depth = depth / max_depth * max_val | |
| cv2.imwrite(path, depth.astype("uint16")) | |
| def read_depth(filename, scale=6000, sz=None, is_disparity=False): | |
| if not hasTorch: | |
| return None | |
| depth = cv2.imread(filename, cv2.IMREAD_UNCHANGED) | |
| depth = np.float32(depth) / scale | |
| if sz: | |
| h, w = sz | |
| depth = cv2.resize(depth, (w, h), | |
| interpolation=cv2.INTER_NEAREST) | |
| depth = torch.from_numpy(depth) | |
| if is_disparity: # convert to depth | |
| depth = 1.0 / torch.clamp(depth, min=1e-10) | |
| return depth | |
| def write_depth(path, depth, grayscale, bits=1): | |
| """Write depth map to png file. | |
| Args: | |
| path (str): filepath without extension | |
| depth (array): depth | |
| grayscale (bool): use a grayscale colormap? | |
| """ | |
| if not grayscale: | |
| bits = 1 | |
| if not np.isfinite(depth).all(): | |
| depth=np.nan_to_num(depth, nan=0.0, posinf=0.0, neginf=0.0) | |
| print("WARNING: Non-finite depth values present") | |
| depth_min = depth.min() | |
| depth_max = depth.max() | |
| max_val = (2**(8*bits))-1 | |
| if depth_max - depth_min > np.finfo("float").eps: | |
| out = max_val * (depth - depth_min) / (depth_max - depth_min) | |
| else: | |
| out = np.zeros(depth.shape, dtype=depth.dtype) | |
| if not grayscale: | |
| out = cv2.applyColorMap(np.uint8(out), cv2.COLORMAP_INFERNO) | |
| if bits == 1: | |
| cv2.imwrite(path + ".png", out.astype("uint8")) | |
| elif bits == 2: | |
| cv2.imwrite(path + ".png", out.astype("uint16")) | |
| return | |
| class NormalIO(BaseIO): | |
| def __init__(self): | |
| super(NormalIO, self).__init__(appex='npy') | |
| self.dump_vis = False | |
| def read_normal(filename, sz=None, to_torch=False): | |
| if not hasTorch: | |
| return None | |
| if not os.path.exists(filename): | |
| h, w = sz | |
| return torch.ones((h, w, 3)) * 0.3 | |
| image = cv2.imread(filename)[:, :, ::-1] | |
| image = np.float32(image) | |
| image = (image / 127.5 - 1) | |
| if sz: | |
| h, w = sz | |
| image = cv2.resize(image, (w, h), | |
| interpolation=cv2.INTER_NEAREST) | |
| return torch.from_numpy(image) | |
| def to8UC3(self, normal): | |
| return np.uint8((normal + 1) * 127.5) | |
| class FlowIO(BaseIO): | |
| def __init__(self): | |
| super(FlowIO, self).__init__(appex='npy') | |
| self.dump_vis = False | |
| def normalize(self, flow, shape=None): | |
| if shape is None: | |
| shape = flow.shape[:2] | |
| flow[:, :, 0] /= shape[1] | |
| flow[:, :, 1] /= shape[0] | |
| return flow | |
| def denormalize(self, flow, shape=None): | |
| if shape is None: | |
| shape = flow.shape[:2] | |
| flow[:, :, 0] *= shape[1] | |
| flow[:, :, 1] *= shape[0] | |
| return flow | |
| def visualization(self, flow): | |
| pass | |
| def load(self, path, shape=None): | |
| path = path + '.npy' | |
| flow = np.load(path) | |
| flow = self.denormalize(flow, shape) | |
| assert flow is not None | |
| return flow | |
| def dump(self, path, flow): | |
| v_uts.mkdir_if_need(os.path.dirname(path)) | |
| path = path + '.npy' | |
| flow = self.normalize(flow) | |
| np.save(path, flow) | |
| if self.dump_vis: | |
| self.dump_visualize(path[:-4], flow) | |
| def dump_visualize(self, path, flow): | |
| _, flow_c = v_uts.flow2color(flow) | |
| cv2.imwrite(path + '.png', flow_c) | |
| class VideoIO(BaseIO): | |
| def __init__(self, longside_len=None): | |
| super(VideoIO, self).__init__() | |
| self.longside_len = longside_len | |
| def get_fps(self, path): | |
| vidcap = cv2.VideoCapture(path) | |
| return vidcap.get(cv2.CAP_PROP_FPS) | |
| def load_first_frame(self, path): | |
| import skvideo.io as vio | |
| video = vio.vreader(path) | |
| frame = next(video) | |
| if self.longside_len is not None: | |
| frame = v_uts.resize2maxsize(frame, self.longside_len) | |
| return frame | |
| def load(self, path, sample_rate=1, max_len=1e10, | |
| load_to_dir=False, | |
| dir_name=None, | |
| pre_len=5, | |
| save_transform=None): | |
| import skvideo.io as vio | |
| def default_transform(x): | |
| if x.ndim == 2: | |
| return x | |
| if x.ndim == 3 and x.shape[2] == 3: | |
| return x[:, :, ::-1] | |
| return x | |
| frames = [] | |
| reader = vio.vreader(path) | |
| if load_to_dir: | |
| v_uts.mkdir(dir_name) | |
| if save_transform is None: | |
| save_transform = lambda x : x | |
| for count, frame in enumerate(reader): | |
| if count == max_len: | |
| break | |
| if count % sample_rate == 0: | |
| if self.longside_len is not None: | |
| frame = v_uts.resize2maxsize( | |
| frame, self.longside_len) | |
| if load_to_dir: | |
| img_file = f"{dir_name}/{count:05}.png" | |
| frame = save_transform(frame) | |
| cv2.imwrite(img_file, frame) | |
| else: | |
| frames.append(frame) | |
| if not load_to_dir: | |
| return frames | |
| def load_till_end(self, path, sample_rate=1): | |
| import skvideo.io as vio | |
| frames = [] | |
| reader = vio.vreader(path) | |
| count = 0 | |
| while True: | |
| try: | |
| frame = next(reader) | |
| except: | |
| break | |
| if count % sample_rate == 0: | |
| if self.longside_len is not None: | |
| frame = v_uts.resize2maxsize( | |
| frame, self.longside_len) | |
| frames.append(frame) | |
| count += 1 | |
| return frames | |
| def load_w_cv(self, path, out_dir, sample_rate = 1, ext="jpg"): | |
| v_uts.video_to_frame(path, | |
| out_dir, | |
| max_len=self.longside_len, | |
| sample_rate=sample_rate, | |
| ext=ext) | |
| def dump_to_images(self, frames, image_path): | |
| v_uts.mkdir_if_need(image_path) | |
| for count, frame in tqdm(enumerate(frames)): | |
| image_file = '%s/%04d.jpg' % (image_path, count) | |
| cv2.imwrite(image_file, frame[:, :, ::-1]) | |
| def dump(self, path, frames, fps=30, lossless=False): | |
| from moviepy.editor import ImageSequenceClip, VideoFileClip | |
| if isinstance(frames[0], str): | |
| frame_np = [] | |
| for frame in tqdm(frames): | |
| cur_frame = cv2.imread(frame, cv2.IMREAD_UNCHANGED)[:, :, ::-1] | |
| frame_np.append(cur_frame) | |
| frames = frame_np | |
| clip = ImageSequenceClip(frames, fps) | |
| if lossless: | |
| assert path.endswith('avi') | |
| clip.write_videofile(path, codec='png') | |
| else: | |
| clip.write_videofile(path, fps=fps) | |
| def dump_skv(self, path, frames, fps=30): | |
| if frames[0].ndim == 2: | |
| frames = [cv2.cvtColor(frame,cv2.COLOR_GRAY2RGB) for frame in frames] | |
| else: | |
| frames = [frame[:, :, ::-1] for frame in frames] | |
| v_uts.frame_to_video_simple(frames, fps, video_name=path) | |
| # import skvideo.io as vio | |
| # fps = str(int(fps)) | |
| # vid_out = vio.FFmpegWriter(path, | |
| # inputdict={'-r': fps}, | |
| # outputdict={ | |
| # '-vcodec': 'libx264', | |
| # '-pix_fmt': 'yuv420p', | |
| # '-r': fps, | |
| # }, | |
| # verbosity=1) | |
| # for idx, frame in enumerate(frames): | |
| # vid_out.writeFrame(frame) | |
| # vid_out.close() | |
| def resave_video(self, video_file, start, end, | |
| outvideo_file): | |
| """ | |
| :param start: sec start | |
| :param end: sec end | |
| :return: | |
| """ | |
| fps = self.get_fps(video_file) | |
| frames = self.load(video_file) | |
| start_frame = int(start * fps) | |
| end_frame = int(end * fps) | |
| frames = frames[start_frame:end_frame] | |
| self.dump_skv(outvideo_file, frames, fps) | |
| def frame2video(self, folder, output, ext=".jpg"): | |
| image_files = v_uts.list_all_files(folder, exts=[ext]) | |
| frames = [] | |
| for name in tqdm(image_files): | |
| frames.append(cv2.imread(name)[:, :, ::-1]) | |
| self.dump(output, frames) | |
| class NpEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| if isinstance(obj, np.floating): | |
| return float(obj) | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| return super(NpEncoder, self).default(obj) | |
| def read2byte(filename): | |
| with open(filename, 'rb') as f: | |
| file_data = f.read() | |
| return file_data | |
| def decodeImage(bytesIo): | |
| import whatimage | |
| import pyheif | |
| from PIL import Image | |
| fmt = whatimage.identify_image(bytesIo) | |
| if fmt in ['heic', 'avif']: | |
| i = pyheif.read_heif(bytesIo) | |
| # Convert to other file format like jpeg | |
| pi = Image.frombytes( | |
| mode=i.mode, size=i.size, data=i.data) | |
| image = np.asarray(pi) | |
| image = image[:, :, ::-1] # to BGR | |
| return image | |
| else: | |
| return None | |
| def image2Normal(imagePath): | |
| from skimage import io | |
| normal = io.imread(imagePath) | |
| normal = ((np.float32(normal) / 255.0) * 2 - 1.0 ) | |
| return normal | |
| def normal2Image(normal): | |
| nm_pred_val = (normal + 1.) / 2. | |
| nm_pred_val = np.uint8(nm_pred_val*255.) | |
| return nm_pred_val | |
| def dump_normal(filename, normal): | |
| normal = normal2Image(normal) | |
| cv2.imwrite(filename + '.png', array) | |
| def dump_prob2image(filename, array): | |
| """ | |
| dump probility map to image when | |
| array: [x, height, width] (x = 1, 3, 4) | |
| """ | |
| class_num = array.shape[0] | |
| # assert class_num <= 4 | |
| if class_num >= 4 : | |
| print('warning: only save the first 3 channels') | |
| array = array[:3, :, :] | |
| if class_num == 2: | |
| raise ValueError('not implement') | |
| array = np.transpose(np.uint8(array * 255), (1, 2, 0)) | |
| if filename.endswith('.png'): | |
| cv2.imwrite(filename, array) | |
| return | |
| cv2.imwrite(filename + '.png', array) | |
| assert os.path.exists(filename) | |
| def load_image2prob(filename): | |
| if not filename.endswith('.png'): | |
| filename = filename + '.png' | |
| array = cv2.imread(filename, cv2.IMREAD_UNCHANGED) | |
| array = np.transpose(array, (2, 0, 1)) / 255 | |
| return array | |
| def shape_match(images): | |
| assert len(images) > 1 | |
| shape = images[0].shape[:2] | |
| for image in images[1:]: | |
| cur_shape = image.shape[:2] | |
| if np.sum(np.abs(np.array(shape) - \ | |
| np.array(cur_shape))): | |
| return False | |
| return True | |
| def append_apex(filename, appex): | |
| filename = filename.split('.') | |
| prefix = '.'.join(filename[:-1]) | |
| filetype = filename[-1] | |
| return '%s_%s.%s' % (prefix, appex, filetype) | |
| def load_json(json_file): | |
| with open(json_file) as f: | |
| res = json.load(f) | |
| return res | |
| def dump_numpy(filename, x: np.ndarray): | |
| np.savetxt(filename, x, delimiter=' ', fmt='%1.6f') | |
| def dump_json(filename, odgt, w_np=False): | |
| with open(filename, 'w') as f: | |
| if not w_np: | |
| json.dump(odgt, f, indent=4) | |
| else: | |
| json.dump(odgt, f, indent=4, cls=NpEncoder) | |
| def dump_jsonl(filename, odgt): | |
| with open(filename, 'w') as file: | |
| for entry in odgt: | |
| json.dump(entry, file) | |
| file.write('\n') | |
| def dump_pair_data(image_list, | |
| label_list, | |
| outfile, | |
| root='', | |
| data_type='txt', | |
| fields=None): | |
| if fields is None: | |
| fields = ["image", "segment"] | |
| if data_type == 'txt': | |
| fp = open(outfile, 'w') | |
| for imagefile, labelfile in zip(image_list, label_list): | |
| imagefile = imagefile.replace(root, '.') | |
| labelfile = labelfile.replace(root, '.') | |
| fp.write('%s %s\n' % (imagefile, labelfile)) | |
| fp.close() | |
| elif data_type == "odgt": | |
| odgt = [] | |
| for imagefile, labelfile in zip(image_list, label_list): | |
| imagefile = imagefile.replace(root, '.') | |
| labelfile = labelfile.replace(root, '.') | |
| item = {fields[0]: imagefile, | |
| fields[1]: labelfile} | |
| odgt.append(item) | |
| dump_json(outfile, odgt) | |
| def save_xlsx(filename, dicts, sheets=None): | |
| """ | |
| Save a list of dicts to an xlsx file. | |
| """ | |
| with pd.ExcelWriter(filename, mode='w') as writer: | |
| if sheets is None: | |
| df1 = pd.DataFrame(dicts) | |
| df1.to_excel(writer, index=False) | |
| return | |
| for sheet in sheets: | |
| df1 = pd.DataFrame(dicts[sheet]) | |
| df1.to_excel(writer, sheet_name=sheet, index=False) | |
| def load_xlsx(filename, sheets=None): | |
| assert os.path.exists(filename) , f"File not found: {filename}" | |
| if sheets is None: | |
| df = pd.read_excel(filename) | |
| dict = {} | |
| for column in df.columns: | |
| dict[column] = df[column].tolist() | |
| else: | |
| dict = {} | |
| for sheet in sheets: | |
| df = pd.read_excel(filename, sheet_name=sheet) | |
| cur_dict = {} | |
| for column in df.columns: | |
| cur_dict[column] = df[column].tolist() | |
| print(cur_dict.keys()) | |
| dict[sheet] = cur_dict | |
| print(dict.keys()) | |
| return dict | |
| def dump_lines(filename, file_list): | |
| f = open(filename, 'w') | |
| tbar = tqdm(file_list) | |
| for i, elements in enumerate(tbar): | |
| if isinstance(elements, (tuple, list)): | |
| line = ' '.join(elements) | |
| elif isinstance(elements, str): | |
| line = elements | |
| appex = '' if i == len(file_list) - 1 else '\n' | |
| f.write('%s%s' % (line, appex)) | |
| f.close() | |
| def load_lines(txt_file): | |
| lines = [line.strip() for line in open(txt_file, 'r')] | |
| return lines | |
| def load_jsonl(jsonl_file): | |
| # List to hold all JSON objects | |
| data = [] | |
| # Open the file and read line by line | |
| with open(jsonl_file, 'r') as file: | |
| for line in file: | |
| # Each line is a JSON object, parse it and append to the list | |
| json_object = json.loads(line) | |
| data.append(json_object) | |
| return data | |
| def load_yaml(yaml_file): | |
| with open(yaml_file, "r") as f: | |
| yaml_dict = yaml.safe_load(f) | |
| return yaml_dict | |
| def load_odgt(odgt): | |
| try: | |
| samples = [json.loads(x.rstrip()) \ | |
| for x in open(odgt, 'r')][0] | |
| except: | |
| samples = load_json(odgt) | |
| print(samples[0].keys()) | |
| return samples | |
| def fuse_odgt(odgt_files): | |
| """ | |
| odgt_files: | |
| """ | |
| odgt_full = [] | |
| for odgt_file in odgt_files: | |
| odgt = load_odgt(odgt_file) | |
| odgt_full = odgt_full + odgt | |
| return odgt_full | |
| def load_video_first_frame(video_name): | |
| cap = cv2.VideoCapture(video_name) | |
| if(cap.isOpened()): | |
| ret, frame = cap.read() | |
| else: | |
| raise ValueError("can not read %s" % video_name) | |
| return frame | |
| def load_lines(txt_file): | |
| lines = [line.strip() for line in open(txt_file, 'r')] | |
| return lines | |
| def load_csv(csv_file): | |
| import csv | |
| lines = [] | |
| with open(csv_file) as f: | |
| reader = csv.reader(f, delimiter=',') | |
| for row in reader: | |
| lines.append(row) | |
| return lines[1:] | |
| # cat multi files in to a single file | |
| def cat_files(files, output): | |
| all_lines = [] | |
| for filename in files: | |
| lines = load_lines(filename) | |
| all_lines = all_lines + lines | |
| dump_lines(output, all_lines) | |
| class SkipExist: | |
| def __init__(self, | |
| processor, | |
| ioType='image', | |
| need_res=False, | |
| rerun=False): | |
| self.ioType = ioType | |
| self.io = IOShop(self.ioType).io | |
| self.processor = processor | |
| self.rerun = rerun | |
| self.need_res = need_res | |
| def __call__(self, *args, **kwargs): | |
| assert 'filename' in kwargs | |
| true_file = '%s.%s' % (kwargs['filename'], self.io.appex) | |
| if os.path.exists(true_file): | |
| if self.need_res: | |
| res = self.io.load(kwargs['filename']) | |
| return res | |
| else: | |
| filename = kwargs['filename'] | |
| del kwargs['filename'] | |
| res = self.processor(*args, **kwargs) | |
| self.io.dump(filename, res) | |
| def dump_pkl(filename, data): | |
| import pickle as pkl | |
| with open(filename, "wb") as fl: | |
| pkl.dump(data, fl) | |
| def load_pkl(filename): | |
| import pickle as pkl | |
| with open(filename, 'rb') as fl: | |
| res = pkl.load(fl) | |
| return res | |
| def write_pointcloud(filename, xyz_points, faces=None, rgb_points=None): | |
| """ | |
| creates a .pkl file of the point clouds generated | |
| """ | |
| assert xyz_points.shape[1] == 3,'Input XYZ points should be Nx3 float array' | |
| if rgb_points is None: | |
| rgb_points = np.ones(xyz_points.shape).astype(np.uint8) * 255 | |
| else: | |
| rgb_points = rgb_points.astype(np.uint8) | |
| assert xyz_points.shape == rgb_points.shape,\ | |
| f'Input RGB colors should be Nx3 {rgb_points.shape} float array \ | |
| and have same size as input XYZ points {xyz_points.shape}' | |
| # Write header of .ply file | |
| fid = open(filename,'wb') | |
| fid.write(bytes('ply\n', 'utf-8')) | |
| fid.write(bytes('format binary_little_endian 1.0\n', 'utf-8')) | |
| fid.write(bytes('element vertex %d\n'%xyz_points.shape[0], 'utf-8')) | |
| fid.write(bytes('property float x\n', 'utf-8')) | |
| fid.write(bytes('property float y\n', 'utf-8')) | |
| fid.write(bytes('property float z\n', 'utf-8')) | |
| fid.write(bytes('property uchar red\n', 'utf-8')) | |
| fid.write(bytes('property uchar green\n', 'utf-8')) | |
| fid.write(bytes('property uchar blue\n', 'utf-8')) | |
| fid.write(bytes('end_header\n', 'utf-8')) | |
| # Write 3D points to .ply file | |
| for i in range(xyz_points.shape[0]): | |
| fid.write(bytearray(struct.pack("fffccc",xyz_points[i,0],xyz_points[i,1],xyz_points[i,2], | |
| rgb_points[i,0].tostring(),rgb_points[i,1].tostring(), | |
| rgb_points[i,2].tostring()))) | |
| if faces is not None: | |
| for face in faces: | |
| fid.write(struct.pack("<B", face[0])) | |
| fid.write(struct.pack("<{}i".format(face[0]), *face[1])) | |
| fid.close() | |
| def read_ply(filename): | |
| # Load the PLY file | |
| ply_data = PlyData.read(filename) | |
| # Access the vertex data | |
| vertex_data = ply_data['vertex'] | |
| # Extract x, y, z coordinates as a numpy array | |
| points = np.vstack((vertex_data['x'], vertex_data['y'], vertex_data['z'])).T | |
| return points | |
| def load_obj(file_path): | |
| verts = [] | |
| normals = [] | |
| uvs = [] | |
| material_colors = [] | |
| texture_images = [] | |
| texture_atlas = [] | |
| faces_verts = [] | |
| faces_normals = [] | |
| faces_textures = [] | |
| faces_materials = [] | |
| with open(file_path, 'r') as file: | |
| for line in file: | |
| if line.startswith('v '): | |
| vertex = [float(v) for v in line.split()[1:]] | |
| verts.append(vertex) | |
| elif line.startswith('vn '): | |
| normal = [float(n) for n in line.split()[1:]] | |
| normals.append(normal) | |
| elif line.startswith('vt '): | |
| uv = [float(u) for u in line.split()[1:]] | |
| uvs.append(uv) | |
| elif line.startswith("mtllib "): | |
| mtl_name = line.split()[1] | |
| elif line.startswith('vc '): | |
| color = [float(c) for c in line.split()[1:]] | |
| material_colors.append(color) | |
| elif line.startswith('usemtl '): | |
| material = line.split()[1] | |
| texture_images.append(material) | |
| elif line.startswith('f '): | |
| face_data = line.split()[1:] | |
| face_verts = [] | |
| face_normals = [] | |
| face_textures = [] | |
| for face in face_data: | |
| res = face.split('/') | |
| vert = res[0] | |
| face_verts.append(int(vert)) | |
| if len(res) == 2: | |
| texture = res[1] | |
| face_textures.append(int(texture)) | |
| if len(res) == 3: | |
| normal = res[2] | |
| face_normals.append(int(normal)) | |
| faces_verts.append(face_verts) | |
| faces_normals.append(face_normals) | |
| faces_textures.append(face_textures) | |
| faces_materials.append(len(texture_images) - 1) | |
| mtl_file = f"{os.path.dirname(file_path)}/{mtl_name}" | |
| with open(mtl_file, 'r') as file: | |
| for line in file: | |
| if line.startswith("map_Kd"): | |
| image_name = line.split()[1] | |
| break | |
| assert len(texture_images) == 1 | |
| texture_name = texture_images[0] | |
| image = cv2.imread(f"{os.path.dirname(file_path)}/{image_name}") | |
| properties = Aux( | |
| normals=np.array(normals), | |
| verts_uvs=np.array(uvs), | |
| material_colors=DEFAULT_MATERIAL, | |
| texture_images={texture_name: np.float32(image)/ 255.0}, | |
| texture_atlas=None) | |
| faces_verts=np.array(faces_verts) | |
| num_faces = faces_verts.shape[0] | |
| faces = Faces( | |
| verts_idx=faces_verts, | |
| normals_idx=np.ones(faces_verts.shape) * -1, | |
| textures_idx=np.array(faces_textures), | |
| materials_idx=np.zeros(num_faces)) | |
| obj = Obj(np.array(verts), faces, properties) | |
| return obj | |
| def export_obj(filename, obj, | |
| include_normals=False, | |
| include_textures=True): | |
| """ | |
| Export the given object to an .obj file with optional normals and textures. | |
| Args: | |
| filename (str): Path to the output .obj file (without the extension). | |
| obj (namedtuple): Object containing vertices, faces, and properties. | |
| include_normals (bool): Flag to include normals in the .obj file. | |
| include_textures (bool): Flag to include textures in the .obj file. | |
| """ | |
| material_name = list(obj.properties.texture_images.keys())[0] | |
| # Write obj file | |
| name = os.path.basename(filename) | |
| with open(filename + ".obj", "w") as f: | |
| f.write("\n") | |
| if include_textures: | |
| f.write(f"mtllib {name}.mtl\n") | |
| f.write("\n") | |
| for vert in obj.verts: | |
| x, y, z = vert | |
| f.write(f"v {x} {y} {z}\n") | |
| if include_textures: | |
| for uv in obj.properties.verts_uvs: | |
| x, y = uv | |
| f.write(f"vt {x} {y}\n") | |
| f.write(f"usemtl {material_name}\n") | |
| num_faces = obj.faces.verts_idx.shape[0] | |
| for i in range(num_faces): | |
| f0, f1, f2 = obj.faces.verts_idx[i] | |
| if include_textures: | |
| t0, t1, t2 = obj.faces.textures_idx[i] | |
| if t0 == -1: | |
| f.write(f"f {f0} {f1} {f2}\n") | |
| continue | |
| f.write(f"f {f0}/{t0} {f1}/{t1} {f2}/{t2}\n") | |
| else: | |
| f.write(f"f {f0} {f1} {f2}\n") | |
| # Write mtl file | |
| if include_textures: | |
| output_dir = os.path.dirname(filename) | |
| with open(f"{output_dir}/{name}.mtl", "w") as f: | |
| f.write(f"newmtl {material_name}\n") | |
| f.write(f"map_Kd {name}.png\n") | |
| material_colors = obj.properties.material_colors[material_name] | |
| r, g, b = material_colors["ambient_color"] | |
| f.write(f"Ka {r} {g} {b}\n") | |
| r, g, b = material_colors["diffuse_color"] | |
| f.write(f"Kd {r} {g} {b}\n") | |
| r, g, b = material_colors["specular_color"] | |
| f.write(f"Ks {r} {g} {b}\n") | |
| s = material_colors["shininess"] | |
| f.write(f"Ns {s}\n") | |
| # Save texture image | |
| image = obj.properties.texture_images[material_name] * 255 | |
| texture_img = f"{output_dir}/{name}.png" | |
| cv2.imwrite(texture_img, image) | |
| return | |
| def resave_to_video(): | |
| folder = "/Users/peng/Downloads/DenseAR/Mesh/" | |
| vname = "0037438511" | |
| image_num = 125 | |
| frames = [] | |
| d_frames = [] | |
| crop = [0, 650, 1080, 1270] | |
| for i in tqdm(range(image_num)): | |
| name = f"{folder}/{vname}/{i}.jpg" | |
| d_name = f"{folder}/{vname}/{i}.tiff" | |
| img = np.array(Image.open(name)) | |
| depth = np.array(Image.open(d_name)) | |
| if img is None: | |
| continue | |
| img = img[crop[0]:crop[2], crop[1]:crop[3]] | |
| depth = depth[crop[0]:crop[2], crop[1]:crop[3]] | |
| depth = 1.0 / np.maximum(depth, 1e-10) | |
| depth = p_uts.depth2color(depth, max_d=50) | |
| frames.append(img) | |
| d_frames.append(depth) | |
| vio = io_uts.VideoIO() | |
| video_file = f"{folder}/{vname}.mp4" | |
| d_video_file = f"{folder}/{vname}_d.mp4" | |
| vio.dump_skv(video_file, frames, fps=24) | |
| vio.dump_skv(d_video_file, d_frames, fps=24) | |
| def test_depth_8uc3_encode(): | |
| depth = np.random.rand(480, 640) * 200 | |
| dio = DepthIO() | |
| depth_encode = dio.to8UC3(depth) | |
| depth_decode = dio.read8UC3(depth_encode) | |
| print(depth, depth_decode) | |
| assert np.sum(np.abs(depth - depth_decode)) / (480 * 640) < 1e-3 | |
| ########### copy from gta code ################ | |
| def build_mesh(w, h): | |
| w = np.linspace(-1.0, 1.0, num=w, dtype=np.float32) | |
| h = np.linspace(1.0, -1.0, num=h, dtype=np.float32) | |
| return np.stack(np.meshgrid(w, h), axis=0) | |
| def build_proj_matrix(fov, aspect): | |
| proj = np.zeros((4, 4)) | |
| proj[0, 0] = 1.0 / np.tan(np.radians(fov / 2)) / aspect | |
| proj[1, 1] = 1.0 / np.tan(np.radians(fov / 2)) | |
| proj[2, 2] = 0.00001502 # reverse-engineered get from shader | |
| proj[2, 3] = 0.15000225 # reverse-engineered get from shader | |
| proj[3, 2] = -1.0 | |
| return proj | |
| def zbuffer_to_depth(zbuffer, fov): | |
| height, width = zbuffer.shape[:2] | |
| aspect = width / height | |
| mesh = build_mesh(width, height) | |
| if len(zbuffer.shape) != 3: | |
| zbuffer = np.expand_dims(zbuffer, 0) | |
| pcloud = np.concatenate((mesh, zbuffer, np.ones_like(zbuffer)), 0) | |
| pcloud = pcloud.reshape(4, height * width) | |
| proj_matrix = build_proj_matrix(fov, aspect) | |
| pcloud = np.linalg.inv(proj_matrix) @ pcloud | |
| depth = -pcloud[2] / pcloud[3] | |
| focal_cv = proj_matrix[0, 0] * width / 2.0 | |
| return depth.reshape(height, width), focal_cv | |
| def test_zbuffer_to_depth(): | |
| # root = "E:/Dataset/GTA/Stereo_0/" | |
| # name = root + "1-130423915874" | |
| name = "E:/depth_video/0036696165/1" | |
| config = load_json(name + ".json") | |
| fov = config["fov"] | |
| zbuffer = cv2.imread(name + ".tiff", cv2.IMREAD_UNCHANGED) | |
| depth, focal = zbuffer_to_depth(zbuffer, fov) | |
| print(depth) | |
| def fuse_frames_of_depth_video(): | |
| """ | |
| frames: list of images or video | |
| """ | |
| def frame_to_video(video_dir, video_name): | |
| frames = v_uts.list_all_files(video_dir, exts=['jpg']) | |
| rgb_video = f"{video_name}.mp4" | |
| depth_video = f"{video_name}_d.avi" | |
| cam_file = f"{video_name}.json" | |
| dio = DepthIO() | |
| imgs = [] | |
| depths = [] | |
| cams = [] | |
| print("seq len:", len(frames)) | |
| for i, frame in tqdm(enumerate(frames)): | |
| name = f"{video_dir}/{i}.jpg" | |
| d_name = f"{video_dir}/{i}.tiff" | |
| c_name = f"{video_dir}/{i}.json" | |
| img = np.array(Image.open(name)) | |
| depth = np.array(Image.open(d_name)) | |
| cam = load_json(c_name) | |
| depth, focal = zbuffer_to_depth(depth, cam['fov']) | |
| depth = dio.to8UC3(depth) | |
| imgs.append(img) | |
| depths.append(depth) | |
| cam['focal'] = focal | |
| cams.append(cam) | |
| # if i > 30: | |
| # break | |
| vio = VideoIO() | |
| vio.dump(rgb_video, imgs) | |
| vio.dump(depth_video, depths, lossless=True) | |
| dump_json(cam_file, cams) | |
| folder = "E:/depth_video/" | |
| output = "E:/depth_video_resave/" | |
| v_uts.mkdir_if_need(output) | |
| folder_names = v_uts.list_all_folders(folder) | |
| for folder_name in tqdm(folder_names[1:]): | |
| folder_name = folder_name.replace('\\', '/') | |
| vid_name = folder_name.split('/')[-2] | |
| print(folder_name, vid_name) | |
| output_video = f"{output}/{vid_name}" | |
| frame_to_video(folder_name, video_name=output_video) | |
| # break | |
| def save_xlsx(filename, dicts, sheets=None): | |
| with pd.ExcelWriter(filename, mode='w') as writer: | |
| if sheets is None: | |
| df1 = pd.DataFrame(dicts) | |
| df1.to_excel(writer, index=False) | |
| return | |
| for sheet in sheets: | |
| df1 = pd.DataFrame(dicts[sheet]) | |
| df1.to_excel(writer, sheet_name=sheet, index=False) | |
| def load_xlsx(filename, sheets=None): | |
| assert os.path.exists(filename) , f"File not found: {filename}" | |
| if sheets is None: | |
| df = pd.read_excel(filename) | |
| dict = {} | |
| for column in df.columns: | |
| dict[column] = df[column].tolist() | |
| else: | |
| dict = {} | |
| for sheet in sheets: | |
| df = pd.read_excel(filename, sheet_name=sheet) | |
| cur_dict = {} | |
| for column in df.columns: | |
| cur_dict[column] = df[column].tolist() | |
| print(cur_dict.keys()) | |
| dict[sheet] = cur_dict | |
| print(dict.keys()) | |
| return dict | |
| def get_sheet_list(dict, sheets=None, key="url"): | |
| images_list = [dict[key]] if sheets is None else [dict[sheet_name][key] for sheet_name in sheets] | |
| images_full = [] | |
| for images, sheet in zip(images_list, sheets): | |
| print(f"{sheet}: {len(images)}") | |
| images_full = images_full + images | |
| return images_full | |
| def test_load_save_obj(): | |
| image_name = "000000243355_zebra" | |
| obj = f"./unit_test/{image_name}.obj" | |
| obj = load_obj(obj) | |
| export_obj(f"./unit_test/{image_name}_resave", obj) | |
| if __name__ == '__main__': | |
| # test = [(1,2), (3,4)] | |
| # dump_pkl('test.pkl', test) | |
| # print(load_pkl('test.pkl')) | |
| # xyz = np.random.rand(1000, 3) | |
| # write_pointcloud("test.ply", xyz) | |
| # xyz = np.random.rand(1000, 3) | |
| # write_pointcloud("test.ply", xyz) | |
| # pass | |
| # test_depth_8uc3_encode() | |
| # test_zbuffer_to_depth() | |
| # fuse_frames_of_depth_video() | |
| test_load_save_obj() | |