Spaces:
Runtime error
Runtime error
| import s3fs | |
| import pandas as pd | |
| import numpy as np | |
| from numpy import arange | |
| from colour import Color | |
| import plotly.graph_objects as go | |
| from nltk import tokenize | |
| from IPython.display import Markdown | |
| from PIL import ImageColor | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| import nltk | |
| nltk.download('punkt') | |
| import email | |
| import codecs | |
| import pickle | |
| import string | |
| from scipy import spatial | |
| import re | |
| import pytorch_lightning as pl | |
| from bs4 import BeautifulSoup | |
| import ipywidgets as widgets | |
| from ipywidgets import FileUpload | |
| from urlextract import URLExtract | |
| from transformers import BertTokenizerFast as BertTokenizer, BertModel, BertConfig | |
| import torch.nn as nn | |
| import torch | |
| from ipywidgets import interact, Dropdown | |
| import boto3 | |
| from sagemaker import get_execution_role | |
| from scipy import spatial | |
| from ipyfilechooser import FileChooser | |
| import random | |
| PARAMS={ | |
| 'BATCH_SIZE': 8, | |
| 'MAX_TOKEN_COUNT':100, | |
| 'BERT_MODEL_NAME':'google/bert_uncased_L-2_H-128_A-2' , | |
| 'N_EPOCHS': 10, | |
| 'n_classes':8, | |
| 'LABEL_COLUMNS': ['label_analytical', 'label_casual', 'label_confident', 'label_friendly', | |
| 'label_joyful', 'label_optimistic', 'label_respectful', | |
| 'label_urgent'], | |
| 'TEXTCOL': 'text', | |
| 'rf_labels':['label_analytical', 'label_casual', 'label_confident', | |
| 'label_friendly', 'label_joyful', 'label_optimistic', | |
| 'label_respectful', 'label_urgent', | |
| 'industry_Academic and Education', 'industry_Energy', | |
| 'industry_Entertainment', 'industry_Finance and Banking', | |
| 'industry_Healthcare', 'industry_Hospitality', 'industry_Real Estate', | |
| 'industry_Retail', 'industry_Software and Technology', | |
| 'campaign_type_Abandoned_Cart', 'campaign_type_Engagement', | |
| 'campaign_type_Newsletter', 'campaign_type_Product_Announcement', | |
| 'campaign_type_Promotional', 'campaign_type_Review_Request', | |
| 'campaign_type_Survey', 'campaign_type_Transactional', | |
| 'campaign_type_Usage_and_Consumption', 'campaign_type_Webinar'] | |
| } | |
| CI_rates=pd.read_csv('CI_RATES.csv') | |
| ### create file uploading widget | |
| def email_upload(): | |
| print("Please upload your email (In EML Format)") | |
| upload = FileUpload(accept='.eml', multiple=True) | |
| display(upload) | |
| return upload | |
| def parse_email(uploaded_file): | |
| check=[] | |
| filename = list(uploaded_file.value.keys())[0] | |
| email_body_str = codecs.decode(uploaded_file.value[filename]['content'], encoding="utf-8") | |
| b=email.message_from_string(email_body_str) | |
| for part in b.walk(): | |
| if part.get_content_type(): | |
| body = str(part.get_payload()) | |
| soup = BeautifulSoup(body) | |
| paragraphs = soup.find_all('body') | |
| for paragraph in paragraphs: | |
| check.append(paragraph.text) | |
| file="".join(check) | |
| return file | |
| def text_clean(x,punct=True): | |
| ### Light | |
| x = x.lower() # lowercase everything | |
| x = x.encode('ascii', 'ignore').decode() # remove unicode characters | |
| x = re.sub(r'https*\S+', ' ', x) # remove links | |
| x = re.sub(r'http*\S+', ' ', x) | |
| # cleaning up text | |
| x = re.sub(r'\'\w+', ' ', x) | |
| x = re.sub(r'\w*\d+\w*', ' ', x) | |
| x = re.sub(r'\s{2,}', ' ', x) | |
| x = re.sub(r'\s[^\w\s]\s', ' ', x) | |
| ### Heavy | |
| x = re.sub(r'@\S', ' ', x) | |
| x = re.sub(r'#\S+', ' ', x) | |
| x=x.replace('=',' ') | |
| if(punct==True): | |
| x = re.sub('[%s]' % re.escape(string.punctuation), ' ', x) | |
| # remove single letters and numbers surrounded by space | |
| x = re.sub(r'\s[a-z]\s|\s[0-9]\s', ' ', x) | |
| clean=[' Â\x8a','\t','\n','Ã\x83','Â\x92','Â\x93','Â\x8a','Â\x95'] | |
| for y in clean: | |
| x=x.replace(y,'') | |
| return x | |
| ####BERT MODEL LOAD REQUIRMENTS######### | |
| class ToneTagger(pl.LightningModule): | |
| def __init__(self, n_classes: int, n_training_steps=None, n_warmup_steps=None): | |
| super().__init__() | |
| self.bert = BertModel.from_pretrained(PARAMS['BERT_MODEL_NAME'], return_dict=True) | |
| self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes) | |
| self.n_training_steps = n_training_steps | |
| self.n_warmup_steps = n_warmup_steps | |
| self.criterion = nn.BCELoss() | |
| def forward(self, input_ids, attention_mask): | |
| output = self.bert(input_ids,attention_mask) | |
| output = self.classifier(output.pooler_output) | |
| output = torch.sigmoid(output) | |
| return output | |
| # LOAD IN PRE TRAINED MODEL WITH WEIGHTS | |
| model=ToneTagger(8) # load up the model archetecture with 8 different tones | |
| model.load_state_dict(torch.load("models/SAMODEL"), strict=False) # populate the weights of the model | |
| model.eval() | |
| def bert_tones(text_sentences,model): | |
| """ This function takes in setences and the model cleaned them then predicts the bert tones""" | |
| predictions=[] | |
| text=[] | |
| tokenizer = BertTokenizer.from_pretrained('google/bert_uncased_L-2_H-128_A-2') | |
| for sent in text_sentences: | |
| text.append(text_clean(sent,False)) | |
| cleaned_text=text_clean(sent) | |
| encoding = tokenizer.encode_plus( | |
| cleaned_text, | |
| add_special_tokens=True, | |
| max_length=100, | |
| return_token_type_ids=False, | |
| padding="max_length", | |
| truncation=True, | |
| return_attention_mask=True, | |
| return_tensors='pt', | |
| ) | |
| with torch.no_grad(): | |
| inputs=encoding['input_ids'] | |
| attention=encoding['attention_mask'] | |
| pred=model(inputs,attention) | |
| pred=pred.cpu().numpy() | |
| predictions.append(np.array(pred[0])) | |
| return text,predictions | |
| def convert_text_to_tone(text,model=model,params=PARAMS): | |
| """ This Function will convert the text to tone, it takes in the text with punctuations seperates it into senteces""" | |
| data=[] | |
| # Find the sentiment from vader sentiment analyzer (Not currently in use) | |
| sid_obj = SentimentIntensityAnalyzer() | |
| total_cleaned=text_clean(text) | |
| sentiment_dict = sid_obj.polarity_scores(total_cleaned)# Find the sentiment from | |
| text_sentences=tokenize.sent_tokenize(text) #Find all the different sentences through the NLTK library | |
| plain_text,predictions=bert_tones(text_sentences,model) | |
| data.append([plain_text,sentiment_dict,predictions]) | |
| final=pd.DataFrame(data,columns=['text','sentiment','sentencetone']) | |
| # print(final) | |
| agg_tones=final['sentencetone'].apply(np.mean,axis=0) | |
| tones=pd.DataFrame(agg_tones.tolist(),columns=params['LABEL_COLUMNS']) | |
| return final,tones | |
| ### This will be abstracted away to a more dynamic model | |
| brf='Rate_Models/bounce_rate_model.sav' | |
| BRM = pickle.load(open(brf, 'rb')) | |
| orf='Rate_Models/open_rate_model.sav' | |
| ORM = pickle.load(open(orf, 'rb')) | |
| urf='Rate_Models/unsubscribe_rate_model.sav' | |
| URM = pickle.load(open(urf, 'rb')) | |
| crf='Rate_Models/click_trough_rate_model.sav' | |
| CRM = pickle.load(open(crf, 'rb')) | |
| CV='Rate_Models/Conversion_rate.sav' | |
| ConM = pickle.load(open(CV, 'rb')) | |
| CTOR='Rate_Models/Click-To-Open_Rates.sav' | |
| CTORM = pickle.load(open(CTOR, 'rb')) | |
| RV='Rate_Models/Revenue_per_email.sav' | |
| RVM = pickle.load(open(RV, 'rb')) | |
| model_dict={'Open_Rate':ORM, | |
| 'Click_Through_Rate': CRM, | |
| 'Unsubscribe_Rate': URM, | |
| 'Bounce_Rate':BRM, | |
| 'Click_To_Open_Rate': CTORM, | |
| 'Conversion_Rate': ConM, | |
| 'Revenue_Per_Email':RVM} | |
| ## Plot confidence interval | |
| def plot_CI(pred,lower,upper,scale_factor=0.5): | |
| """This function plots the confidence intervals of your prediction | |
| pred- The prediction varaible given from the Random Forest for the target variable | |
| lower- The lower half of the prediction confidence interval | |
| upper- The upper half of the confidence interval | |
| scale_factor- This will modify the size of the graph """ | |
| title=f'The Predicted Value is {pred}' | |
| fig = go.Figure() | |
| fig.update_xaxes(showgrid=False) | |
| fig.update_yaxes(showgrid=False, | |
| zeroline=True, zerolinecolor='black', zerolinewidth=3, | |
| showticklabels=False) | |
| fig.update_layout(height=200, plot_bgcolor='white') | |
| fig.add_trace(go.Scatter( | |
| x=[pred], y=[0,0], mode='markers', marker_size=10,line=dict(color="red") | |
| )) | |
| fig.update_layout(xaxis_range=[0,upper+upper*scale_factor]) | |
| fig.update_layout(showlegend=False) | |
| fig.add_vline(x=lower,annotation_text=f"{lower}",annotation_position="top") | |
| fig.add_vline(x=upper,annotation_text=f"{upper}",annotation_position="top") | |
| fig.add_vrect(lower,upper,fillcolor='red',opacity=0.25,annotation_text='95% CI',annotation_position="outside top") | |
| fig.update_layout(title_text=title, title_x=0.5) | |
| fig.show() | |
| def find_max_cat(df,target,industry,campaign): | |
| #### Select entries with the matching industry and campaign (1 == True) | |
| d=df[(df[campaign]==1) & (df[industry]==1)] | |
| if(len(d)>0): | |
| rec=df.loc[d[target].idxmax()][3:11] ## Select the tone values for the best target values | |
| return round(d[target].min(),3),round(d[target].max(),3),rec ## select the top target variable value and return with the tones | |
| else: | |
| return 0,0,0 | |
| def scale_values(val, tn): ## val = slider value, tn = current tone value | |
| val = tn*100 | |
| return val | |
| tone_labels = ['Analytical', 'Casual', 'Confident', 'Friendly', 'Joyful', 'Optimistic', 'Respectful', 'Urgent'] | |
| # ## Plot recommendations - ORIGINAL FROM V1.0 | |
| # def recommend(tones,recommend_changes,change,target): | |
| # ''' This function creates the recomended changes plots it takes it the tones, the changes and ''' | |
| # fig = go.Figure() | |
| # fig.add_trace(go.Bar( | |
| # y=tones.columns, | |
| # x=tones.values[0], | |
| # name='Current Tones', | |
| # orientation='h', | |
| # # text=np.round(tones.values[0],3), | |
| # width=.9, | |
| # marker=dict( | |
| # color='#00e6b1', | |
| # line=dict(color='rgba(58, 71, 80, 1.0)', width=3) | |
| # ) | |
| # )) | |
| # fig.add_trace(go.Bar( | |
| # y=tones.columns, | |
| # x=recommend_changes, | |
| # name='Recommend changes', | |
| # orientation='h', | |
| # text=np.round(recommend_changes,3), | |
| # width=.5, | |
| # marker=dict( | |
| # color='#e60f00', | |
| # line=dict(color='rgba(58, 71, 80, 1.0)', width=3) | |
| # ) | |
| # )) | |
| # fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False) | |
| # fig.update_layout(height=1000, plot_bgcolor='white') | |
| # fig.update_layout(barmode='stack', yaxis={'categoryorder':'array','categoryarray': recommend_changes.sort_values(key=abs,ascending=True).index}) | |
| # fig.update_layout(title_text=f'The following Changes will yield a {round(change,3)} increase in {target}') | |
| # fig.show() | |
| ## Plot recommendations - MODIFIED | |
| def recommend(tones,recommend_changes,change,target): | |
| ''' This function creates the recomended changes plots it takes it the tones, the changes and ''' | |
| fig = go.Figure() | |
| fig.add_trace(go.Bar( | |
| # y=tones.columns, | |
| y=tone_labels, | |
| x=recommend_changes, | |
| name='Recommend changes', | |
| orientation='h', | |
| text=np.round(recommend_changes,3), | |
| width=.5, | |
| marker=dict( | |
| color='#e60f00', | |
| line=dict(color='rgba(58, 71, 80, 1.0)', width=1) | |
| ) | |
| )) | |
| fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False) | |
| # fig.update_layout(height=1000, plot_bgcolor='white') | |
| # fig.update_layout(barmode='stack', yaxis={'categoryorder':'array','categoryarray': recommend_changes.sort_values(key=abs,ascending=True).index}) | |
| # fig.update_layout(title_text=f'The following Changes will yield a {round(change,3)} increase in {target}') | |
| if target == 'Revenue_Per_Email': | |
| out = f"${round(change,2)}" | |
| else: | |
| out = f"{round(change,2)*100}%" | |
| fig.update_layout(title_text=f'The following Changes will yield a {out} increase in {target}') | |
| fig.show() | |
| def prediction(tones,campaign_val,industry_val,target): | |
| model_val=pd.DataFrame(tones,columns=PARAMS['rf_labels']).fillna(0) | |
| model_val.loc[0,campaign_val]=1 | |
| model_val.loc[0,industry_val]=1 | |
| model=model_dict[target] | |
| pred=model.predict(model_val)[0] | |
| # y_pred = regr.predict(X_test) | |
| # r2_test = r2_score(y_test, y_pred) | |
| CI=CI_rates[CI_rates['model']==target] | |
| lower=pred+CI['2_5'].values[0] | |
| higher=pred+CI['97_5'].values[0] | |
| return pred,round(lower,3),round(higher,3),model | |
| ## Plot recommendations for intensity changes | |
| def intensity_changes(tones,recommend_changes,change,target): | |
| ''' This function creates a plot to show the change made to intensities and shows the resulting change in target rate ''' | |
| fig = go.Figure() | |
| fig.add_trace(go.Bar( | |
| # y=tones.columns, | |
| y=tone_labels, | |
| x=recommend_changes, | |
| name='Recommend changes', | |
| orientation='h', | |
| text=np.round(recommend_changes,3), | |
| width=.5, | |
| marker=dict( | |
| color='#00e6b1', | |
| line=dict(color='rgba(58, 71, 80, 1.0)', width=1) | |
| ) | |
| )) | |
| fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False) | |
| if change < 0: | |
| if target == 'Revenue_Per_Email': | |
| out = f"${round(change*(-1),2)}" | |
| else: | |
| out = f"{round(change*(-1),2)}%" | |
| fig.update_layout(title_text=f'The following Changes will decrease the {target} by {out}') | |
| elif change >= 0: | |
| if target == 'Revenue_Per_Email': | |
| out = f"${round(change,2)}" | |
| else: | |
| out = f"{round(change,2)*100}%" | |
| fig.update_layout(title_text=f'The following Changes will increase the {target} by {out}') | |
| # fig.update_layout(title_text=f'The changes made to the tone intensities') | |
| fig.show() | |
| def load_data(): | |
| data_location='Tone_and_target.csv' | |
| df=pd.read_csv(data_location) | |
| df_unique = df.drop_duplicates() | |
| df_unique = pd.get_dummies(df_unique, columns=['industry','campaign_type']) | |
| df_data=df_unique.drop(columns=['Unnamed: 0','body']) | |
| df_data=df_data.rename(columns={'Click-To-Open Rates':'Click_To_Open_Rate','Conversion Rate':'Conversion_Rate','Revenue Per email':'Revenue_Per_Email'}) | |
| return df_data | |
| def plot_table(sorted_setence_tuple): | |
| """ Plots the bottom most table, takes in a list of tuples where the tuple is the sentence the sentiment distance | |
| from the best values """ | |
| sentences=list(zip(*sorted_setence_tuple))[0] | |
| scores= list(zip(*sorted_setence_tuple))[1] | |
| colors= list(zip(*sorted_setence_tuple))[2] | |
| rbg_list=[] | |
| for i in colors: | |
| rbg_list.append('rgb'+str(i)) | |
| fig = go.Figure(data=[go.Table( | |
| header=dict(values=['<b>Sentences</b>', '<b>Difference from Recommended Tone</b>'], | |
| line_color = 'darkslategray', | |
| fill_color = '#010405', | |
| align = 'center', | |
| font=dict(family="Metropolis",color='white', size=16)), | |
| cells=dict(values=[sentences, # 1st column | |
| scores] , # 2nd column | |
| line_color='darkslategray', | |
| fill_color=[rbg_list], | |
| align=['left','center'], | |
| font=dict(family="Arial",size=12))) | |
| ]) | |
| #fig.show() | |
| def corrections(best,df): | |
| """This function finds the the difference between the tone of each sentence and the best tone for the desired metric | |
| best- tone values of the best email for the current categories | |
| df- dataframe of the sentences of the uploaded email and the """ | |
| sentence_order=[] | |
| colors=['#48f0c9','#6ef5d6','#94f7e1','#bbfaec','#e6fff9','#ffe7e6','#ffc3bf','#ffa099','#ff7c73','#ff584d'] #loxz green primary to Loxz light red | |
| for i in range(len(df['sentencetone'][0])): | |
| text=df['text'][0][i] | |
| cur=df['sentencetone'][0][i] | |
| cosine_distance= spatial.distance.cosine(best,cur) | |
| distance=cosine_distance # Cosine distance | |
| new_value = round(( (distance - 0) / (1 - 0) ) * (100 - 0) + 0) # for distance metric this is just normalizing the varaible | |
| color_value=round(( (distance - 0) / (1 - 0) ) * (10 - 0) + 0) # Color whell value | |
| col=colors[(color_value)] | |
| rbg=ImageColor.getcolor(f'{col}', "RGB") | |
| sentence_order.append((text,new_value,rbg)) | |
| sorted_sentences=sorted(sentence_order,key=lambda x: x[1],reverse=True) | |
| plot_table(sorted_sentences) | |
| def read_file(fc): | |
| with open(fc.selected) as file: # Use file to refer to the file object | |
| data = file.read() | |
| check=[] | |
| b=email.message_from_string(data) | |
| for part in b.walk(): | |
| if part.get_content_type(): | |
| body = str(part.get_payload()) | |
| soup = BeautifulSoup(body) | |
| paragraphs = soup.find_all('body') | |
| for paragraph in paragraphs: | |
| check.append(paragraph.text) | |
| file="".join(check) | |
| return file |