Spaces:
Running
Running
| import os | |
| from tempfile import NamedTemporaryFile | |
| import streamlit as st | |
| import cv2 | |
| import numpy as np | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| import time | |
| import requests | |
| app = '' | |
| swapper = '' | |
| st.set_page_config(page_title="FaceSwap App by Adil Khan") | |
| def download_model(): | |
| url = "https://cdn.adikhanofficial.com/python/insightface/models/inswapper_128.onnx" | |
| filename = url.split('/')[-1] | |
| filepath = os.path.join(os.path.dirname(__file__),filename) | |
| if not os.path.exists(filepath): | |
| print(f"Downloading {filename}...") | |
| response = requests.get(url) | |
| with open(filepath, 'wb') as file: | |
| file.write(response.content) | |
| print(f"{filename} downloaded successfully.") | |
| else: | |
| print(f"{filename} already exists in the directory.") | |
| def swap_faces(target_image, target_face, source_face): | |
| try: | |
| return swapper.get(target_image, target_face, source_face, paste_back=True) | |
| except Exception as e: | |
| st.error(f"Error during swaping: {e}") | |
| def image_faceswap_app(): | |
| st.title("Face Swapper for Image") | |
| source_image = st.file_uploader("Upload Source Image", type=["jpg", "jpeg", "png"]) | |
| target_image = st.file_uploader("Upload Target Image", type=["jpg", "jpeg", "png"]) | |
| if source_image and target_image: | |
| with st.spinner("Swapping... Please wait."): | |
| try: | |
| source_image = cv2.imdecode(np.frombuffer(source_image.read(), np.uint8), -1) | |
| target_image = cv2.imdecode(np.frombuffer(target_image.read(), np.uint8), -1) | |
| source_image = cv2.cvtColor(source_image, cv2.COLOR_BGR2RGB) | |
| target_image = cv2.cvtColor(target_image, cv2.COLOR_BGR2RGB) | |
| source_faces = app.get(source_image) | |
| source_faces = sorted(source_faces, key=lambda x: x.bbox[0]) | |
| if len(source_faces) == 0: | |
| raise ValueError("No faces found in the source image.") | |
| source_face = source_faces[0] | |
| target_faces = app.get(target_image) | |
| target_faces = sorted(target_faces, key=lambda x: x.bbox[0]) | |
| if len(target_faces) == 0: | |
| raise ValueError("No faces found in the target image.") | |
| target_face = target_faces[0] | |
| swapped_image = swap_faces(target_image, target_face, source_face) | |
| message_placeholder = st.empty() | |
| message_placeholder.success("Swapped Successfully!") | |
| col1, col2, col3 = st.columns([1, 1, 1]) | |
| with col1: | |
| st.image(source_image, caption="Source Image", use_column_width=True) | |
| with col2: | |
| st.image(target_image, caption="Target Image", use_column_width=True) | |
| with col3: | |
| st.image(swapped_image, caption="Swapped Image", use_column_width=True) | |
| except Exception as e: | |
| st.error(f"Error during image processing: {e}") | |
| def process_video(source_img, video_path, output_video_path): | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
| source_faces = app.get(source_img) | |
| source_faces = sorted(source_faces, key=lambda x: x.bbox[0]) | |
| if len(source_faces) == 0: | |
| raise ValueError("No faces found in the source image.") | |
| source_face = source_faces[0] | |
| progress_placeholder = st.empty() | |
| frame_count = 0 | |
| start_time = time.time() | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| target_faces = app.get(frame) | |
| target_faces = sorted(target_faces, key=lambda x: x.bbox[0]) | |
| if len(target_faces) > 0: | |
| frame = swap_faces(frame, target_faces[0], source_face) | |
| out.write(frame) | |
| elapsed_time = time.time() - start_time | |
| frames_per_second = frame_count / elapsed_time if elapsed_time > 0 else 0 | |
| remaining_time_seconds = max(0, (total_frames - frame_count) / frames_per_second) if frames_per_second > 0 else 0 | |
| remaining_minutes, remaining_seconds = divmod(remaining_time_seconds, 60) | |
| elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60) | |
| progress_placeholder.text( | |
| f"Processed Frames: {frame_count}/{total_frames} | Elapsed Time: {int(elapsed_minutes)}m {int(elapsed_seconds)}s | Remaining Time: {int(remaining_minutes)}m {int(remaining_seconds)}s") | |
| frame_count += 1 | |
| cap.release() | |
| out.release() | |
| except Exception as e: | |
| st.error(f"Error during video processing: {e}") | |
| def video_faceswap_app(): | |
| st.title("Face Swapper for Video") | |
| source_image = st.file_uploader("Upload Source Face Image", type=["jpg", "jpeg", "png"]) | |
| if source_image is not None: | |
| source_image = cv2.imdecode(np.frombuffer(source_image.read(), np.uint8), -1) | |
| target_video = st.file_uploader("Upload Target Video", type=["mp4"]) | |
| if target_video is not None: | |
| temp_video = NamedTemporaryFile(delete=False, suffix=".mp4") | |
| temp_video.write(target_video.read()) | |
| output_video_path = os.path.splitext(temp_video.name)[0] + '_output.mp4' | |
| status_placeholder = st.empty() | |
| try: | |
| with st.spinner("Processing... This may take a while."): | |
| process_video(source_image, temp_video.name, output_video_path) | |
| status_placeholder.success("Processing complete!") | |
| st.subheader("Your video is ready:") | |
| st.video(output_video_path) | |
| except Exception as e: | |
| st.error(f"Error during video processing: {e}") | |
| def main(): | |
| app_selection = st.sidebar.radio("Select App", ("Image Face Swapping", "Video Face Swapping")) | |
| if app_selection == "Image Face Swapping": | |
| image_faceswap_app() | |
| elif app_selection == "Video Face Swapping": | |
| video_faceswap_app() | |
| if __name__ == "__main__": | |
| app = FaceAnalysis(name='buffalo_l') | |
| app.prepare(ctx_id=0, det_size=(640, 640)) | |
| download_model() #download model if not available | |
| swapper = insightface.model_zoo.get_model('inswapper_128.onnx', root=os.path.dirname(__file__)) | |
| main() | |