-
Millian Poquet authoredMillian Poquet authored
events.py 9.99 KiB
import datetime
import hashlib
from html import unescape
import icalendar
from io import StringIO
import itertools
import logging
import math
import pandas as pd
import re
from . import fetch
ROOM_RE = re.compile(r'^(?:FSI|F2SMH) / (.*)$')
COURSE_TYPE_RE = re.compile(r'COURS|COURS/TD|TD|TP|CONTROLE CONTINU|CONTROLE PARTIEL|EXAMEN|Controle de Substitution|CONSULTATION DE COPIES')
STUDENT_GROUP_RE = re.compile(r'K?IN[A-Z0-9]+')
class CelcatEvents:
def __init__(self, celcat_raw_response):
self.df = pd.read_json(StringIO(celcat_raw_response))
self.df['start'] = self.df['start'].astype('datetime64[ns]')
self.df['end'] = self.df['end'].astype('datetime64[ns]')
self.df = self.df[["start", "end", "allDay", "description", "eventCategory", "modules"]]
self.df['timeslot_id'] = self.df.index
class FilteredCelcatEvents:
def __init__(self, course_request, celcat_events):
self._course_request = course_request
self.crossed_df = celcat_events.df.merge(course_request.df, how='cross')
# parse descriptions
parsed_desc_df = self.crossed_df.apply(parse_description, axis=1)
self.crossed_df = pd.concat([self.crossed_df.reset_index(drop=True), parsed_desc_df], axis=1)
self.crossed_df['keep'] = self.crossed_df.apply(lambda row: FilteredCelcatEvents.timeslot_matches_course(row), axis=1)
self.df = self.crossed_df[self.crossed_df['keep'] == True]
def timeslot_matches_course(row):
if row['allDay'] == True:
return False
if (row['course_type'].lower() not in row['eventCategory'].lower()) and (row['course_type'].lower() not in row['course_type_parsed'].lower()):
return False
if (row['module_apogee'] not in row['modules']) and (row['module_apogee'].lower() not in row['description'].lower()):
return False
if row['group'].lower() not in row['groups_parsed'].lower():
return False
if row['start'] < row['begin_date']:
return False
if row['end'] > row['end_date']:
return False
return True
def check_expected_nb_timeslots(self):
check_grp = self.df.groupby(['course_request_id'])
check_grp['timeslot_id'].count()
check_df = pd.DataFrame({
'course_request_id': [x for x in range(len(check_grp))],
'fetched_timeslot_count': check_grp['timeslot_id'].count(),
}).reset_index(drop=True)
reordered_course_req_df = self._course_request.df[['course_request_id', 'module_apogee', 'module_readable', 'begin_date', 'end_date', 'course_type', 'group', 'expected_nb_slots']]
checked_df = reordered_course_req_df.merge(check_df, how='outer', on='course_request_id')
fetch_problem_df = checked_df[checked_df['expected_nb_slots'] != checked_df['fetched_timeslot_count']]
if len(fetch_problem_df) > 0:
logging.warning('The number of time slots fetched from CELCAT does not match the expected number of time slots for some courses')
logging.warning(f'\n{fetch_problem_df}')
logging.warning('Details of the involved time slots:')
problematic_courses = fetch_problem_df[['course_request_id']]
problematic_time_slots = problematic_courses.merge(self.df, how='inner', on='course_request_id')
problematic_time_slots = problematic_time_slots.sort_values(by=['course_request_id', 'start'])[['course_request_id', 'module_apogee', 'module_readable', 'start', 'end', 'course_type', 'group']]
logging.warning(f'\n{problematic_time_slots}')
def parse_description(row):
'''
Expecting an HTML text with this information, separated by HTML/CRCF line breaks:
- (The rooms where the course takes place): optional and there can be multiple rooms
- The apogee code of the course and its readable name
- A list of student groups that should attend this course
- (The teacher name): optional
- The course type
- (Random misc. info): optional
Example: 'FSI / U3-01\r\n\r\n<br />\r\n\r\nKINX7AD1 - Parallélisme [KINX7AD1]\r\n\r\n<br />\r\n\r\nKINB7TPA41<br />KINB7TPA42\r\n\r\n<br />\r\n\r\nTD\r\n'
'FSI / Amphi GRIGNARD (bat 2A)\r\n\r\n<br />\r\n\r\nKINXIB11 - Bas\r\n\r\n<br />\r\n\r\nINXIB11A\r\n\r\n<br />\r\n\r\nCOLLET CHRISTOPHE\r\n\r\n<br />\r\n\r\nCOURS\r\n\r\n<br />\r\n\r\nSem 36 à 42 partie système\nSem 43 à 50 parti Archi\r\n'
'FSI / Amphi VANDEL (U2-A4)<br />FSI / U2-115\r\n\r\n<br />\r\n\r\nKINXIB11 - Bas\r\n\r\n<br />\r\n\r\nINXIB11A\r\n\r\n<br />\r\n\r\nCOLLET CHRISTOPHE\r\n\r\n<br />\r\n\r\nCONTROLE CONTINU\r\n\r\n<br />\r\n\r\nSalle TD en U2 pour ESH 22012044, 22307815, 22304984, 22400685, 22307573\nPartie système CC1 = Sem39, CC2=Sem42 et CC4 = Sem45\nPartie Archi CC3=Sem48 et CC4 = Sem50\r\n'
'FSI / U3-105\r\n\r\n<br />\r\n\r\nKINX7AD1 - Parallélisme\r\n\r\n<br />\r\n\r\nKINM7CM<br />KINM7TDA5\r\n\r\n<br />\r\n\r\nTD\r\n'
'''
desc = unescape(row['description']).replace('\n', '').replace('\r', '')
fields = [x.strip() for x in desc.split('<br />')]
preparse_fields = fields[:]
rooms = []
teacher = 'unset'
groups = []
course_type = 'unset'
try:
if len(fields) == 0:
raise ValueError(f'There should be at least 1 field, but fields are {fields}')
elif len(fields) == 1:
# probably not a course. examples: "CONGES\r\n" or "FERIE\r\n"
course_type = fields[0]
else:
# first fields should be the room, but this is not always set
while (m := ROOM_RE.match(fields[0])) is not None:
rooms.append(m[1])
fields = fields[1:]
# assume that the next field is the course name, and skip it
fields = fields[1:]
# skip notes at the end of the fields until they look like a course type
while COURSE_TYPE_RE.match(fields[-1]) is None:
fields = fields[:-1]
if len(fields) <= 0:
break
# last field is a course type
course_type = fields[-1]
fields = fields[:-1]
# the last field may be a teacher, but this is optional
if STUDENT_GROUP_RE.match(fields[-1]) is None:
teacher = fields[-1]
fields = fields[:-1]
# all remaining fields should be student groups
groups = []
while len(fields) > 0 and (m := STUDENT_GROUP_RE.match(fields[0])) is not None:
groups.append(m[0])
fields = fields[1:]
if len(rooms) == 0:
rooms = ['unset']
if len(groups) == 0:
groups = ['unset']
return pd.Series([rooms, teacher, course_type, groups], index=['rooms_parsed', 'teacher_parsed', 'course_type_parsed', 'groups_parsed'])
except Exception as e:
print(f"Could not parse an event description. fields={preparse_fields}")
raise e
def request_slots_by_mod_code(flat_slot_df, session):
subject_codes = list(flat_slot_df['mod_code'].dropna().unique())
min_start_dt = flat_slot_df['start_dt'].min()
max_end_dt = flat_slot_df['end_dt'].max()
raw_response = fetch.do_celcat_request_subjects(min_start_dt, max_end_dt, subject_codes, session)
celcat_slots = CelcatEvents(raw_response)
celcat_df = celcat_slots.df
parsed_df = celcat_df.apply(parse_description, axis=1)
celcat_df = pd.concat([celcat_df.reset_index(drop=True), parsed_df.reset_index(drop=True)], axis=1).reset_index(drop=True)
flat_celcat_rows = []
for _, row in celcat_df.iterrows():
for room_parsed, group_parsed, module in itertools.product(row['rooms_parsed'], row['groups_parsed'], row['modules']):
flat_celcat_rows.append({
'start_dt': row['start'],
'end_dt': row['end'],
'eventCategory': row['eventCategory'],
'room_parsed': room_parsed,
'teacher_parsed': row['teacher_parsed'],
'course_type_parsed': row['course_type_parsed'],
'student_group': group_parsed,
'mod_code': module,
'slot_in_celcat': True
})
flat_celcat_df = pd.DataFrame(flat_celcat_rows)
merged = pd.merge(flat_slot_df, flat_celcat_df, how='left')
merged = merged.sort_values(by=['start_dt', 'end_dt'])
return merged, raw_response
def events_to_calendar_df(events):
cal_events = []
for _, row in events.iterrows():
subject = ", ".join([
f"{row['display_name']}",
f"{row['room_parsed']}",
f"{row['student_group']}",
])
if math.isnan(row['slot_in_celcat']):
subject = f"NOT IN CELCAT ! {subject}"
cal_events.append({
'subject': subject,
'start_dt': row['start_dt'],
'end_dt': row['end_dt'],
'location': row['room_parsed'],
})
cal_df = pd.DataFrame(cal_events)
cal_df.sort_values(inplace=True, by=['start_dt', 'end_dt', 'subject', 'location'])
return cal_df
def calendar_df_to_ics(df):
c = icalendar.Calendar()
c.add('version', '2.0')
c.add('prodid', '-//mpoquet//survival//')
for _, row in df.iterrows():
event = icalendar.Event()
event.add('name', icalendar.vText(row['subject']))
#event.add('description', icalendar.vText(row['subject']))
event.add('summary', icalendar.vText(row['subject']))
#event.add('dtstamp', datetime.datetime(2000, 1, 1, 0, 0, 0))
event.add('dtstart', row['start_dt'].tz_localize(tz='Europe/Paris'))
event.add('dtend', row['end_dt'].tz_localize(tz='Europe/Paris'))
event.add('location', icalendar.vText(row['location']))
s = "".join([
f"{row['subject']}",
f"{row['start_dt']}",
f"{row['end_dt']}",
f"{row['location']}",
])
event_hash = hashlib.md5(s.encode('utf-8')).hexdigest()
event.add('uid', f"{event_hash}@pff")
c.add_component(event)
return c