Skip to content
Snippets Groups Projects
events.py 9.99 KiB
import datetime
import hashlib
from html import unescape
import icalendar
from io import StringIO
import itertools
import logging
import math
import pandas as pd
import re
from . import fetch

ROOM_RE = re.compile(r'^(?:FSI|F2SMH) / (.*)$')
COURSE_TYPE_RE = re.compile(r'COURS|COURS/TD|TD|TP|CONTROLE CONTINU|CONTROLE PARTIEL|EXAMEN|Controle de Substitution|CONSULTATION DE COPIES')
STUDENT_GROUP_RE = re.compile(r'K?IN[A-Z0-9]+')

class CelcatEvents:
    def __init__(self, celcat_raw_response):
        self.df = pd.read_json(StringIO(celcat_raw_response))
        self.df['start'] = self.df['start'].astype('datetime64[ns]')
        self.df['end'] = self.df['end'].astype('datetime64[ns]')
        self.df = self.df[["start", "end", "allDay", "description", "eventCategory", "modules"]]
        self.df['timeslot_id'] = self.df.index

class FilteredCelcatEvents:
    def __init__(self, course_request, celcat_events):
        self._course_request = course_request
        self.crossed_df = celcat_events.df.merge(course_request.df, how='cross')

        # parse descriptions
        parsed_desc_df = self.crossed_df.apply(parse_description, axis=1)
        self.crossed_df = pd.concat([self.crossed_df.reset_index(drop=True), parsed_desc_df], axis=1)

        self.crossed_df['keep'] = self.crossed_df.apply(lambda row: FilteredCelcatEvents.timeslot_matches_course(row), axis=1)
        self.df = self.crossed_df[self.crossed_df['keep'] == True]

    def timeslot_matches_course(row):
        if row['allDay'] == True:
            return False
        if (row['course_type'].lower() not in row['eventCategory'].lower()) and (row['course_type'].lower() not in row['course_type_parsed'].lower()):
            return False
        if (row['module_apogee'] not in row['modules']) and (row['module_apogee'].lower() not in row['description'].lower()):
            return False
        if row['group'].lower() not in row['groups_parsed'].lower():
            return False
        if row['start'] < row['begin_date']:
            return False
        if row['end'] > row['end_date']:
            return False

        return True

    def check_expected_nb_timeslots(self):
        check_grp = self.df.groupby(['course_request_id'])
        check_grp['timeslot_id'].count()

        check_df = pd.DataFrame({
            'course_request_id': [x for x in range(len(check_grp))],
            'fetched_timeslot_count': check_grp['timeslot_id'].count(),
        }).reset_index(drop=True)

        reordered_course_req_df = self._course_request.df[['course_request_id', 'module_apogee', 'module_readable', 'begin_date', 'end_date', 'course_type', 'group', 'expected_nb_slots']]
        checked_df = reordered_course_req_df.merge(check_df, how='outer', on='course_request_id')
        fetch_problem_df = checked_df[checked_df['expected_nb_slots'] != checked_df['fetched_timeslot_count']]

        if len(fetch_problem_df) > 0:
            logging.warning('The number of time slots fetched from CELCAT does not match the expected number of time slots for some courses')
            logging.warning(f'\n{fetch_problem_df}')

            logging.warning('Details of the involved time slots:')
            problematic_courses = fetch_problem_df[['course_request_id']]
            problematic_time_slots = problematic_courses.merge(self.df, how='inner', on='course_request_id')
            problematic_time_slots = problematic_time_slots.sort_values(by=['course_request_id', 'start'])[['course_request_id', 'module_apogee', 'module_readable', 'start', 'end', 'course_type', 'group']]
            logging.warning(f'\n{problematic_time_slots}')

def parse_description(row):
    '''
    Expecting an HTML text with this information, separated by HTML/CRCF line breaks:
    - (The rooms where the course takes place): optional and there can be multiple rooms
    - The apogee code of the course and its readable name
    - A list of student groups that should attend this course
    - (The teacher name): optional
    - The course type
    - (Random misc. info): optional

    Example: 'FSI / U3-01\r\n\r\n<br />\r\n\r\nKINX7AD1 - Parall&#233;lisme [KINX7AD1]\r\n\r\n<br />\r\n\r\nKINB7TPA41<br />KINB7TPA42\r\n\r\n<br />\r\n\r\nTD\r\n'
             'FSI / Amphi GRIGNARD (bat 2A)\r\n\r\n<br />\r\n\r\nKINXIB11 - Bas\r\n\r\n<br />\r\n\r\nINXIB11A\r\n\r\n<br />\r\n\r\nCOLLET CHRISTOPHE\r\n\r\n<br />\r\n\r\nCOURS\r\n\r\n<br />\r\n\r\nSem 36 &#224; 42 partie syst&#232;me\nSem 43 &#224; 50 parti Archi\r\n'
             'FSI / Amphi VANDEL (U2-A4)<br />FSI / U2-115\r\n\r\n<br />\r\n\r\nKINXIB11 - Bas\r\n\r\n<br />\r\n\r\nINXIB11A\r\n\r\n<br />\r\n\r\nCOLLET CHRISTOPHE\r\n\r\n<br />\r\n\r\nCONTROLE CONTINU\r\n\r\n<br />\r\n\r\nSalle TD en U2 pour ESH 22012044, 22307815, 22304984, 22400685, 22307573\nPartie syst&#232;me CC1 = Sem39, CC2=Sem42 et CC4 = Sem45\nPartie Archi CC3=Sem48 et CC4 = Sem50\r\n'
             'FSI / U3-105\r\n\r\n<br />\r\n\r\nKINX7AD1 - Parall&#233;lisme\r\n\r\n<br />\r\n\r\nKINM7CM<br />KINM7TDA5\r\n\r\n<br />\r\n\r\nTD\r\n'
    '''

    desc = unescape(row['description']).replace('\n', '').replace('\r', '')
    fields = [x.strip() for x in desc.split('<br />')]
    preparse_fields = fields[:]

    rooms = []
    teacher = 'unset'
    groups = []
    course_type = 'unset'

    try:
        if len(fields) == 0:
            raise ValueError(f'There should be at least 1 field, but fields are {fields}')
        elif len(fields) == 1:
            # probably not a course. examples: "CONGES\r\n" or "FERIE\r\n"
            course_type = fields[0]
        else:
            # first fields should be the room, but this is not always set
            while (m := ROOM_RE.match(fields[0])) is not None:
                rooms.append(m[1])
                fields = fields[1:]

            # assume that the next field is the course name, and skip it
            fields = fields[1:]

            # skip notes at the end of the fields until they look like a course type
            while COURSE_TYPE_RE.match(fields[-1]) is None:
                fields = fields[:-1]
                if len(fields) <= 0:
                    break

            # last field is a course type
            course_type = fields[-1]
            fields = fields[:-1]

            # the last field may be a teacher, but this is optional
            if STUDENT_GROUP_RE.match(fields[-1]) is None:
                teacher = fields[-1]
                fields = fields[:-1]

            # all remaining fields should be student groups
            groups = []
            while len(fields) > 0 and (m := STUDENT_GROUP_RE.match(fields[0])) is not None:
                groups.append(m[0])
                fields = fields[1:]

        if len(rooms) == 0:
            rooms = ['unset']
        if len(groups) == 0:
            groups = ['unset']

        return pd.Series([rooms, teacher, course_type, groups], index=['rooms_parsed', 'teacher_parsed', 'course_type_parsed', 'groups_parsed'])
    except Exception as e:
        print(f"Could not parse an event description. fields={preparse_fields}")
        raise e


def request_slots_by_mod_code(flat_slot_df, session):
    subject_codes = list(flat_slot_df['mod_code'].dropna().unique())
    min_start_dt = flat_slot_df['start_dt'].min()
    max_end_dt = flat_slot_df['end_dt'].max()

    raw_response = fetch.do_celcat_request_subjects(min_start_dt, max_end_dt, subject_codes, session)

    celcat_slots = CelcatEvents(raw_response)
    celcat_df = celcat_slots.df
    parsed_df = celcat_df.apply(parse_description, axis=1)
    celcat_df = pd.concat([celcat_df.reset_index(drop=True), parsed_df.reset_index(drop=True)], axis=1).reset_index(drop=True)

    flat_celcat_rows = []
    for _, row in celcat_df.iterrows():
        for room_parsed, group_parsed, module in itertools.product(row['rooms_parsed'], row['groups_parsed'], row['modules']):
            flat_celcat_rows.append({
                'start_dt': row['start'],
                'end_dt': row['end'],
                'eventCategory': row['eventCategory'],
                'room_parsed': room_parsed,
                'teacher_parsed': row['teacher_parsed'],
                'course_type_parsed': row['course_type_parsed'],
                'student_group': group_parsed,
                'mod_code': module,
                'slot_in_celcat': True
            })
    flat_celcat_df = pd.DataFrame(flat_celcat_rows)
    merged = pd.merge(flat_slot_df, flat_celcat_df, how='left')
    merged = merged.sort_values(by=['start_dt', 'end_dt'])
    return merged, raw_response

def events_to_calendar_df(events):
    cal_events = []
    for _, row in events.iterrows():
        subject = ", ".join([
            f"{row['display_name']}",
            f"{row['room_parsed']}",
            f"{row['student_group']}",
        ])
        if math.isnan(row['slot_in_celcat']):
            subject = f"NOT IN CELCAT ! {subject}"

        cal_events.append({
            'subject': subject,
            'start_dt': row['start_dt'],
            'end_dt': row['end_dt'],
            'location': row['room_parsed'],
        })

    cal_df = pd.DataFrame(cal_events)
    cal_df.sort_values(inplace=True, by=['start_dt', 'end_dt', 'subject', 'location'])
    return cal_df

def calendar_df_to_ics(df):
    c = icalendar.Calendar()
    c.add('version', '2.0')
    c.add('prodid', '-//mpoquet//survival//')
    for _, row in df.iterrows():
        event = icalendar.Event()
        event.add('name', icalendar.vText(row['subject']))
        #event.add('description', icalendar.vText(row['subject']))
        event.add('summary', icalendar.vText(row['subject']))
        #event.add('dtstamp', datetime.datetime(2000, 1, 1, 0, 0, 0))
        event.add('dtstart', row['start_dt'].tz_localize(tz='Europe/Paris'))
        event.add('dtend', row['end_dt'].tz_localize(tz='Europe/Paris'))
        event.add('location', icalendar.vText(row['location']))
        s = "".join([
            f"{row['subject']}",
            f"{row['start_dt']}",
            f"{row['end_dt']}",
            f"{row['location']}",
        ])
        event_hash = hashlib.md5(s.encode('utf-8')).hexdigest()
        event.add('uid', f"{event_hash}@pff")
        c.add_component(event)
    return c