- Aperçu du Projet
- TP1: Découverte de Gymnasium et CartPole
- TP2: Q-Learning sur FrozenLake
- TP3: Gestion de Trafic avec Q-Learning et SARSA
- TP4: Apprentissage Profond sur Taxi avec PPO
- TP5: Reinforcement Learning avec TF-Agents
- Visualisation des Résultats
- Guide de Réglage des Hyperparamètres
- Résultats Clés
- Installation
- Workflows
- Contribuer au Projet
- FAQ
- Ressources et Références
- Licence
Ce projet regroupe une série de travaux pratiques (TP) sur l'apprentissage par renforcement (RL), réalisés dans le cadre du cours de Machine Learning II à l'École Nationale de l'Intelligence Artificielle et du Digital. Chaque TP explore différents aspects du RL, allant de la prise en main d'environnements simples comme CartPole, FrozenLake, et Taxi à l'implémentation d'algorithmes avancés comme Q-Learning, SARSA, PPO, et DQN avec TF-Agents.
- Comprendre les concepts fondamentaux du RL (environnements, agents, politiques, récompenses).
- Implémenter et comparer différents algorithmes RL (Q-Learning, SARSA, PPO, DQN).
- Utiliser des bibliothèques modernes comme Gymnasium et TF-Agents.
- Visualiser et analyser les performances des agents.
- Prise en main de Gymnasium
- Exploration de l'environnement CartPole-v1
- Contrôle manuel et évaluation d'une politique aléatoire
- Création de l'environnement :
import gymnasium as gym env = gym.make("CartPole-v1", render_mode="human") observation, info = env.reset() print(f"Espace d'actions : {env.action_space}") print(f"Espace d'observations : {env.observation_space}")
- Boucle d'interaction :
iteration = 0 while iteration < 100: action = env.action_space.sample() observation, reward, terminated, truncated, info = env.step(action) print(f"Action : {action}, Observation : {observation}, Reward : {reward}") if terminated or truncated: observation, info = env.reset() iteration += 1
- Exécution de quelques pas :
for i in range(5): action = env.action_space.sample() observation, reward, terminated, truncated, info = env.step(action) print(f"Pas {i+1} - Action: {action}, Observation: {observation}, Reward: {reward}") if terminated or truncated: observation, info = env.reset()
- Contrôle interactif :
running = True total_reward = 0 steps = 0 while running: user_input = input("Entrez une action (0 ou 1, q pour quitter): ") if user_input.lower() == 'q': running = False continue action = int(user_input) observation, reward, terminated, truncated, info = env.step(action) steps += 1 total_reward += reward print(f"Position: {observation[0]}, Angle: {observation[2]}, Reward: {reward}, Total: {total_reward}") if terminated or truncated: print(f"Épisode terminé après {steps} pas avec une récompense totale de {total_reward}") observation, info = env.reset() total_reward = 0 steps = 0
- Évaluation :
num_episodes = 10 episode_durations = [] episode_steps = [] for episode in range(num_episodes): observation, info = env.reset() steps = 0 start_time = time.time() while True: action = env.action_space.sample() observation, reward, terminated, truncated, info = env.step(action) steps += 1 if terminated or truncated: duration = time.time() - start_time episode_durations.append(duration) episode_steps.append(steps) print(f"Épisode {episode+1}: {steps} pas, durée: {duration:.2f} secondes") break average_steps = np.mean(episode_steps) print(f"Nombre de pas moyen: {average_steps:.2f}")
flowchart TD
A["[Début]"] --> B["[Créer environnement]"]
B --> C["[Réinitialiser environnement]"]
C --> D["[Choisir action]"]
D --> E["[Exécuter action]"]
E --> F{"[Terminé ?]"}
F -->|Non| D
F -->|Oui| G["[Évaluer performance]"]
G --> H{"[Plus d'épisodes ?]"}
H -->|Oui| C
H -->|Non| I["[Fin]"]
- Test initial (politique aléatoire) :
- Récompense moyenne (nombre de pas) : ~20-40.
- Exemple de résultat :
Épisode 1: 19 pas, durée: 0.19 secondes Épisode 2: 17 pas, durée: 0.17 secondes Épisode 3: 35 pas, durée: 0.35 secondes Nombre de pas moyen: 23.67
- Implémenter l'algorithme Q-Learning sur FrozenLake-v1
- Initialiser et mettre à jour une Q-Table
- Évaluer les performances de l'agent
- Création et exploration :
import gymnasium as gym env = gym.make("FrozenLake-v1", is_slippery=False, render_mode="human") print("Espace d'états :", env.observation_space.n) print("Espace d'actions :", env.action_space.n) episode = 0 episode_max = 100 while episode < episode_max: action = env.action_space.sample() observation, reward, done, _, _ = env.step(action) print("episode:", episode, "action:", action, "observation:", observation, "reward:", reward) if done: env.reset() episode += 1
- Initialisation :
import numpy as np q_table = np.zeros((env.observation_space.n, env.action_space.n))
- Entraînement :
alpha = 0.01 gamma = 0.99 epsilon = 0.5 num_episodes = 100 for episode in range(num_episodes): state, _ = env.reset() done = False while not done: if np.random.rand() < epsilon: action = env.action_space.sample() else: action = np.argmax(q_table[state]) next_state, reward, done, _, _ = env.step(action) best_next_action = np.max(q_table[next_state]) q_table[state, action] += alpha * (reward + gamma * best_next_action - q_table[state, action]) state = next_state
- Évaluation :
success = 0 for episode in range(num_episodes): state, _ = env.reset() done = False while not done: if np.random.rand() > epsilon: action = env.action_space.sample() else: action = np.argmax(q_table[state]) next_state, reward, done, _, _ = env.step(action) best_next_action = np.max(q_table[next_state]) q_table[state, action] += alpha * (reward + gamma * best_next_action - q_table[state, action]) state = next_state if done and reward == 1.0: success += 1 success_taux = success / num_episodes print(f"Taux de succès sur {num_episodes} épisodes : {success_taux * 100:.2f}%")
flowchart TD
A["[Début]"] --> B["[Créer environnement]"]
B --> C["[Initialiser Q-Table]"]
C --> D["[Boucle d'entraînement]"]
D --> E["[Choisir action (epsilon-greedy)]"]
E --> F["[Mettre à jour Q-Table]"]
F --> G{"[Épisode terminé ?]"}
G -->|Non| E
G -->|Oui| H{"[Tous les épisodes terminés ?]"}
H -->|Non| D
H -->|Oui| I["[Évaluation]"]
I --> J["[Fin]"]
- Après entraînement :
- Taux de succès : ~70-90% (FrozenLake donne une récompense de 1 pour atteindre l'objectif).
- Exemple de résultat :
Taux de succès sur 100 épisodes : 78.00%
- Implémenter Q-Learning et SARSA dans un environnement de gestion de trafic
- Comparer les performances des deux algorithmes
- Analyser la stabilité et l'efficacité des politiques apprises
- Exploration :
from env_traffic import TrafficEnvironment env = TrafficEnvironment() state = env.reset() for _ in range(10): action = 0 next_state, reward = env.step(action) print(f"Etat : {next_state}, Recompense : {reward}")
- Initialisation et entraînement :
import numpy as np q_table = np.zeros((10, 10, 10, 10, 2)) def train_q_learning(env, episodes=1000, alpha=0.1, gamma=0.9, epsilon=1.0, decay=0.995): q_learning_rewards = [] for episode in range(episodes): state = tuple(np.clip(env.reset(), 0, 9)) total_reward = 0 for step in range(50): if np.random.rand() < epsilon: action = np.random.choice([0, 1]) else: action = np.argmax(q_table[state]) next_state, reward = env.step(action) next_state = tuple(np.clip(next_state, 0, 9)) total_reward += reward best_next_action = np.argmax(q_table[next_state]) q_table[state + (action,)] += alpha * (reward + gamma * q_table[next_state + (best_next_action,)] - q_table[state + (action,)]) state = next_state q_learning_rewards.append(total_reward) epsilon = max(0.01, epsilon * decay) return q_table, q_learning_rewards q_table, q_learning_rewards = train_q_learning(env)
- Entraînement :
sarsa_table = np.zeros((10, 10, 10, 10, 2)) def train_sarsa(env, episodes=1000, alpha=0.1, gamma=0.9, epsilon=1.0, decay=0.995): sarsa_rewards = [] for episode in range(episodes): state = tuple(np.clip(env.reset(), 0, 9)) total_reward = 0 if np.random.rand() < epsilon: action = np.random.choice([0, 1]) else: action = np.argmax(sarsa_table[state]) for step in range(50): next_state, reward = env.step(action) next_state = tuple(np.clip(next_state, 0, 9)) total_reward += reward if np.random.rand() < epsilon: next_action = np.random.choice([0, 1]) else: next_action = np.argmax(sarsa_table[next_state]) sarsa_table[state + (action,)] += alpha * (reward + gamma * sarsa_table[next_state + (next_action,)] - sarsa_table[state + (action,)]) state = next_state action = next_action sarsa_rewards.append(total_reward) epsilon = max(0.01, epsilon * decay) return sarsa_table, sarsa_rewards sarsa_table, sarsa_rewards = train_sarsa(env)
- Visualisation et comparaison :
import matplotlib.pyplot as plt plt.plot(q_learning_rewards, label="Q-learning", color='b') plt.plot(sarsa_rewards, label="SARSA", color='r') plt.xlabel("Épisodes") plt.ylabel("Récompense") plt.legend() plt.show() final_q_learning_reward = np.mean(q_learning_rewards[-100:]) final_sarsa_reward = np.mean(sarsa_rewards[-100:]) print(f"Récompense finale moyenne (Q-Learning) : {final_q_learning_reward}") print(f"Récompense finale moyenne (SARSA) : {final_sarsa_reward}")
flowchart TD
A["[Début]"] --> B["[Créer environnement]"]
B --> C["[Initialiser Q-Table/SARSA-Table]"]
C --> D["[Boucle d'entraînement]"]
D --> E["[Q-Learning]"]
D --> F["[SARSA]"]
E --> G["[Mettre à jour Q-Table]"]
F --> H["[Mettre à jour SARSA-Table]"]
G --> I{"[Épisode terminé ?]"}
H --> I
I -->|Non| D
I -->|Oui| J{"[Tous les épisodes terminés ?]"}
J -->|Non| D
J -->|Oui| K["[Évaluation comparative]"]
K --> L["[Fin]"]
- Après entraînement :
- Q-Learning Récompense Moyenne : ~42.7 ± 3.2
- SARSA Récompense Moyenne : ~39.1 ± 2.8
- Exemple de résultat :
Récompense finale moyenne (Q-Learning) : 42.7 Récompense finale moyenne (SARSA) : 39.1
- Implémenter l'algorithme PPO sur l'environnement Taxi-v3
- Initialiser une politique et une fonction de valeur
- Entraîner et évaluer un agent avec PPO
- Initialisation :
import gymnasium as gym import numpy as np env = gym.make("Taxi-v3", render_mode="human") state_size = env.observation_space.n action_size = env.action_space.n policy_table = np.ones((state_size, action_size)) / action_size value_table = np.zeros(state_size)
- Collecte d'épisodes :
n_episodes = 20 for episode in range(n_episodes): state = env.reset() done = False total_reward = 0 while not done: action = env.action_space.sample() next_state, reward, done, _, _ = env.step(action) print(f"Étape {episode + 1} - Action choisie : {action}, Récompense : {reward}, Nouvel état : {next_state}") total_reward += reward print(f"Récompense totale pour l'épisode {episode + 1}: {total_reward}")
- Classes Policy et ValueFunction :
class Policy: def __init__(self, action_size): self.action_size = action_size self.policy_table = np.ones(self.action_size) / self.action_size def get_action(self): action = np.random.choice(self.action_size, p=self.policy_table) return action def update(self, old_probs, new_probs, advantages, epsilon=0.2): ratio = new_probs / old_probs clipped_ratio = np.clip(ratio, 1 - epsilon, 1 + epsilon) objective = np.minimum(ratio * advantages, clipped_ratio * advantages) loss = -np.mean(objective) return loss class ValueFunction: def __init__(self, state_size): self.value_table = np.zeros(state_size) def get_value(self, state): if not isinstance(state, int): state = state[0] return self.value_table[state] def update(self, states, discounted_rewards, alpha=0.01): for state, reward in zip(states, discounted_rewards): if not isinstance(state, int): state = state[0] self.value_table[state] += alpha * (reward - self.value_table[state])
- Entraînement :
gamma = 0.99 epsilon = 0.2 epochs = 1000 max_timesteps = 200 policy = Policy(action_size) value_function = ValueFunction(state_size) for episode in range(epochs): state = env.reset() done = False episode_rewards = [] old_probs = [] values = [] rewards = [] states = [] actions = [] for t in range(max_timesteps): action = policy.get_action() old_prob = policy.policy_table[action] next_state, reward, done, _, _ = env.step(action) states.append(state) actions.append(action) rewards.append(reward) old_probs.append(old_prob) value = value_function.get_value(state) values.append(value) state = next_state episode_rewards.append(reward) if done: break discounted_rewards = compute_discounted_rewards(rewards, gamma) advantages = np.array(discounted_rewards) - np.array(values) for action, old_prob, advantage in zip(actions, old_probs, advantages): new_prob = policy.policy_table[action] policy.update(old_prob, new_prob, advantage, epsilon) value_function.update(states, discounted_rewards) print(f"Episode {episode+1} récompense totale : {sum(episode_rewards)}")
flowchart TD
A["[Début]"] --> B["[Créer environnement]"]
B --> C["[Initialiser Policy/Value]"]
C --> D["[Boucle d'entraînement]"]
D --> E["[Collecter trajectoire]"]
E --> F["[Calculer avantages]"]
F --> G["[Mettre à jour Policy/Value]"]
G --> H{"[Épisode terminé ?]"}
H -->|Non| E
H -->|Oui| I{"[Tous les épisodes terminés ?]"}
I -->|Non| D
I -->|Oui| J["[Fin]"]
- Exploration initiale :
- Récompense totale par épisode : Variable, souvent négative (e.g., -200 à 20).
- Exemple de résultat :
Récompense totale pour l'épisode 1: -200 Récompense totale pour l'épisode 2: 10
- Après entraînement :
- Récompense totale : Devrait augmenter, atteignant des valeurs positives (e.g., 10-20).
- Exemple de résultat :
Episode 1000 récompense totale : 15
- Découvrir l'utilisation pratique de la bibliothèque TensorFlow Agents (TF-Agents)
- Entraîner un agent d'apprentissage par renforcement dans un environnement simple (CartPole-v0)
- Manipuler les composants fondamentaux d'un agent RL : environnement, réseau, agent, buffer, politique et entraînement
- Création de l'environnement CartPole-v0 :
import tf_agents.environments.suite_gym as suite_gym import tf_agents.environments.tf_py_environment as tf_py_environment train_py_env = suite_gym.load('CartPole-v0') eval_py_env = suite_gym.load('CartPole-v0') train_env = tf_py_environment.TFPyEnvironment(train_py_env) eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)
- Affichage des spécifications :
print("Observation Spec:", train_env.observation_spec()) print("Action Spec:", train_env.action_spec())
- Test avec un acteur aléatoire :
for episode in range(3): time_step = eval_env.reset() episode_reward = 0 while not time_step.is_last(): action = np.random.choice([0, 1]) time_step = eval_env.step(action) episode_reward += time_step.reward print(f"Test Episode {episode + 1}, Reward: {episode_reward}")
- Création du Q-Network :
from tf_agents.networks import q_network fc_layer_params = (100, 50) q_net = q_network.QNetwork( train_env.observation_spec(), train_env.action_spec(), fc_layer_params=fc_layer_params )
- Création de l'agent DQN :
from tf_agents.agents.dqn import dqn_agent import tensorflow as tf from tf_agents.utils import common optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3) train_step_counter = tf.Variable(0) agent = dqn_agent.DqnAgent( train_env.time_step_spec(), train_env.action_spec(), q_network=q_net, optimizer=optimizer, td_errors_loss_fn=common.element_wise_squared_loss, train_step_counter=train_step_counter, epsilon_greedy=0.1, target_update_period=100, gamma=0.99 ) agent.initialize()
- Création du replay buffer :
from tf_agents.replay_buffers import tf_uniform_replay_buffer replay_buffer_max_length = 100000 replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer( data_spec=agent.collect_data_spec, batch_size=train_env.batch_size, max_length=replay_buffer_max_length )
- Boucle d'entraînement :
num_iterations = 20000 for iteration in range(num_iterations): for _ in range(collect_steps_per_iteration): collect_step(train_env, collect_policy, replay_buffer) experience, unused_info = next(iterator) train_loss = agent.train(experience).loss step = agent.train_step_counter.numpy() if step % log_interval == 0: print(f"Step {step}, Loss: {train_loss}") if step % eval_interval == 0: total_return = 0 for _ in range(num_eval_episodes): time_step = eval_env.reset() episode_return = 0 while not time_step.is_last(): action_step = agent.policy.action(time_step) time_step = eval_env.step(action_step.action) episode_return += time_step.reward total_return += episode_return avg_return = total_return / num_eval_episodes print(f"Step {step}, Average Return: {avg_return}")
flowchart TD
A["[Début]"] --> B["[Créer environnement]"]
B --> C["[Créer Q-Network]"]
C --> D["[Créer agent DQN]"]
D --> E["[Configurer replay buffer]"]
E --> F["[Collecter données initiales]"]
F --> G["[Boucle d'entraînement]"]
G --> H{"[Évaluer ?]"}
H -->|Oui| I["[Calculer récompense moyenne]"]
I --> J{"[Fin iterations ?]"}
H -->|Non| G
J -->|Non| G
J -->|Oui| K["[Fin]"]
- Test initial (politique aléatoire) :
Test Episode 1, Reward: [19.] Test Episode 2, Reward: [17.] Test Episode 3, Reward: [35.]- Récompense moyenne : ~23.67.
- Après entraînement :
- Récompense moyenne : ~200 (ou ~500 si Gym ≥ 0.25.0).
- Exemple de log :
Step 1000, Average Return: 50.0 Step 2000, Average Return: 120.0 Final Average Return after training: 195.0
Ajoutez ce code pour visualiser les performances :
- Installer W&B :
pip install wandb
- Initialiser W&B :
import wandb wandb.init(project="reinforcement-learning", name="tpX-experiment")
- Logger les métriques :
- TP1/TP2/TP4 :
wandb.log({"episode": episode, "reward": total_reward}) - TP3 :
wandb.log({"episode": episode, "q_learning_reward": total_reward, "sarsa_reward": total_reward}) - TP5 :
wandb.log({"step": step, "average_return": avg_return})
- TP1/TP2/TP4 :
- Pas d'hyperparamètres (politique aléatoire ou manuelle).
| Hyperparamètre | Valeur par Défaut | Description | Plage Recommandée |
|---|---|---|---|
| Alpha | 0.01 | Taux d'apprentissage | 0.01 à 0.5 |
| Gamma | 0.99 | Facteur de réduction | 0.9 à 0.999 |
| Epsilon | 0.5 | Probabilité d'exploration | 0.1 à 1.0 |
| Hyperparamètre | Valeur par Défaut | Description | Plage Recommandée |
|---|---|---|---|
| Alpha | 0.1 | Taux d'apprentissage | 0.05 à 0.5 |
| Gamma | 0.9 | Facteur de réduction | 0.8 à 0.99 |
| Epsilon | 1.0 (décroît à 0.01) | Probabilité d'exploration | 1.0 à 0.01 |
| Epsilon Decay | 0.995 | Taux de décroissance d'epsilon | 0.99 à 0.999 |
| Hyperparamètre | Valeur par Défaut | Description | Plage Recommandée |
|---|---|---|---|
| Gamma | 0.99 | Facteur de réduction | 0.9 à 0.999 |
| Epsilon | 0.2 | Seuil de clipping PPO | 0.1 à 0.3 |
| Alpha (Value Update) | 0.01 | Taux d'apprentissage pour la fonction de valeur | 0.001 à 0.1 |
| Hyperparamètre | Valeur par Défaut | Description | Plage Recommandée |
|---|---|---|---|
| Learning Rate | 1e-3 | Taux d'apprentissage de l'optimiseur Adam | 1e-4 à 1e-2 |
| Epsilon (epsilon_greedy) | 0.1 | Probabilité d'exploration | 0.05 à 0.3 |
| Target Update Period | 100 | Fréquence de mise à jour du réseau cible | 50 à 500 |
| Gamma | 0.99 | Facteur de réduction des récompenses futures | 0.9 à 0.999 |
- 🏎️ TP1 : Une politique aléatoire sur CartPole atteint ~23.67 pas en moyenne
- 🔍 TP2 : Q-Learning atteint un taux de succès de 78% sur FrozenLake-v1
- 🚦 TP3 : Q-Learning (42.7 ± 3.2) converge 25% plus vite que SARSA (39.1 ± 2.8), mais SARSA est plus stable
- 🚕 TP4 : PPO sur Taxi-v3 améliore les récompenses de -200 à ~15 après 1000 épisodes
- 🤖 TP5 : DQN atteint une récompense moyenne de ~195 après 20,000 itérations sur CartPole-v0
git clone https://github.com/ennajari/reinforcement-learning.git
cd reinforcement-learning
pip install -r requirements.txt
tp1.py # CartPole Exploration
tp2.py # FrozenLake Q-Learning
tp3.py # Traffic Management
tp4.py # Taxi PPO
tp5.py # CartPole with TF-Agents
- Matplotlib pour la visualisation :
pip install matplotlib
- Weights & Biases pour le suivi des expériences :
pip install wandb
flowchart TB
subgraph TP1
A1[Environnement CartPole] --> B1[Exploration]
B1 --> C1[Contrôle manuel]
end
subgraph TP2
A2[Q-Table FrozenLake] --> B2[Apprentissage Q-Learning]
B2 --> C2[Évaluation]
end
subgraph TP3
A3[Environnement Traffic] --> B3[Q-Learning vs SARSA]
B3 --> C3[Analyse comparative]
end
subgraph TP4
A4[Environnement Taxi] --> B4[PPO Entraînement]
B4 --> C4[Évaluation]
end
subgraph TP5
A5[Environnement CartPole] --> B5[DQN Agent]
B5 --> C5[Entraînement et Évaluation]
end
TP1 --> TP2 --> TP3 --> TP4 --> TP5
Nous accueillons les contributions ! Suivez ces étapes pour contribuer :
- Fork le dépôt : Cliquez sur le bouton "Fork" sur GitHub.
- Clonez votre fork :
git clone https://github.com/votre-utilisateur/reinforcement-learning.git
- Créez une branche :
git checkout -b ma-nouvelle-fonctionnalite
- Faites vos modifications et testez-les.
- Commit et push :
git commit -m "Ajout de ma nouvelle fonctionnalité" git push origin ma-nouvelle-fonctionnalite - Créez une Pull Request : Allez sur GitHub et soumettez une PR depuis votre branche.
- Suivez les conventions de codage Python (PEP 8).
- Ajoutez des tests pour les nouvelles fonctionnalités.
- Documentez votre code et mettez à jour le README si nécessaire.
R : L'entraînement DQN peut être lent sur CPU. Essayez de réduire num_iterations (e.g., à 5000) ou utilisez un GPU.
R : Utilisez render_mode="human" lors de la création de l'environnement et appelez env.render(). Cela peut ne pas fonctionner sur des serveurs sans interface graphique.
R : Mettez à jour TensorFlow et TF-Agents à leurs dernières versions :
bash pip install --upgrade tensorflow tf-agents
- TF-Agents Documentation
- Gymnasium Documentation
- DQN Paper
- PPO Paper
- Reinforcement Learning: An Introduction (Sutton & Barto)
- Weights & Biases Documentation
Ce projet est sous licence MIT. Voir le fichier LICENSE pour plus de détails.
École Nationale de l'Intelligence Artificielle et du Digital Professeur : Mohamed Khalifa BOUTAHIR
