From 4b7c38debf14939541321614dfdf0c453f938366 Mon Sep 17 00:00:00 2001 From: Millian Poquet <millian.poquet@irit.fr> Date: Sat, 10 Sep 2022 03:06:41 +0200 Subject: [PATCH] POC script -> reusable functions&types --- default.nix | 1 - src/script.py | 234 ++++++++++++++++++++++++++++++++------------------ 2 files changed, 149 insertions(+), 86 deletions(-) diff --git a/default.nix b/default.nix index 32c8511..ba8ea8f 100644 --- a/default.nix +++ b/default.nix @@ -10,7 +10,6 @@ pkgs.mkShell { python3Packages.ipython python3Packages.requests python3Packages.pandas - python3Packages.nltk python3Packages.ics ]; } diff --git a/src/script.py b/src/script.py index eb72e8f..1206869 100644 --- a/src/script.py +++ b/src/script.py @@ -2,91 +2,155 @@ import ics import requests import pandas as pd -import nltk - -input_dtypes = { - 'module_apogee': 'str', - 'module_readable': 'str', - 'begin_date': 'str', - 'end_date': 'str', - 'course_type': 'str', - 'group': 'str', - 'expected_nb_slots': 'int64' -} -input_data = pd.read_csv('input-data.csv', parse_dates=['begin_date', 'end_date']) -input_data['input_id'] = input_data.index - -input_date_range_min = min(input_data['begin_date']).strftime("%Y-%m-%d") -input_date_range_max = (max(input_data['end_date']) + pd.Timedelta(days=1)).strftime("%Y-%m-%d") - -apogee_codes = input_data['module_apogee'].unique() - -request_data = [ - f'start={input_date_range_min}', - f'end={input_date_range_max}', - 'resType=100', - 'calView=agendaWeek', -] + ['federationIds%5B%5D={}'.format(apogee_code) for apogee_code in apogee_codes] - -url = 'https://edt.univ-tlse3.fr/calendar2/Home/GetCalendarData'; -request_headers = { - "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" -} -response = requests.post(url, '&'.join(request_data), headers=request_headers) - -with open('out.json', 'w') as f: - f.write(response.text) - -celcat_data = pd.read_json(response.text) -celcat_data['start'] = celcat_data['start'].astype('datetime64[ns]') -celcat_data['end'] = celcat_data['end'].astype('datetime64[ns]') -celcat_data = celcat_data[["start", "end", "allDay", "description", "eventCategory", "modules"]] -celcat_data['timeslot_id'] = celcat_data.index - -crossed = celcat_data.merge(input_data, how='cross') - -def timeslot_matches_course(row): - if row['allDay'] == True: - return False - if (row['course_type'].lower() not in row['eventCategory'].lower()): #and (row['course_type'].lower() not in row['description'].lower()): - return False - if (row['module_apogee'] not in row['modules']) and (row['module_apogee'].lower() not in row['description'].lower()): - return False - if row['group'].lower() not in row['description'].lower(): - return False - if row['start'] < row['begin_date']: - return False - if row['end'] > row['end_date']: - return False +import logging + +class CourseRequest: + def __init__(self, filename): + self.df = pd.read_csv(filename, parse_dates=['begin_date', 'end_date']) + self.df['course_request_id'] = self.df.index + + def generate_request_body(self): + date_range_min = min(self.df['begin_date']).strftime("%Y-%m-%d") + date_range_max = (max(self.df['end_date']) + pd.Timedelta(days=1)).strftime("%Y-%m-%d") + apogee_codes = self.df['module_apogee'].unique() + + fields = [ + f'start={date_range_min}', + f'end={date_range_max}', + 'resType=100', + 'calView=agendaWeek', + ] + ['federationIds%5B%5D={}'.format(apogee_code) for apogee_code in apogee_codes] + return '&'.join(fields) + + def do_request(self, url='https://edt.univ-tlse3.fr/calendar2/Home/GetCalendarData'): + headers = {"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"} + request_body = self.generate_request_body() + response = requests.post(url, request_body, headers=request_headers) + + if not response.ok: + logging.error(f'POST HTTP request failed (status code {response.status_code}): {response.reason}') + logging.error(f'Request response text:\n---\n{response.text}\n---') + response.raise_for_status() + + return response.text + +class CelcatEvents: + def __init__(self, celcat_raw_response): + self.df = pd.read_json(celcat_raw_response) + self.df['start'] = self.df['start'].astype('datetime64[ns]') + self.df['end'] = self.df['end'].astype('datetime64[ns]') + self.df = self.df[["start", "end", "allDay", "description", "eventCategory", "modules"]] + self.df['timeslot_id'] = self.df.index - return True - -crossed['keep'] = crossed.apply(lambda row: timeslot_matches_course(row), axis=1) -crossed.to_csv('/tmp/debug.csv', index=False) -keep = crossed[crossed['keep'] == True] - -check_grp = keep.groupby(['input_id']) -check_grp['timeslot_id'].count() - -check_df = pd.DataFrame({ - 'input_id': [x for x in range(len(check_grp))], - 'fetched_timeslot_count': check_grp['timeslot_id'].count(), -}).reset_index(drop=True) - -reordered_input_data = input_data[['input_id', 'module_apogee', 'module_readable', 'begin_date', 'end_date', 'course_type', 'group', 'expected_nb_slots']] -checked_df = reordered_input_data.merge(check_df, how='inner', on='input_id') -fetch_problem_df = checked_df[checked_df['expected_nb_slots'] != checked_df['fetched_timeslot_count']] -print(fetch_problem_df) - -c = ics.Calendar() -for _, row in keep.sort_values(by='start').iterrows(): - event = ics.Event( - name = f'{row["module_readable"]} - {row["course_type"]} - {row["group"]}', - begin = row['start'].tz_localize(tz='Europe/Paris'), - end = row['end'].tz_localize(tz='Europe/Paris'), - description = row['description'] - ) - c.events.add(event) +class FilteredCelcatEvents: + def __init__(self, course_request, celcat_events): + self._course_request = course_request + self.crossed_df = celcat_events.df.merge(course_request.df, how='cross') + + # parse descriptions + parsed_desc_df = self.crossed_df.apply(FilteredCelcatEvents.parse_description, axis=1) + self.crossed_df = pd.concat([self.crossed_df.reset_index(drop=True), parsed_desc_df], axis=1) + + self.crossed_df['keep'] = self.crossed_df.apply(lambda row: FilteredCelcatEvents.timeslot_matches_course(row), axis=1) + self.df = self.crossed_df[self.crossed_df['keep'] == True] + + def timeslot_matches_course(row): + if row['allDay'] == True: + return False + if (row['course_type'].lower() not in row['eventCategory'].lower()) and (row['course_type'].lower() not in row['course_type_parsed'].lower()): + return False + if (row['module_apogee'] not in row['modules']) and (row['module_apogee'].lower() not in row['description'].lower()): + return False + if row['group'].lower() not in row['groups_parsed'].lower(): + return False + if row['start'] < row['begin_date']: + return False + if row['end'] > row['end_date']: + return False + + return True + + def check_expected_nb_timeslots(self): + check_grp = self.df.groupby(['course_request_id']) + check_grp['timeslot_id'].count() + + check_df = pd.DataFrame({ + 'course_request_id': [x for x in range(len(check_grp))], + 'fetched_timeslot_count': check_grp['timeslot_id'].count(), + }).reset_index(drop=True) + + reordered_course_req_df = self._course_request.df[['course_request_id', 'module_apogee', 'module_readable', 'begin_date', 'end_date', 'course_type', 'group', 'expected_nb_slots']] + checked_df = reordered_course_req_df.merge(check_df, how='inner', on='course_request_id') + fetch_problem_df = checked_df[checked_df['expected_nb_slots'] != checked_df['fetched_timeslot_count']] + + if len(fetch_problem_df) > 0: + logging.warning('The number of time slots fetched from CELCAT does not match the expected number of time slots for some courses') + logging.warning(f'\n{fetch_problem_df}') + + def parse_description(row): + ''' + Expecting an HTML text with this information, separated by HTML/CRCF line breaks: + - (The room where the course takes place): optional + - The apogee code of the course and its readable name + - A list of student groups that should attend this course + - The course type + + Example: 'FSI / U3-01\r\n\r\n<br />\r\n\r\nKINX7AD1 - Parallélisme [KINX7AD1]\r\n\r\n<br />\r\n\r\nKINB7TPA41<br />KINB7TPA42\r\n\r\n<br />\r\n\r\nTD\r\n' + ''' + + desc = row['description'].replace('\n', '').replace('\r', '') + fields = [x.strip() for x in desc.split('<br />')] + + room = 'unset' + groups_joined = 'unset' + course_type = 'unset' + + if len(fields) == 0: + raise ValueError(f'There should be at least 1 field, but fields are {fields}') + elif len(fields) == 1: + # probably not a course. examples: "CONGES\r\n" or "FERIE\r\n" + course_type = fields[0] + else: + # first field should be the room, but this is not always set + room = 'unset' + if fields[0].startswith('FSI / '): + room = fields[0].replace('FSI / ', '') + fields = fields[1:] + + # let us assume that the second field is the course name + fields = fields[1:] + + # last field should be the course type + course_type = fields[-1] + + # all remaining fields should be student groups + groups = fields[:-1] + groups_joined = ' '.join(groups) + + return pd.Series([room, course_type, groups_joined], index=['room_parsed', 'course_type_parsed', 'groups_parsed']) + +def course_df_to_ics(df): + c = ics.Calendar() + for _, row in df.iterrows(): + event = ics.Event( + name = f'{row["module_readable"]} - {row["course_type"]} - {row["groups_parsed"]}', + begin = row['start'].tz_localize(tz='Europe/Paris'), + end = row['end'].tz_localize(tz='Europe/Paris'), + ) + if row['room_parsed'] != 'unset': + event.location = row['room_parsed'] + c.events.add(event) + + return c + +req = CourseRequest('input-data.csv') +celcat_raw_response = req.do_request() + +celcat_events = CelcatEvents(celcat_raw_response) +filtered_celcat_events = FilteredCelcatEvents(req, celcat_events) +filtered_celcat_events.check_expected_nb_timeslots() + +c = course_df_to_ics(filtered_celcat_events.df) with open('out.ics', 'w') as f: - f.write(str(c)) + f.writelines(c) -- GitLab