From 5e6c3bb095a021ea699d9cafc9488b386d2ad95c Mon Sep 17 00:00:00 2001 From: gauthiier Date: Thu, 7 Jul 2016 16:11:08 +0200 Subject: [PATCH] archive (loading activity, content-length) --- .gitignore | 2 +- lib/archive.py | 317 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 318 insertions(+), 1 deletion(-) create mode 100644 lib/archive.py diff --git a/.gitignore b/.gitignore index a836002..42e7600 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ __pycache__/ archives -setenv \ No newline at end of file +setenv* \ No newline at end of file diff --git a/lib/archive.py b/lib/archive.py new file mode 100644 index 0000000..bb6e272 --- /dev/null +++ b/lib/archive.py @@ -0,0 +1,317 @@ +import numpy as np +import pandas as pd +import email, email.parser +import os, datetime, json, gzip, re +from random import randint + +def format_from(from_str): + from_addr = email.utils.parseaddr(from_str)[1] + if '{AT}' not in from_addr: + tok = from_str.split() + try: + at = tok.index('{AT}') + from_addr = ''.join(tok[at-1:at+2]) + if from_addr.startswith('<') or from_addr.endswith('>'): + from_addr = from_addr.strip('<').strip('>') + except ValueError: + return None + return from_addr.lower() + +def format_date(date_str): + try: + date_tz = email.utils.parsedate_tz(date_str) + time_tz = email.utils.mktime_tz(date_tz) #utc timestamp + except TypeError: + print "Format Date TypeError" + print " > " + date_str + return None + except ValueError: + print "Format Date ValueError" + print " > " + date_str + return None + + dt = datetime.datetime.fromtimestamp(time_tz) + + try: + pdt = pd.to_datetime(dt) + return pdt + except pd.tslib.OutOfBoundsDatetime: + print 'time out of bound' + print dt + return None + +def message_to_tuple_record(msg, records, references=None): + + # check date first? + date_time = format_date(msg['date']) + if not date_time: + return + + # filter date? + nettime_min_date = pd.to_datetime('01/10/1995', format='%d/%m/%Y') + nettime_max_date = pd.to_datetime(datetime.datetime.now()) + if date_time < nettime_min_date or date_time > nettime_max_date: + return None + + # check / filter from email address second? + from_addr = format_from(msg['from']) + if not from_addr: + return + + records.append((msg['message-id'], + from_addr, + msg['author_name'], + msg['subject'], + date_time, + msg['url'], + len(msg['content']), + 0 if not msg.has_key('follow-up') else len(msg['follow-up']), + references)) + + if msg.has_key('follow-up'): + for f in msg['follow-up']: + message_to_tuple_record(f, records, references=msg['message-id']) + + return + +def json_data_to_pd_dataframe(json_data): + + records = [] + for d in json_data: + for dd in d['threads']: + message_to_tuple_record(dd, records) + + df = pd.DataFrame.from_records(records, + index='date', + columns=['message-id', + 'from', + 'author', + 'subject', + 'date', + 'url', + 'content-length', + 'nbr-references', + 'references']) + + df.index.name = 'date' + + return df + +def load_from_file(filename, archive_dir): + + json_data = None + if not filename.endswith('.json.gz'): + file_path = os.path.join(archive_dir, filename + '.json.gz') + else: + file_path = os.path.join(archive_dir, filename) + + if os.path.isfile(file_path): + with gzip.open(file_path, 'r') as fp: + json_data = json.load(fp) + return json_data_to_pd_dataframe(json_data['threads']) + else: + #list of all "filename[...].json.gz" in archive_dir + files = sorted([f for f in os.listdir(archive_dir) if os.path.isfile(os.path.join(archive_dir, f)) and f.startswith(filename) and f.endswith('.json.gz')]) + if files: + filename = files[-1] # take the most recent (listed alpha-chronological) + file_path = os.path.join(archive_dir, filename) + if os.path.isfile(file_path): + with gzip.open(file_path, 'r') as fp: + json_data = json.load(fp) + return json_data_to_pd_dataframe(json_data['threads']) + else: + #list of all json files in archive_dir/filename + dir_path = os.path.join(archive_dir, filename) + if not os.path.isdir(dir_path): + return None + + files = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f)) and f.endswith('.json')] + if not files: + return None + + # load all json files + threads = [] + for file_path in files: + with open(file_path, 'r') as fp: + json_data = json.load(fp) + threads.append(json_data) + + return json_data_to_pd_dataframe(threads) + + +class Archive: + + + data = None # "raw" json data + dataframe = None # main pd dataframe + + activity = None # (very) sparse dataframe (index=date(month), columns=from, values=activity(month)) + content_length = None # (very) sparse dataframe (index=date(month), columns=from, values=content-length(month in bytes)) + + threads = None + + def __init__(self, data="nettime-l", archive_dir="archives"): + + if isinstance(data, pd.core.frame.DataFrame): + self.dataframe = data.copy() + + if isinstance(data, str): + self.dataframe = load_from_file(data, archive_dir) + + ''' + activity + ''' + + def _activity(self): + + if self.activity is None: + from_index = self.dataframe.reindex(columns=['from']) + self.activity = from_index.groupby([pd.TimeGrouper(freq='M'), 'from']).size().unstack('from').fillna(0) + + return self.activity + + def activity_from(self, email_address, resolution='M'): + + eaddr = email_address.replace('@', '{at}').lower() + + self._activity() + try: + if resolution.lower() == 'm': + return self.activity[eaddr] + elif resolution.lower() == 'y': + y = self.activity[eaddr].resample('AS').sum() + y.index = y.index.year + return y + else: + return None + except KeyError: + return None + + def activity_overall(self, resolution='M'): + + self._activity() + try: + sum_activity_month = self.activity.sum(axis=1) + if resolution.lower() == 'm': + return sum_activity_month + elif resolution.lower() == 'y': + y = sum_activity_month.resample('AS').sum() + y.index = y.index.year + return y + else: + return None + except: + return None + + def activity_from_ranking(self, resolution='M', rank=5, filter_nettime=True): + # finish this -- re resolution AND filtering + self._activity() + afr = self.activity.sum(axis=0).order(ascending=False) + if filter_nettime: + p = r'^((?!nettime*).)*$' + afr = afr[afr.index.str.contains(p)] + return afr[:rank] + + def plot_activity_from_ranking(self, resolution='y', rank=5, figsize=(8, 7)): + + activity_rank = self.activity_from_ranking(rank=rank).keys() + series = [] + for k in activity_rank: + series.append(self.activity_from(k, resolution)) + + df = pd.concat(series, axis=1) + + colors = np.random.rand(len(df),3) + + if figsize: + df.plot(colors=colors, figsize=figsize) + else: + df.plot(colors=colors) + + ''' + content lenght + ''' + + def _content_length(self): + + if self.content_length is None: + from_content_index = self.dataframe.reindex(columns=['from', 'content-length']) + self.content_length = from_content_index.groupby([pd.TimeGrouper(freq='M'), 'from']).sum() + self.content_length = self.content_length.reset_index().pivot(columns='from', index='date', values='content-length').fillna(0) + + return self.content_length + + def content_length_from(self, email_address, resolution='M'): + + eaddr = email_address.replace('@', '{at}').lower() + + self._content_length() + try: + if resolution.lower() == 'm': + return self.content_length[eaddr] + elif resolution.lower() == 'y': + y = self.content_length[eaddr].resample('AS').sum() + y.index = y.index.year + return y + else: + return None + except KeyError: + return None + + def content_length_overall(self): + + self._content_length() + try: + sum_content_length_month = self.content_length.sum(axis=1) + if resolution.lower() == 'm': + return sum_content_length_month + elif resolution.lower() == 'y': + y = sum_content_length_month.resample('AS').sum() + y.index = y.index.year + return y + else: + return None + except: + return None + + def content_length_from_ranking(self, resolution='M', rank=5, filter_nettime=True): + # finish this -- re resolution + self._content_length() + cfr = self.content_length.sum(axis=0).order(ascending=False) + if filter_nettime: + p = r'^((?!nettime*).)*$' + cfr = cfr[cfr.index.str.contains(p)] + return cfr[:rank] + + def plot_content_length_from_ranking(self, resolution='y', rank=5, figsize=(8, 7)): + + content_rank = self.content_length_from_ranking(rank=rank).keys() + series = [] + for k in content_rank: + series.append(self.content_length_from(k, resolution)) + + df = pd.concat(series, axis=1) + + colors = np.random.rand(len(df),3) + + if figsize: + df.plot(colors=colors, figsize=figsize) + else: + df.plot(colors=colors) + + ''' + threads + ''' + + def _threads(self): + + if self.threads is None: + self.threads = self.dataframe[self.dataframe['nbr-references'] > 0].reindex(columns=['from','nbr-references','subject', 'url', 'message-id']).sort_values('nbr-references', ascending=False) + return self.threads; + + def threads_ranking(self, rank=5): + + self._threads() + return self.threads.drop('message-id', axis=1)[:rank] + +