Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import tensorflow as tf | |
| from tensorflow.keras.models import load_model | |
| import cv2 | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| # Creating a numpy array of shape (8, 16, 1) | |
| flow_field = np.ones((128,256), dtype = np.uint8) | |
| # Changing the left input side | |
| flow_field[:,0] = 3 | |
| # Changing the right output side | |
| flow_field[:,-1] = 4 | |
| # Changing the top layer | |
| flow_field[0,:] = 2 | |
| # Changing the bottom layer | |
| flow_field[-1,:] = 2 | |
| def nvs_loss(y_pred, rho=10, nu=0.0001): #arbitary rho and nu(Later use values of air) | |
| u,v,p = tf.split(y_pred, 3, axis=3) | |
| #First order derivative | |
| du_dx, du_dy = tf.image.image_gradients(u) # tf.image.image_gradients returns a tuple containing two tensors: u-grad along the x dir and u-grad along the y dir | |
| dv_dx, dv_dy = tf.image.image_gradients(v) | |
| dp_dx, dp_dy = tf.image.image_gradients(p) | |
| #Second order derivatives | |
| du_dx2, du_dydx = tf.image.image_gradients(du_dx) # du_dydx will be unused | |
| du_dxdy, du_dy2 = tf.image.image_gradients(du_dy) # du_dxdy will be unused | |
| dv_dx2, dv_dydx = tf.image.image_gradients(dv_dx) | |
| dv_dxdy, dv_dy2 = tf.image.image_gradients(dv_dy) | |
| #Momentum equation | |
| er1_tensor = tf.math.multiply(u, du_dx) + tf.math.multiply(v, du_dy) + 1.0*dp_dx/rho - nu*(du_dx2 + du_dy2) | |
| er2_tensor = tf.math.multiply(u, dv_dx) + tf.math.multiply(v, dv_dy) + 1.0*dp_dy/rho - nu*(dv_dx2 + dv_dy2) | |
| # # #Continuity equation | |
| er3_tensor = du_dx + dv_dy | |
| er1 = tf.reduce_mean(er1_tensor) | |
| er2 = tf.reduce_mean(er2_tensor) | |
| er3 = tf.reduce_mean(er3_tensor) | |
| return er1*er1 + er2*er2 + er3*er3 | |
| # Initiating the Loss Function- | |
| def custom_loss(y_true, y_pred): | |
| nv_loss = nvs_loss(y_pred) | |
| mse_loss = tf.reduce_mean(tf.square(y_true-y_pred)) # Try mse loss function here | |
| return mse_loss + nv_loss | |
| import torch | |
| import matplotlib | |
| def colorize(value, vmin=None, vmax=None, cmap='gray_r', invalid_val=-99, invalid_mask=None, background_color=(128, 128, 128, 255), gamma_corrected=False, value_transform=None): | |
| """Converts a depth map to a color image. | |
| Args: | |
| value (torch.Tensor, numpy.ndarry): Input depth map. Shape: (H, W) or (1, H, W) or (1, 1, H, W). All singular dimensions are squeezed | |
| vmin (float, optional): vmin-valued entries are mapped to start color of cmap. If None, value.min() is used. Defaults to None. | |
| vmax (float, optional): vmax-valued entries are mapped to end color of cmap. If None, value.max() is used. Defaults to None. | |
| cmap (str, optional): matplotlib colormap to use. Defaults to 'magma_r'. | |
| invalid_val (int, optional): Specifies value of invalid pixels that should be colored as 'background_color'. Defaults to -99. | |
| invalid_mask (numpy.ndarray, optional): Boolean mask for invalid regions. Defaults to None. | |
| background_color (tuple[int], optional): 4-tuple RGB color to give to invalid pixels. Defaults to (128, 128, 128, 255). | |
| gamma_corrected (bool, optional): Apply gamma correction to colored image. Defaults to False. | |
| value_transform (Callable, optional): Apply transform function to valid pixels before coloring. Defaults to None. | |
| Returns: | |
| numpy.ndarray, dtype - uint8: Colored depth map. Shape: (H, W, 4) | |
| """ | |
| if isinstance(value, torch.Tensor): | |
| value = value.detach().cpu().numpy() | |
| value = value.squeeze() | |
| if invalid_mask is None: | |
| invalid_mask = value == invalid_val | |
| mask = np.logical_not(invalid_mask) | |
| # normalize | |
| # vmin = np.percentile(value[mask],2) if vmin is None else vmin | |
| # vmax = np.percentile(value[mask],85) if vmax is None else vmax | |
| vmin = np.min(value[mask]) if vmin is None else vmin | |
| vmax = np.max(value[mask]) if vmax is None else vmax | |
| if vmin != vmax: | |
| value = (value - vmin) / (vmax - vmin) # vmin..vmax | |
| else: | |
| # Avoid 0-division | |
| value = value * 0. | |
| # squeeze last dim if it exists | |
| # grey out the invalid values | |
| value[invalid_mask] = np.nan | |
| cmapper = matplotlib.cm.get_cmap(cmap) | |
| if value_transform: | |
| value = value_transform(value) | |
| # value = value / value.max() | |
| value = cmapper(value, bytes=True) # (nxmx4) | |
| # img = value[:, :, :] | |
| img = value[...] | |
| img[invalid_mask] = background_color | |
| # return img.transpose((2, 0, 1)) | |
| if gamma_corrected: | |
| # gamma correction | |
| img = img / 255 | |
| img = np.power(img, 2.2) | |
| img = img * 255 | |
| img = img.astype(np.uint8) | |
| return img | |
| def img_preprocess(image, h, w): | |
| # Convert the drawn image to grayscale | |
| img_gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
| # Threshold the grayscale image to create a binary image | |
| _, binary_img = cv2.threshold(img_gray, 1, 255, cv2.THRESH_BINARY) | |
| # Perform flood fill starting from a point inside the shape. Fill the inside with pixel value 0 | |
| seed_point = (int(h/2), int(w/2)) | |
| retval, flooded_image, mask, rect = cv2.floodFill(binary_img, None, seed_point, 0) | |
| flooded_image = (flooded_image/255).astype(np.uint8) | |
| return flooded_image | |
| def patch_stiching(flooded_image, h, w, x0, y0): # ((x0, y0) = center of channel, (w1, h1) = height and width of patch) | |
| flow_field_updated = np.copy(flow_field) | |
| print('flow field updated - ', flow_field_updated[:,-1]) | |
| flow_field_updated[int(x0-w/2):int(x0+w/2),int(y0-h/2):int(y0+h/2)] = flooded_image | |
| # flow_field_updated is the main thing that we will use to make our predictions on - | |
| test_img = np.expand_dims(flow_field_updated, axis = 0) | |
| test_img = np.expand_dims(test_img, axis = 3) # Shape of test_img = (1, 128, 256) | |
| return test_img | |
| # Define grid points | |
| x_points = np.linspace(0, 255, 256) | |
| y_points = np.linspace(0, 127, 128) | |
| X, Y = np.meshgrid(x_points, y_points) | |
| def return_quiver_plot(u, v): | |
| velocity = np.sqrt(u**2 + v**2) | |
| ax = plt.subplot() | |
| ax.imshow(velocity, origin = 'lower', extent = (0,256, 0,128), cmap = 'gray') | |
| q = ax.quiver(X[5::8,5::8], Y[5::8,5::8], u[5::8,5::8], u[5::8,5::8], pivot = 'middle', color = 'red') | |
| # ax.quiverkey(q, X=0.9, Y=1.05, U=2, | |
| # label='m/s', labelpos='E') | |
| # plt.title("Velocity distribution") | |
| # plt.show() | |
| return q | |
| def squeeze_function(img): | |
| img = np.squeeze(img, axis = 0) | |
| img = np.squeeze(img, axis = 2) | |
| return img | |
| # Taking a shape from the user on sketchpad and placing it inside the fluid flow - | |
| h, w = 48, 48 # patch_size in which the obstacle will be drawn | |
| x0, y0 = 64, 128 # (x0, y0) = center of channel | |
| def fill_shape_with_pixels(img): #img is taken by gradio as uint8 | |
| if img is None: | |
| return np.zeros((h, w), dtype=np.uint8) # "No input sketch" | |
| # Calling the the flooded image function to fill inside the obstacle | |
| flooded_image = img_preprocess(img, h, w) | |
| # Performing patch statching to put the obstacle at the required center position | |
| test_img = patch_stiching(flooded_image, h, w, x0, y0) | |
| # Loading and Compiling the Model | |
| model_path = "/content/drive/MyDrive/Pinns_Loss_file.h5" | |
| model = load_model(model_path, compile = False) | |
| model.compile(loss=custom_loss, optimizer=tf.keras.optimizers.AdamW(learning_rate = 0.0001), metrics=['mae', 'cosine_proximity']) | |
| # Making Model prediction from input sketch shape | |
| prediction = model.predict(test_img) # (prediction.shape = (1, 128, 256, 3)) | |
| u_pred, v_pred, p_pred = np.split(prediction, 3, axis=3) # shape of u_pred, v_pred, p_pred = (1, 128, 256, 1) | |
| # Making test_img in shape required by zero_pixel_location | |
| req_img = squeeze_function(test_img) | |
| # Storing the location of 0 pixel values | |
| #req_img = req_img.astype(int) | |
| zero_pixel_locations = np.argwhere(req_img == 0) | |
| # Reducing the dimensions- | |
| u_profile = u_pred[0][:,:,0] # shape of u profile to compatible shape (H, W) = (128, 256) | |
| v_profile = v_pred[0][:,:,0] | |
| p_profile = p_pred[0][:,:,0] | |
| p_profile[p_profile>1.6] = 1.6 | |
| # Creating a copy of the above profiles- | |
| u_profile_dash = np.copy(u_profile) | |
| v_profile_dash = np.copy(v_profile) | |
| # Creating a copy of the above profiles- | |
| u_profile_dash_1 = np.copy(u_profile) | |
| v_profile_dash_1 = np.copy(v_profile) | |
| # Hollowing the obstacle out from the u and v plots. Origin of imae is lop left and origin of plot is top right | |
| for y, x in zero_pixel_locations: | |
| u_profile_dash[128 - y, x] = 0 | |
| v_profile_dash[128 - y, x] = 0 | |
| # will be used for image | |
| u_profile_dash_1[y, x] = 0 | |
| v_profile_dash_1[y, x] = 0 | |
| # Quiver Plot | |
| quiver_plot = plt.figure(figsize = (14,6), edgecolor = "gray") | |
| velocity = np.sqrt(u_profile_dash_1**2 + v_profile_dash_1**2) | |
| ax = plt.subplot() | |
| ax.imshow(velocity, cmap = 'gray', extent = (0,256, 0,128)) | |
| q = ax.quiver(X[5::7,5::7], Y[5::7,5::7], u_profile_dash[5::7,5::7], v_profile_dash[5::7,5::7], pivot = 'middle', color = 'red') | |
| ax.quiverkey(q, X=0.9, Y=1.07, U=2, | |
| label='m/s', labelpos='E') | |
| plt.title("Velocity distribution", fontsize = 11) | |
| plt.xlabel("Length of Channel", fontsize = 11) | |
| plt.ylabel("Height of Channel", fontsize = 11) | |
| # StreamLine Plot | |
| streamline_plot = plt.figure(figsize = (14,6), edgecolor = "gray") | |
| plt.streamplot(X, Y, u_profile_dash, v_profile_dash, density = 3.5) | |
| plt.axis('scaled') | |
| plt.title("Streamline Plot", fontsize = 11) | |
| plt.xlabel("Length of Channel", fontsize = 11) | |
| plt.ylabel("Height of Channel", fontsize = 11) | |
| # Colorize taken from ZoeDepth Model | |
| u_colored = colorize(u_profile, cmap = 'jet') | |
| #cbar_u = plt.colorbar(u_profile,fraction=0.025, pad=0.05) | |
| v_colored = colorize(v_profile, cmap = 'jet') | |
| #cbar_v = plt.colorbar(v_colored,fraction=0.025, pad=0.05) | |
| p_colored = colorize(p_profile, cmap = 'jet') | |
| #cbar_p = plt.colorbar(p_colored,fraction=0.025, pad=0.05) | |
| return colorize(req_img, cmap = 'jet'), quiver_plot, streamline_plot, u_colored, v_colored, p_colored | |
| # Importing gr.Blocks() | |
| with gr.Blocks(theme="Taithrah/Minimal") as demo: | |
| gr.Markdown( | |
| """ | |
| # Physics Constrained DNN for Predicting Mean Turbulent Flows | |
| The App solves 2-D incompressible steady state NS equations for any given 2-D closed geometry. Geometry needs to be drawn around the center of the patch.\n | |
| It predicts the streamlines,horizontal & vertical velocity profiles and the pressure profiles using a hybrid loss function.\n | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_sketch = gr.Image(label = "Draw any Obstacle contour around the patch center", | |
| tool="sketch", source="canvas", shape=(h, w), brush_radius = 3) | |
| Process_button = gr.Button("Process Flow Parameters") | |
| with gr.Column(): | |
| filled_channel = gr.Image(label = "Drawn object inside a Channel of dimensions 128*256", container = True) | |
| with gr.Row(): | |
| quiver_plot = gr.Plot(label = "Velocity Distribution Around The Obstacle", scale = 2) | |
| with gr.Row(): | |
| streamline_plot = gr.Plot(label = "Stream Lines Around The Obstacle", scale = 2) | |
| with gr.Row(): | |
| u_image = gr.Image(label = "Horizontal Velocity") | |
| v_image = gr.Image(label = "Vertical Velocity") | |
| p_image = gr.Image(label = "Pressure") | |
| Process_button.click(fn=fill_shape_with_pixels, inputs=input_sketch, outputs=[filled_channel, quiver_plot, streamline_plot, u_image, v_image, p_image]) | |
| demo.launch(debug=True, inline = False) |