nettime/lib/archive.py

404 lines
10 KiB
Python
Raw Normal View History

import numpy as np
import pandas as pd
import email, email.parser
import os, datetime, json, gzip, re
from random import randint
def format_from(from_str):
from_addr = email.utils.parseaddr(from_str)[1]
if '{AT}' not in from_addr:
tok = from_str.split()
try:
at = tok.index('{AT}')
from_addr = ''.join(tok[at-1:at+2])
if from_addr.startswith('<') or from_addr.endswith('>'):
from_addr = from_addr.strip('<').strip('>')
except ValueError:
return None
return from_addr.lower()
def format_date(date_str):
try:
date_tz = email.utils.parsedate_tz(date_str)
time_tz = email.utils.mktime_tz(date_tz) #utc timestamp
except TypeError:
print "Format Date TypeError"
print " > " + date_str
return None
except ValueError:
print "Format Date ValueError"
print " > " + date_str
return None
dt = datetime.datetime.fromtimestamp(time_tz)
try:
pdt = pd.to_datetime(dt)
return pdt
except pd.tslib.OutOfBoundsDatetime:
print 'time out of bound'
print dt
return None
def message_to_tuple_record(msg, records, references=None):
# check date first?
date_time = format_date(msg['date'])
if not date_time:
return
# filter date?
nettime_min_date = pd.to_datetime('01/10/1995', format='%d/%m/%Y')
nettime_max_date = pd.to_datetime(datetime.datetime.now())
if date_time < nettime_min_date or date_time > nettime_max_date:
return None
# check / filter from email address second?
from_addr = format_from(msg['from'])
if not from_addr:
return
records.append((msg['message-id'],
from_addr,
msg['author_name'],
msg['subject'],
date_time,
msg['url'],
len(msg['content']),
0 if not msg.has_key('follow-up') else len(msg['follow-up']),
references))
if msg.has_key('follow-up'):
for f in msg['follow-up']:
message_to_tuple_record(f, records, references=msg['message-id'])
return
def json_data_to_pd_dataframe(json_data):
records = []
for d in json_data:
for dd in d['threads']:
message_to_tuple_record(dd, records)
df = pd.DataFrame.from_records(records,
index='date',
columns=['message-id',
'from',
'author',
'subject',
'date',
'url',
'content-length',
'nbr-references',
'references'])
df.index.name = 'date'
return df
def load_from_file(filename, archive_dir):
json_data = None
if not filename.endswith('.json.gz'):
file_path = os.path.join(archive_dir, filename + '.json.gz')
else:
file_path = os.path.join(archive_dir, filename)
if os.path.isfile(file_path):
with gzip.open(file_path, 'r') as fp:
json_data = json.load(fp)
return json_data_to_pd_dataframe(json_data['threads'])
else:
#list of all "filename[...].json.gz" in archive_dir
files = sorted([f for f in os.listdir(archive_dir) if os.path.isfile(os.path.join(archive_dir, f)) and f.startswith(filename) and f.endswith('.json.gz')])
if files:
filename = files[-1] # take the most recent (listed alpha-chronological)
file_path = os.path.join(archive_dir, filename)
if os.path.isfile(file_path):
with gzip.open(file_path, 'r') as fp:
json_data = json.load(fp)
return json_data_to_pd_dataframe(json_data['threads'])
else:
#list of all json files in archive_dir/filename
dir_path = os.path.join(archive_dir, filename)
if not os.path.isdir(dir_path):
return None
files = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f)) and f.endswith('.json')]
if not files:
return None
# load all json files
threads = []
for file_path in files:
with open(file_path, 'r') as fp:
json_data = json.load(fp)
threads.append(json_data)
return json_data_to_pd_dataframe(threads)
class Archive:
data = None # "raw" json data
dataframe = None # main pd dataframe
activity = None # (very) sparse dataframe (index=date(month), columns=from, values=activity(month))
content_length = None # (very) sparse dataframe (index=date(month), columns=from, values=content-length(month in bytes))
threads = None
def __init__(self, data="nettime-l", archive_dir="archives"):
if isinstance(data, pd.core.frame.DataFrame):
self.dataframe = data.copy()
if isinstance(data, str):
self.dataframe = load_from_file(data, archive_dir)
'''
activity
'''
def _activity(self):
if self.activity is None:
from_index = self.dataframe.reindex(columns=['from'])
self.activity = from_index.groupby([pd.TimeGrouper(freq='M'), 'from']).size().unstack('from').fillna(0)
return self.activity
def activity_from(self, email_address, resolution='M'):
eaddr = email_address.replace('@', '{at}').lower()
self._activity()
try:
if resolution.lower() == 'm':
return self.activity[eaddr]
elif resolution.lower() == 'y':
y = self.activity[eaddr].resample('AS').sum()
y.index = y.index.year
return y
else:
return None
except KeyError:
return None
def activity_overall(self, resolution='M'):
self._activity()
try:
sum_activity_month = self.activity.sum(axis=1)
if resolution.lower() == 'm':
2016-07-08 13:55:27 +02:00
sum_activity_month.rename
return sum_activity_month
elif resolution.lower() == 'y':
y = sum_activity_month.resample('AS').sum()
y.index = y.index.year
return y
else:
return None
except:
return None
def activity_from_ranking(self, resolution='M', rank=5, filter_nettime=True):
# finish this -- re resolution AND filtering
self._activity()
afr = self.activity.sum(axis=0).order(ascending=False)
if filter_nettime:
p = r'^((?!nettime*).)*$'
afr = afr[afr.index.str.contains(p)]
return afr[:rank]
def plot_activity_from_ranking(self, resolution='y', rank=5, figsize=(8, 7)):
activity_rank = self.activity_from_ranking(rank=rank).keys()
series = []
for k in activity_rank:
series.append(self.activity_from(k, resolution))
df = pd.concat(series, axis=1)
colors = np.random.rand(len(df),3)
if figsize:
df.plot(colors=colors, figsize=figsize)
else:
df.plot(colors=colors)
'''
content lenght
'''
def _content_length(self):
if self.content_length is None:
from_content_index = self.dataframe.reindex(columns=['from', 'content-length'])
self.content_length = from_content_index.groupby([pd.TimeGrouper(freq='M'), 'from']).sum()
self.content_length = self.content_length.reset_index().pivot(columns='from', index='date', values='content-length').fillna(0)
return self.content_length
def content_length_from(self, email_address, resolution='M'):
eaddr = email_address.replace('@', '{at}').lower()
self._content_length()
try:
if resolution.lower() == 'm':
return self.content_length[eaddr]
elif resolution.lower() == 'y':
y = self.content_length[eaddr].resample('AS').sum()
y.index = y.index.year
return y
else:
return None
except KeyError:
return None
def content_length_overall(self):
self._content_length()
try:
sum_content_length_month = self.content_length.sum(axis=1)
if resolution.lower() == 'm':
return sum_content_length_month
elif resolution.lower() == 'y':
y = sum_content_length_month.resample('AS').sum()
y.index = y.index.year
return y
else:
return None
except:
return None
def content_length_from_ranking(self, resolution='M', rank=5, filter_nettime=True):
# finish this -- re resolution
self._content_length()
cfr = self.content_length.sum(axis=0).order(ascending=False)
if filter_nettime:
p = r'^((?!nettime*).)*$'
cfr = cfr[cfr.index.str.contains(p)]
return cfr[:rank]
def plot_content_length_from_ranking(self, resolution='y', rank=5, figsize=(8, 7)):
content_rank = self.content_length_from_ranking(rank=rank).keys()
series = []
for k in content_rank:
series.append(self.content_length_from(k, resolution))
df = pd.concat(series, axis=1)
colors = np.random.rand(len(df),3)
if figsize:
df.plot(colors=colors, figsize=figsize)
else:
df.plot(colors=colors)
'''
threads
'''
2016-07-08 13:55:27 +02:00
def _threads(self, thresh=0):
if self.threads is None:
2016-07-08 13:55:27 +02:00
self.threads = self.dataframe[self.dataframe['nbr-references'] > thresh].reindex(columns=['from','nbr-references','subject', 'url', 'message-id']).sort_values('nbr-references', ascending=False)
return self.threads;
def threads_ranking(self, rank=5):
self._threads()
return self.threads.drop('message-id', axis=1)[:rank]
2016-07-08 13:55:27 +02:00
def threads_from(self, email_address, resolution='y'):
freq = 'M'
if resolution.lower() == 'y':
freq = 'AS'
elif resolution.lower() == 'm':
freq = 'M'
else:
return None
self._threads()
eaddr = email_address.replace('@', '{at}').lower()
self._threads()
threads_from = self.threads.reindex(columns=['from', 'nbr-references'])
threads_from_ranking = threads_from.groupby([pd.TimeGrouper(freq=freq), 'from']).sum()
threads_from_ranking = threads_from_ranking.reset_index().pivot(columns='from', index='date', values='nbr-references').fillna(0)
return threads_from_ranking[eaddr]
def threads_from_ranking(self, rank=5, filter_nettime=True):
self._threads()
threads_from = self.threads.reindex(columns=['from', 'nbr-references'])
threads_from_ranking = threads_from.groupby([pd.TimeGrouper(freq='AS'), 'from']).sum()
threads_from_ranking = threads_from_ranking.reset_index().pivot(columns='from', index='date', values='nbr-references').fillna(0)
tfr = threads_from_ranking.sum(axis=0).order(ascending=False)
if filter_nettime:
p = r'^((?!nettime*).)*$'
tfr = tfr[tfr.index.str.contains(p)]
return tfr[:rank]
def plot_threads_from_ranking(self, resolution='y', rank=5, figsize=(8, 7)):
threads_rank = self.threads_from_ranking(rank=rank).keys()
series = []
for k in threads_rank:
series.append(self.threads_from(k, resolution))
df = pd.concat(series, axis=1)
colors = np.random.rand(len(df),3)
if figsize:
df.plot(colors=colors, figsize=figsize)
else:
df.plot(colors=colors)
def threads_overall(self, resolution='y', aggregate='sum', tresh=0):
freq = 'M'
if resolution.lower() == 'y':
freq = 'AS'
elif resolution.lower() == 'm':
freq = 'M'
else:
return None
agg = aggregate.lower()
if not agg in ['sum', 'mean']:
return None
if not self.threads is None:
del self.threads
self.threads = None
self._threads(tresh)
if agg == 'sum':
y = self.threads.groupby([pd.TimeGrouper(freq=freq)]).sum()
else:
y = self.threads.groupby([pd.TimeGrouper(freq=freq)]).mean()
if freq == 'AS':
y.index = y.index.year
return y